Browse Source

Merge branch 'custom'

Alan Shreve 12 years ago
parent
commit
eda8f3ca17

+ 1 - 0
src/ngrok/client/main.go

@@ -194,6 +194,7 @@ func control(s *State, ctl *ui.Controller) {
 		Subdomain: s.opts.subdomain,
 		ClientId:  s.id,
 		Version:   version.Proto,
+		MmVersion: version.MajorMinor(),
 	})
 
 	if err != nil {

+ 0 - 1
src/ngrok/server/control.go

@@ -68,7 +68,6 @@ func (c *Control) managerThread() {
 		case <-reap.C:
 			if time.Since(c.lastPing) > pingTimeoutInterval {
 				c.conn.Info("Lost heartbeat")
-				metrics.lostHeartbeatMeter.Mark(1)
 				return
 			}
 

+ 0 - 14
src/ngrok/server/manager.go

@@ -56,7 +56,6 @@ func (m *TunnelManager) Add(t *Tunnel) error {
 		if !assignTunnel(url) {
 			return t.Error("TCP at %s already registered!", url)
 		}
-		metrics.tcpTunnelMeter.Mark(1)
 
 	case "http":
 		if strings.TrimSpace(t.regMsg.Hostname) != "" {
@@ -104,19 +103,6 @@ func (m *TunnelManager) Add(t *Tunnel) error {
 	}
 
 	t.url = url
-	metrics.tunnelMeter.Mark(1)
-	//metrics.tunnelGauge.Update(int64(len(m.tunnels)))
-
-	switch t.regMsg.OS {
-	case "windows":
-		metrics.windowsCounter.Inc(1)
-	case "linux":
-		metrics.linuxCounter.Inc(1)
-	case "darwin":
-		metrics.osxCounter.Inc(1)
-	default:
-		metrics.otherCounter.Inc(1)
-	}
 
 	return nil
 }

+ 259 - 50
src/ngrok/server/metrics.go

@@ -1,77 +1,286 @@
 package server
 
 import (
+	"bytes"
 	"encoding/json"
+	"fmt"
 	gometrics "github.com/inconshreveable/go-metrics"
+	"io/ioutil"
+	"net/http"
+	"ngrok/conn"
 	"ngrok/log"
+	"os"
 	"time"
 )
 
-var reportInterval = 30 * time.Second
+var metrics Metrics
 
-var metrics struct {
+func init() {
+	keenApiKey := os.Getenv("KEEN_API_KEY")
+
+	if keenApiKey != "" {
+		metrics = NewKeenIoMetrics()
+	} else {
+		metrics = NewLocalMetrics(30 * time.Second)
+	}
+
+	metrics.AddLogPrefix("metrics")
+}
+
+type Metrics interface {
+	log.Logger
+	OpenConnection(*Tunnel, conn.Conn)
+	CloseConnection(*Tunnel, conn.Conn, time.Time, int64, int64)
+	OpenTunnel(*Tunnel)
+	CloseTunnel(*Tunnel)
+}
+
+type LocalMetrics struct {
+	log.Logger
+	reportInterval time.Duration
 	windowsCounter gometrics.Counter
 	linuxCounter   gometrics.Counter
 	osxCounter     gometrics.Counter
 	otherCounter   gometrics.Counter
-	/*
-	   bytesInCount gometrics.Counter
-	   bytesOutCount gometrics.Counter
-	*/
+
+	tunnelMeter        gometrics.Meter
+	tcpTunnelMeter     gometrics.Meter
+	httpTunnelMeter    gometrics.Meter
+	connMeter          gometrics.Meter
+	lostHeartbeatMeter gometrics.Meter
+
+	connTimer gometrics.Timer
+
+	bytesInCount  gometrics.Counter
+	bytesOutCount gometrics.Counter
 
 	/*
 	   tunnelGauge gometrics.Gauge
 	   tcpTunnelGauge gometrics.Gauge
-	   requestGauge gometrics.Gauge
+	   connGauge gometrics.Gauge
 	*/
+}
 
-	tunnelMeter        gometrics.Meter
-	tcpTunnelMeter     gometrics.Meter
-	requestMeter       gometrics.Meter
-	lostHeartbeatMeter gometrics.Meter
+func NewLocalMetrics(reportInterval time.Duration) *LocalMetrics {
+	metrics := LocalMetrics{
+		Logger:         log.NewPrefixLogger(),
+		reportInterval: reportInterval,
+		windowsCounter: gometrics.NewCounter(),
+		linuxCounter:   gometrics.NewCounter(),
+		osxCounter:     gometrics.NewCounter(),
+		otherCounter:   gometrics.NewCounter(),
+
+		tunnelMeter:        gometrics.NewMeter(),
+		tcpTunnelMeter:     gometrics.NewMeter(),
+		httpTunnelMeter:    gometrics.NewMeter(),
+		connMeter:          gometrics.NewMeter(),
+		lostHeartbeatMeter: gometrics.NewMeter(),
+
+		connTimer: gometrics.NewTimer(),
 
-	requestTimer gometrics.Timer
+		bytesInCount:  gometrics.NewCounter(),
+		bytesOutCount: gometrics.NewCounter(),
+
+		/*
+		   metrics.tunnelGauge = gometrics.NewGauge(),
+		   metrics.tcpTunnelGauge = gometrics.NewGauge(),
+		   metrics.connGauge = gometrics.NewGauge(),
+		*/
+	}
+
+	go metrics.Report()
+
+	return &metrics
 }
 
-func init() {
-	metrics.windowsCounter = gometrics.NewCounter()
-	metrics.linuxCounter = gometrics.NewCounter()
-	metrics.osxCounter = gometrics.NewCounter()
-	metrics.otherCounter = gometrics.NewCounter()
-	/*
-	   metrics.bytesInCount = gometrics.NewCounter()
-	   metrics.bytesOutCount = gometrics.NewCounter()
-	*/
+func (m *LocalMetrics) OpenTunnel(t *Tunnel) {
+	m.tunnelMeter.Mark(1)
 
-	/*
-	   metrics.tunnelGauge = gometrics.NewGauge()
-	   metrics.tcpTunnelGauge = gometrics.NewGauge()
-	   metrics.requestGauge = gometrics.NewGauge()
-	*/
+	switch t.regMsg.OS {
+	case "windows":
+		m.windowsCounter.Inc(1)
+	case "linux":
+		m.linuxCounter.Inc(1)
+	case "darwin":
+		m.osxCounter.Inc(1)
+	default:
+		m.otherCounter.Inc(1)
+	}
+
+	switch t.regMsg.Protocol {
+	case "tcp":
+		m.tcpTunnelMeter.Mark(1)
+	case "http":
+		m.httpTunnelMeter.Mark(1)
+	}
+}
+
+func (m *LocalMetrics) CloseTunnel(t *Tunnel) {
+}
 
-	metrics.tunnelMeter = gometrics.NewMeter()
-	metrics.tcpTunnelMeter = gometrics.NewMeter()
-	metrics.requestMeter = gometrics.NewMeter()
-	metrics.lostHeartbeatMeter = gometrics.NewMeter()
-
-	metrics.requestTimer = gometrics.NewTimer()
-
-	go func() {
-		time.Sleep(reportInterval)
-		log.Info("Server metrics: %s", MetricsJson())
-	}()
-}
-
-func MetricsJson() []byte {
-	buffer, _ := json.Marshal(map[string]interface{}{
-		"windows":            metrics.windowsCounter.Count(),
-		"linux":              metrics.linuxCounter.Count(),
-		"osx":                metrics.osxCounter.Count(),
-		"other":              metrics.otherCounter.Count(),
-		"tunnelMeter.count":  metrics.tunnelMeter.Count(),
-		"tunnelMeter.m1":     metrics.tunnelMeter.Rate1(),
-		"requestMeter.count": metrics.requestMeter.Count(),
-		"requestMeter.m1":    metrics.requestMeter.Rate1(),
+func (m *LocalMetrics) OpenConnection(t *Tunnel, c conn.Conn) {
+	m.connMeter.Mark(1)
+}
+
+func (m *LocalMetrics) CloseConnection(t *Tunnel, c conn.Conn, start time.Time, bytesIn, bytesOut int64) {
+	m.bytesInCount.Inc(bytesIn)
+	m.bytesOutCount.Inc(bytesOut)
+}
+
+func (m *LocalMetrics) Report() {
+	m.Info("Reporting every %d seconds", m.reportInterval.Seconds())
+
+	for {
+		time.Sleep(m.reportInterval)
+		buffer, err := json.Marshal(map[string]interface{}{
+			"windows":               m.windowsCounter.Count(),
+			"linux":                 m.linuxCounter.Count(),
+			"osx":                   m.osxCounter.Count(),
+			"other":                 m.otherCounter.Count(),
+			"httpTunnelMeter.count": m.httpTunnelMeter.Count(),
+			"tcpTunnelMeter.count":  m.tcpTunnelMeter.Count(),
+			"tunnelMeter.count":     m.tunnelMeter.Count(),
+			"tunnelMeter.m1":        m.tunnelMeter.Rate1(),
+			"connMeter.count":       m.connMeter.Count(),
+			"connMeter.m1":          m.connMeter.Rate1(),
+			"bytesIn.count":         m.bytesInCount.Count(),
+			"bytesOut.count":        m.bytesOutCount.Count(),
+		})
+
+		if err != nil {
+			m.Error("Failed to serialize metrics: %v", err)
+			continue
+		}
+
+		m.Info("Reporting: %s", buffer)
+	}
+}
+
+type KeenIoMetrics struct {
+	log.Logger
+	ApiKey       string
+	ProjectToken string
+	HttpClient   http.Client
+}
+
+func NewKeenIoMetrics() *KeenIoMetrics {
+	return &KeenIoMetrics{
+		Logger:       log.NewPrefixLogger(),
+		ApiKey:       os.Getenv("KEEN_API_KEY"),
+		ProjectToken: os.Getenv("KEEN_PROJECT_TOKEN"),
+	}
+}
+
+func (k *KeenIoMetrics) AuthedRequest(method, path string, body *bytes.Reader) (resp *http.Response, err error) {
+	path = fmt.Sprintf("https://api.keen.io/3.0/projects/%s%s", k.ProjectToken, path)
+	req, err := http.NewRequest(method, path, body)
+	if err != nil {
+		return
+	}
+
+	req.Header.Add("Authorization", k.ApiKey)
+
+	if body != nil {
+		req.Header.Add("Content-Type", "application/json")
+		req.ContentLength = int64(body.Len())
+	}
+
+	resp, err = k.HttpClient.Do(req)
+
+	if err != nil {
+		k.Error("Failed to send metric event to keen.io %v", err)
+	} else if resp.StatusCode != 201 {
+		bytes, _ := ioutil.ReadAll(resp.Body)
+		k.Debug("Got %v response from keen.io: %s", resp.StatusCode, bytes)
+	}
+
+	return
+}
+
+func (k *KeenIoMetrics) OpenConnection(t *Tunnel, c conn.Conn) {
+}
+
+func (k *KeenIoMetrics) CloseConnection(t *Tunnel, c conn.Conn, start time.Time, in, out int64) {
+	buf, err := json.Marshal(struct {
+		Keen               KeenStruct `json:"keen"`
+		OS                 string
+		ClientId           string
+		Protocol           string
+		Url                string
+		User               string
+		Version            string
+		Reason             string
+		HttpAuth           bool
+		Subdomain          bool
+		TunnelDuration     float64
+		ConnectionDuration float64
+		BytesIn            int64
+		BytesOut           int64
+	}{
+		Keen: KeenStruct{
+			Timestamp: start.UTC().Format("2006-01-02T15:04:05.000Z"),
+		},
+		OS:                 t.regMsg.OS,
+		ClientId:           t.regMsg.ClientId,
+		Protocol:           t.regMsg.Protocol,
+		Url:                t.url,
+		User:               t.regMsg.User,
+		Version:            t.regMsg.MmVersion,
+		HttpAuth:           t.regMsg.HttpAuth != "",
+		Subdomain:          t.regMsg.Subdomain != "",
+		TunnelDuration:     time.Since(t.start).Seconds(),
+		ConnectionDuration: time.Since(start).Seconds(),
+		BytesIn:            in,
+		BytesOut:           out,
+	})
+
+	if err != nil {
+		k.Error("Error serializing metric %v", err)
+	} else {
+		k.AuthedRequest("POST", "/events/CloseConnection", bytes.NewReader(buf))
+	}
+}
+
+func (k *KeenIoMetrics) OpenTunnel(t *Tunnel) {
+}
+
+type KeenStruct struct {
+	Timestamp string `json:"timestamp"`
+}
+
+func (k *KeenIoMetrics) CloseTunnel(t *Tunnel) {
+	buf, err := json.Marshal(struct {
+		Keen      KeenStruct `json:"keen"`
+		OS        string
+		ClientId  string
+		Protocol  string
+		Url       string
+		User      string
+		Version   string
+		Reason    string
+		Duration  float64
+		HttpAuth  bool
+		Subdomain bool
+	}{
+		Keen: KeenStruct{
+			Timestamp: t.start.UTC().Format("2006-01-02T15:04:05.000Z"),
+		},
+		OS:       t.regMsg.OS,
+		ClientId: t.regMsg.ClientId,
+		Protocol: t.regMsg.Protocol,
+		Url:      t.url,
+		User:     t.regMsg.User,
+		Version:  t.regMsg.MmVersion,
+		//Reason: reason,
+		Duration:  time.Since(t.start).Seconds(),
+		HttpAuth:  t.regMsg.HttpAuth != "",
+		Subdomain: t.regMsg.Subdomain != "",
 	})
-	return buffer
+
+	if err != nil {
+		k.Error("Error serializing metric %v", err)
+		return
+	}
+
+	k.AuthedRequest("POST", "/events/CloseTunnel", bytes.NewReader(buf))
 }

+ 18 - 12
src/ngrok/server/tunnel.go

@@ -8,6 +8,7 @@ import (
 	log "ngrok/log"
 	"ngrok/msg"
 	"ngrok/version"
+	"time"
 )
 
 /**
@@ -17,6 +18,9 @@ import (
 type Tunnel struct {
 	regMsg *msg.RegMsg
 
+	// time when the tunnel was opened
+	start time.Time
+
 	// public url
 	url string
 
@@ -36,6 +40,7 @@ type Tunnel struct {
 func newTunnel(m *msg.RegMsg, ctl *Control) (t *Tunnel) {
 	t = &Tunnel{
 		regMsg:  m,
+		start:   time.Now(),
 		ctl:     ctl,
 		proxies: make(chan conn.Conn),
 		Logger:  log.NewPrefixLogger(),
@@ -79,6 +84,7 @@ func newTunnel(m *msg.RegMsg, ctl *Control) (t *Tunnel) {
 		MmVersion: version.MajorMinor(),
 	}
 
+	metrics.OpenTunnel(t)
 	return
 }
 
@@ -93,10 +99,9 @@ func (t *Tunnel) shutdown() {
 	// remove ourselves from the tunnel registry
 	tunnels.Del(t.url)
 
-	// XXX: should we shut down all of the proxy connections?
+	// XXX: shut down all of the proxy connections?
 
-	// XXX: will this block if this is being called from Control's shutdown code?
-	t.ctl.stop <- nil
+	metrics.CloseTunnel(t)
 }
 
 func (t *Tunnel) Id() string {
@@ -137,18 +142,19 @@ func (t *Tunnel) HandlePublicConnection(publicConn conn.Conn) {
 		}
 	}()
 
-	metrics.requestTimer.Time(func() {
-		metrics.requestMeter.Mark(1)
+	startTime := time.Now()
+	metrics.OpenConnection(t, publicConn)
+
+	t.Debug("Requesting new proxy connection")
+	t.ctl.out <- &msg.ReqProxyMsg{}
 
-		t.Debug("Requesting new proxy connection")
-		t.ctl.out <- &msg.ReqProxyMsg{}
+	proxyConn := <-t.proxies
+	t.Info("Returning proxy connection %s", proxyConn.Id())
 
-		proxyConn := <-t.proxies
-		t.Info("Returning proxy connection %s", proxyConn.Id())
+	defer proxyConn.Close()
+	bytesIn, bytesOut := conn.Join(publicConn, proxyConn)
 
-		defer proxyConn.Close()
-		conn.Join(publicConn, proxyConn)
-	})
+	metrics.CloseConnection(t, publicConn, startTime, bytesIn, bytesOut)
 }
 
 func (t *Tunnel) RegisterProxy(conn conn.Conn) {