Browse Source

Fixed an issues where the affinity cache could not be serialized/deserialized or saved/loaded from disk. Added better logging around affinity cache load/save. TCP tunnel urls are now handled in the affinity cache and will attempt to return to you the same port you had previously.

Alan Shreve 12 years ago
parent
commit
7032606103
4 changed files with 113 additions and 42 deletions
  1. 2 7
      src/ngrok/cache/lru.go
  2. 8 2
      src/ngrok/log/logger.go
  3. 62 22
      src/ngrok/server/registry.go
  4. 41 11
      src/ngrok/server/tunnel.go

+ 2 - 7
src/ngrok/cache/lru.go

@@ -167,17 +167,12 @@ func (lru *LRUCache) Items() []Item {
 
 func (lru *LRUCache) SaveItems(w io.Writer) error {
 	items := lru.Items()
-
-	for _, v := range items {
-		gob.Register(v)
-	}
-
 	encoder := gob.NewEncoder(w)
 	return encoder.Encode(items)
 }
 
 func (lru *LRUCache) SaveItemsToFile(path string) error {
-	if wr, err := os.Open(path); err != nil {
+	if wr, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644); err != nil {
 		return err
 	} else {
 		defer wr.Close()
@@ -188,7 +183,7 @@ func (lru *LRUCache) SaveItemsToFile(path string) error {
 func (lru *LRUCache) LoadItems(r io.Reader) error {
 	items := make([]Item, 0)
 	decoder := gob.NewDecoder(r)
-	if err := decoder.Decode(items); err != nil {
+	if err := decoder.Decode(&items); err != nil {
 		return err
 	}
 

+ 8 - 2
src/ngrok/log/logger.go

@@ -37,8 +37,14 @@ type PrefixLogger struct {
 	prefix string
 }
 
-func NewPrefixLogger() Logger {
-	return &PrefixLogger{Logger: &root}
+func NewPrefixLogger(prefixes ...string) Logger {
+	logger := &PrefixLogger{Logger: &root}
+
+	for _, p := range prefixes {
+		logger.AddLogPrefix(p)
+	}
+
+	return logger
 }
 
 func (pl *PrefixLogger) pfx(fmtstr string) interface{} {

+ 62 - 22
src/ngrok/server/registry.go

@@ -1,9 +1,11 @@
 package server
 
 import (
+	"encoding/gob"
 	"fmt"
 	"net"
 	"ngrok/cache"
+	"ngrok/log"
 	"sync"
 	"time"
 )
@@ -22,32 +24,54 @@ func (url cacheUrl) Size() int {
 type TunnelRegistry struct {
 	tunnels  map[string]*Tunnel
 	affinity *cache.LRUCache
+	log.Logger
 	sync.RWMutex
 }
 
 func NewTunnelRegistry(cacheSize uint64, cacheFile string) *TunnelRegistry {
-	manager := &TunnelRegistry{
+	registry := &TunnelRegistry{
 		tunnels:  make(map[string]*Tunnel),
 		affinity: cache.NewLRUCache(cacheSize),
+		Logger:   log.NewPrefixLogger("registry"),
 	}
 
+	// LRUCache uses Gob encoding. Unfortunately, Gob is fickle and will fail
+	// to encode or decode any non-primitive types that haven't been "registered"
+	// with it. Since we store cacheUrl objects, we need to register them here first
+	// for the encoding/decoding to work
+	var urlobj cacheUrl
+	gob.Register(urlobj)
+
 	if cacheFile != "" {
 		// load cache entries from file
-		manager.affinity.LoadItemsFromFile(cacheFile)
+		err := registry.affinity.LoadItemsFromFile(cacheFile)
+		if err != nil {
+			registry.Error("Failed to load affinity cache %s: %v", cacheFile, err)
+		}
 
 		// save cache periodically to file
-		manager.SaveCacheThread(cacheFile, cacheSaveInterval)
+		registry.SaveCacheThread(cacheFile, cacheSaveInterval)
+	} else {
+		registry.Info("No affinity cache specified")
 	}
 
-	return manager
+	return registry
 }
 
 // Spawns a goroutine the periodically saves the cache to a file.
 func (r *TunnelRegistry) SaveCacheThread(path string, interval time.Duration) {
 	go func() {
+		r.Info("Saving affinity cache to %s every %s", path, interval.String())
 		for {
 			time.Sleep(interval)
-			r.affinity.SaveItemsToFile(path)
+
+			r.Debug("Saving affinity cache")
+			err := r.affinity.SaveItemsToFile(path)
+			if err != nil {
+				r.Error("Failed to save affinity cache: %v", err)
+			} else {
+				r.Info("Saved affinity cache")
+			}
 		}
 	}()
 }
@@ -67,17 +91,15 @@ func (r *TunnelRegistry) Register(url string, t *Tunnel) error {
 	return nil
 }
 
-// Register a tunnel with the following process:
-// Consult the affinity cache to try to assign a previously used tunnel url if possible
-// Generate new urls repeatedly with the urlFn and register until one is available.
-func (r *TunnelRegistry) RegisterRepeat(urlFn func() string, t *Tunnel) string {
-	var url string
-
+func (r *TunnelRegistry) cacheKeys(t *Tunnel) (ip string, id string) {
 	clientIp := t.ctl.conn.RemoteAddr().(*net.TCPAddr).IP.String()
 	clientId := t.regMsg.ClientId
 
-	ipCacheKey := fmt.Sprintf("client-ip:%s", clientIp)
-	idCacheKey := fmt.Sprintf("client-id:%s", clientId)
+	return fmt.Sprintf("client-ip:%s", clientIp), fmt.Sprintf("client-id:%s", clientId)
+}
+
+func (r *TunnelRegistry) GetCachedRegistration(t *Tunnel) (url string) {
+	ipCacheKey, idCacheKey := r.cacheKeys(t)
 
 	// check cache for ID first, because we prefer that over IP which might
 	// not be specific to a user because of NATs
@@ -87,24 +109,42 @@ func (r *TunnelRegistry) RegisterRepeat(urlFn func() string, t *Tunnel) string {
 	} else if v, ok := r.affinity.Get(ipCacheKey); ok {
 		url = string(v.(cacheUrl))
 		t.Debug("Found registry affinity %s for %s", url, ipCacheKey)
-	} else {
+	}
+	return
+}
+
+func (r *TunnelRegistry) RegisterAndCache(url string, t *Tunnel) (err error) {
+	if err = r.Register(url, t); err == nil {
+		// we successfully assigned a url, cache it
+		ipCacheKey, idCacheKey := r.cacheKeys(t)
+		r.affinity.Set(ipCacheKey, cacheUrl(url))
+		r.affinity.Set(idCacheKey, cacheUrl(url))
+	}
+	return
+
+}
+
+// Register a tunnel with the following process:
+// Consult the affinity cache to try to assign a previously used tunnel url if possible
+// Generate new urls repeatedly with the urlFn and register until one is available.
+func (r *TunnelRegistry) RegisterRepeat(urlFn func() string, t *Tunnel) (string, error) {
+	url := r.GetCachedRegistration(t)
+	if url == "" {
 		url = urlFn()
 	}
 
-	for {
-		if err := r.Register(url, t); err != nil {
+	maxAttempts := 5
+	for i := 0; i < maxAttempts; i++ {
+		if err := r.RegisterAndCache(url, t); err != nil {
 			// pick a new url and try again
 			url = urlFn()
 		} else {
 			// we successfully assigned a url, we're done
-
-			// save our choice in the cache
-			r.affinity.Set(ipCacheKey, cacheUrl(url))
-			r.affinity.Set(idCacheKey, cacheUrl(url))
-
-			return url
+			return url, nil
 		}
 	}
+
+	return "", fmt.Errorf("Failed to assign a URL after %d attempts!", maxAttempts)
 }
 
 func (r *TunnelRegistry) Del(url string) {

+ 41 - 11
src/ngrok/server/tunnel.go

@@ -9,6 +9,7 @@ import (
 	"ngrok/log"
 	"ngrok/msg"
 	"ngrok/version"
+	"strconv"
 	"strings"
 	"sync/atomic"
 	"time"
@@ -56,26 +57,50 @@ func newTunnel(m *msg.RegMsg, ctl *Control) (t *Tunnel) {
 		t.ctl.stop <- &msg.RegAckMsg{Error: err.Error()}
 	}
 
+	var err error
+
 	switch t.regMsg.Protocol {
 	case "tcp":
-		var err error
-		t.listener, err = net.ListenTCP("tcp", &net.TCPAddr{IP: net.ParseIP("0.0.0.0"), Port: 0})
+		var port int = 0
+
+		// try to return to you the same port you had before
+		cachedUrl := tunnels.GetCachedRegistration(t)
+		if cachedUrl != "" {
+			parts := strings.Split(cachedUrl, ":")
+			portPart := parts[len(parts)-1]
+			port, err = strconv.Atoi(portPart)
+			if err != nil {
+				t.ctl.conn.Error("Failed to parse cached url port as integer: %s", portPart)
+				// continue with zero
+				port = 0
+			}
+		}
 
-		if err != nil {
-			t.ctl.conn.Error("Failed to create tunnel. Error binding TCP listener: %v", err)
+		// Bind for TCP connections
+		t.listener, err = net.ListenTCP("tcp", &net.TCPAddr{IP: net.ParseIP("0.0.0.0"), Port: port})
+
+		// If we failed with a custom port, try with a random one
+		if err != nil && port != 0 {
+			t.ctl.conn.Warn("Failed to get custom port %d: %v, trying a random one", port, err)
+			t.listener, err = net.ListenTCP("tcp", &net.TCPAddr{IP: net.ParseIP("0.0.0.0"), Port: 0})
+		}
 
-			t.ctl.stop <- &msg.RegAckMsg{Error: "Internal server error"}
+		// we tried to bind with a random port and failed (no more ports available?)
+		if err != nil {
+			failReg(t.ctl.conn.Error("Error binding TCP listener: %v", err))
+			return
 		}
 
+		// create the url
 		addr := t.listener.Addr().(*net.TCPAddr)
 		t.url = fmt.Sprintf("tcp://%s:%d", domain, addr.Port)
 
-		if err = tunnels.Register(t.url, t); err != nil {
-			// This should never be possible because the OS will only assign
-			// available ports to us.
-			t.Error("TCP listener bound, but failed to register: %s", err.Error())
+		// register it
+		if err = tunnels.RegisterAndCache(t.url, t); err != nil {
+			// This should never be possible because the OS will
+			// only assign available ports to us.
 			t.listener.Close()
-			failReg(err)
+			failReg(fmt.Errorf("TCP listener bound, but failed to register %s", t.url))
 			return
 		}
 
@@ -94,9 +119,14 @@ func newTunnel(m *msg.RegMsg, ctl *Control) (t *Tunnel) {
 				return
 			}
 		} else {
-			t.url = tunnels.RegisterRepeat(func() string {
+			t.url, err = tunnels.RegisterRepeat(func() string {
 				return fmt.Sprintf("http://%x.%s", rand.Int31(), domain)
 			}, t)
+
+			if err != nil {
+				failReg(err)
+				return
+			}
 		}
 	}