tunnel.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. package server
  2. import (
  3. "fmt"
  4. "net"
  5. "ngrok/conn"
  6. "ngrok/log"
  7. "ngrok/proto"
  8. )
  9. /**
  10. * Tunnel: A control connection, metadata and proxy connections which
  11. * route public traffic to a firewalled endpoint.
  12. */
  13. type Tunnel struct {
  14. regMsg *proto.RegMsg
  15. // public url
  16. url string
  17. // tcp listener
  18. listener *net.TCPListener
  19. // control connection
  20. ctl *Control
  21. // proxy connections
  22. proxies chan conn.Conn
  23. // logger
  24. log.Logger
  25. }
  26. func newTunnel(msg *proto.RegMsg, ctl *Control) {
  27. t := &Tunnel{
  28. regMsg: msg,
  29. ctl: ctl,
  30. proxies: make(chan conn.Conn),
  31. Logger: log.NewPrefixLogger(),
  32. }
  33. switch t.regMsg.Protocol {
  34. case "tcp":
  35. var err error
  36. t.listener, err = net.ListenTCP("tcp", &net.TCPAddr{net.ParseIP("0.0.0.0"), 0})
  37. if err != nil {
  38. panic(err)
  39. }
  40. go t.listenTcp(t.listener)
  41. default:
  42. }
  43. if err := tunnels.Add(t); err != nil {
  44. t.ctl.stop <- &proto.RegAckMsg{Error: fmt.Sprint(err)}
  45. return
  46. }
  47. t.ctl.conn.AddLogPrefix(t.Id())
  48. t.AddLogPrefix(t.Id())
  49. t.Info("Registered new tunnel")
  50. t.ctl.out <- &proto.RegAckMsg{Url: t.url, ProxyAddr: fmt.Sprintf("%s", proxyAddr)}
  51. }
  52. func (t *Tunnel) shutdown() {
  53. // XXX: this is completely unused right now
  54. t.Info("Shutting down")
  55. // stop any go routines
  56. // close all proxy and public connections
  57. // stop any metrics
  58. t.ctl.stop <- nil
  59. }
  60. func (t *Tunnel) Id() string {
  61. return t.url
  62. }
  63. /**
  64. * Listens for new public tcp connections from the internet.
  65. */
  66. func (t *Tunnel) listenTcp(listener *net.TCPListener) {
  67. for {
  68. // accept public connections
  69. tcpConn, err := listener.AcceptTCP()
  70. if err != nil {
  71. panic(err)
  72. }
  73. conn := conn.NewTCP(tcpConn, "pub")
  74. conn.AddLogPrefix(t.Id())
  75. go func() {
  76. defer func() {
  77. if r := recover(); r != nil {
  78. conn.Warn("Failed with error %v", r)
  79. }
  80. }()
  81. defer conn.Close()
  82. t.HandlePublicConnection(conn)
  83. }()
  84. }
  85. }
  86. func (t *Tunnel) HandlePublicConnection(publicConn conn.Conn) {
  87. metrics.requestTimer.Time(func() {
  88. metrics.requestMeter.Mark(1)
  89. t.Debug("Requesting new proxy connection")
  90. t.ctl.out <- &proto.ReqProxyMsg{}
  91. proxyConn := <-t.proxies
  92. t.Info("Returning proxy connection %s", proxyConn.Id())
  93. defer proxyConn.Close()
  94. conn.Join(publicConn, proxyConn)
  95. })
  96. }
  97. func (t *Tunnel) RegisterProxy(conn conn.Conn) {
  98. t.Info("Registered proxy connection %s", conn.Id())
  99. conn.AddLogPrefix(t.Id())
  100. t.proxies <- conn
  101. }