From d4183a466d3bfc8512b6a50dc5fe4b6aa190c5ef Mon Sep 17 00:00:00 2001 From: Julian Kornberger Date: Tue, 29 Dec 2015 14:05:47 +0100 Subject: [PATCH] Save multiple types of data --- collector.go | 78 ++++++++++++++++++++++++--------------------------- main.go | 37 +++++++++++++++++++++--- nodes.go | 79 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 149 insertions(+), 45 deletions(-) create mode 100644 nodes.go diff --git a/collector.go b/collector.go index 574f57f..3e800ed 100644 --- a/collector.go +++ b/collector.go @@ -4,6 +4,8 @@ import ( "encoding/json" "log" "net" + "reflect" + "strings" "time" ) @@ -11,22 +13,15 @@ const ( maxDatagramSize = 8192 ) -type Node struct { - Firstseen time.Time `json:"firstseen"` - Lastseen time.Time `json:"lastseen"` - Statistics interface{} `json:"statistics"` - Nodeinfo interface{} `json:"nodeinfo"` -} - type Collector struct { - connection *net.UDPConn // UDP socket - queue chan string // received responses - nodes map[string]*Node // the current nodemap + collectType string + connection *net.UDPConn // UDP socket + queue chan string // received responses } -func NewCollector() *Collector { +func NewCollector(collectType string) *Collector { // Parse address - addr, err := net.ResolveUDPAddr("udp", "[::]:1001") + addr, err := net.ResolveUDPAddr("udp", "[::]:0") if err != nil { log.Panic(err) } @@ -39,11 +34,14 @@ func NewCollector() *Collector { conn.SetReadBuffer(maxDatagramSize) collector := &Collector{ - connection: conn, - queue: make(chan string, 100), - nodes: make(map[string]*Node), + collectType: collectType, + connection: conn, + queue: make(chan string, 100), } + go collector.sendOnce() + go collector.sender() + go collector.receiver() go collector.parser() @@ -55,26 +53,29 @@ func (coll *Collector) Close() { close(coll.queue) } -func (coll *Collector) send(address string) { - addr, err := net.ResolveUDPAddr("udp", address) - if err != nil { - log.Panic(err) - } - coll.connection.WriteToUDP([]byte("nodeinfo"), addr) +func (coll *Collector) sendOnce() { + coll.sendPacket("[2a06:8782:ffbb:1337:c24a:ff:fe2c:c7ac]:1001") + coll.sendPacket("[2001:bf7:540:0:32b5:c2ff:fe6e:99d5]:1001") } -func (coll *Collector) print() { - b, err := json.Marshal(coll.nodes) - if err != nil { - log.Panic(err) +func (coll *Collector) sendPacket(address string) { + addr, err := net.ResolveUDPAddr("udp", address) + check(err) + + coll.connection.WriteToUDP([]byte(coll.collectType), addr) +} + +func (coll *Collector) sender() { + c := time.Tick(collectInterval) + + for range c { + coll.sendOnce() } - log.Println(string(b)) } func (coll *Collector) parser() { for str := range coll.queue { coll.parseSingle(str) - coll.print() } } @@ -90,29 +91,24 @@ func (coll *Collector) parseSingle(str string) { return } - now := time.Now() - node, _ := coll.nodes[nodeId] + node := nodes.get(nodeId) - if node == nil { - node = &Node{ - Firstseen: now, - } - coll.nodes[nodeId] = node - } - - node.Lastseen = now - node.Nodeinfo = result + // Set result + elem := reflect.ValueOf(node).Elem() + field := elem.FieldByName(strings.Title(coll.collectType)) + field.Set(reflect.ValueOf(result)) } func (coll *Collector) receiver() { b := make([]byte, maxDatagramSize) for { - n, _, err := coll.connection.ReadFromUDP(b) + n, src, err := coll.connection.ReadFromUDP(b) if err != nil { log.Println("ReadFromUDP failed:", err) - } else { - coll.queue <- string(b[:n]) + return } + coll.queue <- string(b[:n]) + log.Println("received", coll.collectType, "from", src) } } diff --git a/main.go b/main.go index 71fd2ff..a9bbc95 100644 --- a/main.go +++ b/main.go @@ -1,17 +1,37 @@ package main import ( + "flag" "log" "os" "os/signal" "syscall" + "time" +) + +var ( + nodes = NewNodes() + outputFile string + collectInterval time.Duration + saveInterval time.Duration ) func main() { - collector := NewCollector() - defer collector.Close() - collector.send("[2a06:8782:ffbb:1337:c24a:ff:fe2c:c7ac]:1001") - collector.send("[2001:bf7:540:0:32b5:c2ff:fe6e:99d5]:1001") + var collectSeconds, saveSeconds int + + flag.StringVar(&outputFile, "output", "nodes.json", "path output file") + flag.IntVar(&collectSeconds, "collectInterval", 15, "interval for data collections") + flag.IntVar(&saveSeconds, "saveInterval", 5, "interval for data saving") + flag.Parse() + + collectInterval = time.Second * time.Duration(collectSeconds) + saveInterval = time.Second * time.Duration(saveSeconds) + + collectors := []*Collector{ + NewCollector("statistics"), + NewCollector("nodeinfo"), + NewCollector("neighbours"), + } // Wait for SIGINT or SIGTERM sigs := make(chan os.Signal, 1) @@ -19,4 +39,13 @@ func main() { sig := <-sigs log.Println("received", sig) + for _, c := range collectors { + c.Close() + } +} + +func check(e error) { + if e != nil { + log.Panic(e) + } } diff --git a/nodes.go b/nodes.go new file mode 100644 index 0000000..48ae29b --- /dev/null +++ b/nodes.go @@ -0,0 +1,79 @@ +package main + +import ( + "encoding/json" + "io/ioutil" + "log" + "os" + "sync" + "time" +) + +type Node struct { + Firstseen time.Time `json:"firstseen"` + Lastseen time.Time `json:"lastseen"` + Statistics interface{} `json:"statistics"` + Nodeinfo interface{} `json:"nodeinfo"` + Neighbours interface{} `json:"neighbours"` +} + +type Nodes struct { + Version int `json:"version"` + Timestamp time.Time `json:"timestamp"` + List map[string]*Node `json:"nodes"` // the current nodemap + sync.Mutex +} + +func NewNodes() *Nodes { + nodes := &Nodes{ + Version: 1, + List: make(map[string]*Node), + } + + go nodes.saver() + + return nodes +} + +func (nodes *Nodes) get(nodeId string) *Node { + now := time.Now() + + nodes.Lock() + node, _ := nodes.List[nodeId] + + if node == nil { + node = &Node{ + Firstseen: now, + } + nodes.List[nodeId] = node + } + nodes.Unlock() + + node.Lastseen = now + + return node +} + +func (nodes *Nodes) saver() { + c := time.Tick(saveInterval) + + for range c { + nodes.save() + } +} + +func (nodes *Nodes) save() { + nodes.Timestamp = time.Now() + + nodes.Lock() + data, err := json.Marshal(nodes) + nodes.Unlock() + + check(err) + log.Println("saving", len(nodes.List), "nodes") + + tmpFile := outputFile + ".tmp" + + check(ioutil.WriteFile(tmpFile, data, 0644)) + check(os.Rename(tmpFile, outputFile)) +}