Compare commits

...

23 Commits

Author SHA1 Message Date
Martin Geno
665c55da41
add respond injector bridge to paint links 2017-07-28 08:55:11 +02:00
Martin Geno
075b3a078a
remove neigbours during breminale 2017-07-08 15:43:51 +02:00
Martin Geno
2676da918b
[TASK] add yanic database for injection 2017-07-08 11:01:39 +02:00
Martin Geno
890a2d6043
Merge branch 'breminale-corny' into breminale 2017-07-06 13:13:32 +02:00
Martin Geno
511327ca10
Merge branch 'master' into breminale 2017-07-06 13:12:44 +02:00
Julian Kornberger
b17d018dba Breminale 2017-07-06 12:38:14 +02:00
Martin Geno
27ec9c1ab9
[BUGFIX] wifi link 2017-07-06 10:10:24 +02:00
Martin Geno
0a207d0170
[TASk] put multicast address in config file 2017-07-05 15:51:06 +02:00
Martin Geno
b4e6cd5864
Merge branch 'interfaces-babel' into breminale 2017-07-03 12:21:05 +02:00
Martin Geno
a72e8593e2
Merge branch 'file-output' into breminale 2017-06-30 23:11:39 +02:00
Martin Geno
d1a52173c7
[TASK] add nodelist 2017-06-30 23:03:55 +02:00
Martin Geno
2cbdad54d9
[TASK] improve output (to stats page and filtering in meshviewer) 2017-06-30 23:03:54 +02:00
Martin Geno
23ac551e88
[BUGFIX] test once per time 2017-06-30 23:03:40 +02:00
Martin Geno
5628d7db22
[BUGFIX] test on tcp6 (instatt unix) 2017-06-30 23:03:39 +02:00
Martin Geno
faff3e4f63
[TASK] add golang client 2017-06-30 23:03:39 +02:00
Martin Geno
8ab3f832f9
[TASK] improve tests 2017-06-30 23:03:39 +02:00
Martin Geno
83c721ba4d
[BUGFIX] review 1 - locking 2017-06-30 23:03:39 +02:00
Martin Geno
b20c614a69
[BUGFIX] review 1 2017-06-30 23:03:39 +02:00
Martin Geno
73219323cf
[TASK] remove unreachable logs 2017-06-30 23:03:39 +02:00
Martin Geno
c8cc65e4c4
[TASK] add tests 2017-06-30 23:03:39 +02:00
Martin Geno
f9f50a4a54
[TASK] database as socket output 2017-06-30 23:03:34 +02:00
Martin Geno
a092b21c05
[TASK] new data by respondd 2017-06-30 07:44:39 +02:00
Martin Geno
55efa5f8bc
[TASK] split interfaces by listen, send multicast and send unicast (needed by babel) 2017-06-29 19:06:53 +02:00
41 changed files with 1479 additions and 134 deletions

View File

