Browse Source

client initiates heartbeats and detects when connections are stale

Alan Shreve 12 years ago
parent
commit
01eb8eefb3
2 changed files with 53 additions and 9 deletions
  1. 49 3
      src/ngrok/client/main.go
  2. 4 6
      src/ngrok/server/control.go

+ 49 - 3
src/ngrok/client/main.go

@@ -14,9 +14,15 @@ import (
 	"ngrok/proto"
 	"ngrok/util"
 	"runtime"
+	"sync/atomic"
 	"time"
 )
 
+const (
+	pingInterval   = 20 * time.Second
+	maxPongLatency = 15 * time.Second
+)
+
 /** 
  * Connect to the ngrok server
  */
@@ -79,6 +85,44 @@ func proxy(proxyAddr string, s *State, ctl *ui.Controller) {
 	ctl.Update(s)
 }
 
+/*
+ * Hearbeating ensure our connection ngrokd is still live
+ */
+func heartbeat(lastPongAddr *int64, c conn.Conn) {
+	lastPing := time.Unix(atomic.LoadInt64(lastPongAddr)-1, 0)
+	ping := time.NewTicker(pingInterval)
+	pongCheck := time.NewTicker(time.Second)
+
+	defer func() {
+		c.Close()
+		ping.Stop()
+		pongCheck.Stop()
+	}()
+
+	for {
+		select {
+		case <-pongCheck.C:
+			lastPong := time.Unix(0, atomic.LoadInt64(lastPongAddr))
+			needPong := lastPong.Sub(lastPing) < 0
+			pongLatency := time.Since(lastPing)
+
+			if needPong && pongLatency > maxPongLatency {
+				c.Info("Last ping: %v, Last pong: %v", lastPing, lastPong)
+				c.Info("Connection stale, haven't gotten PongMsg in %d seconds", int(pongLatency.Seconds()))
+				return
+			}
+
+		case <-ping.C:
+			err := msg.WriteMsg(c, &msg.PingMsg{})
+			if err != nil {
+				c.Debug("Got error %v when writing PingMsg", err)
+				return
+			}
+			lastPing = time.Now()
+		}
+	}
+}
+
 /**
  * Establishes and manages a tunnel control connection with the server
  */
@@ -129,12 +173,15 @@ func control(s *State, ctl *ui.Controller) {
 
 	// update UI state
 	conn.Info("Tunnel established at %v", regAck.Url)
-	//state.version = regAck.Version
 	s.publicUrl = regAck.Url
 	s.status = "online"
 	s.serverVersion = regAck.Version
 	ctl.Update(s)
 
+	// start the heartbeat
+	lastPong := time.Now().UnixNano()
+	go heartbeat(&lastPong, conn)
+
 	// main control loop
 	for {
 		var m msg.Message
@@ -147,8 +194,7 @@ func control(s *State, ctl *ui.Controller) {
 			go proxy(regAck.ProxyAddr, s, ctl)
 
 		case *msg.PongMsg:
-			//msg.WriteMsg(conn, &msg.PongMsg{})
-			// XXX: update our live status
+			atomic.StoreInt64(&lastPong, time.Now().UnixNano())
 		}
 	}
 }

+ 4 - 6
src/ngrok/server/control.go

@@ -6,7 +6,6 @@ import (
 	"ngrok/conn"
 	"ngrok/msg"
 	"runtime/debug"
-	"sync/atomic"
 	"time"
 )
 
@@ -25,7 +24,7 @@ type Control struct {
 	stop chan (msg.Message)
 
 	// heartbeat
-	lastPing int64
+	lastPing time.Time
 
 	// tunnel
 	tun *Tunnel
@@ -37,7 +36,7 @@ func NewControl(tcpConn *net.TCPConn) {
 		out:      make(chan (interface{}), 1),
 		in:       make(chan (msg.Message), 1),
 		stop:     make(chan (msg.Message), 1),
-		lastPing: time.Now().Unix(),
+		lastPing: time.Now(),
 	}
 
 	go c.managerThread()
@@ -67,8 +66,7 @@ func (c *Control) managerThread() {
 			msg.WriteMsg(c.conn, m)
 
 		case <-reap.C:
-			lastPing := time.Unix(c.lastPing, 0)
-			if time.Since(lastPing) > pingTimeoutInterval {
+			if time.Since(c.lastPing) > pingTimeoutInterval {
 				c.conn.Info("Lost heartbeat")
 				metrics.lostHeartbeatMeter.Mark(1)
 				return
@@ -87,7 +85,7 @@ func (c *Control) managerThread() {
 				c.tun = newTunnel(m, c)
 
 			case *msg.PingMsg:
-				atomic.StoreInt64(&c.lastPing, time.Now().Unix())
+				c.lastPing = time.Now()
 				c.out <- &msg.PongMsg{}
 
 			case *msg.VersionMsg: