|
@@ -182,31 +182,35 @@ func NewKeenIoMetrics(batchInterval time.Duration) *KeenIoMetrics {
|
|
|
}
|
|
|
}()
|
|
|
|
|
|
- batches := make(map[string][]interface{})
|
|
|
+ batch := make(map[string][]interface{})
|
|
|
batchTimer := time.Tick(batchInterval)
|
|
|
|
|
|
for {
|
|
|
select {
|
|
|
case m := <-k.Metrics:
|
|
|
- list, ok := batches[m.Collection]
|
|
|
+ list, ok := batch[m.Collection]
|
|
|
if !ok {
|
|
|
list = make([]interface{}, 0)
|
|
|
}
|
|
|
- batches[m.Collection] = append(list, m.Event)
|
|
|
+ batch[m.Collection] = append(list, m.Event)
|
|
|
|
|
|
case <-batchTimer:
|
|
|
// no metrics to report
|
|
|
- if len(batches) == 0 {
|
|
|
+ if len(batch) == 0 {
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
- payload, err := json.Marshal(batches)
|
|
|
+ payload, err := json.Marshal(batch)
|
|
|
if err != nil {
|
|
|
- k.Error("Failed to serialize metrics payload: %v, %v", batches, err)
|
|
|
+ k.Error("Failed to serialize metrics payload: %v, %v", batch, err)
|
|
|
} else {
|
|
|
+ for key, val := range batch {
|
|
|
+ k.Debug("Reporting %d metrics for %s", len(val), key)
|
|
|
+ }
|
|
|
+
|
|
|
k.AuthedRequest("POST", "/events", bytes.NewReader(payload))
|
|
|
}
|
|
|
- batches = make(map[string][]interface{})
|
|
|
+ batch = make(map[string][]interface{})
|
|
|
}
|
|
|
}
|
|
|
}()
|
|
@@ -234,7 +238,7 @@ func (k *KeenIoMetrics) AuthedRequest(method, path string, body *bytes.Reader) (
|
|
|
if err != nil {
|
|
|
k.Error("Failed to send metric event to keen.io %v", err)
|
|
|
} else {
|
|
|
- k.Info("keen.io processed request in %v sec", time.Since(requestStartAt).Seconds)
|
|
|
+ k.Info("keen.io processed request in %f sec", time.Since(requestStartAt).Seconds())
|
|
|
defer resp.Body.Close()
|
|
|
if resp.StatusCode != 200 {
|
|
|
bytes, _ := ioutil.ReadAll(resp.Body)
|