Browse Source

initial prototype of ngrok

Alan Shreve 12 years ago
commit
8f4795ecac
29 changed files with 2233 additions and 0 deletions
  1. 2 0
      .gitignore
  2. 13 0
      Makefile
  3. 135 0
      client/cli.go
  4. 96 0
      client/history.go
  5. 215 0
      client/main.go
  6. 41 0
      client/metrics.go
  7. 57 0
      client/state.go
  8. 41 0
      client/ui/broadcast.go
  9. 145 0
      client/ui/http.go
  10. 24 0
      client/ui/interface.go
  11. 136 0
      client/ui/terminal.go
  12. 40 0
      client/ui/ui.go
  13. 98 0
      conn/conn.go
  14. 42 0
      conn/http.go
  15. 71 0
      conn/tee.go
  16. 65 0
      logger.go
  17. 9 0
      main/client.go
  18. 9 0
      main/server.go
  19. 71 0
      proto/conn.go
  20. 107 0
      proto/msg.go
  21. 54 0
      proto/pack.go
  22. 119 0
      server/control.go
  23. 63 0
      server/http.go
  24. 131 0
      server/main.go
  25. 117 0
      server/manager.go
  26. 77 0
      server/metrics.go
  27. 127 0
      server/tunnel.go
  28. 23 0
      templates/body.html
  29. 105 0
      templates/page.html

+ 2 - 0
.gitignore

@@ -0,0 +1,2 @@
+*.swp
+bin/

+ 13 - 0
Makefile

@@ -0,0 +1,13 @@
+.PHONY: default bindir server client
+
+default: client server
+
+bindir:
+	mkdir -p bin
+
+server: bindir
+	GOPATH=~ go build -o bin/ngrokd main/server.go
+
+client: bindir
+	GOPATH=~ go build -o bin/ngrok main/client.go
+

+ 135 - 0
client/cli.go

@@ -0,0 +1,135 @@
+package client
+
+import (
+	"errors"
+	"flag"
+	"fmt"
+	"os"
+	"strconv"
+	"strings"
+)
+
+var (
+	PORT_OUT_OF_RANGE error = errors.New("Port number must be between 1 and 65535")
+)
+
+type Options struct {
+	server      string
+	auth        string
+	hostname    string
+	localaddr   string
+	protocol    string
+	url         string
+	subdomain   string
+	historySize int
+}
+
+func fail(msg string, args ...interface{}) {
+	//log.Error(msg, args..)
+	fmt.Printf(msg, args...)
+	flag.PrintDefaults()
+	os.Exit(1)
+}
+
+func parsePort(portString string) (err error) {
+	var port int
+	if port, err = strconv.Atoi(portString); err != nil {
+		return err
+	}
+
+	if port < 1 || port > 65535 {
+		return PORT_OUT_OF_RANGE
+	}
+
+	return
+}
+
+// Local address could be a port of a host:port string
+// we always return a host:port string from this function or fail
+func parseLocalAddr() string {
+	if flag.NArg() == 0 {
+		fail("LOCAL not specified, specify a port number or host:port connection string")
+	}
+
+	if flag.NArg() > 1 {
+		fail("Only one LOCAL may be specified, not %d", flag.NArg())
+	}
+
+	addr := flag.Arg(0)
+
+	// try to parse as a port number
+	if err := parsePort(addr); err == nil {
+		return fmt.Sprintf("127.0.0.1:%s", addr)
+	} else if err == PORT_OUT_OF_RANGE {
+		fail("%s is not in the valid port range 1-65535")
+	}
+
+	// try to parse as a connection string
+	parts := strings.Split(addr, ":")
+	if len(parts) != 2 {
+		fail("%s is not a port number of a host:port connection string", addr)
+	}
+
+	if parsePort(parts[1]) != nil {
+		fail("The port of the connection string '%s' is not a valid port number (1-65535)",
+			parts[1])
+	}
+
+	return addr
+}
+
+func parseProtocol(proto string) string {
+	switch proto {
+	case "http":
+		fallthrough
+	case "tcp":
+		return proto
+	default:
+		fail("%s is not a valid protocol", proto)
+	}
+	panic("unreachable")
+}
+
+func parseArgs() *Options {
+	server := flag.String(
+		"server",
+		"ngrok.com:2280",
+		"The remote ngrok server")
+
+	auth := flag.String(
+		"auth",
+		"",
+		"username:password HTTP basic auth creds protecting the public tunnel endpoint")
+
+	hostname := flag.String(
+		"hostname",
+		"",
+		"A full DNS hostname to identify public tunnel endpoint. (Advanced, requires you CNAME your DNS)")
+
+	subdomain := flag.String(
+		"subdomain",
+		"",
+		"Request a custom subdomain from the ngrok server. (HTTP mode only)")
+
+	protocol := flag.String(
+		"proto",
+		"http",
+		"The protocol of the traffic over the tunnel {'http', 'tcp'} (default: 'http')")
+
+	historySize := flag.Int(
+		"history",
+		20,
+		"The number of previous requests to keep in your history")
+
+	flag.Parse()
+
+	return &Options{
+		server:      *server,
+		auth:        *auth,
+		hostname:    *hostname,
+		subdomain:   *subdomain,
+		localaddr:   parseLocalAddr(),
+		protocol:    parseProtocol(*protocol),
+		historySize: *historySize,
+	}
+}

+ 96 - 0
client/history.go

@@ -0,0 +1,96 @@
+package client
+
+import (
+        "time"
+	"container/list"
+	"net/http"
+)
+
+type RequestHistoryEntry struct {
+	req  *http.Request
+	resp *http.Response
+        start time.Time
+        duration time.Duration
+}
+
+type RequestHistory struct {
+	maxSize    int
+	reqToEntry map[*http.Request]*RequestHistoryEntry
+	reqs       chan *http.Request
+	resps      chan *http.Response
+	history    *list.List
+	onChange   func([]*RequestHistoryEntry)
+        metrics *ClientMetrics
+}
+
+func NewRequestHistory(maxSize int, metrics *ClientMetrics, onChange func([]*RequestHistoryEntry)) *RequestHistory {
+	rh := &RequestHistory{
+		maxSize:    maxSize,
+		reqToEntry: make(map[*http.Request]*RequestHistoryEntry),
+		reqs:       make(chan *http.Request),
+		resps:      make(chan *http.Response),
+		history:    list.New(),
+		onChange:   onChange,
+                metrics:    metrics,
+	}
+
+	go func() {
+		for {
+			select {
+			case req := <-rh.reqs:
+				rh.addRequest(req)
+
+			case resp := <-rh.resps:
+				rh.addResponse(resp)
+			}
+		}
+	}()
+
+	return rh
+}
+
+func (rh *RequestHistory) addRequest(req *http.Request) {
+        rh.metrics.reqMeter.Mark(1)
+	if rh.history.Len() >= rh.maxSize {
+		entry := rh.history.Remove(rh.history.Back()).(*RequestHistoryEntry)
+		delete(rh.reqToEntry, entry.req)
+	}
+
+	entry := &RequestHistoryEntry{req: req, start: time.Now()}
+	rh.reqToEntry[req] = entry
+	rh.history.PushFront(entry)
+	rh.onChange(rh.copy())
+}
+
+func (rh *RequestHistory) addResponse(resp *http.Response) {
+	if entry, ok := rh.reqToEntry[resp.Request]; ok {
+                entry.duration = time.Since(entry.start)
+                rh.metrics.reqTimer.Update(entry.duration)
+
+		entry.resp = resp
+		rh.onChange(rh.copy())
+	} else {
+		// XXX: log warning instead of panic
+		panic("no request for response!")
+	}
+}
+
+func (rh *RequestHistory) copy() []*RequestHistoryEntry {
+	entries := make([]*RequestHistoryEntry, rh.history.Len())
+	i := 0
+	for e := rh.history.Front(); e != nil; e = e.Next() {
+		// force a copy
+		entry := *(e.Value.(*RequestHistoryEntry))
+		entries[i] = &entry
+		i++
+	}
+	return entries
+}
+
+func (rhe *RequestHistoryEntry) GetRequest() *http.Request {
+	return rhe.req
+}
+
+func (rhe *RequestHistoryEntry) GetResponse() *http.Response {
+	return rhe.resp
+}