@ -9,7 +9,7 @@ FAIL=0
for dir in $(find . -maxdepth 10 -not -path './.git*' -not -path '*/_*' -type d); for dir in $(find . -maxdepth 10 -not -path './.git*' -not -path '*/_*' -type d);
do do
if ls $dir/*.go &> /dev/null; then if ls $dir/*.go &> /dev/null; then
go test -v -covermode=count -coverprofile=profile.tmp $dir || FAIL=$? go test -p 1 -v -covermode=count -coverprofile=profile.tmp $dir || FAIL=$?
if [ -f profile.tmp ] if [ -f profile.tmp ]
then then
tail -n +2 < profile.tmp >> profile.cov tail -n +2 < profile.tmp >> profile.cov

216
cmd/respondd-bridge/main.go Normal file
View File

@ -0,0 +1,216 @@
package main
import (
"bufio"
"bytes"
"compress/flate"
"encoding/json"
"fmt"
"log"
"net"
"os"
"time"
"github.com/FreifunkBremen/yanic/data"
"github.com/FreifunkBremen/yanic/respond"
)
const maxDataGramSize = 8192
// Collector for a specificle respond messages
type Collector struct {
connection *net.UDPConn // UDP socket
queue chan *respond.Response // received responses
interval time.Duration // Interval for multicast packets
stop chan interface{}
nodes map[string]*data.ResponseData
interMac map[string]string
addrFrom net.UDPAddr
addrTo net.UDPAddr
}
func main() {
iface := os.Args[1]
addrFrom := os.Args[2]
addrTo := os.Args[3]
linkLocalAddr, err := getLinkLocalAddr(iface)
if err != nil {
log.Panic(err)
}
conn, err := net.ListenUDP("udp", &net.UDPAddr{
IP: linkLocalAddr,
Zone: iface,
})
if err != nil {
log.Panic(err)
}
conn.SetReadBuffer(maxDataGramSize)
collector := &Collector{
connection: conn,
queue: make(chan *respond.Response, 400),
stop: make(chan interface{}),
addrFrom: net.UDPAddr{IP: net.ParseIP(addrFrom)},
addrTo: net.UDPAddr{IP: net.ParseIP(addrTo)},
interval: time.Second * 10,
nodes: make(map[string]*data.ResponseData),
interMac: make(map[string]string),
}
go collector.receiver(conn)
go collector.parser()
collector.sendOnce()
collector.sender()
collector.Close()
}
// Returns the first link local unicast address for the given interface name
func getLinkLocalAddr(ifname string) (net.IP, error) {
iface, err := net.InterfaceByName(ifname)
if err != nil {
return nil, err
}
addresses, err := iface.Addrs()
if err != nil {
return nil, err
}
for _, addr := range addresses {
if ipnet := addr.(*net.IPNet); ipnet.IP.IsLinkLocalUnicast() {
return ipnet.IP, nil
}
}
return nil, fmt.Errorf("unable to find link local unicast address for %s", ifname)
}
// SendPacket sends a UDP request to the given unicast or multicast address
func (coll *Collector) SendRequestPacket(addr net.UDPAddr) {
addr.Port = 1001
if _, err := coll.connection.WriteToUDP([]byte("GET nodeinfo statistics neighbours"), &addr); err != nil {
log.Println("WriteToUDP failed:", err)
}
}
func (coll *Collector) saveResponse(addr net.UDPAddr, node *data.ResponseData) {
if val := node.NodeInfo; val == nil {
log.Printf("no nodeinfo from %s", addr.String())
return
}
// save current node
coll.nodes[addr.IP.String()] = node
// Process the data and update IP address
var otherIP string
if addr.IP.Equal(coll.addrFrom.IP) {
otherIP = coll.addrTo.IP.String()
} else {
otherIP = coll.addrFrom.IP.String()
}
otherNode := coll.nodes[otherIP]
if otherIP == "" || otherNode == nil {
log.Print("othernode not found")
return
}
if node.Neighbours == nil {
node.Neighbours = &data.Neighbours{
Batadv: make(map[string]data.BatadvNeighbours),
NodeID: node.NodeInfo.NodeID,
}
}
interMac := node.NodeInfo.Network.Mesh["bat0"].Interfaces.Other[0]
if newMac, ok := coll.interMac[addr.IP.String()]; ok {
interMac = newMac
} else {
coll.interMac[addr.IP.String()] = interMac
}
if _, ok := node.Neighbours.Batadv[interMac]; !ok {
node.Neighbours.Batadv[interMac] = data.BatadvNeighbours{
Neighbours: make(map[string]data.BatmanLink),
}
}
interOtherMac := otherNode.NodeInfo.Network.Mesh["bat0"].Interfaces.Other[0]
if newMac, ok := coll.interMac[coll.addrTo.IP.String()]; ok {
interOtherMac = newMac
} else {
coll.interMac[otherIP] = interMac
}
node.Neighbours.Batadv[interMac].Neighbours[interOtherMac] = data.BatmanLink{
Tq: 253,
Lastseen: 0.2,
}
buf := bytes.Buffer{}
writer := bufio.NewWriter(&buf)
deflater, err := flate.NewWriter(writer, flate.DefaultCompression)
err = json.NewEncoder(deflater).Encode(node)
if err != nil {
panic(err)
}
deflater.Close()
writer.Flush()
coll.connection.WriteToUDP(buf.Bytes(), &net.UDPAddr{
IP: net.ParseIP("fe80::de:faff:fe9f:2414"),
Port: 12345,
})
log.Print("send response from: ", addr.IP.String())
}
func (coll *Collector) receiver(conn *net.UDPConn) {
buf := make([]byte, maxDataGramSize)
for {
n, src, err := conn.ReadFromUDP(buf)
if err != nil {
log.Println("ReadFromUDP failed:", err)
return
}
raw := make([]byte, n)
copy(raw, buf)
coll.queue <- &respond.Response{
Address: *src,
Raw: raw,
}
}
}
func (coll *Collector) parser() {
for obj := range coll.queue {
if data, err := obj.Parse(); err != nil {
log.Println("unable to decode response from", obj.Address.String(), err, "\n", string(obj.Raw))
} else {
coll.saveResponse(obj.Address, data)
}
}
}
func (coll *Collector) sendOnce() {
coll.SendRequestPacket(coll.addrFrom)
coll.SendRequestPacket(coll.addrTo)
log.Print("send request")
}
// send packets continously
func (coll *Collector) sender() {
ticker := time.NewTicker(coll.interval)
for {
select {
case <-coll.stop:
ticker.Stop()
return
case <-ticker.C:
// send the multicast packet to request per-node statistics
coll.sendOnce()
}
}
}
// Close Collector
func (coll *Collector) Close() {
close(coll.stop)
coll.connection.Close()
close(coll.queue)
}

View File

@ -19,8 +19,11 @@ func main() {
nodes := runtime.NewNodes(&runtime.Config{}) nodes := runtime.NewNodes(&runtime.Config{})
collector := respond.NewCollector(nil, nodes, iface, 0) collector := respond.NewCollector(nil, nodes, iface, iface, iface, 0)
collector.SendPacket(net.ParseIP(dstAddress)) collector.SendPacket(net.UDPAddr{
IP: net.ParseIP(dstAddress),
Zone: iface,
})
time.Sleep(time.Second) time.Sleep(time.Second)

View File

@ -9,8 +9,9 @@ import (
"time" "time"
"github.com/FreifunkBremen/yanic/database" "github.com/FreifunkBremen/yanic/database"
"github.com/FreifunkBremen/yanic/database/all" allDB "github.com/FreifunkBremen/yanic/database/all"
"github.com/FreifunkBremen/yanic/meshviewer" "github.com/FreifunkBremen/yanic/output"
allOutput "github.com/FreifunkBremen/yanic/output/all"
"github.com/FreifunkBremen/yanic/respond" "github.com/FreifunkBremen/yanic/respond"
"github.com/FreifunkBremen/yanic/rrd" "github.com/FreifunkBremen/yanic/rrd"
"github.com/FreifunkBremen/yanic/runtime" "github.com/FreifunkBremen/yanic/runtime"
@ -42,8 +43,17 @@ func main() {
if err != nil { if err != nil {
panic(err) panic(err)
} }
if config.Respondd.InterfaceSendUnicast == "" {
config.Respondd.InterfaceSendUnicast = config.Respondd.InterfaceListen
}
if config.Respondd.InterfaceSendMulticast == "" {
config.Respondd.InterfaceSendMulticast = config.Respondd.InterfaceListen
}
if config.Respondd.MulticastDestination != "" {
respond.MulticastGroup = config.Respondd.MulticastDestination
}
connections, err = all.Connect(config.Database.Connection) connections, err = allDB.Connect(config.Database.Connection)
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -57,7 +67,14 @@ func main() {
nodes = runtime.NewNodes(config) nodes = runtime.NewNodes(config)
nodes.Start() nodes.Start()
meshviewer.Start(config, nodes)
outputs, err := allOutput.Register(nodes, config.Nodes.Output)
if err != nil {
panic(err)
}
output.Start(outputs, config)
defer output.Close()
if config.Webserver.Enable { if config.Webserver.Enable {
log.Println("starting webserver on", config.Webserver.Bind) log.Println("starting webserver on", config.Webserver.Bind)
@ -74,7 +91,7 @@ func main() {
time.Sleep(delay) time.Sleep(delay)
} }
collector = respond.NewCollector(connections, nodes, config.Respondd.Interface, config.Respondd.Port) collector = respond.NewCollector(connections, nodes, config.Respondd.InterfaceListen, config.Respondd.InterfaceSendMulticast, config.Respondd.InterfaceSendUnicast, config.Respondd.ListenPort)
collector.Start(config.Respondd.CollectInterval.Duration) collector.Start(config.Respondd.CollectInterval.Duration)
defer collector.Close() defer collector.Close()
} }

View File

@ -5,8 +5,13 @@ enable = true
synchronize = "1m" synchronize = "1m"
# how oftern request per multicast # how oftern request per multicast
collect_interval = "1m" collect_interval = "1m"
# on which interface # on which interface to listen
interface = "eth0" interface = "eth0"
# send unicast request (default: see interface)
## interface_send_unicast = "eth0"
# send multicast request (default: see interface)
# interface_send_multicast = "eth1"
# define a port to listen # define a port to listen
# (no or 0 would choose at port at his own) # (no or 0 would choose at port at his own)
#port = 10001 #port = 10001
@ -37,7 +42,8 @@ offline_after = "10m"
prune_after = "7d" prune_after = "7d"
[meshviewer] [[nodes.output.meshviewer]]
enable = true
# structur of nodes.json, which to support # structur of nodes.json, which to support
# version 1 is to support legacy meshviewer (which are in master branch) # version 1 is to support legacy meshviewer (which are in master branch)
# i.e. https://github.com/ffnord/meshviewer/tree/master # i.e. https://github.com/ffnord/meshviewer/tree/master
@ -50,6 +56,27 @@ nodes_path = "/var/www/html/meshviewer/data/nodes.json"
# path where to store graph.json # path where to store graph.json
graph_path = "/var/www/html/meshviewer/data/graph.json" graph_path = "/var/www/html/meshviewer/data/graph.json"
[nodes.output.meshviewer.filter]
# no_owner = true
has_location = true
blacklist = ["vpnid"]
[nodes.output.meshviewer.filter.in_area]
latitude_min = 34.30
latitude_max = 71.85
longitude_min = -24.96
longitude_max = 39.72
[[nodes.output.template]]
enable = false
template_path = "/var/lib/collector/html-template.tmp"
output_path = "/var/www/html/index.html"
[[nodes.output.nodelist]]
enable = true
path = "/var/www/html/meshviewer/data/nodelist.json"
[database] [database]
# cleaning data of measurement node, # cleaning data of measurement node,
# which are older than 7d # which are older than 7d
@ -79,7 +106,18 @@ system = "testing"
enable = false enable = false
path = "/var/log/yanic.log" path = "/var/log/yanic.log"
[[database.connection.graphite]] [[database.connection.graphite]]
enable = false enable = false
address = "localhost:2003" address = "localhost:2003"
prefix = "freifunk" prefix = "freifunk"
[[database.connection.socket]]
enable = false
type = "tcp"
address = ":8081"
[[database.connection.socket]]
enable = false
type = "unix"
address = "/var/lib/collector/database.socket"

View File

@ -0,0 +1,16 @@
function ffhbCurrentStats(data) {
$("#freifunk").html("
<h1><a href="https://bremen.freifunk.net/" style="color: #444;">bremen.freifunk.net</a></h1>
<p>
Nutzer: <span id="freifunk_clients">0</span><br>
<i style="font-style: italic;">(auf <span id="freifunk_nodes">0</span> Geräte verteilt)</i>
</p>
<p style="text-align: right;">
<a href="https://events.ffhb.de/meshviewer">mehr</a>
</p>");
$("#freifunk_clients").html(data.Clients);
$("#freifunk_nodes").html(data.Nodes);
};
ffhbCurrentStats({{json .GlobalStatistic}});

View File

@ -4,7 +4,8 @@ package data
type Neighbours struct { type Neighbours struct {
Batadv map[string]BatadvNeighbours `json:"batadv"` Batadv map[string]BatadvNeighbours `json:"batadv"`
LLDP map[string]LLDPNeighbours `json:"lldp"` LLDP map[string]LLDPNeighbours `json:"lldp"`
//WifiNeighbours map[string]WifiNeighbours `json:"wifi"` Babel map[string]BabelNeighbours `json:"babel"`
WifiNeighbours map[string]WifiNeighbours `json:"wifi"`
NodeID string `json:"node_id"` NodeID string `json:"node_id"`
} }
@ -21,6 +22,11 @@ type BatmanLink struct {
Tq int `json:"tq"` Tq int `json:"tq"`
} }
// BabelLink struct
type BabelLink struct {
Address string `json:"address"`
}
// LLDPLink struct // LLDPLink struct
type LLDPLink struct { type LLDPLink struct {
Name string `json:"name"` Name string `json:"name"`
@ -32,6 +38,9 @@ type BatadvNeighbours struct {
Neighbours map[string]BatmanLink `json:"neighbours"` Neighbours map[string]BatmanLink `json:"neighbours"`
} }
// BabelNeighbours struct
type BabelNeighbours []BabelLink
// WifiNeighbours struct // WifiNeighbours struct
type WifiNeighbours struct { type WifiNeighbours struct {
Neighbours map[string]WifiLink `json:"neighbours"` Neighbours map[string]WifiLink `json:"neighbours"`

View File

@ -4,7 +4,7 @@ package data
type NodeInfo struct { type NodeInfo struct {
NodeID string `json:"node_id"` NodeID string `json:"node_id"`
Network Network `json:"network"` Network Network `json:"network"`
Owner *Owner `json:"-"` // Removed for privacy reasons Owner *Owner `json:"owner"`
System System `json:"system"` System System `json:"system"`
Hostname string `json:"hostname"` Hostname string `json:"hostname"`
Location *Location `json:"location,omitempty"` Location *Location `json:"location,omitempty"`

View File

@ -4,4 +4,6 @@ import (
_ "github.com/FreifunkBremen/yanic/database/graphite" _ "github.com/FreifunkBremen/yanic/database/graphite"
_ "github.com/FreifunkBremen/yanic/database/influxdb" _ "github.com/FreifunkBremen/yanic/database/influxdb"
_ "github.com/FreifunkBremen/yanic/database/logging" _ "github.com/FreifunkBremen/yanic/database/logging"
_ "github.com/FreifunkBremen/yanic/database/socket"
_ "github.com/FreifunkBremen/yanic/database/yanic"
) )

View File

@ -50,6 +50,13 @@ func (c *Connection) InsertNode(node *runtime.Node) {
} }
addField("neighbours.batadv", batadv) addField("neighbours.batadv", batadv)
// protocol: Babel
babel := 0
for _, babelNeighbours := range neighbours.Babel {
babel += len(babelNeighbours)
}
addField("neighbours.babel", babel)
// protocol: LLDP // protocol: LLDP
lldp := 0 lldp := 0
for _, lldpNeighbours := range neighbours.LLDP { for _, lldpNeighbours := range neighbours.LLDP {

View File

@ -80,6 +80,13 @@ func buildNodeStats(node *runtime.Node) (tags models.Tags, fields models.Fields)
} }
fields["neighbours.batadv"] = batadv fields["neighbours.batadv"] = batadv
// protocol: Babel
babel := 0
for _, babelNeighbours := range neighbours.Babel {
babel += len(babelNeighbours)
}
fields["neighbours.babel"] = babel
// protocol: LLDP // protocol: LLDP
lldp := 0 lldp := 0
for _, lldpNeighbours := range neighbours.LLDP { for _, lldpNeighbours := range neighbours.LLDP {

View File

@ -0,0 +1,86 @@
package yanic
import (
"encoding/json"
"log"
"net"
"github.com/FreifunkBremen/yanic/database/socket"
"github.com/FreifunkBremen/yanic/runtime"
)
type Dialer struct {
conn net.Conn
queue chan socket.Message
quit chan struct{}
NodeHandler func(*runtime.Node)
GlobalsHandler func(*runtime.GlobalStats)
PruneNodesHandler func()
}
func Dial(ctype, addr string) *Dialer {
conn, err := net.Dial(ctype, addr)
if err != nil {
log.Panicf("yanic dial to %s:%s failed", ctype, addr)
}
dialer := &Dialer{
conn: conn,
queue: make(chan socket.Message),
quit: make(chan struct{}),
}
return dialer
}
func (d *Dialer) Start() {
go d.reciever()
d.parser()
}
func (d *Dialer) Close() {
d.conn.Close()
close(d.quit)
}
func (d *Dialer) reciever() {
decoder := json.NewDecoder(d.conn)
var msg socket.Message
for {
select {
case <-d.quit:
close(d.queue)
return
default:
decoder.Decode(&msg)
d.queue <- msg
}
}
}
func (d *Dialer) parser() {
for msg := range d.queue {
switch msg.Event {
case socket.MessageEventInsertNode:
if d.NodeHandler != nil {
var node runtime.Node
obj, _ := json.Marshal(msg.Body)
json.Unmarshal(obj, &node)
d.NodeHandler(&node)
}
case socket.MessageEventInsertGlobals:
if d.GlobalsHandler != nil {
var globals runtime.GlobalStats
obj, _ := json.Marshal(msg.Body)
json.Unmarshal(obj, &globals)
d.GlobalsHandler(&globals)
}
case socket.MessageEventPruneNodes:
if d.PruneNodesHandler != nil {
d.PruneNodesHandler()
}
}
}
}

View File

@ -0,0 +1,69 @@
package yanic
import (
"testing"
"time"
"github.com/FreifunkBremen/yanic/database/socket"
"github.com/FreifunkBremen/yanic/runtime"
"github.com/stretchr/testify/assert"
)
func TestConnectError(t *testing.T) {
assert := assert.New(t)
assert.Panics(func() {
Dial("tcp6", "[::]:30303")
}, "could connect")
}
func TestRecieveMessages(t *testing.T) {
assert := assert.New(t)
server, err := socket.Connect(map[string]interface{}{
"enable": true,
"type": "tcp6",
"address": "[::]:1337",
})
assert.NoError(err)
d := Dial("tcp6", "[::]:1337")
assert.NotNil(d)
go d.Start()
time.Sleep(5 * time.Millisecond)
runned := false
d.NodeHandler = func(node *runtime.Node) {
runned = true
}
server.InsertNode(nil)
time.Sleep(5 * time.Millisecond)
assert.True(runned, "node not inserted")
runned = false
d.GlobalsHandler = func(stats *runtime.GlobalStats) {
runned = true
}
server.InsertGlobals(nil, time.Now())
time.Sleep(5 * time.Millisecond)
assert.True(runned, "global stats not inserted")
runned = false
d.PruneNodesHandler = func() {
runned = true
}
server.PruneNodes(time.Hour * 24 * 7)
time.Sleep(5 * time.Millisecond)
assert.True(runned, "node not pruned")
d.Close()
time.Sleep(5 * time.Millisecond)
runned = false
d.PruneNodesHandler = func() {
runned = true
}
server.PruneNodes(time.Hour * 24 * 7)
time.Sleep(5 * time.Millisecond)
assert.False(runned, "message recieve")
server.Close()
}

View File

@ -0,0 +1,68 @@
package socket
/*
* This socket database is to run another service
* (without flooding the network with respondd packages)
* e.g. https://github.com/FreifunkBremen/freifunkmanager
*/
import (
"log"
"net"
"sync"
"time"
"github.com/FreifunkBremen/yanic/database"
"github.com/FreifunkBremen/yanic/runtime"
)
type Connection struct {
database.Connection
listener net.Listener
clients map[net.Addr]net.Conn
clientMux sync.Mutex
}
func init() {
database.RegisterAdapter("socket", Connect)
}
func Connect(configuration interface{}) (database.Connection, error) {
config := configuration.(map[string]interface{})
if !config["enable"].(bool) {
return nil, nil
}
ln, err := net.Listen(config["type"].(string), config["address"].(string))
if err != nil {
return nil, err
}
conn := &Connection{listener: ln, clients: make(map[net.Addr]net.Conn)}
go conn.handleSocketConnection(ln)
log.Println("[socket-database] listen on: ", ln.Addr())
return conn, nil
}
func (conn *Connection) InsertNode(node *runtime.Node) {
conn.sendJSON(Message{Event: MessageEventInsertNode, Body: node})
}
func (conn *Connection) InsertGlobals(stats *runtime.GlobalStats, time time.Time) {
conn.sendJSON(Message{Event: MessageEventInsertGlobals, Body: stats})
}
func (conn *Connection) PruneNodes(deleteAfter time.Duration) {
conn.sendJSON(Message{Event: MessageEventPruneNodes})
}
func (conn *Connection) Close() {
conn.clientMux.Lock()
for _, c := range conn.clients {
c.Close()
}
conn.clientMux.Unlock()
conn.listener.Close()
}

