tunnel.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. package server
  2. import (
  3. "fmt"
  4. "net"
  5. "ngrok/conn"
  6. "ngrok/log"
  7. "ngrok/proto"
  8. )
  9. /**
  10. * Tunnel: A control connection, metadata and proxy connections which
  11. * route public traffic to a firewalled endpoint.
  12. */
  13. type Tunnel struct {
  14. regMsg *proto.RegMsg
  15. // public url
  16. url string
  17. // tcp listener
  18. listener *net.TCPListener
  19. // control connection
  20. ctl *Control
  21. // proxy connections
  22. proxies chan conn.Conn
  23. // logger
  24. log.Logger
  25. }
  26. func newTunnel(msg *proto.RegMsg, ctl *Control) {
  27. t := &Tunnel{
  28. regMsg: msg,
  29. ctl: ctl,
  30. proxies: make(chan conn.Conn),
  31. Logger: log.NewPrefixLogger(),
  32. }
  33. switch t.regMsg.Protocol {
  34. case "tcp":
  35. var err error
  36. t.listener, err = net.ListenTCP("tcp", &net.TCPAddr{net.ParseIP("0.0.0.0"), 0})
  37. if err != nil {
  38. panic(err)
  39. }
  40. go t.listenTcp(t.listener)
  41. default:
  42. }
  43. tunnels.Add(t)
  44. t.ctl.conn.AddLogPrefix(t.Id())
  45. t.AddLogPrefix(t.Id())
  46. t.Info("Registered new tunnel")
  47. t.ctl.out <- &proto.RegAckMsg{Url: t.url, ProxyAddr: fmt.Sprintf("%s", proxyAddr)}
  48. //go t.managerThread()
  49. }
  50. func (t *Tunnel) shutdown() {
  51. t.Info("Shutting down")
  52. // stop any go routines
  53. // close all proxy and public connections
  54. // stop any metrics
  55. t.ctl.stop <- 1
  56. }
  57. func (t *Tunnel) Id() string {
  58. return t.url
  59. }
  60. func (t *Tunnel) managerThread() {
  61. }
  62. /**
  63. * Listens for new public tcp connections from the internet.
  64. */
  65. func (t *Tunnel) listenTcp(listener *net.TCPListener) {
  66. for {
  67. // accept public connections
  68. tcpConn, err := listener.AcceptTCP()
  69. if err != nil {
  70. panic(err)
  71. }
  72. conn := conn.NewTCP(tcpConn, "pub")
  73. conn.AddLogPrefix(t.Id())
  74. go func() {
  75. defer func() {
  76. if r := recover(); r != nil {
  77. conn.Warn("Failed with error %v", r)
  78. }
  79. }()
  80. defer conn.Close()
  81. t.HandlePublicConnection(conn)
  82. }()
  83. }
  84. }
  85. func (t *Tunnel) HandlePublicConnection(publicConn conn.Conn) {
  86. metrics.requestTimer.Time(func() {
  87. metrics.requestMeter.Mark(1)
  88. t.Debug("Requesting new proxy connection")
  89. t.ctl.out <- &proto.ReqProxyMsg{}
  90. proxyConn := <-t.proxies
  91. t.Info("Returning proxy connection %s", proxyConn.Id())
  92. defer proxyConn.Close()
  93. conn.Join(publicConn, proxyConn)
  94. })
  95. }
  96. func (t *Tunnel) RegisterProxy(conn conn.Conn) {
  97. t.Info("Registered proxy connection %s", conn.Id())
  98. conn.AddLogPrefix(t.Id())
  99. t.proxies <- conn
  100. }