+ 215 - 0
client/main.go

@@ -0,0 +1,215 @@
+package client
+
+import (
+	log "code.google.com/p/log4go"
+	"crypto/rand"
+	"fmt"
+	"net"
+	"ngrok"
+	"ngrok/client/ui"
+	"ngrok/conn"
+	"ngrok/proto"
+	"runtime"
+	"time"
+)
+
+/** 
+ * Connect to the ngrok server
+ */
+func connect(addr string, typ string) (c conn.Conn, err error) {
+	var (
+		tcpAddr *net.TCPAddr
+		tcpConn *net.TCPConn
+	)
+
+	if tcpAddr, err = net.ResolveTCPAddr("tcp", addr); err != nil {
+		return
+	}
+
+	//log.Debug("Dialing %v", addr)
+	if tcpConn, err = net.DialTCP("tcp", nil, tcpAddr); err != nil {
+		return
+	}
+
+	c = conn.NewLogged(tcpConn, typ)
+	c.Debug("Connected to: %v", tcpAddr)
+	return c, nil
+}
+
+/**
+ * Establishes and manages a tunnel proxy connection with the server
+ */
+func proxy(proxyAddr string, s *State) {
+	start := time.Now()
+	remoteConn, err := connect(proxyAddr, "pxy")
+	if err != nil {
+		panic(err)
+	}
+
+	defer remoteConn.Close()
+	err = proto.WriteMsg(remoteConn, &proto.RegProxyMsg{Url: s.publicUrl})
+
+	if err != nil {
+		panic(err)
+	}
+
+	localConn, err := connect(s.opts.localaddr, "prv")
+	if err != nil {
+		remoteConn.Warn("Failed to open private leg %s: %v", s.opts.localaddr, err)
+		return
+	}
+	defer localConn.Close()
+
+	m := s.metrics
+	m.proxySetupTimer.Update(time.Since(start))
+	m.connMeter.Mark(1)
+	s.Update()
+	m.connTimer.Time(func() {
+		if s.opts.protocol == "http" {
+			teeConn := conn.NewTee(remoteConn)
+			remoteConn = teeConn
+			go conn.ParseHttp(teeConn, s.history.reqs, s.history.resps)
+		}
+		bytesIn, bytesOut := conn.Join(localConn, remoteConn)
+		m.bytesIn.Update(bytesIn)
+		m.bytesOut.Update(bytesOut)
+		m.bytesInCount.Inc(bytesIn)
+		m.bytesOutCount.Inc(bytesOut)
+	})
+	s.Update()
+}
+
+/**
+ * Establishes and manages a tunnel control connection with the server
+ */
+func control(s *State) {
+	defer func() {
+		if r := recover(); r != nil {
+			log.Error("Recovering from failure %v, attempting to reconnect to server after 10 seconds . . .", r)
+			time.Sleep(10 * time.Second)
+			s.status = "reconnecting"
+			s.Update()
+			go control(s)
+		}
+	}()
+
+	// establish control channel
+	conn, err := connect(s.opts.server, "ctl")
+	if err != nil {
+		panic(err)
+	}
+	defer conn.Close()
+
+	// register with the server
+	err = proto.WriteMsg(conn, &proto.RegMsg{
+		Protocol:  s.opts.protocol,
+		OS:        runtime.GOOS,
+		Hostname:  s.opts.hostname,
+		Subdomain: s.opts.subdomain,
+		ClientId:  s.id,
+	})
+
+	if err != nil {
+		panic(err)
+	}
+
+	// wait for the server to ack our register
+	var regAck proto.RegAckMsg
+	if err = proto.ReadMsgInto(conn, &regAck); err != nil {
+		panic(err)
+	}
+
+	// update UI state
+	conn.Info("Tunnel established at %v", regAck.Url)
+	//state.version = regAck.Version
+	s.publicUrl = regAck.Url
+	s.status = "online"
+	s.Update()
+
+	// main control loop
+	for {
+		var msg proto.Message
+		if msg, err = proto.ReadMsg(conn); err != nil {
+			panic(err)
+		}
+
+		switch msg.GetType() {
+		case "ReqProxyMsg":
+			go proxy(regAck.ProxyAddr, s)
+
+		case "PingMsg":
+			proto.WriteMsg(conn, &proto.PongMsg{})
+		}
+	}
+}
+
+// create a random identifier for this client
+func mkid() string {
+	b := make([]byte, 8)
+	_, err := rand.Read(b)
+	if err != nil {
+		panic(fmt.Sprintf("Couldn't create random client identifier, %v", err))
+	}
+	return fmt.Sprintf("%x", b)
+
+}
+
+func Main() {
+	ngrok.LogToFile()
+	//	ngrok.LogToConsole()
+
+	// parse options
+	opts := parseArgs()
+
+	// init terminal, http UI
+	//termView := ui.NewTerm()
+	httpView := ui.NewHttp(9999)
+
+	// init client state
+	s := &State{
+		// unique client id
+		id: mkid(),
+
+		// ui communication channels
+		//            ui: ui.NewUi(termView, httpView),
+		ui: ui.NewUi(httpView),
+
+		// command-line options
+		opts: opts,
+
+		// metrics
+		metrics: NewClientMetrics(),
+	}
+
+	// request history
+	// XXX: don't use a callback, use a channel
+	// and define it inline in the struct
+	s.history = NewRequestHistory(opts.historySize, s.metrics, func(history []*RequestHistoryEntry) {
+		s.historyEntries = history
+		s.Update()
+	})
+
+	// set initial ui state
+	s.status = "connecting"
+	s.Update()
+
+	go control(s)
+
+	s.ui.Wait.Add(1)
+	go func() {
+		defer s.ui.Wait.Done()
+		for {
+			select {
+			case cmd := <-s.ui.Cmds:
+				switch cmd {
+				case ui.QUIT:
+					s.stopping = true
+					s.Update()
+					return
+				}
+			}
+		}
+	}()
+
+	s.ui.Wait.Wait()
+}

+ 41 - 0
client/metrics.go

@@ -0,0 +1,41 @@
+package client
+
+import (
+	metrics "github.com/rcrowley/go-metrics"
+)
+
+const (
+	sampleSize  int     = 1028
+	sampleAlpha float64 = 0.015
+)
+
+type ClientMetrics struct {
+	// metrics
+	connGauge       metrics.Gauge
+	connMeter       metrics.Meter
+	connTimer       metrics.Timer
+        reqGauge    metrics.Gauge
+        reqMeter    metrics.Meter
+        reqTimer    metrics.Timer
+	proxySetupTimer metrics.Timer
+	bytesIn         metrics.Histogram
+	bytesOut        metrics.Histogram
+	bytesInCount    metrics.Counter
+	bytesOutCount   metrics.Counter
+}
+
+func NewClientMetrics() *ClientMetrics {
+	return &ClientMetrics{
+		connGauge:       metrics.NewGauge(),
+		connMeter:       metrics.NewMeter(),
+		connTimer:       metrics.NewTimer(),
+		reqGauge:       metrics.NewGauge(),
+		reqMeter:       metrics.NewMeter(),
+		reqTimer:       metrics.NewTimer(),
+		proxySetupTimer: metrics.NewTimer(),
+		bytesIn:         metrics.NewHistogram(metrics.NewExpDecaySample(sampleSize, sampleAlpha)),
+		bytesOut:        metrics.NewHistogram(metrics.NewExpDecaySample(sampleSize, sampleAlpha)),
+		bytesInCount:    metrics.NewCounter(),
+		bytesOutCount:   metrics.NewCounter(),
+	}
+}

+ 57 - 0
client/state.go

