[TASK] add respond(d) daemon - WIP

This commit is contained in:
Martin/Geno 2019-01-24 02:54:21 +01:00
parent bd13b99378
commit 2b7bfd724d
No known key found for this signature in database
GPG Key ID: 9D7D3C6BFF600C6A
14 changed files with 367 additions and 57 deletions

1
.gitignore vendored
View File

@ -24,4 +24,5 @@ _testmain.go
*.prof *.prof
webroot webroot
/config.toml /config.toml
/config-respondd.toml
/vendor /vendor

View File

@ -1,54 +1,31 @@
package cmd package cmd
import ( import (
"fmt"
"io/ioutil" "io/ioutil"
"os"
"github.com/naoina/toml" "github.com/naoina/toml"
"github.com/FreifunkBremen/yanic/database"
"github.com/FreifunkBremen/yanic/respond" "github.com/FreifunkBremen/yanic/respond"
"github.com/FreifunkBremen/yanic/runtime" "github.com/FreifunkBremen/yanic/runtime"
"github.com/FreifunkBremen/yanic/webserver"
) )
// Config represents the whole configuration
type Config struct {
Respondd respond.Config
Webserver webserver.Config
Nodes runtime.NodesConfig
Database database.Config
}
var ( var (
configPath string configPath string
collector *respond.Collector collector *respond.Collector
nodes *runtime.Nodes nodes *runtime.Nodes
) )
func loadConfig() *Config {
config, err := ReadConfigFile(configPath)
if err != nil {
fmt.Fprintln(os.Stderr, "unable to load config file:", err)
os.Exit(2)
}
return config
}
// ReadConfigFile reads a config model from path of a yml file // ReadConfigFile reads a config model from path of a yml file
func ReadConfigFile(path string) (config *Config, err error) { func ReadConfigFile(path string, config interface{}) error {
config = &Config{}
file, err := ioutil.ReadFile(path) file, err := ioutil.ReadFile(path)
if err != nil { if err != nil {
return nil, err return err
} }
err = toml.Unmarshal(file, config) err = toml.Unmarshal(file, config)
if err != nil { if err != nil {
return nil, err return err
} }
return return nil
} }

View File

@ -10,7 +10,9 @@ import (
func TestReadConfig(t *testing.T) { func TestReadConfig(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
config, err := ReadConfigFile("../config_example.toml") config := &ServeConfig{}
err := ReadConfigFile("../config_example.toml", config)
assert.NoError(err) assert.NoError(err)
assert.NotNil(config) assert.NotNil(config)
@ -40,11 +42,11 @@ func TestReadConfig(t *testing.T) {
}, },
}, meshviewer) }, meshviewer)
_, err = ReadConfigFile("testdata/config_invalid.toml") err = ReadConfigFile("testdata/config_invalid.toml", config)
assert.Error(err, "not unmarshalable") assert.Error(err, "not unmarshalable")
assert.Contains(err.Error(), "invalid TOML syntax") assert.Contains(err.Error(), "invalid TOML syntax")
_, err = ReadConfigFile("testdata/adsa.toml") err = ReadConfigFile("testdata/adsa.toml", config)
assert.Error(err, "not found able") assert.Error(err, "not found able")
assert.Contains(err.Error(), "no such file or directory") assert.Contains(err.Error(), "no such file or directory")
} }

View File

