Browse Source

batch metric reporting to keen to improve performance at high request loads

Alan Shreve 12 years ago
parent
commit
1860b0a1e1
1 changed files with 48 additions and 25 deletions
  1. 48 25
      src/ngrok/server/metrics.go

+ 48 - 25
src/ngrok/server/metrics.go

@@ -19,7 +19,7 @@ func init() {
 	keenApiKey := os.Getenv("KEEN_API_KEY")
 
 	if keenApiKey != "" {
-		metrics = NewKeenIoMetrics()
+		metrics = NewKeenIoMetrics(60 * time.Second)
 	} else {
 		metrics = NewLocalMetrics(30 * time.Second)
 	}
@@ -154,9 +154,9 @@ func (m *LocalMetrics) Report() {
 	}
 }
 
-type KeenIoRequest struct {
-	Path string
-	Body []byte
+type KeenIoMetric struct {
+	Collection string
+	Event interface{}
 }
 
 type KeenIoMetrics struct {
@@ -164,20 +164,50 @@ type KeenIoMetrics struct {
 	ApiKey       string
 	ProjectToken string
 	HttpClient   http.Client
-	Requests     chan *KeenIoRequest
+	Metrics      chan *KeenIoMetric
 }
 
-func NewKeenIoMetrics() *KeenIoMetrics {
+func NewKeenIoMetrics(batchInterval time.Duration) *KeenIoMetrics {
 	k := &KeenIoMetrics{
 		Logger:       log.NewPrefixLogger("metrics"),
 		ApiKey:       os.Getenv("KEEN_API_KEY"),
 		ProjectToken: os.Getenv("KEEN_PROJECT_TOKEN"),
-		Requests:     make(chan *KeenIoRequest, 100),
+		Metrics:      make(chan *KeenIoMetric, 1000),
 	}
 
 	go func() {
-		for req := range k.Requests {
-			k.AuthedRequest("POST", req.Path, bytes.NewReader(req.Body))
+		defer func() {
+			if r := recover(); r != nil {
+				k.Error("KeenIoMetrics failed: %v", r)
+			}
+		}()
+
+		batches := make(map[string][]interface{})
+		batchTimer := time.Tick(batchInterval)
+
+		for {
+			select {
+			case m := <-k.Metrics:
+				list, ok := batches[m.Collection]
+				if !ok {
+					list = make([]interface{}, 0)
+				}
+				batches[m.Collection] = append(list, m.Event)
+
+			case <-batchTimer:
+				// no metrics to report
+				if len(batches) == 0 {
+					continue
+				}
+
+				payload, err := json.Marshal(batches)
+				if err != nil {
+					k.Error("Failed to serialize metrics payload: %v, %v", batches, err)
+				} else {
+					k.AuthedRequest("POST", "/events", bytes.NewReader(payload))
+				}
+				batches = make(map[string][]interface{})
+			}
 		}
 	}()
 
@@ -198,13 +228,15 @@ func (k *KeenIoMetrics) AuthedRequest(method, path string, body *bytes.Reader) (
 		req.ContentLength = int64(body.Len())
 	}
 
+	requestStartAt := time.Now()
 	resp, err = k.HttpClient.Do(req)
 
 	if err != nil {
 		k.Error("Failed to send metric event to keen.io %v", err)
 	} else {
+		k.Info("keen.io processed request in %v sec", time.Since(requestStartAt).Seconds)
 		defer resp.Body.Close()
-		if resp.StatusCode != 201 {
+		if resp.StatusCode != 200 {
 			bytes, _ := ioutil.ReadAll(resp.Body)
 			k.Error("Got %v response from keen.io: %s", resp.StatusCode, bytes)
 		}
@@ -217,7 +249,7 @@ func (k *KeenIoMetrics) OpenConnection(t *Tunnel, c conn.Conn) {
 }
 
 func (k *KeenIoMetrics) CloseConnection(t *Tunnel, c conn.Conn, start time.Time, in, out int64) {
-	buf, err := json.Marshal(struct {
+	event := struct {
 		Keen               KeenStruct `json:"keen"`
 		OS                 string
 		ClientId           string
@@ -248,13 +280,9 @@ func (k *KeenIoMetrics) CloseConnection(t *Tunnel, c conn.Conn, start time.Time,
 		ConnectionDuration: time.Since(start).Seconds(),
 		BytesIn:            in,
 		BytesOut:           out,
-	})
-
-	if err != nil {
-		k.Error("Error serializing metric %v", err)
-	} else {
-		k.Requests <- &KeenIoRequest{Path: "/events/CloseConnection", Body: buf}
 	}
+
+	k.Metrics <- &KeenIoMetric{Collection: "CloseConnection", Event: event}
 }
 
 func (k *KeenIoMetrics) OpenTunnel(t *Tunnel) {
@@ -265,7 +293,7 @@ type KeenStruct struct {
 }
 
 func (k *KeenIoMetrics) CloseTunnel(t *Tunnel) {
-	buf, err := json.Marshal(struct {
+	event := struct {
 		Keen      KeenStruct `json:"keen"`
 		OS        string
 		ClientId  string
@@ -291,12 +319,7 @@ func (k *KeenIoMetrics) CloseTunnel(t *Tunnel) {
 		Duration:  time.Since(t.start).Seconds(),
 		HttpAuth:  t.req.HttpAuth != "",
 		Subdomain: t.req.Subdomain != "",
-	})
-
-	if err != nil {
-		k.Error("Error serializing metric %v", err)
-		return
-	} else {
-		k.Requests <- &KeenIoRequest{Path: "/events/CloseTunnel", Body: buf}
 	}
+
+	k.Metrics <- &KeenIoMetric{Collection: "CloseTunnel", Event: event}
 }