@@ -0,0 +1,57 @@
+package client
+
+import (
+	metrics "github.com/rcrowley/go-metrics"
+	"ngrok/client/ui"
+)
+
+// client state
+type State struct {
+	id        string
+	ui        *ui.Ui
+	publicUrl string
+	history   *RequestHistory
+	opts      *Options
+	metrics   *ClientMetrics
+
+	// just for UI purposes
+	status         string
+	historyEntries []*RequestHistoryEntry
+	stopping       bool
+}
+
+// implement client.ui.State
+func (s State) GetVersion() string   { return "" }
+func (s State) GetPublicUrl() string { return s.publicUrl }
+func (s State) GetLocalAddr() string { return s.opts.localaddr }
+func (s State) GetStatus() string    { return s.status }
+func (s State) GetHistory() []ui.HttpRequest {
+	// go sucks
+	historyEntries := make([]ui.HttpRequest, len(s.historyEntries))
+
+	for i, entry := range s.historyEntries {
+		historyEntries[i] = entry
+	}
+	return historyEntries
+}
+func (s State) IsStopping() bool { return s.stopping }
+
+func (s State) GetConnectionMetrics() (metrics.Meter, metrics.Timer) {
+	return s.metrics.connMeter, s.metrics.connTimer
+}
+
+func (s State) GetRequestMetrics() (metrics.Meter, metrics.Timer) {
+	return s.metrics.reqMeter, s.metrics.reqTimer
+}
+
+func (s State) GetBytesInMetrics() (metrics.Counter, metrics.Histogram) {
+	return s.metrics.bytesInCount, s.metrics.bytesIn
+}
+
+func (s State) GetBytesOutMetrics() (metrics.Counter, metrics.Histogram) {
+	return s.metrics.bytesOutCount, s.metrics.bytesOut
+}
+
+func (s *State) Update() {
+	s.ui.Updates.In() <- *s
+}

+ 41 - 0
client/ui/broadcast.go

@@ -0,0 +1,41 @@
+package ui
+
+type Broadcast struct {
+	listeners []chan State
+	reg       chan (chan State)
+	in        chan State
+}
+
+func NewBroadcast() *Broadcast {
+	b := &Broadcast{
+		listeners: make([]chan State, 0),
+		reg:       make(chan (chan State)),
+		in:        make(chan State),
+	}
+
+	go func() {
+		for {
+			select {
+			case l := <-b.reg:
+				b.listeners = append(b.listeners, l)
+
+			case item := <-b.in:
+				for _, l := range b.listeners {
+					l <- item
+				}
+			}
+		}
+	}()
+
+	return b
+}
+
+func (b *Broadcast) In() chan State {
+	return b.in
+}
+
+func (b *Broadcast) Reg() chan State {
+	listener := make(chan State)
+	b.reg <- listener
+	return listener
+}

+ 145 - 0
client/ui/http.go

@@ -0,0 +1,145 @@
+// interative http client user interface
+package ui
+
+import (
+	"bytes"
+	"encoding/json"
+	"fmt"
+	"html/template"
+	"io"
+	"io/ioutil"
+	"net/http"
+	"net/http/httputil"
+	"net/url"
+)
+
+type RepeatableReader struct {
+	io.Reader
+	buffer []byte
+}
+
+func NewRepeatableReader(rd io.ReadCloser) *RepeatableReader {
+	buffer := new(bytes.Buffer)
+	buffer.ReadFrom(rd)
+	return &RepeatableReader{
+		bytes.NewBuffer(buffer.Bytes()),
+		buffer.Bytes(),
+	}
+}
+
+func (rr *RepeatableReader) Read(b []byte) (n int, err error) {
+	n, err = rr.Reader.Read(b)
+
+	if err == io.EOF {
+		rr.Reader = bytes.NewBuffer(rr.buffer)
+	}
+
+	return n, err
+}
+
+func (rr *RepeatableReader) Close() error {
+	return nil
+}
+
+type Http struct {
+	ui   *Ui
+	port int
+}
+
+func NewHttp(port int) *Http {
+	return &Http{port: port}
+}
+
+func (h *Http) SetUi(ui *Ui) {
+	h.ui = ui
+	go h.run()
+}
+
+func (h *Http) run() {
+	// open channels for incoming application state changes
+	// and broadbasts
+
+	updates := h.ui.Updates.Reg()
+	var s State
+	go func() {
+		for {
+			select {
+			case obj := <-updates:
+				s = obj.(State)
+			}
+		}
+	}()
+
+	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
+		funcMap := template.FuncMap{
+			"dumpResponse": func(resp *http.Response) (interface{}, error) {
+				b, err := httputil.DumpResponse(resp, false)
+				body := new(bytes.Buffer)
+				body.ReadFrom(resp.Body)
+				return string(b) + string(body.Bytes()), err
+			},
+			"dumpRequest": func(req *http.Request) (interface{}, error) {
+				b, err := httputil.DumpRequest(req, false)
+				body := new(bytes.Buffer)
+				body.ReadFrom(req.Body)
+				return string(b) + string(body.Bytes()), err
+			},
+			"handleForm": func(req *http.Request) (values interface{}, err error) {
+				if req.Header.Get("Content-Type") != "application/x-www-form-urlencoded" {
+					return
+				}
+
+				b, err := ioutil.ReadAll(req.Body)
+				if err != nil {
+					return
+				}
+
+				values, err = url.ParseQuery(string(b))
+				return
+			},
+			"handleJson": func(req *http.Request) interface{} {
+				if req.Header.Get("Content-Type") != "application/json" {
+					return nil
+				}
+
+				src := new(bytes.Buffer)
+				dst := new(bytes.Buffer)
+				src.ReadFrom(req.Body)
+				err := json.Indent(dst, src.Bytes(), "", "    ")
+
+				retval := struct {
+					Str string
+					Err error
+				}{
+					string(dst.Bytes()),
+					err,
+				}
+
+				if err != nil {
+					retval.Str = string(src.Bytes())
+				}
+
+				return retval
+			},
+		}
+
+		tmpl := template.Must(
+			template.New("page.html").Funcs(funcMap).ParseFiles("./templates/page.html", "./templates/body.html"))
+
+		for _, htxn := range s.GetHistory() {
+                        req, resp := htxn.GetRequest(), htxn.GetResponse()
+
+			req.Body = NewRepeatableReader(req.Body)
+                        if resp != nil && resp.Body != nil {
+                            resp.Body = NewRepeatableReader(resp.Body)
+                        }
+		}
+
+		// write the response
+		if err := tmpl.Execute(w, s); err != nil {
+			panic(err)
+		}
+	})
+
+	http.ListenAndServe(fmt.Sprintf(":%d", h.port), nil)
+}

+ 24 - 0
client/ui/interface.go

@@ -0,0 +1,24 @@
+package ui
+
+import (
+	metrics "github.com/rcrowley/go-metrics"
+	"net/http"
+)
+
+type HttpRequest interface {
+	GetRequest() *http.Request
+	GetResponse() *http.Response
+}
+
+type State interface {
+	GetVersion() string
+	GetPublicUrl() string
+	GetLocalAddr() string
+	GetStatus() string
+	GetHistory() []HttpRequest
+	IsStopping() bool
+	GetConnectionMetrics() (metrics.Meter, metrics.Timer)
+	GetRequestMetrics() (metrics.Meter, metrics.Timer)
+	GetBytesInMetrics() (metrics.Counter, metrics.Histogram)
+	GetBytesOutMetrics() (metrics.Counter, metrics.Histogram)
+}

+ 136 - 0
client/ui/terminal.go

