gluon-mesh-babel: respondd: reduce load

* open babel socket in its own event loop, allowing to monitor instead of dump
* keep state from monitor in json object, providing that to clients on
  network on demand
* do not parse babel routes, instead use netlink to obtain nexthop

babeld still does not support limiting the data on the socket by object. When
that is merged, the load can be further reduced by limiting to neighbour
changes with a small change.
This commit is contained in:
Christof Schulze 2019-08-11 22:07:37 +02:00
parent 055a2337aa
commit 1db3c8b40b
2 changed files with 409 additions and 81 deletions

View File

@ -24,7 +24,7 @@ CFLAGS_JSONC = $(shell pkg-config --cflags json-c)
LDFLAGS_JSONC = $(shell pkg-config --libs json-c)
respondd.so: respondd.c handle_neighbour.c
respondd.so: respondd.c
$(CC) $(CPPFLAGS) $(CFLAGS) $(LDFLAGS) $(LDLIBS) -shared $(LDFLAGS_JSONC) -o $@ $^ -lgluonutil -lblobmsg_json -lubox -lubus -luci
neighbours-babel: neighbours-babel.c handle_neighbour.c

View File

@ -29,6 +29,10 @@
#include <json-c/json.h>
#include <libgluonutil.h>
#include <uci.h>
#include <sys/select.h>
#include <linux/rtnetlink.h>
#include <pthread.h>
#include <inttypes.h>
#include <stdbool.h>
@ -61,6 +65,245 @@
#define UBUS_TIMEOUT 30000
static struct babelhelper_ctx bhelper_ctx = {};
static struct json_object *babeld_version = NULL;
static char *model = NULL;
static struct json_object *neighbours = NULL;
static pthread_rwlock_t neighbours_lock;
static pthread_t babelmonitor;
struct thread_info {
pthread_t thread_id;
int thread_num;
char *argv_string;
};
struct kernel_route {
struct in6_addr prefix;
struct in6_addr src_prefix;
struct in6_addr gw;
int plen;
int src_plen; /* no source prefix <=> src_plen == 0 */
int metric;
int proto;
unsigned int ifindex;
unsigned int table;
};
struct nlrtreq {
struct nlmsghdr nl;
struct rtmsg rt;
char buf[1024];
};
#define ROUTE_PROTO 158
#define KERNEL_INFINITY 9999
static const char *print_ip(const struct in6_addr *addr, char *buf, size_t buflen) {
return inet_ntop(AF_INET6, &(addr->s6_addr), buf, buflen);
}
static int rtnl_addattr(struct nlmsghdr *n, size_t maxlen, int type, void *data, int datalen) {
int len = RTA_LENGTH(datalen);
struct rtattr *rta;
if (NLMSG_ALIGN(n->nlmsg_len) + len > maxlen)
return -1;
rta = (struct rtattr *)(((char *)n) + NLMSG_ALIGN(n->nlmsg_len));
rta->rta_type = type;
rta->rta_len = len;
memcpy(RTA_DATA(rta), data, datalen);
n->nlmsg_len = NLMSG_ALIGN(n->nlmsg_len) + len;
return 0;
}
static void rtmgr_rtnl_talk(int fd, struct nlmsghdr *req) {
struct sockaddr_nl nladdr = {.nl_family = AF_NETLINK};
struct iovec iov = {req, 0};
struct msghdr msg = {&nladdr, sizeof(nladdr), &iov, 1, NULL, 0, 0};
iov.iov_len = req->nlmsg_len;
if (sendmsg(fd, &msg, 0) < 0) {
perror("sendmsg on rtmgr_rtnl_talk()");
}
}
static void get_route(int fd, const int ifindex, struct in6_addr *address, const int prefix_length) {
struct nlrtreq req = {
.nl = {
.nlmsg_type = RTM_GETROUTE,
.nlmsg_flags = NLM_F_REQUEST,
.nlmsg_len = NLMSG_LENGTH(sizeof(struct rtmsg)),
},
.rt = {
.rtm_family = AF_INET6,
.rtm_protocol = ROUTE_PROTO,
.rtm_scope = RT_SCOPE_UNIVERSE,
.rtm_type = RTN_UNICAST,
.rtm_dst_len = prefix_length
},
};
rtnl_addattr(&req.nl, sizeof(req), RTA_DST, (void *)address, sizeof(struct in6_addr));
if (ifindex > 0 )
rtnl_addattr(&req.nl, sizeof(req), RTA_OIF, (void *)&ifindex, sizeof(ifindex));
rtmgr_rtnl_talk(fd, (struct nlmsghdr *)&req);
}
static int parse_kernel_route_rta(struct rtmsg *rtm, int len, struct kernel_route *route) {
len -= NLMSG_ALIGN(sizeof(*rtm));
memset(route, 0, sizeof(struct kernel_route));
route->proto = rtm->rtm_protocol;
for (struct rtattr *rta = RTM_RTA(rtm); RTA_OK(rta, len); rta = RTA_NEXT(rta, len)) {
switch (rta->rta_type) {
case RTA_DST:
route->plen = rtm->rtm_dst_len;
memcpy(route->prefix.s6_addr, RTA_DATA(rta), 16);
break;
case RTA_SRC:
route->src_plen = rtm->rtm_src_len;
memcpy(route->src_prefix.s6_addr, RTA_DATA(rta), 16);
break;
case RTA_GATEWAY:
memcpy(route->gw.s6_addr, RTA_DATA(rta), 16);
break;
case RTA_OIF:
route->ifindex = *(int *)RTA_DATA(rta);
break;
case RTA_PRIORITY:
route->metric = *(int *)RTA_DATA(rta);
if (route->metric < 0 || route->metric > KERNEL_INFINITY)
route->metric = KERNEL_INFINITY;
break;
default:
break;
}
}
return 1;
}
static bool handle_kernel_routes(const struct nlmsghdr *nh, struct kernel_route *route) {
int len = nh->nlmsg_len;
struct rtmsg *rtm;
rtm = (struct rtmsg *)NLMSG_DATA(nh);
len -= NLMSG_LENGTH(0);
/* Ignore cached routes, advertised by some kernels (linux 3.x). */
if (rtm->rtm_flags & RTM_F_CLONED) return false;
if (parse_kernel_route_rta(rtm, len, route) < 0) return false;
return true;
}
static bool rtnl_handle_msg(const struct nlmsghdr *nh,
struct kernel_route *route) {
if (nh->nlmsg_type == RTM_NEWROUTE) {
handle_kernel_routes(nh, route);
if (!(route->plen == 0 && route->metric >= KERNEL_INFINITY))
return true;
}
return false;
}
static int get_default_route(struct json_object *ret) {
int nlfd = socket(AF_NETLINK, SOCK_RAW, NETLINK_ROUTE);
if (nlfd < 0) {
perror("can't open RTNL socket");
return -1;
}
struct sockaddr_nl snl = {
.nl_family = AF_NETLINK,
.nl_groups = RTMGRP_IPV6_ROUTE,
};
if (bind(nlfd, (struct sockaddr *)&snl, sizeof(snl)) < 0)
perror("can't bind RTNL socket");
struct in6_addr addr = {};
inet_pton(AF_INET6, "2000::/3", &addr);
get_route(nlfd, 0, &addr, 3);
struct nlmsghdr readbuffer[8192/sizeof(struct nlmsghdr)];
int count = recv(nlfd, readbuffer, sizeof(readbuffer), 0);
struct nlmsghdr *nh;
struct nlmsgerr *ne;
struct kernel_route route;
nh = (struct nlmsghdr *)readbuffer;
if (NLMSG_OK(nh, count)) {
switch (nh->nlmsg_type) {
case NLMSG_DONE:
break;
case NLMSG_ERROR:
ne = NLMSG_DATA(nh);
if (ne->error <= 0)
break;
/* Falls through. */
default:
if (rtnl_handle_msg(nh, &route) == true) {
char strbuf[64];
// TODO: for each route that is retrieved, create a json object
// containing src, via, interface Yanic currently requires this layout
// but it makes sense to adjust it. See https://github.com/FreifunkBremen/yanic/issues/170
if (ret) {
json_object_object_add(ret, "gateway_src", json_object_new_string(print_ip(&route.src_prefix, strbuf, 64)));
json_object_object_add(ret, "gateway_nexthop", json_object_new_string(print_ip(&route.gw, strbuf, 64)));
char ifname[IFNAMSIZ];
json_object_object_add(ret, "gateway_interface", json_object_new_string(if_indextoname(route.ifindex, ifname)));
}
}
break;
}
}
close(nlfd);
return 0;
}
static int babeld_connect() {
int fd = -1;
fd_set rfds;
FD_ZERO(&rfds);
printf("connecting to babeld\n");
do {
fd = babelhelper_babel_connect(BABEL_PORT);
if (fd < 0) {
fprintf(stderr, "Connecting to babel socket failed. Retrying.\n");
sleep(1);
}
} while (fd < 0);
FD_SET(fd, &rfds);
// receive and ignore babel header
while (true) {
if ( babelhelper_input_pump(&bhelper_ctx, fd, NULL, babelhelper_discard_response))
break;
if (select(fd + 1, &rfds, NULL, NULL, NULL) < 0) {
perror("select (babel header):");
};
}
int amount = 0;
while (amount != 8) {
amount = babelhelper_sendcommand(&bhelper_ctx, fd, "monitor\n");
}
return fd;
}
static void obtain_if_addr(const char *ifname, char *lladdr) {
struct ifaddrs *ifaddr, *ifa;
@ -96,8 +339,106 @@ cleanup:
freeifaddrs(ifaddr);
}
static void add_neighbour(char **data) {
struct json_object *neigh = json_object_new_object();
static char* get_line_from_run(const char* command) {
if (!neigh)
return;
if (data[RXCOST])
json_object_object_add(neigh, "rxcost", json_object_new_int(atoi(data[RXCOST])));
if (data[TXCOST])
json_object_object_add(neigh, "txcost", json_object_new_int(atoi(data[TXCOST])));
if (data[COST])
json_object_object_add(neigh, "cost", json_object_new_int(atoi(data[COST])));
if (data[REACH])
json_object_object_add(neigh, "reachability", json_object_new_double(strtod(data[REACH], NULL)));
struct json_object *nif = NULL;
if (data[IF] && !json_object_object_get_ex(neighbours, data[IF], &nif)) {
char str_ip[NI_MAXHOST] = {};
obtain_if_addr((const char *)data[IF], str_ip);
nif = json_object_new_object();
if (nif) {
json_object_object_add(nif, "ll-addr", json_object_new_string(str_ip));
json_object_object_add(nif, "protocol", json_object_new_string("babel"));
json_object_object_add(neighbours, data[IF], nif);
}
}
struct json_object *nif_neighbours = NULL;
if (!json_object_object_get_ex(nif, "neighbours", &nif_neighbours)) {
nif_neighbours = json_object_new_object();
if (nif_neighbours) {
json_object_object_add(nif, "neighbours", nif_neighbours);
json_object_object_add(nif_neighbours, data[ADDRESS], neigh);
} else {
json_object_put(neigh);
}
} else {
json_object_object_add(nif_neighbours, data[ADDRESS], neigh);
}
}
static void del_neighbour(char **data) {
struct json_object *nif = NULL;
if (json_object_object_get_ex(neighbours, data[IF], &nif)) {
struct json_object *neighbour = NULL;
if (json_object_object_get_ex(nif, "neighbours", &neighbour)) {
json_object_object_del(neighbour, data[ADDRESS]);
}
}
}
static bool handle_neighbour(char **data, void *obj) {
if (data[NEIGHBOUR]) {
pthread_rwlock_wrlock(&neighbours_lock);
if (strncmp(data[VERB], "add", 3) == 0) {
del_neighbour(data);
add_neighbour(data);
} else if (strncmp(data[VERB], "del", 3) == 0) {
del_neighbour(data);
} else if (strncmp(data[VERB], "change", 6) == 0) {
del_neighbour(data);
add_neighbour(data);
}
pthread_rwlock_unlock(&neighbours_lock);
}
return false;
}
static bool babel_lineprocessor(char **data, void *object) {
return handle_neighbour(data, object);
}
static void *babeld_monitor_thread_start(void *arg) {
while (true) {
int babelfd = babeld_connect();
fd_set rfds;
FD_ZERO(&rfds);
FD_SET(babelfd, &rfds);
while (true) {
if ( babelhelper_input_pump(&bhelper_ctx, babelfd, NULL, babel_lineprocessor) < 0 ) {
perror("input pump");
break;
}
if (select(babelfd + 1, &rfds, NULL, NULL, NULL) < 0) {
perror("select (babel data):");
break;
};
}
close(babelfd);
}
return NULL;
}
static char *get_line_from_run(const char *command) {
FILE *fp;
char *line = NULL;
size_t len = 0;
@ -122,9 +463,42 @@ static char* get_line_from_run(const char* command) {
return line;
}
__attribute__((constructor)) static void init(void) {
if (pthread_rwlock_init(&neighbours_lock, NULL) != 0) {
perror("rwlock init failed for neighbours");
exit(-2);
}
neighbours = json_object_new_object();
char *version = get_line_from_run("exec babeld -V 2>&1");
babeld_version = gluonutil_wrap_string(version);
free(version);
model = gluonutil_read_line("/tmp/sysinfo/model");
if (pthread_create(&babelmonitor, NULL, &babeld_monitor_thread_start, NULL) < 0 ) {
perror("error on pthread_create for babel monitor");
}
}
__attribute__((destructor)) static void deinit(void) {
pthread_cancel(babelmonitor);
int s = pthread_join(babelmonitor, NULL);
if (s)
perror("pthread_cancel");
pthread_rwlock_destroy(&neighbours_lock);
}
static struct json_object * get_addresses(void) {
char *primarymac = gluonutil_get_sysconfig("primary_mac");
char *address = malloc(INET6_ADDRSTRLEN+1);
if (!address) {
fprintf(stderr, "Could not allocate memory for ipv6 address, not adding addresses to json data.\n");
goto free;
}
char node_prefix_str[INET6_ADDRSTRLEN+1];
struct in6_addr node_prefix = {};
struct json_object *retval = json_object_new_array();
@ -140,14 +514,20 @@ static struct json_object * get_addresses(void) {
}
char *prefix_addresspart = strndup(node_prefix_str, INET6_ADDRSTRLEN);
if (! babelhelper_generateip_str(address, primarymac, prefix_addresspart) ) {
fprintf(stderr, "IP-address could not be generated by babelhelper");
if (!prefix_addresspart) {
fprintf(stderr, "could not allocate memory to hold node_prefix_str. Not adding address to json data\n");
goto free;
}
if (! babelhelper_generateip_str(address, primarymac, prefix_addresspart) ) {
fprintf(stderr, "IP-address could not be generated by babelhelper\n");
goto free;
}
free(prefix_addresspart);
json_object_array_add(retval, json_object_new_string(address));
free:
free(prefix_addresspart);
free(address);
free(primarymac);
@ -176,57 +556,6 @@ static void mesh_add_if(const char *ifname, struct json_object *wireless,
json_object_array_add(tunnel, address);
else
json_object_array_add(other, address);
}
static bool handle_neighbour(char **data, void *obj) {
if (data[NEIGHBOUR]) {
struct json_object *neigh = json_object_new_object();
if (data[RXCOST])
json_object_object_add(neigh, "rxcost", json_object_new_int(atoi(data[RXCOST])));
if (data[TXCOST])
json_object_object_add(neigh, "txcost", json_object_new_int(atoi(data[TXCOST])));
if (data[COST])
json_object_object_add(neigh, "cost", json_object_new_int(atoi(data[COST])));
if (data[REACH])
json_object_object_add(neigh, "reachability", json_object_new_double(strtod(data[REACH], NULL)));
struct json_object *nif = 0;
if (data[IF] && !json_object_object_get_ex(obj, data[IF], &nif)) {
char str_ip[NI_MAXHOST] = {};
obtain_if_addr( (const char*)data[IF], str_ip );
nif = json_object_new_object();
json_object_object_add(nif, "ll-addr", json_object_new_string(str_ip));
json_object_object_add(nif, "protocol", json_object_new_string("babel"));
json_object_object_add(obj, data[IF], nif);
}
struct json_object *neighborcollector = 0;
if (!json_object_object_get_ex(nif, "neighbours", &neighborcollector)) {
neighborcollector = json_object_new_object();
json_object_object_add(nif, "neighbours", neighborcollector);
}
json_object_object_add(neighborcollector, data[ADDRESS], neigh);
}
return true;
}
static struct json_object * get_babel_neighbours(void) {
struct json_object *neighbours;
neighbours = json_object_new_object();
if (!neighbours)
return NULL;
babelhelper_readbabeldata(&bhelper_ctx, "dump", (void*)neighbours, handle_neighbour);
return(neighbours);
}
static void blobmsg_handle_list(struct blob_attr *attr, int len, bool array, struct json_object *wireless, struct json_object *tunnel, struct json_object *other);
@ -305,7 +634,6 @@ static void receive_call_result_data(struct ubus_request *req, int type, struct
*((struct json_object**)(req->priv)) = ret;
}
static struct json_object * get_mesh_ifs() {
struct ubus_context *ubus_ctx;
struct json_object *ret = NULL;
@ -344,15 +672,13 @@ static struct json_object * get_mesh(void) {
return ret;
}
static struct json_object * get_babeld_version(void) {
char *version = get_line_from_run("exec babeld -V 2>&1");
struct json_object *ret = gluonutil_wrap_string(version);
free(version);
return ret;
return babeld_version ? json_object_get(babeld_version) : json_object_new_string("unknown");
}
static struct json_object * respondd_provider_nodeinfo(void) {
bhelper_ctx.debug=false;
bhelper_ctx.debug = false;
struct json_object *ret = json_object_new_object();
struct json_object *network = json_object_new_object();
@ -390,9 +716,7 @@ static struct json_object * read_number(const char *ifname, const char *stat) {
return ret;
}
static struct json_object * get_traffic(void) {
const char *ifname = "br-client";
static struct json_object * get_traffic_if(const char *ifname) {
struct json_object *ret = NULL;
struct json_object *rx = json_object_new_object();
struct json_object *tx = json_object_new_object();
@ -407,21 +731,17 @@ static struct json_object * get_traffic(void) {
ret = json_object_new_object();
json_object_object_add(ret, "rx", rx);
json_object_object_add(ret, "tx", tx);
return ret;
}
static bool handle_route_addgw_nexthop(char **data, void *arg) {
struct json_object *obj = (struct json_object*) arg;
if (data[PREFIX] && data[FROM] && data[VIA] && data[IF]) {
if ( (! strncmp(data[PREFIX], "::/0", 4) ) && ( ! strncmp(data[FROM], "::/0", 4) ) ) {
int gw_nexthoplen=strlen(data[VIA]) + strlen(data[IF])+2;
char gw_nexthop[gw_nexthoplen];
snprintf(gw_nexthop, gw_nexthoplen , "%s%%%s", data[VIA], data[IF]);
json_object_object_add(obj, "gateway_nexthop", json_object_new_string(gw_nexthop));
}
}
return true;
static struct json_object * get_traffic(void) {
struct json_object *ret = get_traffic_if("br-client"); // keep for compatibility
json_object_object_add(ret, "local-node", get_traffic_if("local-node"));
json_object_object_add(ret, "br-client", get_traffic_if("br-client"));
// TODO: add traffic stats for all mesh interfaces
return ret;
}
static int json_parse_get_clients(json_object * object) {
@ -507,18 +827,26 @@ static struct json_object * respondd_provider_statistics(void) {
json_object_object_add(ret, "clients", get_clients());
json_object_object_add(ret, "traffic", get_traffic());
babelhelper_readbabeldata(&bhelper_ctx, "dump", (void*)ret, handle_route_addgw_nexthop );
get_default_route(ret);
return ret;
}
static struct json_object * respondd_provider_neighbours(void) {
struct json_object *ret = json_object_new_object();
struct json_object *neighbours_copy = NULL;
struct json_object *babel = get_babel_neighbours();
if (babel)
json_object_object_add(ret, "babel", babel);
if (!ret)
return NULL;
if (neighbours) {
pthread_rwlock_rdlock(&neighbours_lock);
int deepcopy_state = json_object_deep_copy(neighbours, neighbours_copy, NULL);
pthread_rwlock_unlock(&neighbours_lock);
if (!deepcopy_state)
json_object_object_add(ret, "babel", neighbours_copy);
}
return ret;
}