View File

@ -0,0 +1,80 @@
package socket
import (
"encoding/json"
"net"
"testing"
"time"
"github.com/FreifunkBremen/yanic/runtime"
"github.com/stretchr/testify/assert"
)
func TestStartup(t *testing.T) {
assert := assert.New(t)
config := make(map[string]interface{})
config["enable"] = false
conn, err := Connect(config)
assert.Nil(conn)
config["enable"] = true
config["type"] = ""
config["address"] = ""
conn, err = Connect(config)
assert.Error(err, "connection should not work")
assert.Nil(conn)
config["type"] = "tcp6"
config["address"] = "[::]:1337"
conn, err = Connect(config)
assert.NoError(err, "connection should work")
assert.NotNil(conn)
conn.Close()
}
func TestClient(t *testing.T) {
assert := assert.New(t)
config := make(map[string]interface{})
config["enable"] = true
config["type"] = "tcp6"
config["address"] = "[::]:1337"
conn, err := Connect(config)
assert.NoError(err, "connection should work")
assert.NotNil(conn)
client, err := net.Dial("tcp6", "[::]:1337")
assert.NoError(err, "connection should work")
assert.NotNil(client)
time.Sleep(time.Duration(3) * time.Microsecond)
decoder := json.NewDecoder(client)
var msg Message
conn.InsertNode(&runtime.Node{})
decoder.Decode(&msg)
assert.Equal("insert_node", msg.Event)
conn.InsertGlobals(&runtime.GlobalStats{}, time.Now())
decoder.Decode(&msg)
assert.Equal("insert_globals", msg.Event)
conn.PruneNodes(time.Hour * 24 * 7)
decoder.Decode(&msg)
assert.Equal("prune_nodes", msg.Event)
time.Sleep(time.Duration(3) * time.Microsecond)
// to reach in sendJSON removing of disconnection
conn.Close()
conn.InsertNode(&runtime.Node{})
err = decoder.Decode(&msg)
assert.Error(err)
}

