[TASK] add golang client
This commit is contained in:
parent
8ab3f832f9
commit
faff3e4f63
86
database/socket/client/dial.go
Normal file
86
database/socket/client/dial.go
Normal file
@ -0,0 +1,86 @@
|
||||
package yanic
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"log"
|
||||
"net"
|
||||
|
||||
"github.com/FreifunkBremen/yanic/database/socket"
|
||||
"github.com/FreifunkBremen/yanic/runtime"
|
||||
)
|
||||
|
||||
type Dialer struct {
|
||||
conn net.Conn
|
||||
queue chan socket.Message
|
||||
quit chan struct{}
|
||||
NodeHandler func(*runtime.Node)
|
||||
GlobalsHandler func(*runtime.GlobalStats)
|
||||
PruneNodesHandler func()
|
||||
}
|
||||
|
||||
func Dial(ctype, addr string) *Dialer {
|
||||
conn, err := net.Dial(ctype, addr)
|
||||
if err != nil {
|
||||
log.Panicf("yanic dial to %s:%s failed", ctype, addr)
|
||||
}
|
||||
dialer := &Dialer{
|
||||
conn: conn,
|
||||
queue: make(chan socket.Message),
|
||||
quit: make(chan struct{}),
|
||||
}
|
||||
|
||||
return dialer
|
||||
}
|
||||
|
||||
func (d *Dialer) Start() {
|
||||
go d.reciever()
|
||||
d.parser()
|
||||
}
|
||||
func (d *Dialer) Close() {
|
||||
d.conn.Close()
|
||||
close(d.quit)
|
||||
}
|
||||
|
||||
func (d *Dialer) reciever() {
|
||||
decoder := json.NewDecoder(d.conn)
|
||||
var msg socket.Message
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-d.quit:
|
||||
close(d.queue)
|
||||
return
|
||||
default:
|
||||
decoder.Decode(&msg)
|
||||
d.queue <- msg
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Dialer) parser() {
|
||||
for msg := range d.queue {
|
||||
switch msg.Event {
|
||||
case socket.MessageEventInsertNode:
|
||||
if d.NodeHandler != nil {
|
||||
var node runtime.Node
|
||||
|
||||
obj, _ := json.Marshal(msg.Body)
|
||||
json.Unmarshal(obj, &node)
|
||||
d.NodeHandler(&node)
|
||||
}
|
||||
case socket.MessageEventInsertGlobals:
|
||||
if d.GlobalsHandler != nil {
|
||||
var globals runtime.GlobalStats
|
||||
|
||||
obj, _ := json.Marshal(msg.Body)
|
||||
json.Unmarshal(obj, &globals)
|
||||
|
||||
d.GlobalsHandler(&globals)
|
||||
}
|
||||
case socket.MessageEventPruneNodes:
|
||||
if d.PruneNodesHandler != nil {
|
||||
d.PruneNodesHandler()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
69
database/socket/client/dial_test.go
Normal file
69
database/socket/client/dial_test.go
Normal file
@ -0,0 +1,69 @@
|
||||
package yanic
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/FreifunkBremen/yanic/database/socket"
|
||||
"github.com/FreifunkBremen/yanic/runtime"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestConnectError(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
assert.Panics(func() {
|
||||
Dial("unix", "/tmp/yanic-database-error.socket")
|
||||
}, "could connect")
|
||||
}
|
||||
|
||||
func TestRecieveMessages(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
server, err := socket.Connect(map[string]interface{}{
|
||||
"enable": true,
|
||||
"type": "unix",
|
||||
"address": "/tmp/yanic-database.socket",
|
||||
})
|
||||
assert.NoError(err)
|
||||
|
||||
d := Dial("unix", "/tmp/yanic-database.socket")
|
||||
assert.NotNil(d)
|
||||
go d.Start()
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
|
||||
runned := false
|
||||
d.NodeHandler = func(node *runtime.Node) {
|
||||
runned = true
|
||||
}
|
||||
server.InsertNode(nil)
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
assert.True(runned, "node not inserted")
|
||||
|
||||
runned = false
|
||||
d.GlobalsHandler = func(stats *runtime.GlobalStats) {
|
||||
runned = true
|
||||
}
|
||||
server.InsertGlobals(nil, time.Now())
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
assert.True(runned, "global stats not inserted")
|
||||
|
||||
runned = false
|
||||
d.PruneNodesHandler = func() {
|
||||
runned = true
|
||||
}
|
||||
server.PruneNodes(time.Hour * 24 * 7)
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
assert.True(runned, "node not pruned")
|
||||
|
||||
d.Close()
|
||||
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
runned = false
|
||||
d.PruneNodesHandler = func() {
|
||||
runned = true
|
||||
}
|
||||
server.PruneNodes(time.Hour * 24 * 7)
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
assert.False(runned, "message recieve")
|
||||
|
||||
server.Close()
|
||||
}
|
Loading…
Reference in New Issue
Block a user