From faff3e4f6303fe84467e85fb6bf63a0395425386 Mon Sep 17 00:00:00 2001 From: Martin Geno Date: Mon, 29 May 2017 21:34:10 +0200 Subject: [PATCH] [TASK] add golang client --- database/socket/client/dial.go | 86 +++++++++++++++++++++++++++++ database/socket/client/dial_test.go | 69 +++++++++++++++++++++++ 2 files changed, 155 insertions(+) create mode 100644 database/socket/client/dial.go create mode 100644 database/socket/client/dial_test.go diff --git a/database/socket/client/dial.go b/database/socket/client/dial.go new file mode 100644 index 0000000..33edd02 --- /dev/null +++ b/database/socket/client/dial.go @@ -0,0 +1,86 @@ +package yanic + +import ( + "encoding/json" + "log" + "net" + + "github.com/FreifunkBremen/yanic/database/socket" + "github.com/FreifunkBremen/yanic/runtime" +) + +type Dialer struct { + conn net.Conn + queue chan socket.Message + quit chan struct{} + NodeHandler func(*runtime.Node) + GlobalsHandler func(*runtime.GlobalStats) + PruneNodesHandler func() +} + +func Dial(ctype, addr string) *Dialer { + conn, err := net.Dial(ctype, addr) + if err != nil { + log.Panicf("yanic dial to %s:%s failed", ctype, addr) + } + dialer := &Dialer{ + conn: conn, + queue: make(chan socket.Message), + quit: make(chan struct{}), + } + + return dialer +} + +func (d *Dialer) Start() { + go d.reciever() + d.parser() +} +func (d *Dialer) Close() { + d.conn.Close() + close(d.quit) +} + +func (d *Dialer) reciever() { + decoder := json.NewDecoder(d.conn) + var msg socket.Message + + for { + select { + case <-d.quit: + close(d.queue) + return + default: + decoder.Decode(&msg) + d.queue <- msg + } + } +} + +func (d *Dialer) parser() { + for msg := range d.queue { + switch msg.Event { + case socket.MessageEventInsertNode: + if d.NodeHandler != nil { + var node runtime.Node + + obj, _ := json.Marshal(msg.Body) + json.Unmarshal(obj, &node) + d.NodeHandler(&node) + } + case socket.MessageEventInsertGlobals: + if d.GlobalsHandler != nil { + var globals runtime.GlobalStats + + obj, _ := json.Marshal(msg.Body) + json.Unmarshal(obj, &globals) + + d.GlobalsHandler(&globals) + } + case socket.MessageEventPruneNodes: + if d.PruneNodesHandler != nil { + d.PruneNodesHandler() + } + } + } +} diff --git a/database/socket/client/dial_test.go b/database/socket/client/dial_test.go new file mode 100644 index 0000000..d85496d --- /dev/null +++ b/database/socket/client/dial_test.go @@ -0,0 +1,69 @@ +package yanic + +import ( + "testing" + "time" + + "github.com/FreifunkBremen/yanic/database/socket" + "github.com/FreifunkBremen/yanic/runtime" + "github.com/stretchr/testify/assert" +) + +func TestConnectError(t *testing.T) { + assert := assert.New(t) + assert.Panics(func() { + Dial("unix", "/tmp/yanic-database-error.socket") + }, "could connect") +} + +func TestRecieveMessages(t *testing.T) { + assert := assert.New(t) + server, err := socket.Connect(map[string]interface{}{ + "enable": true, + "type": "unix", + "address": "/tmp/yanic-database.socket", + }) + assert.NoError(err) + + d := Dial("unix", "/tmp/yanic-database.socket") + assert.NotNil(d) + go d.Start() + time.Sleep(5 * time.Millisecond) + + runned := false + d.NodeHandler = func(node *runtime.Node) { + runned = true + } + server.InsertNode(nil) + time.Sleep(5 * time.Millisecond) + assert.True(runned, "node not inserted") + + runned = false + d.GlobalsHandler = func(stats *runtime.GlobalStats) { + runned = true + } + server.InsertGlobals(nil, time.Now()) + time.Sleep(5 * time.Millisecond) + assert.True(runned, "global stats not inserted") + + runned = false + d.PruneNodesHandler = func() { + runned = true + } + server.PruneNodes(time.Hour * 24 * 7) + time.Sleep(5 * time.Millisecond) + assert.True(runned, "node not pruned") + + d.Close() + + time.Sleep(5 * time.Millisecond) + runned = false + d.PruneNodesHandler = func() { + runned = true + } + server.PruneNodes(time.Hour * 24 * 7) + time.Sleep(5 * time.Millisecond) + assert.False(runned, "message recieve") + + server.Close() +}