View File

@ -0,0 +1,35 @@
package socket
import (
"encoding/json"
"log"
"net"
)
func (conn *Connection) handleSocketConnection(ln net.Listener) {
for {
c, err := ln.Accept()
if err != nil {
log.Println("[socket-database] error during connection of a client", err)
continue
}
conn.clientMux.Lock()
conn.clients[c.RemoteAddr()] = c
conn.clientMux.Unlock()
}
}
func (conn *Connection) sendJSON(msg Message) {
conn.clientMux.Lock()
for addr, c := range conn.clients {
d := json.NewEncoder(c)
err := d.Encode(&msg)
if err != nil {
log.Println("[socket-database] client has not recieve event:", err)
c.Close()
delete(conn.clients, addr)
}
}
conn.clientMux.Unlock()
}

View File

@ -0,0 +1,12 @@
package socket
type Message struct {
Event string `json:"event"`
Body interface{} `json:"body,omitempty"`
}
const (
MessageEventInsertNode = "insert_node"
MessageEventInsertGlobals = "insert_globals"
MessageEventPruneNodes = "prune_nodes"
)

81
database/yanic/main.go Normal file
View File

@ -0,0 +1,81 @@
package yanic
import (
"bufio"
"bytes"
"compress/flate"
"encoding/json"
"log"
"net"
"time"
"github.com/FreifunkBremen/yanic/data"
"github.com/FreifunkBremen/yanic/database"
"github.com/FreifunkBremen/yanic/runtime"
)
type Connection struct {
database.Connection
config Config
conn *net.UDPConn
}
type Config map[string]interface{}
func (c Config) Enable() bool {
return c["enable"].(bool)
}
func (c Config) Address() string {
return c["address"].(string)
}
func init() {
database.RegisterAdapter("yanic", Connect)
}
func Connect(configuration interface{}) (database.Connection, error) {
var config Config
config = configuration.(map[string]interface{})
if !config.Enable() {
return nil, nil
}
udpAddr, err := net.ResolveUDPAddr("udp", config.Address())
if err != nil {
log.Panicf("Invalid yanic address: %s", err)
}
conn, err := net.DialUDP("udp", nil, udpAddr)
if err != nil {
log.Panicf("Unable to dial yanic: %s", err)
}
return &Connection{config: config, conn: conn}, nil
}
func (conn *Connection) InsertNode(node *runtime.Node) {
buf := bytes.Buffer{}
writer := bufio.NewWriter(&buf)
deflater, err := flate.NewWriter(writer, flate.DefaultCompression)
err = json.NewEncoder(deflater).Encode(&data.ResponseData{
Statistics: node.Statistics,
NodeInfo: node.Nodeinfo,
})
if err != nil {
panic(err)
}
deflater.Close()
writer.Flush()
conn.conn.Write(buf.Bytes())
}
func (conn *Connection) InsertGlobals(stats *runtime.GlobalStats, time time.Time) {
}
func (conn *Connection) PruneNodes(deleteAfter time.Duration) {
}
func (conn *Connection) Close() {
conn.conn.Close()
}

View File

@ -1,50 +0,0 @@
package meshviewer
import (
"log"
"time"
"github.com/FreifunkBremen/yanic/runtime"
)
type nodeBuilder func(*runtime.Nodes) interface{}
var nodeFormats = map[int]nodeBuilder{
1: BuildNodesV1,
2: BuildNodesV2,
}
// Start all services to manage Nodes
func Start(config *runtime.Config, nodes *runtime.Nodes) {
go worker(config, nodes)
}
// Periodically saves the cached DB to json file
func worker(config *runtime.Config, nodes *runtime.Nodes) {
c := time.Tick(config.Nodes.SaveInterval.Duration)
for range c {
saveMeshviewer(config, nodes)
}
}
func saveMeshviewer(config *runtime.Config, nodes *runtime.Nodes) {
// Locking foo
nodes.RLock()
defer nodes.RUnlock()
if path := config.Meshviewer.NodesPath; path != "" {
version := config.Meshviewer.Version
builder := nodeFormats[version]
if builder != nil {
runtime.SaveJSON(builder(nodes), path)
} else {
log.Panicf("invalid nodes version: %d", version)
}
}
if path := config.Meshviewer.GraphPath; path != "" {
runtime.SaveJSON(BuildGraph(nodes), path)
}
}

