tunnel.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. package server
  2. import (
  3. log "code.google.com/p/log4go"
  4. "fmt"
  5. "net"
  6. "ngrok/conn"
  7. nlog "ngrok/log"
  8. "ngrok/proto"
  9. )
  10. /**
  11. * Tunnel: A control connection, metadata and proxy connections which
  12. * route public traffic to a firewalled endpoint.
  13. */
  14. type Tunnel struct {
  15. regMsg *proto.RegMsg
  16. // public url
  17. url string
  18. // tcp listener
  19. listener *net.TCPListener
  20. // control connection
  21. ctl *Control
  22. // proxy connections
  23. proxies chan conn.Conn
  24. // logger
  25. nlog.Logger
  26. }
  27. func newTunnel(msg *proto.RegMsg, ctl *Control) (t *Tunnel) {
  28. t = &Tunnel{
  29. regMsg: msg,
  30. ctl: ctl,
  31. proxies: make(chan conn.Conn),
  32. Logger: nlog.NewPrefixLogger(),
  33. }
  34. switch t.regMsg.Protocol {
  35. case "tcp":
  36. var err error
  37. t.listener, err = net.ListenTCP("tcp", &net.TCPAddr{net.ParseIP("0.0.0.0"), 0})
  38. if err != nil {
  39. panic(err)
  40. }
  41. go t.listenTcp(t.listener)
  42. default:
  43. }
  44. if err := tunnels.Add(t); err != nil {
  45. t.ctl.stop <- &proto.RegAckMsg{Error: fmt.Sprint(err)}
  46. return
  47. }
  48. t.ctl.conn.AddLogPrefix(t.Id())
  49. t.AddLogPrefix(t.Id())
  50. t.Info("Registered new tunnel")
  51. t.ctl.out <- &proto.RegAckMsg{Url: t.url, ProxyAddr: fmt.Sprintf("%s", proxyAddr)}
  52. return
  53. }
  54. func (t *Tunnel) shutdown() {
  55. t.Info("Shutting down")
  56. // if we have a public listener (this is a raw TCP tunnel, shut it down
  57. if t.listener != nil {
  58. t.listener.Close()
  59. }
  60. // remove ourselves from the tunnel registry
  61. tunnels.Del(t.url)
  62. // XXX: should we shut down all of the proxy connections?
  63. // XXX: will this block if this is being called from Control's shutdown code?
  64. t.ctl.stop <- nil
  65. }
  66. func (t *Tunnel) Id() string {
  67. return t.url
  68. }
  69. /**
  70. * Listens for new public tcp connections from the internet.
  71. */
  72. func (t *Tunnel) listenTcp(listener *net.TCPListener) {
  73. for {
  74. defer func() {
  75. if r := recover(); r != nil {
  76. log.Warn("listenTcp failed with error %v", r)
  77. }
  78. }()
  79. // accept public connections
  80. tcpConn, err := listener.AcceptTCP()
  81. if err != nil {
  82. panic(err)
  83. }
  84. conn := conn.NewTCP(tcpConn, "pub")
  85. conn.AddLogPrefix(t.Id())
  86. go t.HandlePublicConnection(conn)
  87. }
  88. }
  89. func (t *Tunnel) HandlePublicConnection(publicConn conn.Conn) {
  90. defer publicConn.Close()
  91. defer func() {
  92. if r := recover(); r != nil {
  93. publicConn.Warn("HandlePublicConnection failed with error %v", r)
  94. }
  95. }()
  96. metrics.requestTimer.Time(func() {
  97. metrics.requestMeter.Mark(1)
  98. t.Debug("Requesting new proxy connection")
  99. t.ctl.out <- &proto.ReqProxyMsg{}
  100. proxyConn := <-t.proxies
  101. t.Info("Returning proxy connection %s", proxyConn.Id())
  102. defer proxyConn.Close()
  103. conn.Join(publicConn, proxyConn)
  104. })
  105. }
  106. func (t *Tunnel) RegisterProxy(conn conn.Conn) {
  107. t.Info("Registered proxy connection %s", conn.Id())
  108. conn.AddLogPrefix(t.Id())
  109. t.proxies <- conn
  110. }