@@ -0,0 +1,136 @@
+/* 
+   interactive terminal interface for local clients
+*/
+package ui
+
+import (
+	"fmt"
+	termbox "github.com/nsf/termbox-go"
+	"time"
+)
+
+const (
+	fgColor = termbox.ColorWhite
+	bgColor = termbox.ColorDefault
+)
+
+func clear() {
+	w, h := termbox.Size()
+
+	for i := 0; i < w; i++ {
+		for j := 0; j < h; j++ {
+			termbox.SetCell(i, j, ' ', fgColor, bgColor)
+		}
+	}
+}
+
+func printfAttr(x, y int, fg termbox.Attribute, arg0 string, args ...interface{}) {
+	s := fmt.Sprintf(arg0, args...)
+	for i, ch := range s {
+		termbox.SetCell(x+i, y, ch, fg, bgColor)
+	}
+}
+
+func printf(x, y int, arg0 string, args ...interface{}) {
+	printfAttr(x, y, fgColor, arg0, args...)
+}
+
+type Term struct {
+	ui             *Ui
+	statusColorMap map[string]termbox.Attribute
+}
+
+func NewTerm() *Term {
+	return &Term{
+		statusColorMap: map[string]termbox.Attribute{
+			"connecting":   termbox.ColorCyan,
+			"reconnecting": termbox.ColorRed,
+			"online":       termbox.ColorGreen,
+		},
+	}
+}
+
+func (t *Term) SetUi(ui *Ui) {
+	t.ui = ui
+	go t.run()
+}
+
+func (t *Term) run() {
+	// make sure we shut down cleanly
+	t.ui.Wait.Add(1)
+	defer t.ui.Wait.Done()
+
+	// open channels for incoming application state changes
+	// and broadbasts
+	updates := t.ui.Updates.Reg()
+
+	// init/close termbox library
+	termbox.Init()
+	defer termbox.Close()
+
+	go t.input()
+
+	t.draw(updates)
+}
+
+func (t *Term) draw(updates chan State) {
+	for {
+		select {
+		case state := <-updates:
+			// program is shutting down
+			if state.IsStopping() {
+				return
+			}
+
+			clear()
+
+			printfAttr(0, 0, termbox.ColorBlue|termbox.AttrBold, "ngrok")
+
+			msec := float64(time.Millisecond)
+
+			printf(0, 2, "%-30s%s", "Version", state.GetVersion())
+			printf(0, 3, "%-30s%s", "Public URL", state.GetPublicUrl())
+			printf(0, 4, "%-30s%s", "Local Address", state.GetLocalAddr())
+			printfAttr(0, 5, t.statusColorMap[state.GetStatus()], "%-30s%s", "Tunnel Status", state.GetStatus())
+
+			connMeter, connTimer := state.GetConnectionMetrics()
+			printf(0, 6, "%-30s%d", "# Conn", connMeter.Count())
+			printf(0, 7, "%-30s%.2fms", "Mean Conn Time", connTimer.Mean()/msec)
+			printf(0, 8, "%-30s%.2fms", "Conn Time 95th PCTL", connTimer.Percentile(0.95)/msec)
+
+			bytesInCount, bytesIn := state.GetBytesInMetrics()
+			printf(0, 9, "%-30s%d", "Bytes In", bytesInCount.Count())
+			printf(0, 10, "%-30s%.2f", "Avg Bytes/req", bytesIn.Mean())
+
+			bytesOutCount, bytesOut := state.GetBytesOutMetrics()
+			printf(0, 11, "%-30s%d", "Bytes Out", bytesOutCount.Count())
+			printf(0, 12, "%-30s%.2f", "Bytes Out/req", bytesOut.Mean())
+
+			printf(0, 14, "Last HTTP Requests")
+			for i, http := range state.GetHistory() {
+				req := http.GetRequest()
+				resp := http.GetResponse()
+				printf(0, 15+i, "%s %v", req.Method, req.URL)
+				if resp != nil {
+					printf(30, 15+i, "%s", resp.Status)
+				}
+			}
+
+			termbox.Flush()
+		}
+	}
+}
+
+func (t *Term) input() {
+	for {
+		ev := termbox.PollEvent()
+		switch ev.Type {
+		case termbox.EventKey:
+			switch ev.Key {
+			case termbox.KeyCtrlC:
+				t.ui.Cmds <- QUIT
+				return
+			}
+		}
+	}
+}

+ 40 - 0
client/ui/ui.go

@@ -0,0 +1,40 @@
+package ui
+
+import (
+	"sync"
+)
+
+type Command int
+
+const (
+	QUIT = iota
+)
+
+type View interface {
+	SetUi(*Ui)
+}
+
+type Ui struct {
+	// the model always updates
+	Updates *Broadcast
+
+	// all views put their commands into this channel
+	Cmds chan Command
+
+	// all threads may add themself to this to wait for clean shutdown
+	Wait *sync.WaitGroup
+}
+
+func NewUi(views ...View) *Ui {
+	ui := &Ui{
+		Updates: NewBroadcast(),
+		Cmds:    make(chan Command),
+		Wait:    new(sync.WaitGroup),
+	}
+
+	for _, v := range views {
+		v.SetUi(ui)
+	}
+
+	return ui
+}

+ 98 - 0
conn/conn.go

@@ -0,0 +1,98 @@
+package conn
+
+import (
+	"bufio"
+	"bytes"
+	"fmt"
+	"io"
+	"math/rand"
+	"net"
+	"net/http"
+	"ngrok"
+)
+
+type Conn interface {
+	net.Conn
+	ngrok.Logger
+	Id() string
+}
+
+type loggedConn struct {
+	net.Conn
+	ngrok.Logger
+	id  int32
+	typ string
+}
+
+func NewLogged(conn net.Conn, typ string) *loggedConn {
+	c := &loggedConn{conn, ngrok.NewPrefixLogger(), rand.Int31(), typ}
+	c.AddLogPrefix(c.Id())
+	c.Info("New connection from %v", conn.RemoteAddr())
+	return c
+}
+
+func (c *loggedConn) Close() error {
+	c.Debug("Closing")
+	return c.Conn.Close()
+}
+
+func (c *loggedConn) Id() string {
+	return fmt.Sprintf("%s:%x", c.typ, c.id)
+}
+
+func Join(c Conn, c2 Conn) (int64, int64) {
+	done := make(chan error)
+	pipe := func(to Conn, from Conn, bytesCopied *int64) {
+		var err error
+		*bytesCopied, err = io.Copy(to, from)
+		if err != nil {
+			from.Warn("Copied %d bytes to %s before failing with error %v", *bytesCopied, to.Id(), err)
+			done <- err
+		} else {
+			from.Debug("Copied %d bytes from to %s", *bytesCopied, to.Id())
+			done <- nil
+		}
+	}
+
+	var fromBytes, toBytes int64
+	go pipe(c, c2, &fromBytes)
+	go pipe(c2, c, &toBytes)
+	c.Info("Joined with connection %s", c2.Id())
+	<-done
+	c.Close()
+	c2.Close()
+	<-done
+	return fromBytes, toBytes
+}
+
+type loggedHttpConn struct {
+	*loggedConn
+	reqBuf *bytes.Buffer
+}
+
+func NewHttp(conn net.Conn, typ string) *loggedHttpConn {
+	return &loggedHttpConn{
+		NewLogged(conn, typ),
+		bytes.NewBuffer(make([]byte, 0, 1024)),
+	}
+}
+
+func (c *loggedHttpConn) ReadRequest() (*http.Request, error) {
+	r := io.TeeReader(c.loggedConn, c.reqBuf)
+	return http.ReadRequest(bufio.NewReader(r))
+}
+
+func (c *loggedConn) ReadFrom(r io.Reader) (n int64, err error) {
+	// special case when we're reading from an http request where
+	// we had to parse the request and consume bytes from the socket
+	// and store them in a temporary request buffer
+	if httpConn, ok := r.(*loggedHttpConn); ok {
+		if n, err = httpConn.reqBuf.WriteTo(c); err != nil {
+			return
+		}
+	}
+
+	nCopied, err := io.Copy(c.Conn, r)
+	n += nCopied
+	return
+}

+ 42 - 0
conn/http.go

@@ -0,0 +1,42 @@
+package conn
+
+import (
+	"net/http"
+	"net/http/httputil"
+)
+
+func ParseHttp(tee *Tee, reqs chan *http.Request, resps chan *http.Response) {
+	lastReq := make(chan *http.Request)
+
+	go func() {
+		for {
+			req, err := http.ReadRequest(tee.ReadBuffer())
+			if err != nil {
+				// no more requests to be read, we're done
+				break
+			}
+			lastReq <- req
+			// make sure we read the body of the request so that
+			// we don't block the reader 
+			_, _ = httputil.DumpRequest(req, true)
+			reqs <- req
+		}
+	}()
+
+	go func() {
+		for {
+			req := <-lastReq
+			resp, err := http.ReadResponse(tee.WriteBuffer(), req)
+			if err != nil {
+				tee.Warn("Error reading response from server: %v", err)
+				// no more responses to be read, we're done
+				break
+			}
+			// make sure we read the body of the response so that
+			// we don't block the writer 
+			_, _ = httputil.DumpResponse(resp, true)
+			resps <- resp
+		}
+	}()
+
+}

+ 71 - 0
conn/tee.go