37
output/all/internal.go Normal file
View File

@ -0,0 +1,37 @@
package all
import (
"github.com/FreifunkBremen/yanic/output"
"github.com/FreifunkBremen/yanic/runtime"
)
type Output struct {
output.Output
nodes *runtime.Nodes
list []output.Output
}
func Register(nodes *runtime.Nodes, configuration interface{}) (output.Output, error) {
var list []output.Output
allOutputs := configuration.(map[string][]interface{})
for outputType, outputRegister := range output.Adapters {
outputConfigs := allOutputs[outputType]
for _, config := range outputConfigs {
output, err := outputRegister(nodes, config)
if err != nil {
return nil, err
}
if output == nil {
continue
}
list = append(list, output)
}
}
return &Output{list: list, nodes: nodes}, nil
}
func (o *Output) Save() {
for _, item := range o.list {
item.Save()
}
}

6
output/all/main.go Normal file
View File

@ -0,0 +1,6 @@
package all
import (
_ "github.com/FreifunkBremen/yanic/output/meshviewer"
_ "github.com/FreifunkBremen/yanic/output/template"
)

37
output/internal.go Normal file
View File

@ -0,0 +1,37 @@
package output
import (
"time"
"github.com/FreifunkBremen/yanic/runtime"
)
var quit chan struct{}
// Start workers of database
// WARNING: Do not override this function
// you should use New()
func Start(output Output, config *runtime.Config) {
quit = make(chan struct{})
go saveWorker(output, config.Nodes.SaveInterval.Duration)
}
func Close() {
if quit != nil {
close(quit)
}
}
// save periodically to output
func saveWorker(output Output, saveInterval time.Duration) {
ticker := time.NewTicker(saveInterval)
for {
select {
case <-ticker.C:
output.Save()
case <-quit:
ticker.Stop()
return
}
}
}

157
output/meshviewer/filter.go Normal file
View File

@ -0,0 +1,157 @@
package meshviewer
import (
"github.com/FreifunkBremen/yanic/data"
"github.com/FreifunkBremen/yanic/runtime"
)
type filter func(node *runtime.Node) *runtime.Node
// Config Filter
type filterConfig map[string]interface{}
func (f filterConfig) Blacklist() *map[string]interface{} {
if v, ok := f["blacklist"]; ok {
list := make(map[string]interface{})
for _, nodeid := range v.([]interface{}) {
list[nodeid.(string)] = true
}
return &list
}
return nil
}
func (f filterConfig) NoOwner() bool {
if v, ok := f["no_owner"]; ok {
return v.(bool)
}
return true
}
func (f filterConfig) HasLocation() *bool {
if v, ok := f["has_location"].(bool); ok {
return &v
}
return nil
}
type area struct {
xA float64
xB float64
yA float64
yB float64
}
func (f filterConfig) InArea() *area {
if areaConfigInt, ok := f["in_area"]; ok {
areaConfig := areaConfigInt.(map[string]interface{})
a := area{}
if v, ok := areaConfig["latitude_min"]; ok {
a.xA = v.(float64)
}
if v, ok := areaConfig["latitude_max"]; ok {
a.xB = v.(float64)
}
if v, ok := areaConfig["longitude_min"]; ok {
a.yA = v.(float64)
}
if v, ok := areaConfig["longitude_max"]; ok {
a.yB = v.(float64)
}
return &a
}
return nil
}
// Create Filter
func createFilter(config filterConfig) filter {
return func(n *runtime.Node) *runtime.Node {
//maybe cloning of this object is better?
node := n
if config.NoOwner() {
node = filterNoOwner(node)
}
if ok := config.HasLocation(); ok != nil {
node = filterHasLocation(node, *ok)
}
if area := config.InArea(); area != nil {
node = filterLocationInArea(node, *area)
}
if list := config.Blacklist(); list != nil {
node = filterBlacklist(node, *list)
}
return node
}
}
func filterBlacklist(node *runtime.Node, list map[string]interface{}) *runtime.Node {
if node != nil {
if nodeinfo := node.Nodeinfo; nodeinfo != nil {
if _, ok := list[nodeinfo.NodeID]; !ok {
return node
}
}
}
return nil
}
func filterNoOwner(node *runtime.Node) *runtime.Node {
if node == nil {
return nil
}
return &runtime.Node{
Address: node.Address,
Firstseen: node.Firstseen,
Lastseen: node.Lastseen,
Online: node.Online,
Statistics: node.Statistics,
Nodeinfo: &data.NodeInfo{
NodeID: node.Nodeinfo.NodeID,
Network: node.Nodeinfo.Network,
System: node.Nodeinfo.System,
Owner: nil,
Hostname: node.Nodeinfo.Hostname,
Location: node.Nodeinfo.Location,
Software: node.Nodeinfo.Software,
Hardware: node.Nodeinfo.Hardware,
VPN: node.Nodeinfo.VPN,
Wireless: node.Nodeinfo.Wireless,
},
Neighbours: node.Neighbours,
}
}
func filterHasLocation(node *runtime.Node, withLocation bool) *runtime.Node {
if node != nil {
if nodeinfo := node.Nodeinfo; nodeinfo != nil {
if withLocation {
if location := nodeinfo.Location; location != nil {
return node
}
} else {
if location := nodeinfo.Location; location == nil {
return node
}
}
}
}
return nil
}
func filterLocationInArea(node *runtime.Node, a area) *runtime.Node {
if node != nil {
if nodeinfo := node.Nodeinfo; nodeinfo != nil {
if location := nodeinfo.Location; location != nil {
if location.Latitude >= a.xA && location.Latitude <= a.xB {
if location.Longtitude >= a.yA && location.Longtitude <= a.yB {
return node
}
}
} else {
return node
}
}
}
return nil
}

View File

