diff --git a/config_example.toml b/config_example.toml index 34ebcf7..d10b8ea 100644 --- a/config_example.toml +++ b/config_example.toml @@ -79,7 +79,18 @@ system = "testing" enable = false path = "/var/log/yanic.log" + [[database.connection.graphite]] enable = false address = "localhost:2003" prefix = "freifunk" + +[[database.connection.socket]] +enable = false +type = "tcp" +address = ":8081" + +[[database.connection.socket]] +enable = false +type = "unix" +address = "/var/lib/collector/database.socket" diff --git a/database/all/main.go b/database/all/main.go index fae0160..fdf8ca0 100644 --- a/database/all/main.go +++ b/database/all/main.go @@ -4,4 +4,5 @@ import ( _ "github.com/FreifunkBremen/yanic/database/graphite" _ "github.com/FreifunkBremen/yanic/database/influxdb" _ "github.com/FreifunkBremen/yanic/database/logging" + _ "github.com/FreifunkBremen/yanic/database/socket" ) diff --git a/database/socket/database.go b/database/socket/database.go new file mode 100644 index 0000000..baf9735 --- /dev/null +++ b/database/socket/database.go @@ -0,0 +1,82 @@ +package socket + +/** + * This database type is just for, + * - debugging without a influxconn + * - example for other developers for new databases + */ +import ( + "log" + "net" + "time" + + "github.com/FreifunkBremen/yanic/database" + "github.com/FreifunkBremen/yanic/runtime" +) + +type Connection struct { + database.Connection + config Config + listener net.Listener + clients map[net.Addr]net.Conn +} + +type Config map[string]interface{} + +func (c Config) Enable() bool { + return c["enable"].(bool) +} +func (c Config) Type() string { + return c["type"].(string) +} +func (c Config) Address() string { + return c["address"].(string) +} + +func init() { + database.RegisterAdapter("socket", Connect) +} + +func Connect(configuration interface{}) (database.Connection, error) { + var config Config + config = configuration.(map[string]interface{}) + if !config.Enable() { + return nil, nil + } + + ln, err := net.Listen(config.Type(), config.Address()) + if err != nil { + return nil, err + } + conn := &Connection{config: config, listener: ln, clients: make(map[net.Addr]net.Conn)} + go conn.handleSocketConnection(ln) + + log.Println("[socket-database] listen on: ", ln.Addr()) + + return conn, nil +} + +func (conn *Connection) InsertNode(node *runtime.Node) { + conn.sendJSON(EventMessage{Event: "insert_node", Body: node}) +} + +func (conn *Connection) InsertGlobals(stats *runtime.GlobalStats, time time.Time) { + conn.sendJSON(EventMessage{Event: "insert_globals", Body: stats}) +} + +func (conn *Connection) PruneNodes(deleteAfter time.Duration) { + conn.sendJSON(EventMessage{Event: "prune_nodes"}) +} + +func (conn *Connection) Close() { + for _, c := range conn.clients { + err := c.Close() + if err != nil { + log.Println("[socket-database] client was not able to close:", err) + } + } + err := conn.listener.Close() + if err != nil { + log.Println("[socket-database] server was not able to close:", err) + } +} diff --git a/database/socket/internal.go b/database/socket/internal.go new file mode 100644 index 0000000..6f827c7 --- /dev/null +++ b/database/socket/internal.go @@ -0,0 +1,38 @@ +package socket + +import ( + "encoding/json" + "log" + "net" +) + +type EventMessage struct { + Event string `json:"event"` + Body interface{} `json:"body,omitempty"` +} + +func (config *Connection) handleSocketConnection(ln net.Listener) { + for { + c, err := ln.Accept() + if err != nil { + log.Println("[socket-database] error during connection of a client", err) + continue + } + config.clients[c.RemoteAddr()] = c + } +} + +func (conn *Connection) sendJSON(msg EventMessage) { + for i, c := range conn.clients { + d := json.NewEncoder(c) + + err := d.Encode(&msg) + if err != nil { + err = c.Close() + if err != nil { + log.Println("[socket-database] connection could not close after error on sending event:", err) + } + delete(conn.clients, i) + } + } +}