yanic/stats_db.go

199 lines
4.6 KiB
Go
Raw Normal View History

2016-03-12 02:36:02 +00:00
package main
import (
"log"
"strconv"
2016-03-12 02:36:02 +00:00
"sync"
"time"
2016-03-15 22:26:30 +00:00
"github.com/FreifunkBremen/respond-collector/data"
2016-07-13 23:19:03 +00:00
"github.com/FreifunkBremen/respond-collector/models"
2016-03-12 02:36:02 +00:00
"github.com/influxdata/influxdb/client/v2"
)
const (
2016-03-28 13:46:22 +00:00
batchDuration = time.Second * 5
2016-03-12 02:36:02 +00:00
)
type StatsDb struct {
points chan *client.Point
wg sync.WaitGroup
2016-07-22 20:16:01 +00:00
nodes *models.Nodes
2016-03-12 02:36:02 +00:00
client client.Client
}
func NewStatsDb() *StatsDb {
// Make client
c, err := client.NewHTTPClient(client.HTTPConfig{
2016-04-29 10:39:53 +00:00
Addr: config.Influxdb.Addr,
2016-03-12 02:36:02 +00:00
Username: config.Influxdb.Username,
Password: config.Influxdb.Password,
})
if err != nil {
panic(err)
}
db := &StatsDb{
client: c,
points: make(chan *client.Point, 500),
2016-07-22 20:16:01 +00:00
nodes: nodes,
2016-03-12 02:36:02 +00:00
}
// start worker
db.wg.Add(1)
go db.worker()
return db
}
2016-07-13 23:19:03 +00:00
func (c *StatsDb) Add(nodeId string, node *models.Node) {
stats := node.Statistics
2016-03-12 02:36:02 +00:00
tags := map[string]string{
2016-07-13 23:19:03 +00:00
"nodeid": nodeId,
2016-03-12 02:36:02 +00:00
}
2016-07-13 23:19:03 +00:00
2016-03-12 02:36:02 +00:00
fields := map[string]interface{}{
"load": stats.LoadAverage,
"idletime": int64(stats.Idletime),
"uptime": int64(stats.Uptime),
"processes.running": stats.Processes.Running,
"clients.wifi": stats.Clients.Wifi,
"clients.wifi24": stats.Clients.Wifi24,
"clients.wifi5": stats.Clients.Wifi5,
"clients.total": stats.Clients.Total,
"memory.buffers": stats.Memory.Buffers,
"memory.cached": stats.Memory.Cached,
"memory.free": stats.Memory.Free,
"memory.total": stats.Memory.Total,
}
2016-07-15 14:34:16 +00:00
if nodeinfo := node.Nodeinfo; nodeinfo != nil {
if owner := nodeinfo.Owner; owner != nil {
tags["owner"] = owner.Contact
}
if wireless := nodeinfo.Wireless; wireless != nil {
fields["wireless.txpower24"] = wireless.TxPower24
fields["wireless.txpower5"] = wireless.TxPower5
}
// morpheus needs
tags["hostname"] = nodeinfo.Hostname
}
if t := stats.Traffic.Rx; t != nil {
fields["traffic.rx.bytes"] = int64(t.Bytes)
fields["traffic.rx.packets"] = t.Packets
}
if t := stats.Traffic.Tx; t != nil {
fields["traffic.tx.bytes"] = int64(t.Bytes)
fields["traffic.tx.packets"] = t.Packets
fields["traffic.tx.dropped"] = t.Dropped
}
if t := stats.Traffic.Forward; t != nil {
fields["traffic.forward.bytes"] = int64(t.Bytes)
fields["traffic.forward.packets"] = t.Packets
}
if t := stats.Traffic.MgmtRx; t != nil {
fields["traffic.mgmt_rx.bytes"] = int64(t.Bytes)
fields["traffic.mgmt_rx.packets"] = t.Packets
}
if t := stats.Traffic.MgmtTx; t != nil {
fields["traffic.mgmt_tx.bytes"] = int64(t.Bytes)
fields["traffic.mgmt_tx.packets"] = t.Packets
2016-03-12 02:36:02 +00:00
}
if w := stats.Wireless; w != nil {
2016-07-13 23:19:03 +00:00
addAirtime := func(suffix string, time *data.WirelessAirtime) {
fields["airtime"+suffix+".chan_util"] = time.ChanUtil
fields["airtime"+suffix+".rx_util"] = time.RxUtil
fields["airtime"+suffix+".tx_util"] = time.TxUtil
fields["airtime"+suffix+".noise"] = time.Noise
fields["airtime"+suffix+".frequency"] = time.Frequency
tags["frequency"+suffix+""] = strconv.Itoa(int(time.Frequency))
}
if time := w.Airtime24; time != nil {
2016-07-13 23:19:03 +00:00
addAirtime("24", w.Airtime24)
}
if time := w.Airtime5; time != nil {
2016-07-13 23:19:03 +00:00
addAirtime("5", w.Airtime5)
}
}
2016-03-12 02:36:02 +00:00
point, err := client.NewPoint("node", tags, fields, time.Now())
if err != nil {
panic(err)
}
c.points <- point
}
func (c *StatsDb) Close() {
close(c.points)
c.wg.Wait()
c.client.Close()
}
2016-03-12 02:58:36 +00:00
// stores data points in batches into the influxdb
2016-03-12 02:36:02 +00:00
func (c *StatsDb) worker() {
bpConfig := client.BatchPointsConfig{
Database: config.Influxdb.Database,
Precision: "m",
}
var bp client.BatchPoints
var err error
2016-03-28 13:46:22 +00:00
var writeNow, closed bool
timer := time.NewTimer(batchDuration)
2016-07-22 20:16:01 +00:00
globalDuration := time.Second * time.Duration(config.Nodes.SaveInterval)
globalTimer := time.NewTimer(globalDuration)
2016-03-12 02:36:02 +00:00
2016-03-12 15:27:52 +00:00
for !closed {
2016-03-12 02:36:02 +00:00
// wait for new points
select {
2016-07-22 20:16:01 +00:00
case <-globalTimer.C:
point, err := client.NewPoint("global", nil, nodes.GetStats(), time.Now())
if err != nil {
panic(err)
}
c.points <- point
globalTimer.Reset(globalDuration)
log.Print("saving global point")
2016-03-12 02:36:02 +00:00
case point, ok := <-c.points:
if ok {
2016-03-28 13:46:22 +00:00
if bp == nil {
// create new batch
timer.Reset(batchDuration)
if bp, err = client.NewBatchPoints(bpConfig); err != nil {
log.Fatal(err)
2016-03-28 13:46:22 +00:00
}
2016-03-12 15:27:52 +00:00
}
2016-03-28 13:46:22 +00:00
bp.AddPoint(point)
2016-03-12 02:36:02 +00:00
} else {
2016-03-12 15:27:52 +00:00
closed = true
2016-03-12 02:36:02 +00:00
}
2016-03-28 13:46:22 +00:00
case <-timer.C:
if bp == nil {
timer.Reset(batchDuration)
} else {
writeNow = true
}
2016-03-12 02:36:02 +00:00
}
2016-03-12 02:58:36 +00:00
// write batch now?
2016-03-28 13:46:22 +00:00
if bp != nil && (writeNow || closed) {
2016-03-12 02:36:02 +00:00
log.Println("saving", len(bp.Points()), "points")
2016-03-12 02:58:36 +00:00
if err = c.client.Write(bp); err != nil {
log.Fatal(err)
2016-03-12 02:36:02 +00:00
}
2016-03-28 13:46:22 +00:00
writeNow = false
2016-03-12 02:36:02 +00:00
bp = nil
}
}
2016-07-22 20:16:01 +00:00
globalTimer.Stop()
2016-03-28 13:46:22 +00:00
timer.Stop()
2016-03-12 02:36:02 +00:00
c.wg.Done()
}