@@ -0,0 +1,71 @@
+package conn
+
+import (
+	"bufio"
+	"io"
+)
+
+// conn.Tee is a wraps a conn.Conn
+// causing all writes/reads to be tee'd just
+// like the unix command such that all data that
+// is read and written to the connection through its
+// interfaces will also be copied into two dedicated pipes
+// used for consuming a copy of the data stream
+//
+// this is useful for introspecting the traffic flowing
+// over a connection without having to tamper with the actual
+// code that reads and writes over the connection
+//
+// NB: the data is Tee'd into a shared-memory io.Pipe which
+// has a limited (and small) buffer. If you are not consuming from
+// the ReadBuffer() and WriteBuffer(), you are going to block
+// your application's real traffic from flowing over the connection
+
+type Tee struct {
+	rd       io.Reader
+	wr       io.Writer
+	readPipe struct {
+		rd *io.PipeReader
+		wr *io.PipeWriter
+	}
+	writePipe struct {
+		rd *io.PipeReader
+		wr *io.PipeWriter
+	}
+	Conn
+}
+
+func NewTee(conn Conn) *Tee {
+	c := &Tee{
+		rd:   nil,
+		wr:   nil,
+		Conn: conn,
+	}
+
+	c.readPipe.rd, c.readPipe.wr = io.Pipe()
+	c.writePipe.rd, c.writePipe.wr = io.Pipe()
+
+	c.rd = io.TeeReader(c.Conn, c.readPipe.wr)
+	c.wr = io.MultiWriter(c.Conn, c.writePipe.wr)
+	return c
+}
+
+func (c *Tee) ReadBuffer() *bufio.Reader {
+	return bufio.NewReader(c.readPipe.rd)
+}
+
+func (c *Tee) WriteBuffer() *bufio.Reader {
+	return bufio.NewReader(c.writePipe.rd)
+}
+
+func (c *Tee) Read(b []byte) (n int, err error) {
+	return c.rd.Read(b)
+}
+
+func (c *Tee) ReadFrom(r io.Reader) (n int64, err error) {
+	return io.Copy(c.wr, r)
+}
+
+func (c *Tee) Write(b []byte) (n int, err error) {
+	return c.wr.Write(b)
+}

+ 65 - 0
logger.go

@@ -0,0 +1,65 @@
+package ngrok
+
+import (
+	log "code.google.com/p/log4go"
+	"fmt"
+)
+
+var Log log.Logger
+
+func init() {
+	Log = make(log.Logger)
+	//    Log.AddFilter("log", log.DEBUG, log.NewFileLogWriter("ngrok.log", true))
+}
+
+func LogToConsole() {
+	Log.AddFilter("log", log.DEBUG, log.NewConsoleLogWriter())
+}
+
+func LogToFile() {
+	Log.AddFilter("log", log.DEBUG, log.NewFileLogWriter("ngrok.log", true))
+}
+
+type Logger interface {
+	AddLogPrefix(string)
+	Debug(string, ...interface{})
+	Info(string, ...interface{})
+	Warn(string, ...interface{}) error
+	Error(string, ...interface{}) error
+}
+
+type PrefixLogger struct {
+	prefix string
+}
+
+func NewPrefixLogger() Logger {
+	return &PrefixLogger{}
+}
+
+func (pl *PrefixLogger) pfx(fmtstr string) interface{} {
+	return fmt.Sprintf("%s %s", pl.prefix, fmtstr)
+}
+
+func (pl *PrefixLogger) Debug(arg0 string, args ...interface{}) {
+	Log.Debug(pl.pfx(arg0), args...)
+}
+
+func (pl *PrefixLogger) Info(arg0 string, args ...interface{}) {
+	Log.Info(pl.pfx(arg0), args...)
+}
+
+func (pl *PrefixLogger) Warn(arg0 string, args ...interface{}) error {
+	return Log.Warn(pl.pfx(arg0), args...)
+}
+
+func (pl *PrefixLogger) Error(arg0 string, args ...interface{}) error {
+	return Log.Error(pl.pfx(arg0), args...)
+}
+
+func (pl *PrefixLogger) AddLogPrefix(prefix string) {
+	if len(pl.prefix) > 0 {
+		pl.prefix += " "
+	}
+
+	pl.prefix += "[" + prefix + "]"
+}

+ 9 - 0
main/client.go

@@ -0,0 +1,9 @@
+package main
+
+import (
+	"ngrok/client"
+)
+
+func main() {
+	client.Main()
+}

+ 9 - 0
main/server.go

@@ -0,0 +1,9 @@
+package main
+
+import (
+	"ngrok/server"
+)
+
+func main() {
+	server.Main()
+}

+ 71 - 0
proto/conn.go

@@ -0,0 +1,71 @@
+package proto
+
+import (
+	"encoding/binary"
+	"errors"
+	"fmt"
+	"ngrok/conn"
+)
+
+func readMsgShared(c conn.Conn) (buffer []byte, err error) {
+	c.Debug("Waiting to read message")
+
+	var sz int64
+	err = binary.Read(c, binary.LittleEndian, &sz)
+	if err != nil {
+		return
+	}
+	c.Debug("Reading message with length: %d", sz)
+
+	buffer = make([]byte, sz)
+	n, err := c.Read(buffer)
+	c.Debug("Read message %s", buffer)
+
+	if err != nil {
+		return
+	}
+
+	if int64(n) != sz {
+		err = errors.New(fmt.Sprintf("Expected to read %d bytes, but only read %d", sz, n))
+		return
+	}
+
+	return
+}
+
+func ReadMsg(c conn.Conn) (msg Message, err error) {
+	buffer, err := readMsgShared(c)
+	if err != nil {
+		return
+	}
+
+	return Unpack(buffer)
+}
+
+func ReadMsgInto(c conn.Conn, msg Message) (err error) {
+	buffer, err := readMsgShared(c)
+	if err != nil {
+		return
+	}
+	return UnpackInto(buffer, msg)
+}
+
+func WriteMsg(c conn.Conn, msg interface{}) (err error) {
+	buffer, err := Pack(msg)
+	if err != nil {
+		return
+	}
+
+	c.Debug("Writing message: %s", string(buffer))
+	err = binary.Write(c, binary.LittleEndian, int64(len(buffer)))
+
+	if err != nil {
+		return
+	}
+
+	if _, err = c.Write(buffer); err != nil {
+		return
+	}
+
+	return nil
+}

+ 107 - 0
proto/msg.go

@@ -0,0 +1,107 @@
+package proto
+
+import (
+	"encoding/json"
+	"reflect"
+)
+
+var TypeMap map[string]reflect.Type
+
+const (
+	Version = "0.1"
+)
+
+func init() {
+	TypeMap = make(map[string]reflect.Type)
+
+	t := func(obj interface{}) reflect.Type { return reflect.TypeOf(obj).Elem() }
+	TypeMap["RegMsg"] = t((*RegMsg)(nil))
+	TypeMap["RegAckMsg"] = t((*RegAckMsg)(nil))
+	TypeMap["RegProxyMsg"] = t((*RegProxyMsg)(nil))
+	TypeMap["ReqProxyMsg"] = t((*ReqProxyMsg)(nil))
+	TypeMap["PingMsg"] = t((*PingMsg)(nil))
+	TypeMap["PongMsg"] = t((*PongMsg)(nil))
+	TypeMap["VerisonMsg"] = t((*VersionMsg)(nil))
+	TypeMap["VersionRespMsg"] = t((*VersionRespMsg)(nil))
+	TypeMap["MetricsMsg"] = t((*MetricsMsg)(nil))
+	TypeMap["MetricsRespMsg"] = t((*MetricsRespMsg)(nil))
+}
+
+type Message interface {
+	GetType() string
+	SetType(string)
+}
+
+type Envelope struct {
+	Version string
+	Type    string
+	Payload json.RawMessage
+}
+
+type TypeEmbed struct {
+	Type string
+}
+
+type RegMsg struct {
+	TypeEmbed
+	Protocol         string
+	Hostname         string
+	Subdomain        string
+	ClientId         string
+	HttpAuthUser     string
+	HttpAuthPassword string
+	User             string
+	Password         string
+	OS               string
+	Arch             string
+}
+
+type RegAckMsg struct {
+	TypeEmbed
+	Type      string
+	Url       string
+	ProxyAddr string
+}
+
+type RegProxyMsg struct {
+	TypeEmbed
+	Url string
+}
+
+type ReqProxyMsg struct {
+	TypeEmbed
+}
+
+type PingMsg struct {
+	TypeEmbed
+}
+
+type PongMsg struct {
+	TypeEmbed
+}
+
+type VersionMsg struct {
+	TypeEmbed
+}
+
+type VersionRespMsg struct {
+	TypeEmbed
+	Version string
+}
+
+type MetricsMsg struct {
+	TypeEmbed
+}
+
+type MetricsRespMsg struct {
+	TypeEmbed
+	Metrics string
+}
+
+func (m *TypeEmbed) GetType() string {
+	return m.Type
+}
+
+func (m *TypeEmbed) SetType(typ string) {
+	m.Type = typ
+}

