manager.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. package server
  2. import (
  3. "fmt"
  4. cache "github.com/pmylund/go-cache"
  5. "math/rand"
  6. "net"
  7. "strings"
  8. "sync"
  9. "time"
  10. )
  11. const (
  12. cacheDuration time.Duration = 24 * time.Hour
  13. cacheCleanupInterval time.Duration = time.Minute
  14. )
  15. /**
  16. * TunnelManager: Manages a set of tunnels
  17. */
  18. type TunnelManager struct {
  19. domain string
  20. tunnels map[string]*Tunnel
  21. idDomainAffinity *cache.Cache
  22. ipDomainAffinity *cache.Cache
  23. sync.RWMutex
  24. }
  25. func NewTunnelManager(domain string) *TunnelManager {
  26. return &TunnelManager{
  27. domain: domain,
  28. tunnels: make(map[string]*Tunnel),
  29. idDomainAffinity: cache.New(cacheDuration, cacheCleanupInterval),
  30. ipDomainAffinity: cache.New(cacheDuration, cacheCleanupInterval),
  31. }
  32. }
  33. func (m *TunnelManager) Add(t *Tunnel) error {
  34. assignTunnel := func(url string) bool {
  35. m.Lock()
  36. defer m.Unlock()
  37. if m.tunnels[url] == nil {
  38. m.tunnels[url] = t
  39. return true
  40. }
  41. return false
  42. }
  43. url := ""
  44. switch t.regMsg.Protocol {
  45. case "tcp":
  46. addr := t.listener.Addr().(*net.TCPAddr)
  47. url = fmt.Sprintf("tcp://%s:%d", m.domain, addr.Port)
  48. if !assignTunnel(url) {
  49. return t.Error("TCP at %s already registered!", url)
  50. }
  51. metrics.tcpTunnelMeter.Mark(1)
  52. case "http":
  53. if strings.TrimSpace(t.regMsg.Hostname) != "" {
  54. url = fmt.Sprintf("http://%s", t.regMsg.Hostname)
  55. } else if strings.TrimSpace(t.regMsg.Subdomain) != "" {
  56. url = fmt.Sprintf("http://%s.%s", t.regMsg.Subdomain, m.domain)
  57. }
  58. if url != "" {
  59. if !assignTunnel(url) {
  60. return t.Warn("The tunnel address %s is already registered!", url)
  61. }
  62. } else {
  63. clientIp := t.ctl.conn.RemoteAddr().(*net.TCPAddr).IP.String()
  64. clientId := t.regMsg.ClientId
  65. // try to give the same subdomain back if it's available
  66. subdomain := fmt.Sprintf("%x", rand.Int31())
  67. if lastDomain, ok := m.idDomainAffinity.Get(clientId); ok {
  68. t.Debug("Found affinity for subdomain %s with client id %s", subdomain, clientId)
  69. subdomain = lastDomain.(string)
  70. } else if lastDomain, ok = m.ipDomainAffinity.Get(clientIp); ok {
  71. t.Debug("Found affinity for subdomain %s with client ip %s", subdomain, clientIp)
  72. subdomain = lastDomain.(string)
  73. }
  74. // pick one randomly
  75. for {
  76. url = fmt.Sprintf("http://%s.%s", subdomain, m.domain)
  77. if assignTunnel(url) {
  78. break
  79. } else {
  80. subdomain = fmt.Sprintf("%x", rand.Int31())
  81. }
  82. }
  83. // save our choice for later
  84. // XXX: this is going to leak memory
  85. m.idDomainAffinity.Set(clientId, subdomain, 0)
  86. m.ipDomainAffinity.Set(clientIp, subdomain, 0)
  87. }
  88. default:
  89. return t.Error("Unrecognized protocol type %s", t.regMsg.Protocol)
  90. }
  91. t.url = url
  92. metrics.tunnelMeter.Mark(1)
  93. //metrics.tunnelGauge.Update(int64(len(m.tunnels)))
  94. switch t.regMsg.OS {
  95. case "windows":
  96. metrics.windowsCounter.Inc(1)
  97. case "linux":
  98. metrics.linuxCounter.Inc(1)
  99. case "darwin":
  100. metrics.osxCounter.Inc(1)
  101. default:
  102. metrics.otherCounter.Inc(1)
  103. }
  104. return nil
  105. }
  106. func (m *TunnelManager) Del(url string) {
  107. m.Lock()
  108. defer m.Unlock()
  109. delete(m.tunnels, url)
  110. }
  111. func (m *TunnelManager) Get(url string) *Tunnel {
  112. m.RLock()
  113. defer m.RUnlock()
  114. return m.tunnels[url]
  115. }