diff --git a/.gitignore b/.gitignore index 894526c..d25e864 100644 --- a/.gitignore +++ b/.gitignore @@ -24,4 +24,5 @@ _testmain.go *.prof webroot /config.toml +/config-respondd.toml /vendor diff --git a/cmd/config.go b/cmd/config.go index 21ab5dd..17428ea 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -1,54 +1,31 @@ package cmd import ( - "fmt" "io/ioutil" - "os" "github.com/naoina/toml" - "github.com/FreifunkBremen/yanic/database" "github.com/FreifunkBremen/yanic/respond" "github.com/FreifunkBremen/yanic/runtime" - "github.com/FreifunkBremen/yanic/webserver" ) -// Config represents the whole configuration -type Config struct { - Respondd respond.Config - Webserver webserver.Config - Nodes runtime.NodesConfig - Database database.Config -} - var ( configPath string collector *respond.Collector nodes *runtime.Nodes ) -func loadConfig() *Config { - config, err := ReadConfigFile(configPath) - if err != nil { - fmt.Fprintln(os.Stderr, "unable to load config file:", err) - os.Exit(2) - } - return config -} - // ReadConfigFile reads a config model from path of a yml file -func ReadConfigFile(path string) (config *Config, err error) { - config = &Config{} - +func ReadConfigFile(path string, config interface{}) error { file, err := ioutil.ReadFile(path) if err != nil { - return nil, err + return err } err = toml.Unmarshal(file, config) if err != nil { - return nil, err + return err } - return + return nil } diff --git a/cmd/config_test.go b/cmd/config_test.go index ce6cd61..df296ce 100644 --- a/cmd/config_test.go +++ b/cmd/config_test.go @@ -10,7 +10,9 @@ import ( func TestReadConfig(t *testing.T) { assert := assert.New(t) - config, err := ReadConfigFile("../config_example.toml") + config := &ServeConfig{} + err := ReadConfigFile("../config_example.toml", config) + assert.NoError(err) assert.NotNil(config) @@ -40,11 +42,11 @@ func TestReadConfig(t *testing.T) { }, }, meshviewer) - _, err = ReadConfigFile("testdata/config_invalid.toml") + err = ReadConfigFile("testdata/config_invalid.toml", config) assert.Error(err, "not unmarshalable") assert.Contains(err.Error(), "invalid TOML syntax") - _, err = ReadConfigFile("testdata/adsa.toml") + err = ReadConfigFile("testdata/adsa.toml", config) assert.Error(err, "not found able") assert.Contains(err.Error(), "no such file or directory") } diff --git a/cmd/import.go b/cmd/import.go index 89ff203..ef5edaf 100644 --- a/cmd/import.go +++ b/cmd/import.go @@ -19,7 +19,10 @@ var importCmd = &cobra.Command{ path := args[0] site := args[1] domain := args[2] - config := loadConfig() + config := &ServeConfig{} + if err := ReadConfigFile(configPath, config); err != nil { + log.Panicf("unable to load config file: %s", err) + } err := allDatabase.Start(config.Database) if err != nil { diff --git a/cmd/respondd.go b/cmd/respondd.go new file mode 100644 index 0000000..9141ed4 --- /dev/null +++ b/cmd/respondd.go @@ -0,0 +1,40 @@ +package cmd + +import ( + "os" + "os/signal" + "syscall" + + "github.com/bdlm/log" + "github.com/spf13/cobra" + + "github.com/FreifunkBremen/yanic/respond/daemon" +) + +// serveCmd represents the serve command +var responddCMD = &cobra.Command{ + Use: "respondd", + Short: "Runs a respond daemon", + Example: "yanic respondd --config /etc/respondd.toml", + Run: func(cmd *cobra.Command, args []string) { + daemon := &respondd.Daemon{} + if err := ReadConfigFile(configPath, daemon); err != nil { + log.Panicf("unable to load config file: %s", err) + } + + go daemon.Start() + + log.Info("respondd daemon started") + // Wait for INT/TERM + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + sig := <-sigs + log.Infof("received %s", sig) + + }, +} + +func init() { + RootCmd.AddCommand(responddCMD) + responddCMD.Flags().StringVarP(&configPath, "config", "c", "config-respondd.toml", "Path to configuration file") +} diff --git a/cmd/serve.go b/cmd/serve.go index 2b1f648..0096e74 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -9,6 +9,7 @@ import ( "github.com/bdlm/log" "github.com/spf13/cobra" + "github.com/FreifunkBremen/yanic/database" allDatabase "github.com/FreifunkBremen/yanic/database/all" allOutput "github.com/FreifunkBremen/yanic/output/all" "github.com/FreifunkBremen/yanic/respond" @@ -16,16 +17,26 @@ import ( "github.com/FreifunkBremen/yanic/webserver" ) +// Config represents the whole configuration +type ServeConfig struct { + Respondd respond.Config + Webserver webserver.Config + Nodes runtime.NodesConfig + Database database.Config +} + // serveCmd represents the serve command var serveCmd = &cobra.Command{ Use: "serve", Short: "Runs the yanic server", Example: "yanic serve --config /etc/yanic.toml", Run: func(cmd *cobra.Command, args []string) { - config := loadConfig() + config := &ServeConfig{} + if err := ReadConfigFile(configPath, config); err != nil { + log.Panicf("unable to load config file: %s", err) + } - err := allDatabase.Start(config.Database) - if err != nil { + if err := allDatabase.Start(config.Database); err != nil { log.Panicf("could not connect to database: %s", err) } defer allDatabase.Close() @@ -33,8 +44,7 @@ var serveCmd = &cobra.Command{ nodes = runtime.NewNodes(&config.Nodes) nodes.Start() - err = allOutput.Start(nodes, config.Nodes) - if err != nil { + if err := allOutput.Start(nodes, config.Nodes); err != nil { log.Panicf("error on init outputs: %s", err) } defer allOutput.Close() diff --git a/config-respondd_example.toml b/config-respondd_example.toml new file mode 100644 index 0000000..32a3314 --- /dev/null +++ b/config-respondd_example.toml @@ -0,0 +1,15 @@ +# how ofter the cache respond of a respondd request is calculated +data_interval = "3m" + +# if set true, respond will contain data from batman interface +multi_instance = false + +[[listen]] +address = "ff02::2:1001" +interface = "" +port = 1001 + +# manuelle data for respond +[data.nodeinfo.location] +latitude = 53.112446246 +longitude = 8.734087944 diff --git a/contrib/init/linux-systemd/respondd.service b/contrib/init/linux-systemd/respondd.service new file mode 100644 index 0000000..f7f9bee --- /dev/null +++ b/contrib/init/linux-systemd/respondd.service @@ -0,0 +1,13 @@ +[Unit] +Description=yanic + +[Service] +Type=simple +User=yanic +ExecStart=/opt/go/bin/yanic respondd --config /etc/respondd.conf +Restart=always +RestartSec=5s +Environment=PATH=/usr/bin:/usr/local/bin + +[Install] +WantedBy=multi-user.target diff --git a/data/response.go b/data/response.go index b52874d..af55cc2 100644 --- a/data/response.go +++ b/data/response.go @@ -2,7 +2,7 @@ package data // ResponseData struct type ResponseData struct { - Neighbours *Neighbours `json:"neighbours"` - Nodeinfo *Nodeinfo `json:"nodeinfo"` - Statistics *Statistics `json:"statistics"` + Nodeinfo *Nodeinfo `json:"nodeinfo" toml:"nodeinfo"` + Statistics *Statistics `json:"statistics" toml:"statistics"` + Neighbours *Neighbours `json:"neighbours" toml:"neighbours"` } diff --git a/respond/collector.go b/respond/collector.go index 929d58a..e2da2f3 100644 --- a/respond/collector.go +++ b/respond/collector.go @@ -1,9 +1,6 @@ package respond import ( - "bytes" - "compress/flate" - "encoding/json" "fmt" "net" "time" @@ -86,7 +83,7 @@ func (coll *Collector) listenUDP(iface InterfaceConfig) { if err != nil { log.Panic(err) } - conn.SetReadBuffer(maxDataGramSize) + conn.SetReadBuffer(MaxDataGramSize) coll.connections = append(coll.connections, multicastConn{ Conn: conn, @@ -245,18 +242,6 @@ func (coll *Collector) parser() { } } -func (res *Response) parse() (*data.ResponseData, error) { - // Deflate - deflater := flate.NewReader(bytes.NewReader(res.Raw)) - defer deflater.Close() - - // Unmarshal - rdata := &data.ResponseData{} - err := json.NewDecoder(deflater).Decode(rdata) - - return rdata, err -} - func (coll *Collector) saveResponse(addr *net.UDPAddr, res *data.ResponseData) { // Search for NodeID var nodeID string @@ -308,7 +293,7 @@ func (coll *Collector) saveResponse(addr *net.UDPAddr, res *data.ResponseData) { } func (coll *Collector) receiver(conn *net.UDPConn) { - buf := make([]byte, maxDataGramSize) + buf := make([]byte, MaxDataGramSize) for { n, src, err := conn.ReadFromUDP(buf) diff --git a/respond/daemon/data.go b/respond/daemon/data.go new file mode 100644 index 0000000..e04f465 --- /dev/null +++ b/respond/daemon/data.go @@ -0,0 +1,65 @@ +package respondd + +import ( + "io/ioutil" + "os" + "strings" + + "github.com/FreifunkBremen/yanic/data" +) + +func trim(s string) string { + return strings.TrimSpace(strings.Trim(s, "\n")) +} + +func (d *Daemon) updateData() { + nodeID := "" + // Nodeinfo + if d.Data.Nodeinfo == nil { + d.Data.Nodeinfo = &data.Nodeinfo{} + } else { + nodeID = d.Data.Nodeinfo.NodeID + } + if d.Data.Nodeinfo.Hostname == "" { + d.Data.Nodeinfo.Hostname, _ = os.Hostname() + } + + // Statistics + if d.Data.Statistics == nil { + d.Data.Statistics = &data.Statistics{} + } else if nodeID == "" { + nodeID = d.Data.Statistics.NodeID + } + + // Neighbours + if d.Data.Neighbours == nil { + d.Data.Neighbours = &data.Neighbours{} + } else if nodeID == "" { + nodeID = d.Data.Neighbours.NodeID + } + + if nodeID == "" && !d.MultiInstance { + if v, err := ioutil.ReadFile("/etc/machine-id"); err == nil { + nodeID = trim(string(v))[:12] + } + } + d.Data.Nodeinfo.NodeID = nodeID + d.Data.Statistics.NodeID = nodeID + d.Data.Neighbours.NodeID = nodeID + + for _, data := range d.dataByInterface { + data.Nodeinfo = d.Data.Nodeinfo + } +} + +func (d *Daemon) getData(iface string) *data.ResponseData { + if !d.MultiInstance { + return d.Data + } + if data, ok := d.dataByInterface[iface]; ok { + return data + } + d.dataByInterface[iface] = &data.ResponseData{} + d.updateData() + return d.dataByInterface[iface] +} diff --git a/respond/daemon/handler.go b/respond/daemon/handler.go new file mode 100644 index 0000000..e7e73fa --- /dev/null +++ b/respond/daemon/handler.go @@ -0,0 +1,91 @@ +package respondd + +import ( + "encoding/json" + "net" + "reflect" + + "github.com/bdlm/log" + + "github.com/FreifunkBremen/yanic/respond" +) + +func (d *Daemon) handler(socket *net.UDPConn) { + socket.SetReadBuffer(respond.MaxDataGramSize) + + // Loop forever reading from the socket + for { + buf := make([]byte, respond.MaxDataGramSize) + n, src, err := socket.ReadFromUDP(buf) + if err != nil { + log.Errorf("ReadFromUDP failed: %s", err) + } + raw := make([]byte, n) + copy(raw, buf) + + get := string(raw) + + data := d.getData(src.Zone) + + log.WithFields(map[string]interface{}{ + "bytes": n, + "data": get, + "src": src.String(), + }).Debug("recieve request") + + if get[:3] == "GET" { + res, err := respond.NewRespone(data, src) + if err != nil { + log.Errorf("Decode failed: %s", err) + continue + } + n, err = socket.WriteToUDP(res.Raw, res.Address) + if err != nil { + log.Errorf("WriteToUDP failed: %s", err) + continue + } + log.WithFields(map[string]interface{}{ + "bytes": n, + "dest": res.Address.String(), + }).Debug("send respond") + continue + } + + found := false + + t := reflect.TypeOf(data).Elem() + v := reflect.ValueOf(data).Elem() + + for i := 0; i < t.NumField(); i++ { + f := t.Field(i) + fv := v.FieldByName(f.Name) + if f.Tag.Get("json") == get { + log.WithFields(map[string]interface{}{ + "param": get, + "dest": src.String(), + }).Debug("found") + raw, err = json.Marshal(fv.Interface()) + found = true + break + } + } + + if !found { + log.WithFields(map[string]interface{}{ + "param": get, + "dest": src.String(), + }).Debug("not found") + raw = []byte("ressource not found") + } + + n, err = socket.WriteToUDP(raw, src) + if err != nil { + log.Errorf("WriteToUDP failed: %s", err) + continue + } + log.WithFields(map[string]interface{}{ + "bytes": n, + "dest": src.String(), + }).Debug("send respond") + } +} diff --git a/respond/daemon/main.go b/respond/daemon/main.go new file mode 100644 index 0000000..2acd29b --- /dev/null +++ b/respond/daemon/main.go @@ -0,0 +1,71 @@ +package respondd + +import ( + "net" + "time" + + "github.com/bdlm/log" + + "github.com/FreifunkBremen/yanic/data" + "github.com/FreifunkBremen/yanic/lib/duration" +) + +type Daemon struct { + MultiInstance bool `toml:"multi_instance"` + DataInterval duration.Duration `toml:"data_interval"` + Listen []struct { + Address string `toml:"address"` + Interface string `toml:"interface"` + Port int `toml:"port"` + } `toml:"listen"` + Data *data.ResponseData `toml:"data"` + dataByInterface map[string]*data.ResponseData +} + +func (d *Daemon) Start() { + if d.Data == nil { + d.Data = &data.ResponseData{} + } + + d.updateData() + go d.updateWorker() + + for _, listen := range d.Listen { + var socket *net.UDPConn + var err error + addr := net.ParseIP(listen.Address) + + if addr.IsMulticast() { + var iface *net.Interface + if listen.Interface != "" { + iface, err = net.InterfaceByName(listen.Interface) + if err != nil { + log.Fatal(err) + } + } + if socket, err = net.ListenMulticastUDP("udp6", iface, &net.UDPAddr{ + IP: addr, + Port: listen.Port, + }); err != nil { + log.Fatal(err) + } + } else { + if socket, err = net.ListenUDP("udp6", &net.UDPAddr{ + IP: addr, + Port: listen.Port, + }); err != nil { + log.Fatal(err) + } + } + go d.handler(socket) + } + log.Debug("all listener started") +} + +func (d *Daemon) updateWorker() { + c := time.Tick(d.DataInterval.Duration) + + for range c { + d.updateData() + } +} diff --git a/respond/respond.go b/respond/respond.go index 9564b8e..2728fbe 100644 --- a/respond/respond.go +++ b/respond/respond.go @@ -1,7 +1,12 @@ package respond import ( + "bytes" + "compress/flate" + "encoding/json" "net" + + "github.com/FreifunkBremen/yanic/data" ) const ( @@ -12,7 +17,7 @@ const ( port = 1001 // maximum receivable size - maxDataGramSize = 8192 + MaxDataGramSize = 8192 ) // Response of the respond request @@ -20,3 +25,35 @@ type Response struct { Address *net.UDPAddr Raw []byte } + +func NewRespone(res *data.ResponseData, addr *net.UDPAddr) (*Response, error) { + buf := new(bytes.Buffer) + flater, err := flate.NewWriter(buf, flate.BestCompression) + if err != nil { + return nil, err + } + defer flater.Close() + + if err = json.NewEncoder(flater).Encode(res); err != nil { + return nil, err + } + + err = flater.Flush() + + return &Response{ + Raw: buf.Bytes(), + Address: addr, + }, err +} + +func (res *Response) parse() (*data.ResponseData, error) { + // Deflate + deflater := flate.NewReader(bytes.NewReader(res.Raw)) + defer deflater.Close() + + // Unmarshal + rdata := &data.ResponseData{} + err := json.NewDecoder(deflater).Decode(rdata) + + return rdata, err +}