+ 54 - 0
proto/pack.go

@@ -0,0 +1,54 @@
+package proto
+
+import (
+	"encoding/json"
+	"errors"
+	"fmt"
+	"reflect"
+)
+
+func unpack(buffer []byte, msgIn Message) (msg Message, err error) {
+	var env Envelope
+	if err = json.Unmarshal(buffer, &env); err != nil {
+		return
+	}
+
+	if msgIn == nil {
+		t, ok := TypeMap[env.Type]
+
+		if !ok {
+			err = errors.New(fmt.Sprintf("Unsupposted message type %s", env.Type))
+			return
+		}
+
+		// guess type
+		msg = reflect.New(t).Interface().(Message)
+		msg.SetType(env.Type)
+	} else {
+		msg = msgIn
+	}
+
+	err = json.Unmarshal(env.Payload, &msg)
+	return
+}
+
+func UnpackInto(buffer []byte, msg Message) (err error) {
+	_, err = unpack(buffer, msg)
+	return
+}
+
+func Unpack(buffer []byte) (msg Message, err error) {
+	return unpack(buffer, nil)
+}
+
+func Pack(payload interface{}) ([]byte, error) {
+	return json.Marshal(struct {
+		Version string
+		Type    string
+		Payload interface{}
+	}{
+		Version: Version,
+		Type:    reflect.TypeOf(payload).Elem().Name(),
+		Payload: payload,
+	})
+}

+ 119 - 0
server/control.go

@@ -0,0 +1,119 @@
+package server
+
+import (
+	"io"
+	"net"
+	"ngrok/conn"
+	"ngrok/proto"
+	"runtime/debug"
+	"sync/atomic"
+	"time"
+)
+
+const (
+	pingInterval     = 30 * time.Second
+	connReapInterval = pingInterval * 5
+)
+
+type Control struct {
+	// actual connection
+	conn conn.Conn
+
+	// channels for communicating messages over the connection
+	out  chan (interface{})
+	in   chan (proto.Message)
+	stop chan (int)
+
+	// heartbeat
+	lastPong int64
+
+	// tunnel
+	tun *Tunnel
+}
+
+func NewControl(tcpConn *net.TCPConn) {
+	c := &Control{
+		conn:     conn.NewLogged(tcpConn, "ctl"),
+		out:      make(chan (interface{}), 1),
+		in:       make(chan (proto.Message), 1),
+		stop:     make(chan (int), 1),
+		lastPong: time.Now().Unix(),
+	}
+
+	go c.managerThread()
+	go c.readThread()
+}
+
+func (c *Control) managerThread() {
+	ping := time.NewTicker(pingInterval)
+	reap := time.NewTicker(connReapInterval)
+
+	// all shutdown functionality in here
+	defer func() {
+		if err := recover(); err != nil {
+			c.conn.Info("Control::managerThread failed with error %v: %s", err, debug.Stack())
+		}
+		ping.Stop()
+		reap.Stop()
+		close(c.out)
+		close(c.in)
+		close(c.stop)
+		c.conn.Close()
+	}()
+
+	for {
+		select {
+		case m := <-c.out:
+			proto.WriteMsg(c.conn, m)
+
+		case <-ping.C:
+			proto.WriteMsg(c.conn, &proto.PingMsg{})
+
+		case <-reap.C:
+			if (time.Now().Unix() - c.lastPong) > 60 {
+				c.conn.Info("Lost heartbeat")
+				metrics.lostHeartbeatMeter.Mark(1)
+				return
+			}
+
+		case <-c.stop:
+			return
+
+		case msg := <-c.in:
+			switch msg.GetType() {
+			case "RegMsg":
+				c.conn.Info("Registering new tunnel")
+				newTunnel(msg.(*proto.RegMsg), c)
+
+			case "PongMsg":
+				atomic.StoreInt64(&c.lastPong, time.Now().Unix())
+
+			case "VersionReqMsg":
+				c.out <- &proto.VersionRespMsg{Version: version}
+			}
+		}
+	}
+}
+
+func (c *Control) readThread() {
+	defer func() {
+		if err := recover(); err != nil {
+			c.conn.Info("Control::readThread failed with error %v: %s", err, debug.Stack())
+		}
+		c.stop <- 1
+	}()
+
+	// read messages from the control channel
+	for {
+		if msg, err := proto.ReadMsg(c.conn); err != nil {
+			if err == io.EOF {
+				c.conn.Info("EOF")
+				return
+			} else {
+				panic(err)
+			}
+		} else {
+			c.in <- msg
+		}
+	}
+}

+ 63 - 0
server/http.go

@@ -0,0 +1,63 @@
+package server
+
+import (
+	log "code.google.com/p/log4go"
+	"net"
+	"ngrok/conn"
+)
+
+/**
+ * Listens for new http connections from the public internet
+ */
+func httpListener(addr *net.TCPAddr) {
+	// bind/listen for incoming connections
+	listener, err := net.ListenTCP("tcp", addr)
+	if err != nil {
+		panic(err)
+	}
+
+	log.Info("Listening for public http connections on %v", getTCPPort(listener.Addr()))
+	for {
+		// accept new public connections
+		tcpConn, err := listener.AcceptTCP()
+
+		if err != nil {
+			panic(err)
+		}
+
+		// handle the new connection asynchronously
+		go httpHandler(tcpConn)
+	}
+}
+
+/**
+ * Handles a new http connection from the public internet
+ */
+func httpHandler(tcpConn net.Conn) {
+	// wrap up the connection for logging
+	conn := conn.NewHttp(tcpConn, "pub")
+
+	defer conn.Close()
+	defer func() {
+		// recover from failures
+		if r := recover(); r != nil {
+			conn.Warn("Failed with error %v", r)
+		}
+	}()
+
+	// read out the http request
+	req, err := conn.ReadRequest()
+	if err != nil {
+		panic(err)
+	}
+	conn.Debug("Found hostname %s in request", req.Host)
+
+	tunnel := tunnels.Get("http://" + req.Host)
+
+	if tunnel == nil {
+		conn.Info("Not tunnel found for hostname %s", req.Host)
+                return
+	}
+
+	tunnel.HandlePublicConnection(conn)
+}

+ 131 - 0
server/main.go