@ -19,7 +19,10 @@ var importCmd = &cobra.Command{
path := args[0] path := args[0]
site := args[1] site := args[1]
domain := args[2] domain := args[2]
config := loadConfig() config := &ServeConfig{}
if err := ReadConfigFile(configPath, config); err != nil {
log.Panicf("unable to load config file: %s", err)
}
err := allDatabase.Start(config.Database) err := allDatabase.Start(config.Database)
if err != nil { if err != nil {

40
cmd/respondd.go Normal file
View File

@ -0,0 +1,40 @@
package cmd
import (
"os"
"os/signal"
"syscall"
"github.com/bdlm/log"
"github.com/spf13/cobra"
"github.com/FreifunkBremen/yanic/respond/daemon"
)
// serveCmd represents the serve command
var responddCMD = &cobra.Command{
Use: "respondd",
Short: "Runs a respond daemon",
Example: "yanic respondd --config /etc/respondd.toml",
Run: func(cmd *cobra.Command, args []string) {
daemon := &respondd.Daemon{}
if err := ReadConfigFile(configPath, daemon); err != nil {
log.Panicf("unable to load config file: %s", err)
}
go daemon.Start()
log.Info("respondd daemon started")
// Wait for INT/TERM
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
sig := <-sigs
log.Infof("received %s", sig)
},
}
func init() {
RootCmd.AddCommand(responddCMD)
responddCMD.Flags().StringVarP(&configPath, "config", "c", "config-respondd.toml", "Path to configuration file")
}

View File

@ -9,6 +9,7 @@ import (
"github.com/bdlm/log" "github.com/bdlm/log"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/FreifunkBremen/yanic/database"
allDatabase "github.com/FreifunkBremen/yanic/database/all" allDatabase "github.com/FreifunkBremen/yanic/database/all"
allOutput "github.com/FreifunkBremen/yanic/output/all" allOutput "github.com/FreifunkBremen/yanic/output/all"
"github.com/FreifunkBremen/yanic/respond" "github.com/FreifunkBremen/yanic/respond"
@ -16,16 +17,26 @@ import (
"github.com/FreifunkBremen/yanic/webserver" "github.com/FreifunkBremen/yanic/webserver"
) )
// Config represents the whole configuration
type ServeConfig struct {
Respondd respond.Config
Webserver webserver.Config
Nodes runtime.NodesConfig
Database database.Config
}
// serveCmd represents the serve command // serveCmd represents the serve command
var serveCmd = &cobra.Command{ var serveCmd = &cobra.Command{
Use: "serve", Use: "serve",
Short: "Runs the yanic server", Short: "Runs the yanic server",
Example: "yanic serve --config /etc/yanic.toml", Example: "yanic serve --config /etc/yanic.toml",
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
config := loadConfig() config := &ServeConfig{}
if err := ReadConfigFile(configPath, config); err != nil {
log.Panicf("unable to load config file: %s", err)
}
err := allDatabase.Start(config.Database) if err := allDatabase.Start(config.Database); err != nil {
if err != nil {
log.Panicf("could not connect to database: %s", err) log.Panicf("could not connect to database: %s", err)
} }
defer allDatabase.Close() defer allDatabase.Close()
@ -33,8 +44,7 @@ var serveCmd = &cobra.Command{
nodes = runtime.NewNodes(&config.Nodes) nodes = runtime.NewNodes(&config.Nodes)
nodes.Start() nodes.Start()
err = allOutput.Start(nodes, config.Nodes) if err := allOutput.Start(nodes, config.Nodes); err != nil {
if err != nil {
log.Panicf("error on init outputs: %s", err) log.Panicf("error on init outputs: %s", err)
} }
defer allOutput.Close() defer allOutput.Close()

View File

@ -0,0 +1,15 @@
# how ofter the cache respond of a respondd request is calculated
data_interval = "3m"
# if set true, respond will contain data from batman interface
multi_instance = false
[[listen]]
address = "ff02::2:1001"
interface = ""
port = 1001
# manuelle data for respond
[data.nodeinfo.location]
latitude = 53.112446246
longitude = 8.734087944

View File

@ -0,0 +1,13 @@
[Unit]
Description=yanic
[Service]
Type=simple
User=yanic
ExecStart=/opt/go/bin/yanic respondd --config /etc/respondd.conf
Restart=always
RestartSec=5s
Environment=PATH=/usr/bin:/usr/local/bin
[Install]
WantedBy=multi-user.target

View File

@ -2,7 +2,7 @@ package data
// ResponseData struct // ResponseData struct
type ResponseData struct { type ResponseData struct {
Neighbours *Neighbours `json:"neighbours"` Nodeinfo *Nodeinfo `json:"nodeinfo" toml:"nodeinfo"`
Nodeinfo *Nodeinfo `json:"nodeinfo"` Statistics *Statistics `json:"statistics" toml:"statistics"`
Statistics *Statistics `json:"statistics"` Neighbours *Neighbours `json:"neighbours" toml:"neighbours"`
} }

View File

@ -1,9 +1,6 @@
package respond package respond
import ( import (
"bytes"
"compress/flate"
"encoding/json"
"fmt" "fmt"
"net" "net"
"time" "time"
@ -86,7 +83,7 @@ func (coll *Collector) listenUDP(iface InterfaceConfig) {
if err != nil { if err != nil {
log.Panic(err) log.Panic(err)
} }
conn.SetReadBuffer(maxDataGramSize) conn.SetReadBuffer(MaxDataGramSize)
coll.connections = append(coll.connections, multicastConn{ coll.connections = append(coll.connections, multicastConn{
Conn: conn, Conn: conn,
@ -245,18 +242,6 @@ func (coll *Collector) parser() {
} }
} }
func (res *Response) parse() (*data.ResponseData, error) {
// Deflate
deflater := flate.NewReader(bytes.NewReader(res.Raw))
defer deflater.Close()
// Unmarshal
rdata := &data.ResponseData{}
err := json.NewDecoder(deflater).Decode(rdata)
return rdata, err
}
func (coll *Collector) saveResponse(addr *net.UDPAddr, res *data.ResponseData) { func (coll *Collector) saveResponse(addr *net.UDPAddr, res *data.ResponseData) {
// Search for NodeID // Search for NodeID
var nodeID string var nodeID string
@ -308,7 +293,7 @@ func (coll *Collector) saveResponse(addr *net.UDPAddr, res *data.ResponseData) {
} }
func (coll *Collector) receiver(conn *net.UDPConn) { func (coll *Collector) receiver(conn *net.UDPConn) {
buf := make([]byte, maxDataGramSize) buf := make([]byte, MaxDataGramSize)
for { for {
n, src, err := conn.ReadFromUDP(buf) n, src, err := conn.ReadFromUDP(buf)

65
respond/daemon/data.go Normal file
View File

@ -0,0 +1,65 @@
package respondd
import (
"io/ioutil"
"os"
"strings"
"github.com/FreifunkBremen/yanic/data"
)
func trim(s string) string {
return strings.TrimSpace(strings.Trim(s, "\n"))
}
func (d *Daemon) updateData() {
nodeID := ""
// Nodeinfo
if d.Data.Nodeinfo == nil {
d.Data.Nodeinfo = &data.Nodeinfo{}
} else {
nodeID = d.Data.Nodeinfo.NodeID
}
if d.Data.Nodeinfo.Hostname == "" {
d.Data.Nodeinfo.Hostname, _ = os.Hostname()
}
// Statistics
if d.Data.Statistics == nil {
d.Data.Statistics = &data.Statistics{}
} else if nodeID == "" {
nodeID = d.Data.Statistics.NodeID
}
// Neighbours
if d.Data.Neighbours == nil {
d.Data.Neighbours = &data.Neighbours{}
} else if nodeID == "" {
nodeID = d.Data.Neighbours.NodeID
}
if nodeID == "" && !d.MultiInstance {
if v, err := ioutil.ReadFile("/etc/machine-id"); err == nil {
nodeID = trim(string(v))[:12]
}
}
d.Data.Nodeinfo.NodeID = nodeID
d.Data.Statistics.NodeID = nodeID
d.Data.Neighbours.NodeID = nodeID
for _, data := range d.dataByInterface {
data.Nodeinfo = d.Data.Nodeinfo
}
}
func (d *Daemon) getData(iface string) *data.ResponseData {
if !d.MultiInstance {
return d.Data
}
if data, ok := d.dataByInterface[iface]; ok {
return data
}
d.dataByInterface[iface] = &data.ResponseData{}
d.updateData()
return d.dataByInterface[iface]
}

91
respond/daemon/handler.go Normal file
View File

@ -0,0 +1,91 @@
package respondd
import (
"encoding/json"
"net"
"reflect"
"github.com/bdlm/log"
"github.com/FreifunkBremen/yanic/respond"
)
func (d *Daemon) handler(socket *net.UDPConn) {
socket.SetReadBuffer(respond.MaxDataGramSize)
// Loop forever reading from the socket
for {
buf := make([]byte, respond.MaxDataGramSize)
n, src, err := socket.ReadFromUDP(buf)
if err != nil {
log.Errorf("ReadFromUDP failed: %s", err)
}
raw := make([]byte, n)
copy(raw, buf)
get := string(raw)
data := d.getData(src.Zone)
log.WithFields(map[string]interface{}{
"bytes": n,
"data": get,
"src": src.String(),
}).Debug("recieve request")
if get[:3] == "GET" {
res, err := respond.NewRespone(data, src)
if err != nil {
log.Errorf("Decode failed: %s", err)
continue
}
n, err = socket.WriteToUDP(res.Raw, res.Address)
if err != nil {
log.Errorf("WriteToUDP failed: %s", err)
continue
}
log.WithFields(map[string]interface{}{
"bytes": n,
"dest": res.Address.String(),
}).Debug("send respond")
continue
}
found := false
t := reflect.TypeOf(data).Elem()
v := reflect.ValueOf(data).Elem()
for i := 0; i < t.NumField(); i++ {
f := t.Field(i)
fv := v.FieldByName(f.Name)
if f.Tag.Get("json") == get {
log.WithFields(map[string]interface{}{
"param": get,
"dest": src.String(),
}).Debug("found")
raw, err = json.Marshal(fv.Interface())
found = true
break
}
}
if !found {
log.WithFields(map[string]interface{}{
"param": get,
"dest": src.String(),
}).Debug("not found")
raw = []byte("ressource not found")
}
n, err = socket.WriteToUDP(raw, src)
if err != nil {
log.Errorf("WriteToUDP failed: %s", err)
continue
}
log.WithFields(map[string]interface{}{
"bytes": n,
"dest": src.String(),
}).Debug("send respond")
}
}

71
respond/daemon/main.go Normal file
View File

@ -0,0 +1,71 @@
package respondd
import (
"net"
"time"
"github.com/bdlm/log"
"github.com/FreifunkBremen/yanic/data"
"github.com/FreifunkBremen/yanic/lib/duration"
)
type Daemon struct {
MultiInstance bool `toml:"multi_instance"`
DataInterval duration.Duration `toml:"data_interval"`
Listen []struct {
Address string `toml:"address"`
Interface string `toml:"interface"`
Port int `toml:"port"`
} `toml:"listen"`
Data *data.ResponseData `toml:"data"`
dataByInterface map[string]*data.ResponseData
}
func (d *Daemon) Start() {
if d.Data == nil {
d.Data = &data.ResponseData{}
}
d.updateData()
go d.updateWorker()
for _, listen := range d.Listen {
var socket *net.UDPConn
var err error
addr := net.ParseIP(listen.Address)
if addr.IsMulticast() {
var iface *net.Interface
if listen.Interface != "" {
iface, err = net.InterfaceByName(listen.Interface)
if err != nil {
log.Fatal(err)
}
}
if socket, err = net.ListenMulticastUDP("udp6", iface, &net.UDPAddr{
IP: addr,
Port: listen.Port,
}); err != nil {
log.Fatal(err)
}
} else {
if socket, err = net.ListenUDP("udp6", &net.UDPAddr{
IP: addr,
Port: listen.Port,
}); err != nil {
log.Fatal(err)
}
}
go d.handler(socket)
}
log.Debug("all listener started")
}
func (d *Daemon) updateWorker() {
c := time.Tick(d.DataInterval.Duration)
for range c {
d.updateData()
}
}

View File

@ -1,7 +1,12 @@
package respond package respond
import ( import (
"bytes"
"compress/flate"
"encoding/json"
"net" "net"
"github.com/FreifunkBremen/yanic/data"
) )
const ( const (
@ -12,7 +17,7 @@ const (
port = 1001 port = 1001
// maximum receivable size // maximum receivable size
maxDataGramSize = 8192 MaxDataGramSize = 8192
) )
// Response of the respond request // Response of the respond request
@ -20,3 +25,35 @@ type Response struct {
Address *net.UDPAddr Address *net.UDPAddr
Raw []byte Raw []byte
} }
func NewRespone(res *data.ResponseData, addr *net.UDPAddr) (*Response, error) {
buf := new(bytes.Buffer)
flater, err := flate.NewWriter(buf, flate.BestCompression)
if err != nil {
return nil, err
}
defer flater.Close()
if err = json.NewEncoder(flater).Encode(res); err != nil {
return nil, err
}
err = flater.Flush()
return &Response{
Raw: buf.Bytes(),
Address: addr,
}, err
}
func (res *Response) parse() (*data.ResponseData, error) {
// Deflate
deflater := flate.NewReader(bytes.NewReader(res.Raw))
defer deflater.Close()
// Unmarshal
rdata := &data.ResponseData{}
err := json.NewDecoder(deflater).Decode(rdata)
return rdata, err
}