Browse Source

refactor to make protocol parsing more generic in preparation to support other non-http protocols, make ui more MVC-oriented

add functionality to replay http requests
Alan Shreve 12 years ago
parent
commit
53c1b2d5b3

+ 1 - 1
src/ngrok/client/cli.go

@@ -26,7 +26,7 @@ type Options struct {
 
 func fail(msg string, args ...interface{}) {
 	//log.Error(msg, args..)
-	fmt.Printf(msg + "\n", args...)
+	fmt.Printf(msg+"\n", args...)
 	flag.PrintDefaults()
 	os.Exit(1)
 }

+ 0 - 96
src/ngrok/client/history.go

@@ -1,96 +0,0 @@
-package client
-
-import (
-	"container/list"
-	"net/http"
-	"time"
-)
-
-type RequestHistoryEntry struct {
-	req      *http.Request
-	resp     *http.Response
-	start    time.Time
-	duration time.Duration
-}
-
-type RequestHistory struct {
-	maxSize    int
-	reqToEntry map[*http.Request]*RequestHistoryEntry
-	reqs       chan *http.Request
-	resps      chan *http.Response
-	history    *list.List
-	onChange   func([]*RequestHistoryEntry)
-	metrics    *ClientMetrics
-}
-
-func NewRequestHistory(maxSize int, metrics *ClientMetrics, onChange func([]*RequestHistoryEntry)) *RequestHistory {
-	rh := &RequestHistory{
-		maxSize:    maxSize,
-		reqToEntry: make(map[*http.Request]*RequestHistoryEntry),
-		reqs:       make(chan *http.Request),
-		resps:      make(chan *http.Response),
-		history:    list.New(),
-		onChange:   onChange,
-		metrics:    metrics,
-	}
-
-	go func() {
-		for {
-			select {
-			case req := <-rh.reqs:
-				rh.addRequest(req)
-
-			case resp := <-rh.resps:
-				rh.addResponse(resp)
-			}
-		}
-	}()
-
-	return rh
-}
-
-func (rh *RequestHistory) addRequest(req *http.Request) {
-	rh.metrics.reqMeter.Mark(1)
-	if rh.history.Len() >= rh.maxSize {
-		entry := rh.history.Remove(rh.history.Back()).(*RequestHistoryEntry)
-		delete(rh.reqToEntry, entry.req)
-	}
-
-	entry := &RequestHistoryEntry{req: req, start: time.Now()}
-	rh.reqToEntry[req] = entry
-	rh.history.PushFront(entry)
-	rh.onChange(rh.copy())
-}
-
-func (rh *RequestHistory) addResponse(resp *http.Response) {
-	if entry, ok := rh.reqToEntry[resp.Request]; ok {
-		entry.duration = time.Since(entry.start)
-		rh.metrics.reqTimer.Update(entry.duration)
-
-		entry.resp = resp
-		rh.onChange(rh.copy())
-	} else {
-		// XXX: log warning instead of panic
-		panic("no request for response!")
-	}
-}
-
-func (rh *RequestHistory) copy() []*RequestHistoryEntry {
-	entries := make([]*RequestHistoryEntry, rh.history.Len())
-	i := 0
-	for e := rh.history.Front(); e != nil; e = e.Next() {
-		// force a copy
-		entry := *(e.Value.(*RequestHistoryEntry))
-		entries[i] = &entry
-		i++
-	}
-	return entries
-}
-
-func (rhe *RequestHistoryEntry) GetRequest() *http.Request {
-	return rhe.req
-}
-
-func (rhe *RequestHistoryEntry) GetResponse() *http.Response {
-	return rhe.resp
-}

+ 53 - 60
src/ngrok/client/main.go

@@ -2,13 +2,15 @@ package client
 
 import (
 	log "code.google.com/p/log4go"
-	"crypto/rand"
 	"fmt"
+	"io/ioutil"
 	"net"
 	"ngrok/client/ui"
 	"ngrok/conn"
 	nlog "ngrok/log"
+	"ngrok/msg"
 	"ngrok/proto"
+	"ngrok/util"
 	"runtime"
 	"time"
 )
@@ -39,7 +41,7 @@ func connect(addr string, typ string) (c conn.Conn, err error) {
 /**
  * Establishes and manages a tunnel proxy connection with the server
  */
-func proxy(proxyAddr string, s *State) {
+func proxy(proxyAddr string, s *State, ctl *ui.Controller) {
 	start := time.Now()
 	remoteConn, err := connect(proxyAddr, "pxy")
 	if err != nil {
@@ -47,7 +49,7 @@ func proxy(proxyAddr string, s *State) {
 	}
 
 	defer remoteConn.Close()
-	err = proto.WriteMsg(remoteConn, &proto.RegProxyMsg{Url: s.publicUrl})
+	err = msg.WriteMsg(remoteConn, &msg.RegProxyMsg{Url: s.publicUrl})
 
 	if err != nil {
 		panic(err)
@@ -63,33 +65,29 @@ func proxy(proxyAddr string, s *State) {
 	m := s.metrics
 	m.proxySetupTimer.Update(time.Since(start))
 	m.connMeter.Mark(1)
-	s.Update()
+	ctl.Update(s)
 	m.connTimer.Time(func() {
-		if s.opts.protocol == "http" {
-			teeConn := conn.NewTee(remoteConn)
-			remoteConn = teeConn
-			go conn.ParseHttp(teeConn, s.history.reqs, s.history.resps)
-		}
+		localConn := s.protocol.WrapConn(localConn)
 		bytesIn, bytesOut := conn.Join(localConn, remoteConn)
 		m.bytesIn.Update(bytesIn)
 		m.bytesOut.Update(bytesOut)
 		m.bytesInCount.Inc(bytesIn)
 		m.bytesOutCount.Inc(bytesOut)
 	})
-	s.Update()
+	ctl.Update(s)
 }
 
 /**
  * Establishes and manages a tunnel control connection with the server
  */
-func control(s *State) {
+func control(s *State, ctl *ui.Controller) {
 	defer func() {
 		if r := recover(); r != nil {
 			log.Error("Recovering from failure %v, attempting to reconnect to server after 10 seconds . . .", r)
 			s.status = "reconnecting"
-			s.Update()
+			ctl.Update(s)
 			time.Sleep(10 * time.Second)
-			go control(s)
+			go control(s, ctl)
 		}
 	}()
 
@@ -101,7 +99,7 @@ func control(s *State) {
 	defer conn.Close()
 
 	// register with the server
-	err = proto.WriteMsg(conn, &proto.RegMsg{
+	err = msg.WriteMsg(conn, &msg.RegMsg{
 		Protocol:  s.opts.protocol,
 		OS:        runtime.GOOS,
 		Hostname:  s.opts.hostname,
@@ -114,14 +112,14 @@ func control(s *State) {
 	}
 
 	// wait for the server to ack our register
-	var regAck proto.RegAckMsg
-	if err = proto.ReadMsgInto(conn, &regAck); err != nil {
+	var regAck msg.RegAckMsg
+	if err = msg.ReadMsgInto(conn, &regAck); err != nil {
 		panic(err)
 	}
 
 	if regAck.Error != "" {
 		emsg := fmt.Sprintf("Server failed to allocate tunnel: %s", regAck.Error)
-		s.ui.Cmds <- ui.Command{ui.QUIT, emsg}
+		ctl.Cmds <- ui.Command{ui.QUIT, emsg}
 		return
 	}
 
@@ -130,36 +128,25 @@ func control(s *State) {
 	//state.version = regAck.Version
 	s.publicUrl = regAck.Url
 	s.status = "online"
-	s.Update()
+	ctl.Update(s)
 
 	// main control loop
 	for {
-		var msg proto.Message
-		if msg, err = proto.ReadMsg(conn); err != nil {
+		var m msg.Message
+		if m, err = msg.ReadMsg(conn); err != nil {
 			panic(err)
 		}
 
-		switch msg.GetType() {
+		switch m.GetType() {
 		case "ReqProxyMsg":
-			go proxy(regAck.ProxyAddr, s)
+			go proxy(regAck.ProxyAddr, s, ctl)
 
 		case "PingMsg":
-			proto.WriteMsg(conn, &proto.PongMsg{})
+			msg.WriteMsg(conn, &msg.PongMsg{})
 		}
 	}
 }
 
-// create a random identifier for this client
-func mkid() string {
-	b := make([]byte, 8)
-	_, err := rand.Read(b)
-	if err != nil {
-		panic(fmt.Sprintf("Couldn't create random client identifier, %v", err))
-	}
-	return fmt.Sprintf("%x", b)
-
-}
-
 func Main() {
 	// XXX: should do this only if they ask us too
 	nlog.LogToFile()
@@ -167,18 +154,12 @@ func Main() {
 	// parse options
 	opts := parseArgs()
 
-	// init terminal, http UI
-	termView := ui.NewTerm()
-	httpView := ui.NewHttp(9999)
-
 	// init client state
 	s := &State{
-		// unique client id
-		id: mkid(),
+		status: "connecting",
 
-		// ui communication channels
-		ui: ui.NewUi(termView, httpView),
-		//ui: ui.NewUi(httpView),
+		// unique client id
+		id: util.RandId(),
 
 		// command-line options
 		opts: opts,
@@ -187,39 +168,51 @@ func Main() {
 		metrics: NewClientMetrics(),
 	}
 
-	// request history
-	// XXX: don't use a callback, use a channel
-	// and define it inline in the struct
-	s.history = NewRequestHistory(opts.historySize, s.metrics, func(history []*RequestHistoryEntry) {
-		s.historyEntries = history
-		s.Update()
-	})
+	switch opts.protocol {
+	case "http":
+		s.protocol = proto.NewHttp()
+	case "tcp":
+		s.protocol = proto.NewTcp()
+	}
 
-	// set initial ui state
-	s.status = "connecting"
-	s.protocol = opts.protocol
-	s.Update()
+	// init ui
+	ctl := ui.NewController()
+	ui.NewTermView(ctl)
+	ui.NewWebView(ctl, s, 9999)
 
-	go control(s)
+	go control(s, ctl)
 
 	quitMessage := ""
-	s.ui.Wait.Add(1)
+	ctl.Wait.Add(1)
 	go func() {
-		defer s.ui.Wait.Done()
+		defer ctl.Wait.Done()
 		for {
 			select {
-			case cmd := <-s.ui.Cmds:
+			case cmd := <-ctl.Cmds:
 				switch cmd.Code {
 				case ui.QUIT:
 					quitMessage = cmd.Payload.(string)
 					s.stopping = true
-					s.Update()
+					ctl.Update(s)
 					return
+				case ui.REPLAY:
+					go func() {
+						payload := cmd.Payload.([]byte)
+						localConn, err := connect(s.opts.localaddr, "prv")
+						if err != nil {
+							log.Warn("Failed to open private leg %s: %v", s.opts.localaddr, err)
+							return
+						}
+						//defer localConn.Close()
+						localConn = s.protocol.WrapConn(localConn)
+						localConn.Write(payload)
+						ioutil.ReadAll(localConn)
+					}()
 				}
 			}
 		}
 	}()
 
-	s.ui.Wait.Wait()
+	ctl.Wait.Wait()
 	fmt.Println(quitMessage)
 }

+ 0 - 6
src/ngrok/client/metrics.go

@@ -14,9 +14,6 @@ type ClientMetrics struct {
 	connGauge       metrics.Gauge
 	connMeter       metrics.Meter
 	connTimer       metrics.Timer
-	reqGauge        metrics.Gauge
-	reqMeter        metrics.Meter
-	reqTimer        metrics.Timer
 	proxySetupTimer metrics.Timer
 	bytesIn         metrics.Histogram
 	bytesOut        metrics.Histogram
@@ -29,9 +26,6 @@ func NewClientMetrics() *ClientMetrics {
 		connGauge:       metrics.NewGauge(),
 		connMeter:       metrics.NewMeter(),
 		connTimer:       metrics.NewTimer(),
-		reqGauge:        metrics.NewGauge(),
-		reqMeter:        metrics.NewMeter(),
-		reqTimer:        metrics.NewTimer(),
 		proxySetupTimer: metrics.NewTimer(),
 		bytesIn:         metrics.NewHistogram(metrics.NewExpDecaySample(sampleSize, sampleAlpha)),
 		bytesOut:        metrics.NewHistogram(metrics.NewExpDecaySample(sampleSize, sampleAlpha)),

+ 10 - 30
src/ngrok/client/state.go

@@ -2,50 +2,34 @@ package client
 
 import (
 	metrics "github.com/rcrowley/go-metrics"
-	"ngrok/client/ui"
+	"ngrok/proto"
 )
 
 // client state
 type State struct {
 	id        string
-	ui        *ui.Ui
 	publicUrl string
-	protocol  string
-	history   *RequestHistory
+	protocol  proto.Protocol
 	opts      *Options
 	metrics   *ClientMetrics
 
 	// just for UI purposes
-	status         string
-	historyEntries []*RequestHistoryEntry
-	stopping       bool
+	status   string
+	stopping bool
 }
 
 // implement client.ui.State
-func (s State) GetVersion() string   { return "" }
-func (s State) GetPublicUrl() string { return s.publicUrl }
-func (s State) GetLocalAddr() string { return s.opts.localaddr }
-func (s State) GetStatus() string    { return s.status }
-func (s State) GetProtocol() string  { return s.protocol }
-func (s State) GetHistory() []ui.HttpRequest {
-	// go sucks
-	historyEntries := make([]ui.HttpRequest, len(s.historyEntries))
-
-	for i, entry := range s.historyEntries {
-		historyEntries[i] = entry
-	}
-	return historyEntries
-}
-func (s State) IsStopping() bool { return s.stopping }
+func (s State) GetVersion() string          { return "" }
+func (s State) GetPublicUrl() string        { return s.publicUrl }
+func (s State) GetLocalAddr() string        { return s.opts.localaddr }
+func (s State) GetStatus() string           { return s.status }
+func (s State) GetProtocol() proto.Protocol { return s.protocol }
+func (s State) IsStopping() bool            { return s.stopping }
 
 func (s State) GetConnectionMetrics() (metrics.Meter, metrics.Timer) {
 	return s.metrics.connMeter, s.metrics.connTimer
 }
 
-func (s State) GetRequestMetrics() (metrics.Meter, metrics.Timer) {
-	return s.metrics.reqMeter, s.metrics.reqTimer
-}
-
 func (s State) GetBytesInMetrics() (metrics.Counter, metrics.Histogram) {
 	return s.metrics.bytesInCount, s.metrics.bytesIn
 }
@@ -53,7 +37,3 @@ func (s State) GetBytesInMetrics() (metrics.Counter, metrics.Histogram) {
 func (s State) GetBytesOutMetrics() (metrics.Counter, metrics.Histogram) {
 	return s.metrics.bytesOutCount, s.metrics.bytesOut
 }
-
-func (s *State) Update() {
-	s.ui.Updates.In() <- *s
-}

+ 0 - 41
src/ngrok/client/ui/broadcast.go

@@ -1,41 +0,0 @@
-package ui
-
-type Broadcast struct {
-	listeners []chan State
-	reg       chan (chan State)
-	in        chan State
-}
-
-func NewBroadcast() *Broadcast {
-	b := &Broadcast{
-		listeners: make([]chan State, 0),
-		reg:       make(chan (chan State)),
-		in:        make(chan State),
-	}
-
-	go func() {
-		for {
-			select {
-			case l := <-b.reg:
-				b.listeners = append(b.listeners, l)
-
-			case item := <-b.in:
-				for _, l := range b.listeners {
-					l <- item
-				}
-			}
-		}
-	}()
-
-	return b
-}
-
-func (b *Broadcast) In() chan State {
-	return b.in
-}
-
-func (b *Broadcast) Reg() chan State {
-	listener := make(chan State)
-	b.reg <- listener
-	return listener
-}

+ 44 - 0
src/ngrok/client/ui/controller.go

@@ -0,0 +1,44 @@
+/* The controller in the MVC
+ */
+
+package ui
+
+import (
+	"ngrok/util"
+	"sync"
+)
+
+type Command struct {
+	Code    int
+	Payload interface{}
+}
+
+const (
+	QUIT = iota
+	REPLAY
+)
+
+type Controller struct {
+	// the model sends updates through this broadcast channel
+	Updates *util.Broadcast
+
+	// all views put any commands into this channel
+	Cmds chan Command
+
+	// all threads may add themself to this to wait for clean shutdown
+	Wait *sync.WaitGroup
+}
+
+func NewController() *Controller {
+	ctl := &Controller{
+		Updates: util.NewBroadcast(),
+		Cmds:    make(chan Command),
+		Wait:    new(sync.WaitGroup),
+	}
+
+	return ctl
+}
+
+func (ctl *Controller) Update(state State) {
+	ctl.Updates.In() <- state
+}

+ 0 - 158
src/ngrok/client/ui/http.go

@@ -1,158 +0,0 @@
-// interative http client user interface
-package ui
-
-import (
-	"bytes"
-	"encoding/json"
-	"fmt"
-	"html/template"
-	"io"
-	"io/ioutil"
-	"net/http"
-	"net/http/httputil"
-	"net/url"
-)
-
-type RepeatableReader struct {
-	io.Reader
-	buffer []byte
-}
-
-func NewRepeatableReader(rd io.ReadCloser) *RepeatableReader {
-	buffer := new(bytes.Buffer)
-	buffer.ReadFrom(rd)
-	return &RepeatableReader{
-		bytes.NewBuffer(buffer.Bytes()),
-		buffer.Bytes(),
-	}
-}
-
-func (rr *RepeatableReader) Read(b []byte) (n int, err error) {
-	n, err = rr.Reader.Read(b)
-
-	if err == io.EOF {
-		rr.Reader = bytes.NewBuffer(rr.buffer)
-	}
-
-	return n, err
-}
-
-func (rr *RepeatableReader) Close() error {
-	return nil
-}
-
-type Http struct {
-	ui   *Ui
-	port int
-}
-
-func NewHttp(port int) *Http {
-	return &Http{port: port}
-}
-
-func (h *Http) SetUi(ui *Ui) {
-	h.ui = ui
-	go h.run()
-}
-
-func (h *Http) run() {
-	// open channels for incoming application state changes
-	// and broadbasts
-
-	updates := h.ui.Updates.Reg()
-	var s State
-	go func() {
-		for {
-			select {
-			case obj := <-updates:
-				s = obj.(State)
-			}
-		}
-	}()
-
-	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
-		funcMap := template.FuncMap{
-                        "classForStatus": func(status string) string {
-                            switch status[0] {
-                                case '2':
-                                    return "text-info"
-                                case '3':
-                                    return "muted"
-                                case '4':
-                                    return "text-warning"
-                                case '5':
-                                    return "text-error"
-                            }
-                            return ""
-                        },
-			"dumpResponse": func(resp *http.Response) (interface{}, error) {
-				b, err := httputil.DumpResponse(resp, false)
-				body := new(bytes.Buffer)
-				body.ReadFrom(resp.Body)
-				return string(b) + string(body.Bytes()), err
-			},
-			"dumpRequest": func(req *http.Request) (interface{}, error) {
-				b, err := httputil.DumpRequest(req, false)
-				body := new(bytes.Buffer)
-				body.ReadFrom(req.Body)
-				return string(b) + string(body.Bytes()), err
-			},
-			"handleForm": func(req *http.Request) (values interface{}, err error) {
-				if req.Header.Get("Content-Type") != "application/x-www-form-urlencoded" {
-					return
-				}
-
-				b, err := ioutil.ReadAll(req.Body)
-				if err != nil {
-					return
-				}
-
-				values, err = url.ParseQuery(string(b))
-				return
-			},
-			"handleJson": func(req *http.Request) interface{} {
-				if req.Header.Get("Content-Type") != "application/json" {
-					return nil
-				}
-
-				src := new(bytes.Buffer)
-				dst := new(bytes.Buffer)
-				src.ReadFrom(req.Body)
-				err := json.Indent(dst, src.Bytes(), "", "    ")
-
-				retval := struct {
-					Str string
-					Err error
-				}{
-					string(dst.Bytes()),
-					err,
-				}
-
-				if err != nil {
-					retval.Str = string(src.Bytes())
-				}
-
-				return retval
-			},
-		}
-
-		tmpl := template.Must(
-			template.New("page.html").Funcs(funcMap).ParseFiles("./templates/page.html", "./templates/body.html"))
-
-		for _, htxn := range s.GetHistory() {
-			req, resp := htxn.GetRequest(), htxn.GetResponse()
-
-			req.Body = NewRepeatableReader(req.Body)
-			if resp != nil && resp.Body != nil {
-				resp.Body = NewRepeatableReader(resp.Body)
-			}
-		}
-
-		// write the response
-		if err := tmpl.Execute(w, s); err != nil {
-			panic(err)
-		}
-	})
-
-	http.ListenAndServe(fmt.Sprintf(":%d", h.port), nil)
-}

+ 2 - 9
src/ngrok/client/ui/interface.go

@@ -2,24 +2,17 @@ package ui
 
 import (
 	metrics "github.com/rcrowley/go-metrics"
-	"net/http"
+	"ngrok/proto"
 )
 
-type HttpRequest interface {
-	GetRequest() *http.Request
-	GetResponse() *http.Response
-}
-
 type State interface {
 	GetVersion() string
 	GetPublicUrl() string
 	GetLocalAddr() string
 	GetStatus() string
-	GetProtocol() string
-	GetHistory() []HttpRequest
+	GetProtocol() proto.Protocol
 	IsStopping() bool
 	GetConnectionMetrics() (metrics.Meter, metrics.Timer)
-	GetRequestMetrics() (metrics.Meter, metrics.Timer)
 	GetBytesInMetrics() (metrics.Counter, metrics.Histogram)
 	GetBytesOutMetrics() (metrics.Counter, metrics.Histogram)
 }

+ 31 - 31
src/ngrok/client/ui/terminal.go

@@ -35,36 +35,34 @@ func printf(x, y int, arg0 string, args ...interface{}) {
 	printfAttr(x, y, fgColor, arg0, args...)
 }
 
-type Term struct {
-	ui             *Ui
+type TermView struct {
+	ctl            *Controller
 	statusColorMap map[string]termbox.Attribute
-	updates        chan (State)
+	updates        chan interface{}
 }
 
-func NewTerm() *Term {
-	return &Term{
+func NewTermView(ctl *Controller) *TermView {
+	t := &TermView{
+		ctl:     ctl,
+		updates: ctl.Updates.Reg(),
 		statusColorMap: map[string]termbox.Attribute{
 			"connecting":   termbox.ColorCyan,
 			"reconnecting": termbox.ColorRed,
 			"online":       termbox.ColorGreen,
 		},
-		updates: make(chan State),
 	}
-}
 
-func (t *Term) SetUi(ui *Ui) {
-	t.ui = ui
 	go t.run()
+	return t
 }
 
-func (t *Term) run() {
-	// make sure we shut down cleanly
-	t.ui.Wait.Add(1)
-	defer t.ui.Wait.Done()
+func (t *TermView) run() {
+	// XXX: clean this up? maybe Term should have its own waitgroup for
+	// both run and draw
 
-	// open channels for incoming application state changes
-	// and broadbasts
-	t.updates = t.ui.Updates.Reg()
+	// make sure we shut down cleanly
+	t.ctl.Wait.Add(1)
+	defer t.ctl.Wait.Done()
 
 	// init/close termbox library
 	termbox.Init()
@@ -75,13 +73,13 @@ func (t *Term) run() {
 	t.draw()
 }
 
-func (t *Term) draw() {
+func (t *TermView) draw() {
 	var state State
 	for {
 		select {
 		case newState := <-t.updates:
 			if newState != nil {
-				state = newState
+				state = newState.(State)
 			}
 
 			if state == nil {
@@ -106,7 +104,7 @@ func (t *Term) draw() {
 
 			printfAttr(0, 2, t.statusColorMap[state.GetStatus()], "%-30s%s", "Tunnel Status", state.GetStatus())
 			printf(0, 3, "%-30s%s", "Version", state.GetVersion())
-			printf(0, 4, "%-30s%s", "Protocol", state.GetProtocol())
+			printf(0, 4, "%-30s%s", "Protocol", state.GetProtocol().GetName())
 			printf(0, 5, "%-30s%s -> %s", "Forwarding", state.GetPublicUrl(), state.GetLocalAddr())
 			printf(0, 6, "%-30s%s", "HTTP Dashboard", "http://127.0.0.1:9999")
 
@@ -114,32 +112,34 @@ func (t *Term) draw() {
 			printf(0, 7, "%-30s%d", "# Conn", connMeter.Count())
 			printf(0, 8, "%-30s%.2fms", "Avg Conn Time", connTimer.Mean()/msec)
 
-			if state.GetProtocol() == "http" {
-				printf(0, 10, "HTTP Requests")
-				printf(0, 11, "-------------")
-				for i, http := range state.GetHistory() {
-					req := http.GetRequest()
-					resp := http.GetResponse()
-					printf(0, 12+i, "%s %v", req.Method, req.URL)
-					if resp != nil {
-						printf(30, 12+i, "%s", resp.Status)
+			/*
+				if state.GetProtocol() == "http" {
+					printf(0, 10, "HTTP Requests")
+					printf(0, 11, "-------------")
+					for i, http := range state.GetHistory() {
+						req := http.GetRequest()
+						resp := http.GetResponse()
+						printf(0, 12+i, "%s %v", req.Method, req.URL)
+						if resp != nil {
+							printf(30, 12+i, "%s", resp.Status)
+						}
 					}
 				}
-			}
+			*/
 
 			termbox.Flush()
 		}
 	}
 }
 
-func (t *Term) input() {
+func (t *TermView) input() {
 	for {
 		ev := termbox.PollEvent()
 		switch ev.Type {
 		case termbox.EventKey:
 			switch ev.Key {
 			case termbox.KeyCtrlC:
-				t.ui.Cmds <- Command{QUIT, ""}
+				t.ctl.Cmds <- Command{QUIT, ""}
 				return
 			}
 

+ 0 - 43
src/ngrok/client/ui/ui.go

@@ -1,43 +0,0 @@
-package ui
-
-import (
-	"sync"
-)
-
-type Command struct {
-	Code    int
-	Payload interface{}
-}
-
-const (
-	QUIT = iota
-)
-
-type View interface {
-	SetUi(*Ui)
-}
-
-type Ui struct {
-	// the model always updates
-	Updates *Broadcast
-
-	// all views put their commands into this channel
-	Cmds chan Command
-
-	// all threads may add themself to this to wait for clean shutdown
-	Wait *sync.WaitGroup
-}
-
-func NewUi(views ...View) *Ui {
-	ui := &Ui{
-		Updates: NewBroadcast(),
-		Cmds:    make(chan Command),
-		Wait:    new(sync.WaitGroup),
-	}
-
-	for _, v := range views {
-		v.SetUi(ui)
-	}
-
-	return ui
-}

+ 26 - 0
src/ngrok/client/ui/web.go

@@ -0,0 +1,26 @@
+// interative web user interface
+package ui
+
+import (
+	"fmt"
+	"net/http"
+	"ngrok/proto"
+)
+
+type WebView struct{}
+
+func NewWebView(ctl *Controller, state State, port int) *WebView {
+	w := &WebView{}
+
+	switch p := state.GetProtocol().(type) {
+	case *proto.Http:
+		NewWebHttpView(ctl, p)
+	}
+
+	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
+		http.Redirect(w, r, "/http/in", 302)
+	})
+
+	go http.ListenAndServe(fmt.Sprintf(":%d", port), nil)
+	return w
+}

+ 162 - 0
src/ngrok/client/ui/webhttp.go

@@ -0,0 +1,162 @@
+// interative web user interface
+package ui
+
+import (
+	"bytes"
+	"encoding/json"
+	"html/template"
+	"io/ioutil"
+	"net/http"
+	"net/http/httputil"
+	"net/url"
+	"ngrok/proto"
+	"ngrok/util"
+)
+
+func readBody(r *http.Request) ([]byte, error) {
+	buf := new(bytes.Buffer)
+	_, err := buf.ReadFrom(r.Body)
+	r.Body = ioutil.NopCloser(buf)
+	return buf.Bytes(), err
+}
+
+type WebHttpTxn struct {
+	Id string
+	*proto.HttpTxn
+}
+
+type WebHttpView struct {
+	ctl          *Controller
+	httpProto    *proto.Http
+	HttpRequests *util.Ring
+	idToTxn      map[string]*WebHttpTxn
+}
+
+func NewWebHttpView(ctl *Controller, proto *proto.Http) *WebHttpView {
+	w := &WebHttpView{
+		ctl:          ctl,
+		httpProto:    proto,
+		idToTxn:      make(map[string]*WebHttpTxn),
+		HttpRequests: util.NewRing(20),
+	}
+	go w.update()
+	w.register()
+	return w
+}
+
+func (whv *WebHttpView) update() {
+	// open channels for incoming http state changes
+	// and broadbasts
+	txnUpdates := whv.httpProto.Txns.Reg()
+	for {
+		select {
+		case txn := <-txnUpdates:
+			// XXX: it's not safe for proto.Http and this code
+			// to be accessing txn and txn.(req/resp) without synchronization
+			htxn := txn.(*proto.HttpTxn)
+
+			// XXX: golang, why do I have to do this to make DumpRequestOut work later?
+			htxn.Req.URL.Scheme = "http"
+
+			if htxn.Resp == nil {
+				whtxn := &WebHttpTxn{Id: util.RandId(), HttpTxn: htxn}
+				// XXX: unsafe map access from multiple go routines
+				whv.idToTxn[whtxn.Id] = whtxn
+				// XXX: use return value to delete from map so we don't leak memory
+				whv.HttpRequests.Add(whtxn)
+			}
+		}
+	}
+}
+
+func (h *WebHttpView) register() {
+	http.HandleFunc("/http/in/replay", func(w http.ResponseWriter, r *http.Request) {
+		r.ParseForm()
+		txnid := r.Form.Get("txnid")
+		if txn, ok := h.idToTxn[txnid]; ok {
+			bodyBytes, err := httputil.DumpRequestOut(txn.Req, true)
+			if err != nil {
+				panic(err)
+			}
+			h.ctl.Cmds <- Command{REPLAY, bodyBytes}
+			w.Write([]byte(http.StatusText(200)))
+		} else {
+			// XXX: 400
+			http.NotFound(w, r)
+		}
+	})
+
+	http.HandleFunc("/http/in", func(w http.ResponseWriter, r *http.Request) {
+		funcMap := template.FuncMap{
+			"classForStatus": func(status string) string {
+				switch status[0] {
+				case '2':
+					return "text-info"
+				case '3':
+					return "muted"
+				case '4':
+					return "text-warning"
+				case '5':
+					return "text-error"
+				}
+				return ""
+			},
+			"dumpResponse": func(resp *http.Response) (interface{}, error) {
+				b, err := httputil.DumpResponse(resp, true)
+				return string(b), err
+			},
+			"dumpRequest": func(req *http.Request) (interface{}, error) {
+				b, err := httputil.DumpRequestOut(req, true)
+				return string(b), err
+			},
+			"handleForm": func(req *http.Request) (values interface{}, err error) {
+				if req.Header.Get("Content-Type") != "application/x-www-form-urlencoded" {
+					return
+				}
+
+				b, err := readBody(req)
+				if err != nil {
+					return
+				}
+
+				values, err = url.ParseQuery(string(b))
+				return
+			},
+			"handleJson": func(req *http.Request) interface{} {
+				if req.Header.Get("Content-Type") != "application/json" {
+					return nil
+				}
+
+				raw, err := readBody(req)
+				if err != nil {
+					panic(err)
+				}
+
+				pretty := new(bytes.Buffer)
+				err = json.Indent(pretty, raw, "", "    ")
+
+				retval := struct {
+					Str string
+					Err error
+				}{
+					string(pretty.Bytes()),
+					err,
+				}
+
+				if err != nil {
+					retval.Str = string(raw)
+				}
+
+				return retval
+			},
+		}
+
+		tmpl := template.Must(
+			template.New("page.html").Funcs(funcMap).ParseFiles("./templates/page.html", "./templates/body.html"))
+
+		// write the response
+		if err := tmpl.Execute(w, h); err != nil {
+			panic(err)
+		}
+	})
+}

+ 0 - 42
src/ngrok/conn/http.go

@@ -1,42 +0,0 @@
-package conn
-
-import (
-	"net/http"
-	"net/http/httputil"
-)
-
-func ParseHttp(tee *Tee, reqs chan *http.Request, resps chan *http.Response) {
-	lastReq := make(chan *http.Request)
-
-	go func() {
-		for {
-			req, err := http.ReadRequest(tee.ReadBuffer())
-			if err != nil {
-				// no more requests to be read, we're done
-				break
-			}
-			lastReq <- req
-			// make sure we read the body of the request so that
-			// we don't block the reader 
-			_, _ = httputil.DumpRequest(req, true)
-			reqs <- req
-		}
-	}()
-
-	go func() {
-		for {
-			req := <-lastReq
-			resp, err := http.ReadResponse(tee.WriteBuffer(), req)
-			if err != nil {
-				tee.Warn("Error reading response from server: %v", err)
-				// no more responses to be read, we're done
-				break
-			}
-			// make sure we read the body of the response so that
-			// we don't block the writer 
-			_, _ = httputil.DumpResponse(resp, true)
-			resps <- resp
-		}
-	}()
-
-}

+ 1 - 1
src/ngrok/proto/conn.go → src/ngrok/msg/conn.go

@@ -1,4 +1,4 @@
-package proto
+package msg
 
 import (
 	"encoding/binary"

+ 1 - 1
src/ngrok/proto/msg.go → src/ngrok/msg/msg.go

@@ -1,4 +1,4 @@
-package proto
+package msg
 
 import (
 	"encoding/json"

+ 1 - 1
src/ngrok/proto/pack.go → src/ngrok/msg/pack.go

@@ -1,4 +1,4 @@
-package proto
+package msg
 
 import (
 	"encoding/json"

+ 82 - 0
src/ngrok/proto/http.go

@@ -0,0 +1,82 @@
+package proto
+
+import (
+	metrics "github.com/rcrowley/go-metrics"
+	"net/http"
+	"net/http/httputil"
+	"ngrok/conn"
+	"ngrok/util"
+	"time"
+)
+
+type HttpTxn struct {
+	Req      *http.Request
+	Resp     *http.Response
+	Start    time.Time
+	Duration time.Duration
+}
+
+type Http struct {
+	Txns     *util.Broadcast
+	reqGauge metrics.Gauge
+	reqMeter metrics.Meter
+	reqTimer metrics.Timer
+}
+
+func NewHttp() *Http {
+	return &Http{
+		Txns:     util.NewBroadcast(),
+		reqGauge: metrics.NewGauge(),
+		reqMeter: metrics.NewMeter(),
+		reqTimer: metrics.NewTimer(),
+	}
+}
+
+func (h *Http) GetName() string { return "http" }
+
+func (h *Http) WrapConn(c conn.Conn) conn.Conn {
+	tee := conn.NewTee(c)
+	lastTxn := make(chan *HttpTxn)
+	go h.readRequests(tee, lastTxn)
+	go h.readResponses(tee, lastTxn)
+	return tee
+}
+
+func (h *Http) readRequests(tee *conn.Tee, lastTxn chan *HttpTxn) {
+	for {
+		req, err := http.ReadRequest(tee.WriteBuffer())
+		if err != nil {
+			// no more requests to be read, we're done
+			break
+		}
+
+		// make sure we read the body of the request so that
+		// we don't block the writer 
+		_, _ = httputil.DumpRequest(req, true)
+
+		h.reqMeter.Mark(1)
+		txn := &HttpTxn{Req: req, Start: time.Now()}
+		lastTxn <- txn
+		h.Txns.In() <- txn
+	}
+}
+
+func (h *Http) readResponses(tee *conn.Tee, lastTxn chan *HttpTxn) {
+	for {
+		var err error
+		txn := <-lastTxn
+		txn.Resp, err = http.ReadResponse(tee.ReadBuffer(), txn.Req)
+		txn.Duration = time.Since(txn.Start)
+		h.reqTimer.Update(txn.Duration)
+		if err != nil {
+			tee.Warn("Error reading response from server: %v", err)
+			// no more responses to be read, we're done
+			break
+		}
+		// make sure we read the body of the response so that
+		// we don't block the reader 
+		_, _ = httputil.DumpResponse(txn.Resp, true)
+
+		h.Txns.In() <- txn
+	}
+}

+ 10 - 0
src/ngrok/proto/interface.go

@@ -0,0 +1,10 @@
+package proto
+
+import (
+	"ngrok/conn"
+)
+
+type Protocol interface {
+	GetName() string
+	WrapConn(conn.Conn) conn.Conn
+}

+ 17 - 0
src/ngrok/proto/tcp.go

@@ -0,0 +1,17 @@
+package proto
+
+import (
+	"ngrok/conn"
+)
+
+type Tcp struct{}
+
+func NewTcp() *Tcp {
+	return new(Tcp)
+}
+
+func (h *Tcp) GetName() string { return "tcp" }
+
+func (h *Tcp) WrapConn(c conn.Conn) conn.Conn {
+	return c
+}

+ 13 - 13
src/ngrok/server/control.go

@@ -4,7 +4,7 @@ import (
 	"io"
 	"net"
 	"ngrok/conn"
-	"ngrok/proto"
+	"ngrok/msg"
 	"runtime/debug"
 	"sync/atomic"
 	"time"
@@ -21,8 +21,8 @@ type Control struct {
 
 	// channels for communicating messages over the connection
 	out  chan (interface{})
-	in   chan (proto.Message)
-	stop chan (proto.Message)
+	in   chan (msg.Message)
+	stop chan (msg.Message)
 
 	// heartbeat
 	lastPong int64
@@ -35,8 +35,8 @@ func NewControl(tcpConn *net.TCPConn) {
 	c := &Control{
 		conn:     conn.NewTCP(tcpConn, "ctl"),
 		out:      make(chan (interface{}), 1),
-		in:       make(chan (proto.Message), 1),
-		stop:     make(chan (proto.Message), 1),
+		in:       make(chan (msg.Message), 1),
+		stop:     make(chan (msg.Message), 1),
 		lastPong: time.Now().Unix(),
 	}
 
@@ -66,10 +66,10 @@ func (c *Control) managerThread() {
 	for {
 		select {
 		case m := <-c.out:
-			proto.WriteMsg(c.conn, m)
+			msg.WriteMsg(c.conn, m)
 
 		case <-ping.C:
-			proto.WriteMsg(c.conn, &proto.PingMsg{})
+			msg.WriteMsg(c.conn, &msg.PingMsg{})
 
 		case <-reap.C:
 			if (time.Now().Unix() - c.lastPong) > 60 {
@@ -80,21 +80,21 @@ func (c *Control) managerThread() {
 
 		case m := <-c.stop:
 			if m != nil {
-				proto.WriteMsg(c.conn, m)
+				msg.WriteMsg(c.conn, m)
 			}
 			return
 
-		case msg := <-c.in:
-			switch msg.GetType() {
+		case m := <-c.in:
+			switch m.GetType() {
 			case "RegMsg":
 				c.conn.Info("Registering new tunnel")
-				c.tun = newTunnel(msg.(*proto.RegMsg), c)
+				c.tun = newTunnel(m.(*msg.RegMsg), c)
 
 			case "PongMsg":
 				atomic.StoreInt64(&c.lastPong, time.Now().Unix())
 
 			case "VersionReqMsg":
-				c.out <- &proto.VersionRespMsg{Version: version}
+				c.out <- &msg.VersionRespMsg{Version: version}
 			}
 		}
 	}
@@ -110,7 +110,7 @@ func (c *Control) readThread() {
 
 	// read messages from the control channel
 	for {
-		if msg, err := proto.ReadMsg(c.conn); err != nil {
+		if msg, err := msg.ReadMsg(c.conn); err != nil {
 			if err == io.EOF {
 				c.conn.Info("EOF")
 				return

+ 3 - 3
src/ngrok/server/main.go

@@ -7,7 +7,7 @@ import (
 	"net"
 	"ngrok/conn"
 	nlog "ngrok/log"
-	"ngrok/proto"
+	"ngrok/msg"
 	"regexp"
 )
 
@@ -99,8 +99,8 @@ func proxyListener(addr *net.TCPAddr, domain string) {
 				}
 			}()
 
-			var regPxy proto.RegProxyMsg
-			if err = proto.ReadMsgInto(conn, &regPxy); err != nil {
+			var regPxy msg.RegProxyMsg
+			if err = msg.ReadMsgInto(conn, &regPxy); err != nil {
 				panic(err)
 			}
 

+ 7 - 7
src/ngrok/server/tunnel.go

@@ -6,7 +6,7 @@ import (
 	"net"
 	"ngrok/conn"
 	nlog "ngrok/log"
-	"ngrok/proto"
+	"ngrok/msg"
 )
 
 /**
@@ -14,7 +14,7 @@ import (
  *         route public traffic to a firewalled endpoint.
  */
 type Tunnel struct {
-	regMsg *proto.RegMsg
+	regMsg *msg.RegMsg
 
 	// public url
 	url string
@@ -32,9 +32,9 @@ type Tunnel struct {
 	nlog.Logger
 }
 
-func newTunnel(msg *proto.RegMsg, ctl *Control) (t *Tunnel) {
+func newTunnel(m *msg.RegMsg, ctl *Control) (t *Tunnel) {
 	t = &Tunnel{
-		regMsg:  msg,
+		regMsg:  m,
 		ctl:     ctl,
 		proxies: make(chan conn.Conn),
 		Logger:  nlog.NewPrefixLogger(),
@@ -55,14 +55,14 @@ func newTunnel(msg *proto.RegMsg, ctl *Control) (t *Tunnel) {
 	}
 
 	if err := tunnels.Add(t); err != nil {
-		t.ctl.stop <- &proto.RegAckMsg{Error: fmt.Sprint(err)}
+		t.ctl.stop <- &msg.RegAckMsg{Error: fmt.Sprint(err)}
 		return
 	}
 
 	t.ctl.conn.AddLogPrefix(t.Id())
 	t.AddLogPrefix(t.Id())
 	t.Info("Registered new tunnel")
-	t.ctl.out <- &proto.RegAckMsg{Url: t.url, ProxyAddr: fmt.Sprintf("%s", proxyAddr)}
+	t.ctl.out <- &msg.RegAckMsg{Url: t.url, ProxyAddr: fmt.Sprintf("%s", proxyAddr)}
 	return
 }
 
@@ -124,7 +124,7 @@ func (t *Tunnel) HandlePublicConnection(publicConn conn.Conn) {
 		metrics.requestMeter.Mark(1)
 
 		t.Debug("Requesting new proxy connection")
-		t.ctl.out <- &proto.ReqProxyMsg{}
+		t.ctl.out <- &msg.ReqProxyMsg{}
 
 		proxyConn := <-t.proxies
 		t.Info("Returning proxy connection %s", proxyConn.Id())

+ 41 - 0
src/ngrok/util/broadcast.go

@@ -0,0 +1,41 @@
+package util
+
+type Broadcast struct {
+	listeners []chan interface{}
+	reg       chan (chan interface{})
+	in        chan interface{}
+}
+
+func NewBroadcast() *Broadcast {
+	b := &Broadcast{
+		listeners: make([]chan interface{}, 0),
+		reg:       make(chan (chan interface{})),
+		in:        make(chan interface{}),
+	}
+
+	go func() {
+		for {
+			select {
+			case l := <-b.reg:
+				b.listeners = append(b.listeners, l)
+
+			case item := <-b.in:
+				for _, l := range b.listeners {
+					l <- item
+				}
+			}
+		}
+	}()
+
+	return b
+}
+
+func (b *Broadcast) In() chan interface{} {
+	return b.in
+}
+
+func (b *Broadcast) Reg() chan interface{} {
+	listener := make(chan interface{})
+	b.reg <- listener
+	return listener
+}

+ 16 - 0
src/ngrok/util/id.go

@@ -0,0 +1,16 @@
+package util
+
+import (
+	"crypto/rand"
+	"fmt"
+)
+
+// create a random identifier for this client
+func RandId() string {
+	b := make([]byte, 8)
+	_, err := rand.Read(b)
+	if err != nil {
+		panic(fmt.Sprintf("rand.Rand failed trying to create identifier, %v", err))
+	}
+	return fmt.Sprintf("%x", b)
+}

+ 46 - 0
src/ngrok/util/ring.go

@@ -0,0 +1,46 @@
+package util
+
+import (
+	"container/list"
+	"sync"
+)
+
+type Ring struct {
+	sync.Mutex
+	*list.List
+	capacity int
+}
+
+func NewRing(capacity int) *Ring {
+	return &Ring{capacity: capacity, List: list.New()}
+}
+
+func (r *Ring) Add(item interface{}) interface{} {
+	r.Lock()
+	defer r.Unlock()
+
+	// add new item
+	r.PushFront(item)
+
+	// remove old item if at capacity
+	var old interface{}
+	if r.Len() >= r.capacity {
+		old = r.Remove(r.Back())
+	}
+
+	return old
+}
+
+func (r *Ring) Slice() []interface{} {
+	r.Lock()
+	defer r.Unlock()
+
+	i := 0
+	items := make([]interface{}, r.Len())
+	for e := r.Front(); e != nil; e = e.Next() {
+		items[i] = e.Value
+		i++
+	}
+
+	return items
+}

+ 15 - 4
templates/page.html

@@ -14,6 +14,16 @@
                     $(this).parent().addClass("active");
                     return false;
                 });
+
+                $(".replay").click(function() {
+                    $.ajax({
+                      type: "POST",
+                      url: "/http/in/replay",
+                      data: { txnid: $(this).closest(".txn").attr("txnid") }
+                    });
+                    
+                    return false;
+                });
             });
         </script>
         <style type="text/css">
@@ -38,16 +48,17 @@
                 </div>
             </div>
             <ul class="history unstyled">
-            {{ range .GetHistory }}
-                <li>
+            {{ range .HttpRequests.Slice }}
+                <li class="txn" txnid="{{ .Id }}">
                     <div class="row">
                         <div class="span6">
-                            {{ with .GetRequest }}
+                            {{ with .Req }}
                                 <h3>{{ .Method }} {{ .URL.Path }}</h3>
                                 <ul class="request nav nav-pills">
                                     <li class="active"><a target="summary" href="#">Summary</a></li>
                                     <li><a href="#" target="headers">Headers</a></li>
                                     <li><a href="#" target="raw">Raw</a></li>
+                                    <li class="pull-right"><button class="replay btn btn-primary">Replay</button></li>
                                 </ul>
 
                                 <div class="raw">
@@ -88,7 +99,7 @@
                             {{ end }}
                         </div>
                         <div class="span6">
-                            {{ with .GetResponse }}
+                            {{ with .Resp }}
                                 <h3 class="{{ classForStatus .Status }}">{{ .Status }}</h3>
 
                                 {{ with $raw := dumpResponse . }}