@@ -0,0 +1,131 @@
+package server
+
+import (
+	log "code.google.com/p/log4go"
+	"flag"
+	"net"
+	"ngrok"
+	"ngrok/conn"
+	"ngrok/proto"
+	"regexp"
+)
+
+type Options struct {
+	publicPort int
+	proxyPort  int
+	tunnelPort int
+	domain     string
+}
+
+/* GLOBALS */
+var (
+	hostRegex *regexp.Regexp
+	version   string = "0.1"
+	proxyAddr string
+	tunnels   *TunnelManager
+)
+
+func init() {
+	hostRegex = regexp.MustCompile("[H|h]ost: ([^\\(\\);:,<>]+)\n")
+}
+
+func parseArgs() *Options {
+	publicPort := flag.Int("publicport", 80, "Public port")
+	tunnelPort := flag.Int("tunnelport", 2280, "Tunnel port")
+	proxyPort := flag.Int("proxyPort", 0, "Proxy port")
+	domain := flag.String("domain", "ngrok.com", "Domain where the tunnels are hosted")
+
+	flag.Parse()
+
+	return &Options{
+		publicPort: *publicPort,
+		tunnelPort: *tunnelPort,
+		proxyPort:  *proxyPort,
+		domain:     *domain,
+	}
+}
+
+func getTCPPort(addr net.Addr) int {
+	return addr.(*net.TCPAddr).Port
+}
+
+/**
+ * Listens for new control connections from tunnel clients
+ */
+func controlListener(addr *net.TCPAddr, domain string) {
+	// listen for incoming connections
+	listener, err := net.ListenTCP("tcp", addr)
+	if err != nil {
+		panic(err)
+	}
+
+	log.Info("Listening for control connections on %d", getTCPPort(addr))
+	for {
+		tcpConn, err := listener.AcceptTCP()
+		if err != nil {
+			panic(err)
+		}
+
+		NewControl(tcpConn)
+	}
+}
+
+/**
+ * Listens for new proxy connections from tunnel clients
+ */
+func proxyListener(addr *net.TCPAddr) {
+	listener, err := net.ListenTCP("tcp", addr)
+	proxyAddr = listener.Addr().String()
+
+	if err != nil {
+		panic(err)
+	}
+
+	log.Info("Listening for proxy connection on %d", getTCPPort(listener.Addr()))
+	for {
+		tcpConn, err := listener.AcceptTCP()
+		if err != nil {
+			panic(err)
+		}
+
+		conn := conn.NewLogged(tcpConn, "pxy")
+
+		go func() {
+			defer func() {
+				if r := recover(); r != nil {
+					conn.Warn("Failed with error: %v", r)
+					conn.Close()
+				}
+			}()
+
+			var regPxy proto.RegProxyMsg
+			if err = proto.ReadMsgInto(conn, &regPxy); err != nil {
+				panic(err)
+			}
+
+			conn.Info("Registering new proxy for %s", regPxy.Url)
+
+			tunnel := tunnels.Get(regPxy.Url)
+			if tunnel == nil {
+				panic("No tunnel found for: " + regPxy.Url)
+			}
+
+			tunnel.RegisterProxy(conn)
+		}()
+	}
+}
+
+func Main() {
+	ngrok.LogToConsole()
+	done := make(chan int)
+	// parse options
+	opts := parseArgs()
+
+	tunnels = NewTunnelManager(opts.domain)
+
+	go proxyListener(&net.TCPAddr{net.ParseIP("0.0.0.0"), opts.proxyPort})
+	go controlListener(&net.TCPAddr{net.ParseIP("0.0.0.0"), opts.tunnelPort}, opts.domain)
+	go httpListener(&net.TCPAddr{net.ParseIP("0.0.0.0"), opts.publicPort})
+
+	<-done
+}

+ 117 - 0
server/manager.go

@@ -0,0 +1,117 @@
+package server
+
+import (
+	"fmt"
+	cache "github.com/pmylund/go-cache"
+	"math/rand"
+	"net"
+	"strings"
+	"sync"
+	"time"
+)
+
+/**
+ * TunnelManager: Manages a set of tunnels
+ */
+type TunnelManager struct {
+	domain         string
+	tunnels        map[string]*Tunnel
+	domainAffinity *cache.Cache
+	sync.RWMutex
+}
+
+func NewTunnelManager(domain string) *TunnelManager {
+	return &TunnelManager{
+		domain:         domain,
+		tunnels:        make(map[string]*Tunnel),
+		domainAffinity: cache.New(24*time.Hour, time.Minute),
+	}
+}
+
+func (m *TunnelManager) Add(t *Tunnel) {
+	assignTunnel := func(url string) bool {
+		m.Lock()
+		defer m.Unlock()
+
+		if m.tunnels[url] == nil {
+			m.tunnels[url] = t
+			return true
+		}
+
+		return false
+	}
+
+	url := ""
+	switch t.regMsg.Protocol {
+	case "tcp":
+		addr := t.listener.Addr().(*net.TCPAddr)
+		url = fmt.Sprintf("tcp://%s:%d", m.domain, addr.Port)
+		if !assignTunnel(url) {
+			panic("TCP at %s already registered!")
+		}
+		metrics.tcpTunnelMeter.Mark(1)
+
+	case "http":
+		if strings.TrimSpace(t.regMsg.Hostname) != "" {
+			url = fmt.Sprintf("http://%s", t.regMsg.Hostname)
+		} else if strings.TrimSpace(t.regMsg.Subdomain) != "" {
+			url = fmt.Sprintf("http://%s.%s", t.regMsg.Subdomain, m.domain)
+		}
+
+		if url != "" {
+			if !assignTunnel(url) {
+				panic(fmt.Sprintf("The tunnel address %s is already registered!", url))
+			}
+		} else {
+			// try to give the same subdomain back if it's available
+			subdomain, ok := m.domainAffinity.Get(t.regMsg.ClientId)
+			if !ok {
+				subdomain = fmt.Sprintf("%x", rand.Int31())
+			}
+
+			// pick one randomly
+			for {
+				url = fmt.Sprintf("http://%s.%s", subdomain, m.domain)
+				if assignTunnel(url) {
+					break
+				} else {
+					subdomain = fmt.Sprintf("%x", rand.Int31())
+				}
+			}
+
+			// save our choice for later
+			// XXX: this is going to leak memory
+			m.domainAffinity.Set(t.regMsg.ClientId, subdomain, 0)
+		}
+
+	default:
+		panic(t.Error("Unrecognized protocol type %s", t.regMsg.Protocol))
+	}
+
+	t.url = url
+	metrics.tunnelMeter.Mark(1)
+	//metrics.tunnelGauge.Update(int64(len(m.tunnels)))
+
+	switch t.regMsg.OS {
+	case "windows":
+		metrics.windowsCounter.Inc(1)
+	case "linux":
+		metrics.linuxCounter.Inc(1)
+	case "darwin":
+		metrics.osxCounter.Inc(1)
+	default:
+		metrics.otherCounter.Inc(1)
+	}
+}
+
+func (m *TunnelManager) Del(url string) {
+	m.Lock()
+	defer m.Unlock()
+	delete(m.tunnels, url)
+}
+
+func (m *TunnelManager) Get(url string) *Tunnel {
+	m.RLock()
+	defer m.RUnlock()
+	return m.tunnels[url]
+}

+ 77 - 0
server/metrics.go

@@ -0,0 +1,77 @@
+package server
+
+import (
+	log "code.google.com/p/log4go"
+	"encoding/json"
+	gometrics "github.com/rcrowley/go-metrics"
+	"time"
+)
+
+var reportInterval = 30 * time.Second
+
+var metrics struct {
+	windowsCounter gometrics.Counter
+	linuxCounter   gometrics.Counter
+	osxCounter     gometrics.Counter
+	otherCounter   gometrics.Counter
+	/*
+	   bytesInCount gometrics.Counter
+	   bytesOutCount gometrics.Counter
+	*/
+
+	/*
+	   tunnelGauge gometrics.Gauge
+	   tcpTunnelGauge gometrics.Gauge
+	   requestGauge gometrics.Gauge
+	*/
+
+	tunnelMeter        gometrics.Meter
+	tcpTunnelMeter     gometrics.Meter
+	requestMeter       gometrics.Meter
+	lostHeartbeatMeter gometrics.Meter
+
+	requestTimer gometrics.Timer
+}
+
+func init() {
+	metrics.windowsCounter = gometrics.NewCounter()
+	metrics.linuxCounter = gometrics.NewCounter()
+	metrics.osxCounter = gometrics.NewCounter()
+	metrics.otherCounter = gometrics.NewCounter()
+	/*
+	   metrics.bytesInCount = gometrics.NewCounter()
+	   metrics.bytesOutCount = gometrics.NewCounter()
+	*/
+
+	/*
+	   metrics.tunnelGauge = gometrics.NewGauge()
+	   metrics.tcpTunnelGauge = gometrics.NewGauge()
+	   metrics.requestGauge = gometrics.NewGauge()
+	*/
+
+	metrics.tunnelMeter = gometrics.NewMeter()
+	metrics.tcpTunnelMeter = gometrics.NewMeter()
+	metrics.requestMeter = gometrics.NewMeter()
+	metrics.lostHeartbeatMeter = gometrics.NewMeter()
+
+	metrics.requestTimer = gometrics.NewTimer()
+
+	go func() {
+		time.Sleep(reportInterval)
+		log.Info("Server metrics: %s", MetricsJson())
+	}()
+}
+
+func MetricsJson() []byte {
+	buffer, _ := json.Marshal(map[string]interface{}{
+		"windows":            metrics.windowsCounter.Count(),
+		"linux":              metrics.linuxCounter.Count(),
+		"osx":                metrics.osxCounter.Count(),
+		"other":              metrics.otherCounter.Count(),
+		"tunnelMeter.count":  metrics.tunnelMeter.Count(),
+		"tunnelMeter.m1":     metrics.tunnelMeter.Rate1(),
+		"requestMeter.count": metrics.requestMeter.Count(),
+		"requestMeter.m1":    metrics.requestMeter.Rate1(),
+	})
+	return buffer
+}