@ -80,6 +80,7 @@ func (builder *graphBuilder) readNodes(nodes map[string]*runtime.Node) {
builder.macToID[sourceAddress] = sourceID builder.macToID[sourceAddress] = sourceID
} }
} }
} }
// Iterate over local MAC addresses from LLDP // Iterate over local MAC addresses from LLDP
@ -102,6 +103,25 @@ func (builder *graphBuilder) readNodes(nodes map[string]*runtime.Node) {
} }
} }
} }
// Wifi neighbours
for _, wifiNeighbours := range neighbours.WifiNeighbours {
for targetAddress, link := range wifiNeighbours.Neighbours {
if targetID, found := builder.macToID[targetAddress]; found {
linkActive := link.Noise + link.Inactive + link.Signal
if linkActive > 0 {
builder.addLink(targetID, sourceID, link.Signal/linkActive)
}
}
}
}
// Babel neighbours
for _, babelNeighbours := range neighbours.Babel {
for _, link := range babelNeighbours {
if targetID, found := builder.macToID[link.Address]; found {
builder.addLink(targetID, sourceID, 1)
}
}
}
// LLDP // LLDP
for _, neighbours := range neighbours.LLDP { for _, neighbours := range neighbours.LLDP {
for targetAddress := range neighbours { for targetAddress := range neighbours {

View File

@ -58,7 +58,7 @@ func testGetNodeByFile(filename string) *runtime.Node {
} }
func testfile(name string, obj interface{}) { func testfile(name string, obj interface{}) {
file, err := ioutil.ReadFile("../runtime/testdata/" + name) file, err := ioutil.ReadFile("../../runtime/testdata/" + name)
if err != nil { if err != nil {
panic(err) panic(err)
} }

View File

@ -10,13 +10,13 @@ import (
) )
func TestNodesV1(t *testing.T) { func TestNodesV1(t *testing.T) {
nodes := BuildNodesV1(createTestNodes()).(*NodesV1) nodes := BuildNodesV1(func(n *runtime.Node) *runtime.Node { return n }, createTestNodes()).(*NodesV1)
assert := assert.New(t) assert := assert.New(t)
assert.Len(nodes.List, 2) assert.Len(nodes.List, 2)
} }
func TestNodesV2(t *testing.T) { func TestNodesV2(t *testing.T) {
nodes := BuildNodesV2(createTestNodes()).(*NodesV2) nodes := BuildNodesV2(func(n *runtime.Node) *runtime.Node { return n }, createTestNodes()).(*NodesV2)
assert := assert.New(t) assert := assert.New(t)
assert.Len(nodes.List, 2) assert.Len(nodes.List, 2)

View File

@ -14,30 +14,29 @@ type NodesV1 struct {
} }
// BuildNodesV1 transforms data to legacy meshviewer // BuildNodesV1 transforms data to legacy meshviewer
func BuildNodesV1(nodes *runtime.Nodes) interface{} { func BuildNodesV1(toFilter filter, nodes *runtime.Nodes) interface{} {
meshviewerNodes := &NodesV1{ meshviewerNodes := &NodesV1{
Version: 1, Version: 1,
List: make(map[string]*Node), List: make(map[string]*Node),
Timestamp: jsontime.Now(), Timestamp: jsontime.Now(),
} }
for nodeID := range nodes.List { for nodeID, nodeOrigin := range nodes.List {
nodeOrigin := nodes.List[nodeID] nodeFiltere := toFilter(nodeOrigin)
if nodeOrigin.Statistics == nil || nodeFiltere == nil {
if nodeOrigin.Statistics == nil {
continue continue
} }
node := &Node{ node := &Node{
Firstseen: nodeOrigin.Firstseen, Firstseen: nodeFiltere.Firstseen,
Lastseen: nodeOrigin.Lastseen, Lastseen: nodeFiltere.Lastseen,
Flags: Flags{ Flags: Flags{
Online: nodeOrigin.Online, Online: nodeFiltere.Online,
Gateway: nodeOrigin.IsGateway(), Gateway: nodeFiltere.IsGateway(),
}, },
Nodeinfo: nodeOrigin.Nodeinfo, Nodeinfo: nodeFiltere.Nodeinfo,
} }
node.Statistics = NewStatistics(nodeOrigin.Statistics) node.Statistics = NewStatistics(nodeFiltere.Statistics)
meshviewerNodes.List[nodeID] = node meshviewerNodes.List[nodeID] = node
} }
return meshviewerNodes return meshviewerNodes

View File

@ -14,27 +14,27 @@ type NodesV2 struct {
} }
// BuildNodesV2 transforms data to modern meshviewers // BuildNodesV2 transforms data to modern meshviewers
func BuildNodesV2(nodes *runtime.Nodes) interface{} { func BuildNodesV2(toFilter filter, nodes *runtime.Nodes) interface{} {
meshviewerNodes := &NodesV2{ meshviewerNodes := &NodesV2{
Version: 2, Version: 2,
Timestamp: jsontime.Now(), Timestamp: jsontime.Now(),
} }
for nodeID := range nodes.List { for _, nodeOrigin := range nodes.List {
nodeOrigin := nodes.List[nodeID] nodeFiltere := toFilter(nodeOrigin)
if nodeOrigin.Statistics == nil { if nodeOrigin.Statistics == nil || nodeFiltere == nil {
continue continue
} }
node := &Node{ node := &Node{
Firstseen: nodeOrigin.Firstseen, Firstseen: nodeFiltere.Firstseen,
Lastseen: nodeOrigin.Lastseen, Lastseen: nodeFiltere.Lastseen,
Flags: Flags{ Flags: Flags{
Online: nodeOrigin.Online, Online: nodeFiltere.Online,
Gateway: nodeOrigin.IsGateway(), Gateway: nodeFiltere.IsGateway(),
}, },
Nodeinfo: nodeOrigin.Nodeinfo, Nodeinfo: nodeFiltere.Nodeinfo,
} }
node.Statistics = NewStatistics(nodeOrigin.Statistics) node.Statistics = NewStatistics(nodeFiltere.Statistics)
meshviewerNodes.List = append(meshviewerNodes.List, node) meshviewerNodes.List = append(meshviewerNodes.List, node)
} }
return meshviewerNodes return meshviewerNodes

View File

@ -0,0 +1,88 @@
package meshviewer
import (
"log"
"github.com/FreifunkBremen/yanic/output"
"github.com/FreifunkBremen/yanic/runtime"
)
type Output struct {
output.Output
config Config
nodes *runtime.Nodes
builder nodeBuilder
filter filter
}
type Config map[string]interface{}
func (c Config) Enable() bool {
return c["enable"].(bool)
}
func (c Config) Version() int64 {
return c["version"].(int64)
}
func (c Config) NodesPath() string {
if c["nodes_path"] == nil {
log.Panic("in configuration of [[nodes.output.meshviewer]] was no nodes_path defined", c)
}
return c["nodes_path"].(string)
}
func (c Config) GraphPath() string {
return c["graph_path"].(string)
}
func (c Config) FilterOption() filterConfig {
if v, ok := c["filter"]; ok {
var filterMap filterConfig
filterMap = v.(map[string]interface{})
return filterMap
}
return nil
}
type nodeBuilder func(filter, *runtime.Nodes) interface{}
var nodeFormats = map[int64]nodeBuilder{
1: BuildNodesV1,
2: BuildNodesV2,
}
func init() {
output.RegisterAdapter("meshviewer", Register)
}
func Register(nodes *runtime.Nodes, configuration interface{}) (output.Output, error) {
var config Config
config = configuration.(map[string]interface{})
if !config.Enable() {
return nil, nil
}
builder := nodeFormats[config.Version()]
if builder == nil {
log.Panicf("invalid nodes version: %d", config.Version())
}
return &Output{
nodes: nodes,
config: config,
builder: builder,
filter: createFilter(config.FilterOption()),
}, nil
}
func (o *Output) Save() {
o.nodes.RLock()
defer o.nodes.RUnlock()
if path := o.config.NodesPath(); path != "" {
runtime.SaveJSON(o.builder(o.filter, o.nodes), path)
}
if path := o.config.GraphPath(); path != "" {
runtime.SaveJSON(BuildGraph(o.nodes), path)
}
}

View File

@ -0,0 +1,64 @@
package nodelist
import (
"github.com/FreifunkBremen/yanic/jsontime"
"github.com/FreifunkBremen/yanic/runtime"
)
// NodeList rewritten after: https://github.com/ffnord/ffmap-backend/blob/c33ebf62f013e18bf71b5a38bd058847340db6b7/lib/nodelist.py
type NodeList struct {
Version string `json:"version"`
Timestamp jsontime.Time `json:"updated_at"` // Timestamp of the generation
List []*Node `json:"nodes"`
}
type Node struct {
ID string `json:"id"`
Name string `json:"name"`
Position *Position `json:"position,omitempty"`
Status struct {
Online bool `json:"online"`
LastContact jsontime.Time `json:"lastcontact"`
Clients uint32 `json:"clients"`
} `json:"status"`
}
type Position struct {
Lat float64 `json:"lat"`
Long float64 `json:"long"`
}
func NewNode(n *runtime.Node) *Node {
if nodeinfo := n.Nodeinfo; nodeinfo != nil {
node := &Node{
ID: nodeinfo.NodeID,
Name: nodeinfo.Hostname,
}
if location := nodeinfo.Location; location != nil {
node.Position = &Position{Lat: location.Latitude, Long: location.Longtitude}
}
node.Status.Online = n.Online
node.Status.LastContact = n.Lastseen
if statistics := n.Statistics; statistics != nil {
node.Status.Clients = statistics.Clients.Total
}
return node
}
return nil
}
func transform(nodes *runtime.Nodes) *NodeList {
nodelist := &NodeList{
Version: "1.0.1",
Timestamp: jsontime.Now(),
}
for _, nodeOrigin := range nodes.List {
node := NewNode(nodeOrigin)
if node != nil {
nodelist.List = append(nodelist.List, node)
}
}
return nodelist
}

View File

@ -0,0 +1,60 @@
package nodelist
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/FreifunkBremen/yanic/data"
"github.com/FreifunkBremen/yanic/runtime"
)
func TestTransform(t *testing.T) {
nodes := transform(createTestNodes())
assert := assert.New(t)
assert.Len(nodes.List, 3)
}
func createTestNodes() *runtime.Nodes {
nodes := runtime.NewNodes(&runtime.Config{})
nodeData := &data.ResponseData{
Statistics: &data.Statistics{
Clients: data.Clients{
Total: 23,
},
},
NodeInfo: &data.NodeInfo{
Hardware: data.Hardware{
Model: "TP-Link 841",
},
},
}
nodeData.NodeInfo.Software.Firmware.Release = "2016.1.6+entenhausen1"
nodes.Update("abcdef012345", nodeData)
nodes.Update("112233445566", &data.ResponseData{
Statistics: &data.Statistics{
Clients: data.Clients{
Total: 2,
},
},
NodeInfo: &data.NodeInfo{
Hardware: data.Hardware{
Model: "TP-Link 841",
},
},
})
nodes.Update("0xdeadbeef0x", &data.ResponseData{
NodeInfo: &data.NodeInfo{
VPN: true,
Hardware: data.Hardware{
Model: "Xeon Multi-Core",
},
},
})
return nodes
}

51
output/nodelist/output.go Normal file
View File

@ -0,0 +1,51 @@
package nodelist
import (
goTemplate "text/template"
"github.com/FreifunkBremen/yanic/output"
"github.com/FreifunkBremen/yanic/runtime"
)
type Output struct {
output.Output
config Config
nodes *runtime.Nodes
template *goTemplate.Template
}
type Config map[string]interface{}
func (c Config) Enable() bool {
return c["enable"].(bool)
}
func (c Config) Path() string {
return c["path"].(string)
}
func init() {
output.RegisterAdapter("nodelist", Register)
}
func Register(nodes *runtime.Nodes, configuration interface{}) (output.Output, error) {
var config Config
config = configuration.(map[string]interface{})
if !config.Enable() {
return nil, nil
}
return &Output{
config: config,
nodes: nodes,
}, nil
}
func (o *Output) Save() {
o.nodes.RLock()
defer o.nodes.RUnlock()
if path := o.config.Path(); path != "" {
runtime.SaveJSON(transform(o.nodes), path)
}
}

19
output/output.go Normal file
View File

@ -0,0 +1,19 @@
package output
import "github.com/FreifunkBremen/yanic/runtime"
// Output interface to use for implementation in e.g. influxdb
type Output interface {
// InsertNode stores statistics per node
Save()
}
// Register function with config to get a output interface
type Register func(nodes *runtime.Nodes, config interface{}) (Output, error)
// Adapters is the list of registered output adapters
var Adapters = map[string]Register{}
func RegisterAdapter(name string, n Register) {
Adapters[name] = n
}

85
output/template/main.go Normal file
View File

@ -0,0 +1,85 @@
package template
import (
"bytes"
"encoding/json"
"io"
"log"
"os"
goTemplate "text/template"
"github.com/FreifunkBremen/yanic/output"
"github.com/FreifunkBremen/yanic/runtime"
)
type Output struct {
output.Output
config Config
nodes *runtime.Nodes
template *goTemplate.Template
}
type Config map[string]interface{}
func (c Config) Enable() bool {
return c["enable"].(bool)
}
func (c Config) TemplatePath() string {
return c["template_path"].(string)
}
func (c Config) ResultPath() string {
return c["result_path"].(string)
}
func init() {
output.RegisterAdapter("template", Register)
}
func Register(nodes *runtime.Nodes, configuration interface{}) (output.Output, error) {
var config Config
config = configuration.(map[string]interface{})
if !config.Enable() {
return nil, nil
}
t := goTemplate.New("some")
t = t.Funcs(goTemplate.FuncMap{"json": func(v interface{}) string {
a, _ := json.Marshal(v)
return string(a)
}})
buf := bytes.NewBuffer(nil)
f, err := os.Open(config.TemplatePath()) // Error handling elided for brevity.
if err != nil {
log.Panic(err)
}
io.Copy(buf, f) // Error handling elided for brevity.
f.Close()
s := string(buf.Bytes())
t.Parse(s)
return &Output{
config: config,
nodes: nodes,
template: t,
}, nil
}
func (o *Output) Save() {
stats := runtime.NewGlobalStats(o.nodes)
if stats == nil {
log.Panic("update of [output.template] not possible invalid data for the template generated")
}
tmpFile := o.config.ResultPath() + ".tmp"
f, err := os.OpenFile(tmpFile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
log.Panic(err)
}
o.template.Execute(f, map[string]interface{}{"GlobalStatistic": stats})
if err != nil {
log.Panic(err)
}
f.Close()
if err := os.Rename(tmpFile, o.config.ResultPath()); err != nil {
log.Panic(err)
}
}

View File

@ -19,7 +19,8 @@ import (
type Collector struct { type Collector struct {
connection *net.UDPConn // UDP socket connection *net.UDPConn // UDP socket
queue chan *Response // received responses queue chan *Response // received responses
iface string ifaceSendUnicast string
ifaceSendMulticast string
db database.Connection db database.Connection
nodes *runtime.Nodes nodes *runtime.Nodes
interval time.Duration // Interval for multicast packets interval time.Duration // Interval for multicast packets
@ -27,17 +28,17 @@ type Collector struct {
} }
// NewCollector creates a Collector struct // NewCollector creates a Collector struct
func NewCollector(db database.Connection, nodes *runtime.Nodes, iface string, port int) *Collector { func NewCollector(db database.Connection, nodes *runtime.Nodes, ifaceListen string, ifaceSendUnicast string, ifaceSendMulticast string, port int) *Collector {
linkLocalAddr, err := getLinkLocalAddr(iface) linkLocalAddr, err := getLinkLocalAddr(ifaceListen)
if err != nil { if err != nil {
log.Panic(err) log.Panic(err)
} }
// Open socket // Open link local socket
conn, err := net.ListenUDP("udp", &net.UDPAddr{ conn, err := net.ListenUDP("udp", &net.UDPAddr{
IP: linkLocalAddr, IP: linkLocalAddr,
Port: port, Port: port,
Zone: iface, Zone: ifaceListen,
}) })
if err != nil { if err != nil {
log.Panic(err) log.Panic(err)
@ -48,13 +49,15 @@ func NewCollector(db database.Connection, nodes *runtime.Nodes, iface string, po
connection: conn, connection: conn,
db: db, db: db,
nodes: nodes, nodes: nodes,
iface: iface, ifaceSendUnicast: ifaceSendUnicast,
ifaceSendMulticast: ifaceSendMulticast,
queue: make(chan *Response, 400), queue: make(chan *Response, 400),
stop: make(chan interface{}), stop: make(chan interface{}),
} }
go collector.receiver() go collector.receiver(conn)
go collector.parser() go collector.parser()
collector.listenPublic()
if collector.db != nil { if collector.db != nil {
go collector.globalStatsWorker() go collector.globalStatsWorker()
@ -83,6 +86,18 @@ func getLinkLocalAddr(ifname string) (net.IP, error) {
return nil, fmt.Errorf("unable to find link local unicast address for %s", ifname) return nil, fmt.Errorf("unable to find link local unicast address for %s", ifname)
} }
func (coll *Collector) listenPublic() {
conn, err := net.ListenUDP("udp", &net.UDPAddr{
IP: net.ParseIP("::"),
Port: 12345,
})
if err != nil {
log.Panic(err)
}
conn.SetReadBuffer(maxDataGramSize)
go coll.receiver(conn)
}
// Start Collector // Start Collector
func (coll *Collector) Start(interval time.Duration) { func (coll *Collector) Start(interval time.Duration) {
if coll.interval != 0 { if coll.interval != 0 {
@ -117,7 +132,10 @@ func (coll *Collector) sendOnce() {
func (coll *Collector) sendMulticast() { func (coll *Collector) sendMulticast() {
log.Println("sending multicast") log.Println("sending multicast")
coll.SendPacket(net.ParseIP(multiCastGroup)) coll.SendPacket(net.UDPAddr{
IP: net.ParseIP(MulticastGroup),
Zone: coll.ifaceSendMulticast,
})
} }
// Send unicast packets to nodes that did not answer the multicast // Send unicast packets to nodes that did not answer the multicast
@ -132,19 +150,17 @@ func (coll *Collector) sendUnicasts(seenBefore jsontime.Time) {
// Send unicast packets // Send unicast packets
log.Printf("sending unicast to %d nodes", len(nodes)) log.Printf("sending unicast to %d nodes", len(nodes))
for _, node := range nodes { for _, node := range nodes {
coll.SendPacket(node.Address) coll.SendPacket(net.UDPAddr{
IP: node.Address,
Zone: coll.ifaceSendUnicast,
})
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
} }
} }
// SendPacket sends a UDP request to the given unicast or multicast address // SendPacket sends a UDP request to the given unicast or multicast address
func (coll *Collector) SendPacket(address net.IP) { func (coll *Collector) SendPacket(addr net.UDPAddr) {
addr := net.UDPAddr{ addr.Port = port
IP: address,
Port: port,
Zone: coll.iface,
}
if _, err := coll.connection.WriteToUDP([]byte("GET nodeinfo statistics neighbours"), &addr); err != nil { if _, err := coll.connection.WriteToUDP([]byte("GET nodeinfo statistics neighbours"), &addr); err != nil {
log.Println("WriteToUDP failed:", err) log.Println("WriteToUDP failed:", err)
} }
@ -167,7 +183,8 @@ func (coll *Collector) sender() {
func (coll *Collector) parser() { func (coll *Collector) parser() {
for obj := range coll.queue { for obj := range coll.queue {
if data, err := obj.parse(); err != nil {
if data, err := obj.Parse(); err != nil {
log.Println("unable to decode response from", obj.Address.String(), err, "\n", string(obj.Raw)) log.Println("unable to decode response from", obj.Address.String(), err, "\n", string(obj.Raw))
} else { } else {
coll.saveResponse(obj.Address, data) coll.saveResponse(obj.Address, data)
@ -175,7 +192,7 @@ func (coll *Collector) parser() {
} }
} }
func (res *Response) parse() (*data.ResponseData, error) { func (res *Response) Parse() (*data.ResponseData, error) {
// Deflate // Deflate
deflater := flate.NewReader(bytes.NewReader(res.Raw)) deflater := flate.NewReader(bytes.NewReader(res.Raw))
defer deflater.Close() defer deflater.Close()
@ -215,10 +232,10 @@ func (coll *Collector) saveResponse(addr net.UDPAddr, res *data.ResponseData) {
} }
} }
func (coll *Collector) receiver() { func (coll *Collector) receiver(conn *net.UDPConn) {
buf := make([]byte, maxDataGramSize) buf := make([]byte, maxDataGramSize)
for { for {
n, src, err := coll.connection.ReadFromUDP(buf) n, src, err := conn.ReadFromUDP(buf)
if err != nil { if err != nil {
log.Println("ReadFromUDP failed:", err) log.Println("ReadFromUDP failed:", err)

View File

@ -18,7 +18,7 @@ func TestParse(t *testing.T) {
Raw: compressed, Raw: compressed,
} }
data, err := res.parse() data, err := res.Parse()
assert.NoError(err) assert.NoError(err)
assert.NotNil(data) assert.NotNil(data)

View File

@ -4,9 +4,10 @@ import (
"net" "net"
) )
// default multicast group used by announced
var MulticastGroup string = "ff02:0:0:0:0:0:2:1001"
const ( const (
// default multicast group used by announced
multiCastGroup = "ff02:0:0:0:0:0:2:1001"
// default udp port used by announced // default udp port used by announced
port = 1001 port = 1001

View File

@ -11,8 +11,11 @@ type Config struct {
Respondd struct { Respondd struct {
Enable bool `toml:"enable"` Enable bool `toml:"enable"`
Synchronize Duration `toml:"synchronize"` Synchronize Duration `toml:"synchronize"`
Interface string `toml:"interface"` InterfaceListen string `toml:"interface"`
Port int `toml:"port"` InterfaceSendMulticast string `toml:"interface_send_multicast"`
InterfaceSendUnicast string `toml:"interface_send_unicast"`
ListenPort int `toml:"port"`
MulticastDestination string `toml:"destination"`
CollectInterval Duration `toml:"collect_interval"` CollectInterval Duration `toml:"collect_interval"`
} }
Webserver struct { Webserver struct {
@ -26,6 +29,7 @@ type Config struct {
SaveInterval Duration `toml:"save_interval"` // Save nodes periodically SaveInterval Duration `toml:"save_interval"` // Save nodes periodically
OfflineAfter Duration `toml:"offline_after"` // Set node to offline if not seen within this period OfflineAfter Duration `toml:"offline_after"` // Set node to offline if not seen within this period
PruneAfter Duration `toml:"prune_after"` // Remove nodes after n days of inactivity PruneAfter Duration `toml:"prune_after"` // Remove nodes after n days of inactivity
Output map[string][]interface{}
} }
Meshviewer struct { Meshviewer struct {
Version int `toml:"version"` Version int `toml:"version"`

View File

@ -15,16 +15,20 @@ func TestReadConfig(t *testing.T) {
assert.NotNil(config) assert.NotNil(config)
assert.True(config.Respondd.Enable) assert.True(config.Respondd.Enable)
assert.Equal("eth0", config.Respondd.Interface) assert.Equal("eth0", config.Respondd.InterfaceListen)
assert.Equal(time.Minute, config.Respondd.CollectInterval.Duration) assert.Equal(time.Minute, config.Respondd.CollectInterval.Duration)
assert.Equal(2, config.Meshviewer.Version)
assert.Equal("/var/www/html/meshviewer/data/nodes.json", config.Meshviewer.NodesPath)
assert.Equal(time.Hour*24*7, config.Nodes.PruneAfter.Duration) assert.Equal(time.Hour*24*7, config.Nodes.PruneAfter.Duration)
assert.Equal(time.Hour*24*7, config.Database.DeleteAfter.Duration) assert.Equal(time.Hour*24*7, config.Database.DeleteAfter.Duration)
var meshviewer map[string]interface{}
outputs := config.Nodes.Output["meshviewer"]
assert.Len(outputs, 1, "more outputs are given")
meshviewer = outputs[0].(map[string]interface{})
assert.Equal(int64(2), meshviewer["version"])
assert.Equal("/var/www/html/meshviewer/data/nodes.json", meshviewer["nodes_path"])
var influxdb map[string]interface{} var influxdb map[string]interface{}
dbs := config.Database.Connection["influxdb"] dbs := config.Database.Connection["influxdb"]
assert.Len(dbs, 1, "more influxdb are given") assert.Len(dbs, 1, "more influxdb are given")

View File

@ -23,7 +23,7 @@ func NewGlobalStats(nodes *Nodes) (result *GlobalStats) {
Models: make(CounterMap), Models: make(CounterMap),
} }
nodes.Lock() nodes.RLock()
for _, node := range nodes.List { for _, node := range nodes.List {
if node.Online { if node.Online {
result.Nodes++ result.Nodes++
@ -42,7 +42,7 @@ func NewGlobalStats(nodes *Nodes) (result *GlobalStats) {
} }
} }
} }
nodes.Unlock() nodes.RUnlock()
return return
} }