control.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. package server
  2. import (
  3. "io"
  4. "net"
  5. "ngrok/conn"
  6. "ngrok/proto"
  7. "runtime/debug"
  8. "sync/atomic"
  9. "time"
  10. )
  11. const (
  12. pingInterval = 30 * time.Second
  13. connReapInterval = pingInterval * 5
  14. )
  15. type Control struct {
  16. // actual connection
  17. conn conn.Conn
  18. // channels for communicating messages over the connection
  19. out chan (interface{})
  20. in chan (proto.Message)
  21. stop chan (int)
  22. // heartbeat
  23. lastPong int64
  24. // tunnel
  25. tun *Tunnel
  26. }
  27. func NewControl(tcpConn *net.TCPConn) {
  28. c := &Control{
  29. conn: conn.NewTCP(tcpConn, "ctl"),
  30. out: make(chan (interface{}), 1),
  31. in: make(chan (proto.Message), 1),
  32. stop: make(chan (int), 1),
  33. lastPong: time.Now().Unix(),
  34. }
  35. go c.managerThread()
  36. go c.readThread()
  37. }
  38. func (c *Control) managerThread() {
  39. ping := time.NewTicker(pingInterval)
  40. reap := time.NewTicker(connReapInterval)
  41. // all shutdown functionality in here
  42. defer func() {
  43. if err := recover(); err != nil {
  44. c.conn.Info("Control::managerThread failed with error %v: %s", err, debug.Stack())
  45. }
  46. ping.Stop()
  47. reap.Stop()
  48. close(c.out)
  49. close(c.in)
  50. close(c.stop)
  51. c.conn.Close()
  52. }()
  53. for {
  54. select {
  55. case m := <-c.out:
  56. proto.WriteMsg(c.conn, m)
  57. case <-ping.C:
  58. proto.WriteMsg(c.conn, &proto.PingMsg{})
  59. case <-reap.C:
  60. if (time.Now().Unix() - c.lastPong) > 60 {
  61. c.conn.Info("Lost heartbeat")
  62. metrics.lostHeartbeatMeter.Mark(1)
  63. return
  64. }
  65. case <-c.stop:
  66. return
  67. case msg := <-c.in:
  68. switch msg.GetType() {
  69. case "RegMsg":
  70. c.conn.Info("Registering new tunnel")
  71. newTunnel(msg.(*proto.RegMsg), c)
  72. case "PongMsg":
  73. atomic.StoreInt64(&c.lastPong, time.Now().Unix())
  74. case "VersionReqMsg":
  75. c.out <- &proto.VersionRespMsg{Version: version}
  76. }
  77. }
  78. }
  79. }
  80. func (c *Control) readThread() {
  81. defer func() {
  82. if err := recover(); err != nil {
  83. c.conn.Info("Control::readThread failed with error %v: %s", err, debug.Stack())
  84. }
  85. c.stop <- 1
  86. }()
  87. // read messages from the control channel
  88. for {
  89. if msg, err := proto.ReadMsg(c.conn); err != nil {
  90. if err == io.EOF {
  91. c.conn.Info("EOF")
  92. return
  93. } else {
  94. panic(err)
  95. }
  96. } else {
  97. c.in <- msg
  98. }
  99. }
  100. }