+ 127 - 0
server/tunnel.go

@@ -0,0 +1,127 @@
+package server
+
+import (
+	"fmt"
+	"net"
+	"ngrok"
+	"ngrok/conn"
+	"ngrok/proto"
+)
+
+/**
+ * Tunnel: A control connection, metadata and proxy connections which
+ *         route public traffic to a firewalled endpoint.
+ */
+type Tunnel struct {
+	regMsg *proto.RegMsg
+
+	// public url
+	url string
+
+	// tcp listener
+	listener *net.TCPListener
+
+	// control connection
+	ctl *Control
+
+	// proxy connections
+	proxies chan conn.Conn
+
+	// logger
+	ngrok.Logger
+}
+
+func newTunnel(msg *proto.RegMsg, ctl *Control) {
+	t := &Tunnel{
+		regMsg:  msg,
+		ctl:     ctl,
+		proxies: make(chan conn.Conn),
+		Logger:  ngrok.NewPrefixLogger(),
+	}
+
+	switch t.regMsg.Protocol {
+	case "tcp":
+		var err error
+		t.listener, err = net.ListenTCP("tcp", &net.TCPAddr{net.ParseIP("0.0.0.0"), 0})
+
+		if err != nil {
+			panic(err)
+		}
+
+		go t.listenTcp(t.listener)
+
+	default:
+	}
+
+	tunnels.Add(t)
+	t.ctl.conn.AddLogPrefix(t.Id())
+	t.AddLogPrefix(t.Id())
+	t.Info("Registered new tunnel")
+	t.ctl.out <- &proto.RegAckMsg{Url: t.url, ProxyAddr: fmt.Sprintf("%s", proxyAddr)}
+
+	//go t.managerThread()
+}
+
+func (t *Tunnel) shutdown() {
+	t.Info("Shutting down")
+	// stop any go routines
+	// close all proxy and public connections
+	// stop any metrics
+	t.ctl.stop <- 1
+}
+
+func (t *Tunnel) Id() string {
+	return t.url
+}
+
+func (t *Tunnel) managerThread() {
+}
+
+/**
+ * Listens for new public tcp connections from the internet.
+ */
+func (t *Tunnel) listenTcp(listener *net.TCPListener) {
+	for {
+		// accept public connections
+		tcpConn, err := listener.AcceptTCP()
+
+		if err != nil {
+			panic(err)
+		}
+
+		conn := conn.NewLogged(tcpConn, "pub")
+		conn.AddLogPrefix(t.Id())
+
+		go func() {
+			defer func() {
+				if r := recover(); r != nil {
+					conn.Warn("Failed with error %v", r)
+				}
+			}()
+			defer conn.Close()
+
+			t.HandlePublicConnection(conn)
+		}()
+	}
+}
+
+func (t *Tunnel) HandlePublicConnection(publicConn conn.Conn) {
+	metrics.requestTimer.Time(func() {
+		metrics.requestMeter.Mark(1)
+
+		t.Debug("Requesting new proxy connection")
+		t.ctl.out <- &proto.ReqProxyMsg{}
+
+		proxyConn := <-t.proxies
+		t.Info("Returning proxy connection %s", proxyConn.Id())
+
+		defer proxyConn.Close()
+		conn.Join(publicConn, proxyConn)
+	})
+}
+
+func (t *Tunnel) RegisterProxy(conn conn.Conn) {
+	t.Info("Registered proxy connection %s", conn.Id())
+	conn.AddLogPrefix(t.Id())
+	t.proxies <- conn
+}

+ 23 - 0
templates/body.html

@@ -0,0 +1,23 @@
+{{ define "body" }}
+
+{{ with $json := handleJson . }}
+    <pre><code class="json">{{ $json.Str }}</code></pre>
+    {{ with $json.Err }}
+        <div class="alert">{{ $json.Err }}</div>
+    {{ end }}
+{{ end }}
+
+{{ with $form := handleForm . }}
+    <h6>Form Params</h6>
+    <ul>
+    {{ range $key, $values := $form }}
+        {{ range $value := $values }}
+            <li>
+                {{ $key }}: {{ $value }}
+            </li>
+        {{ end }}
+    {{ end }}
+    </ul>
+{{ end }}
+
+{{ end }}

+ 105 - 0
templates/page.html

@@ -0,0 +1,105 @@
+<html>
+    <head>
+        <link href="//netdna.bootstrapcdn.com/twitter-bootstrap/2.3.1/css/bootstrap-combined.min.css" rel="stylesheet">
+        <link rel="stylesheet" href="http://yandex.st/highlightjs/7.3/styles/default.min.css">
+        <script src="http://yandex.st/highlightjs/7.3/highlight.min.js"></script>
+        <script src="http://code.jquery.com/jquery-1.9.1.min.js"></script>
+        <script>hljs.initHighlightingOnLoad();</script>
+        <script type="text/javascript">
+            $(function() {
+                $("ul.request.nav a").click(function() {
+                    $(this).closest("div").find(".raw, .headers, .summary").hide();
+                    $(this).closest("div").find("." + $(this).attr("target")).show();
+                    $(this).closest("ul").find("li.active").removeClass("active");
+                    $(this).parent().addClass("active");
+                    return false;
+                });
+            });
+        </script>
+        <style type="text/css">
+            body { margin-top: 50px; }
+            ul.history > li { none; margin-bottom: 20px; padding-bottom: 20px; border-bottom: 1px solid #ccc; }
+            .headers, .raw { display: none; }
+        </style>
+    </head>
+
+    <body>
+        <div class="container">
+            <div class="navbar navbar-inverse navbar-fixed-top">
+                <div class="navbar-inner">
+                    <div class="container">
+                        <a class="brand" href="#">ngrok</a>
+                        <ul class="nav">
+                            <li class="active"><a href="#">Inbound Requests</a></li>
+                            <li><a href="#">Outbound Requests</a></li>
+                            <li><a href="#">Configuration</a></li>
+                        </ul>
+                    </div>
+                </div>
+            </div>
+            <ul class="history unstyled">
+            {{ range .GetHistory }}
+                <li>
+                    <div class="row">
+                        <div class="span6">
+                            {{ with .GetRequest }}
+                                <h3>{{ .Method }} {{ .URL.Path }}</h3>
+                                <ul class="request nav nav-pills">
+                                    <li class="active"><a target="summary" href="#">Summary</a></li>
+                                    <li><a href="#" target="headers">Headers</a></li>
+                                    <li><a href="#" target="raw">Raw</a></li>
+                                </ul>
+
+                                <div class="raw">
+                                    {{ with $raw := dumpRequest . }}
+                                        <pre><code class="http">{{ $raw }}</code></pre>
+                                    {{ end }}
+                                </div>
+
+                                <div class="headers">
+                                    {{ with .Header }}
+                                        <h6>Headers</h6>
+                                        <table class="table">
+                                        {{ range $key, $value := .}}
+                                            <tr>
+                                                <th> {{ $key }} </th>
+                                                <td> {{ $value }} </td>
+                                            </tr>
+                                        {{ end }}
+                                        </table>
+                                    {{ end }}
+                                </div>
+
+                                <div class="summary">
+                                    {{ with .URL.Query }}
+                                        <h6>Query Params</h6>
+                                        <table class="table">
+                                        {{ range $key, $value := .}}
+                                            <tr>
+                                                <th> {{ $key }} </th>
+                                                <td> {{ $value }} </td>
+                                            </tr>
+                                        {{ end }}
+                                        </table>
+                                    {{ end }}
+
+                                    {{ template "body" . }}
+                                </div>
+                            {{ end }}
+                        </div>
+                        <div class="span6">
+                            {{ with .GetResponse }}
+                                <h3>{{ .Status }}</h3>
+
+                                {{ with $raw := dumpResponse . }}
+                                    <pre><code class="http">{{ $raw }}</code></pre>
+                                {{ end }}
+                            {{ end }}
+                        </div>
+                    </div>
+                </li>
+            {{ end }}
+            </ul>
+        </div>
+    </body>
+</html>