Commit 96c5226a authored by Jeroen Vreeken's avatar Jeroen Vreeken
Browse files

Working traces at different rates with low cpu load

Also fixes setting of interval and name by j2000 tracker. (was a bug in
select loop of trace_proxy)
parent 000a212b
......@@ -67,6 +67,23 @@ void trace_initialize(struct trace *trace, size_t buffer)
trace->fd = -1;
}
int trace_interval_set(struct trace *trace, struct timespec *interval, enum trace_interval_type type)
{
struct trace_pkt *pkt;
trace->interval_req.tv_sec = interval->tv_sec;
trace->interval_req.tv_nsec = interval->tv_nsec;
trace->interval_type_req = type;
trace->interval_set = true;
pkt = trace_packet_new();
trace_packet_interval_set(pkt, interval, type);
trace_packet_write(trace, pkt);
trace_packet_put(pkt);
return 0;
}
int trace_name_set(struct trace *trace, char *name)
{
struct trace_pkt *pkt;
......@@ -322,7 +339,8 @@ void trace_fd_set(struct trace *trace, fd_set *set, int *high)
return;
trace->recovertime = time(NULL);
log_send(LOG_T_DEBUG, "Attempt to recover trace");
log_send(LOG_T_DEBUG, "Attempt to recover trace %s",
trace->name);
trace->fd = tcp_connect(trace->host, trace->port);
if (trace->fd >= 0) {
struct trace_pkt *pkt;
......@@ -334,10 +352,12 @@ void trace_fd_set(struct trace *trace, fd_set *set, int *high)
trace_packet_name_set(pkt, trace->name);
trace_packet_write(trace, pkt);
trace_packet_put(pkt);
}
if (trace->interval_set) {
pkt = trace_packet_new();
trace_packet_interval_set(pkt,
&trace->interval, trace->interval_type);
&trace->interval_req,
trace->interval_type_req);
trace_packet_write(trace, pkt);
trace_packet_put(pkt);
}
......@@ -346,6 +366,7 @@ void trace_fd_set(struct trace *trace, fd_set *set, int *high)
log_send(LOG_T_DEBUG, "Failed to recover");
return;
}
log_send(LOG_T_DEBUG, "Reconnected trace");
}
FD_SET(trace->fd, set);
......@@ -488,7 +509,7 @@ int trace_handle(struct trace *trace, fd_set *set)
return 0;
}
return 1;
return 0;
}
enum trace_state trace_state_get(struct trace *trace)
......
......@@ -96,9 +96,12 @@ struct trace {
struct timespec timestamp;
struct timespec interval;
enum trace_interval_type interval_type;
struct timespec interval_req;
enum trace_interval_type interval_type_req;
char *name;
bool interval_set;
bool name_set;
bool list_received;
bool recover;
......@@ -136,6 +139,8 @@ struct trace *trace_alloc(void);
struct trace *trace_open(char *host, int port);
void trace_initialize(struct trace *trace, size_t buffer);
void trace_initialize_fd(struct trace *trace, int fd);
int trace_interval_set(struct trace *trace,
struct timespec *interval, enum trace_interval_type type);
int trace_name_set(struct trace *trace, char *name);
void trace_autorecover(struct trace *trace, bool value);
void trace_close(struct trace *trace);
......
......@@ -201,7 +201,7 @@ int trace_packet_write(struct trace *trace, struct trace_pkt *pkt)
unsigned char *seq;
r = write(trace->fd, pkt->data + start, i - start);
if (r < 0)
if (r != i - start)
goto err_write;
start = i + 1;
......@@ -210,12 +210,12 @@ int trace_packet_write(struct trace *trace, struct trace_pkt *pkt)
else
seq = (unsigned char[2]){ TRACE_ESC, TRACE_ESC_END };
r = write(trace->fd, seq, 2);
if (r < 0)
if (r != 2)
goto err_write;
}
if (i == pkt->len -1) {
r = write(trace->fd, pkt->data + start, i - start + 1);
if (r < 0)
if (r != i - start + 1)
goto err_write;
start = i + 1;
}
......
......@@ -226,10 +226,14 @@ void output(struct status_server *stat_srv)
int do_output = 1;
double pressure, temperature, alt_adj;
if (traceval_az->value.t.tv_sec != traceval_el->value.t.tv_sec) {
do_output = 0;
}
now = traceval_az->value.t.tv_sec;
if (now <= last)
if (now <= last) {
do_output = 0;
}
if (do_output == 0) {
return;
......@@ -320,7 +324,6 @@ int main(int argc, char **argv)
struct setpoint_command *sp_command_el = NULL;
struct command_server *cmd_srv;
struct status_server *stat_srv;
struct trace_pkt *pkt;
time_t lastt = 0;
struct timespec t_int;
......@@ -393,12 +396,8 @@ int main(int argc, char **argv)
t_int.tv_sec = 1;
t_int.tv_nsec = 0;
pkt = trace_packet_new();
trace_packet_interval_set(pkt,
&t_int, TRACE_INTERVAL_TYPE_INTERVAL);
trace_packet_write(traceval_az, pkt);
trace_packet_write(traceval_el, pkt);
trace_packet_put(pkt);
trace_interval_set(traceval_az, &t_int, TRACE_INTERVAL_TYPE_INTERVAL);
trace_interval_set(traceval_el, &t_int, TRACE_INTERVAL_TYPE_INTERVAL);
cmd_srv = command_server_create(cmd_port, 0, 100);
......
......@@ -33,6 +33,7 @@
#include <errno.h>
#define TIMEOUT 10
#define VALCLIENT_UPDATE_NSEC (100*1000*1000)
static char *tr_host;
static int tr_port;
......@@ -49,6 +50,135 @@ struct private {
};
static int nr_client_traces = 0;
static struct trace **client_traces;
static int nr_server_traces = 0;
static struct trace **server_traces;
static void common_denom(struct timespec *common, struct timespec *int1, struct timespec *int2)
{
uint64_t i1, i2, b;
uint64_t d1, d2;
uint64_t inew;
i1 = int1->tv_sec * 1000000000 + int1->tv_nsec;
i2 = int2->tv_sec * 1000000000 + int2->tv_nsec;
b = base_interval.tv_sec * 1000000000 + base_interval.tv_nsec;
d1 = i1 / b;
d2 = i2 / b;
if (d2 && d1 > d2 && !(d1 % d2)) {
inew = d2;
} else if (d1 && d2 > d1 && !(d2 % d1)) {
inew = d1;
} else {
inew = 1;
}
common->tv_sec = base_interval.tv_sec * inew;
common->tv_nsec = base_interval.tv_nsec * inew;
common->tv_sec += common->tv_nsec / 1000000000;
common->tv_nsec %= 1000000000;
}
static void server_interval_check(struct trace *server)
{
struct private *priv = server->private;
enum trace_interval_type type = 0;
struct timespec interval;
int i;
bool has_valclient;
bool fullrate = false;
interval.tv_sec = 0;
interval.tv_nsec = 0;
has_valclient = priv->lastclient + TIMEOUT > time(NULL);
if (has_valclient) {
interval.tv_sec = 0;
interval.tv_nsec = VALCLIENT_UPDATE_NSEC;
}
for (i = 0; i < priv->nr_clients; i++) {
switch (priv->clients[i]->interval_type) {
case TRACE_INTERVAL_TYPE_INTERVAL:
if (type) {
fullrate = true;
} else {
type = TRACE_INTERVAL_TYPE_INTERVAL;
}
/* Search for a common interval for all clients */
common_denom(&interval,
&interval, &priv->clients[i]->interval);
break;
case TRACE_INTERVAL_TYPE_CHANGED:
if (type) {
fullrate = true;
} else {
type = TRACE_INTERVAL_TYPE_CHANGED;
}
break;
default:
break;
}
}
if (!type && has_valclient) {
type = TRACE_INTERVAL_TYPE_INTERVAL;
interval.tv_sec = 0;
interval.tv_nsec = VALCLIENT_UPDATE_NSEC;
}
if (fullrate) {
type = TRACE_INTERVAL_TYPE_INTERVAL;
interval.tv_sec = base_interval.tv_sec;
interval.tv_nsec = base_interval.tv_nsec;
}
if ((server->interval_type != type) ||
(server->interval.tv_sec != interval.tv_sec) ||
(server->interval.tv_nsec != interval.tv_nsec)) {
struct trace_pkt *pkt;
printf("Changing server trace %s from %ld.%09ld, %d to %ld.%09ld, %d\n",
server->name,
server->interval.tv_sec, server->interval.tv_nsec,
server->interval_type,
interval.tv_sec, interval.tv_nsec, type);
pkt = trace_packet_new();
trace_packet_interval_set(pkt,
&interval, type);
trace_packet_write(server, pkt);
trace_packet_put(pkt);
}
}
static struct trace *server_find_client(struct trace *trace)
{
int i, j;
for (i = 0; i < nr_server_traces; i++) {
struct trace *server;
struct private *priv;
server = server_traces[i];
priv = server->private;
for (j = 0; j < priv->nr_clients; j++) {
if (priv->clients[j] == trace)
return server;
}
}
return NULL;
}
static void handler_interval(struct trace *trace,
struct timespec *interval, enum trace_interval_type type)
......@@ -64,12 +194,23 @@ static void client_interval(struct trace *trace,
struct timespec *interval, enum trace_interval_type type)
{
struct trace_pkt *pkt;
struct trace *server;
printf("Client %s requests %ld.%09ld type %d\n",
trace->name,
interval->tv_sec, interval->tv_nsec, type);
/* Send back interval to client (as ACK) */
pkt = trace_packet_new();
trace_packet_interval_set(pkt,
interval, type);
trace_packet_write(trace, pkt);
trace_packet_put(pkt);
server = server_find_client(trace);
if (!server)
return;
server_interval_check(server);
}
static struct trace_pkt *list_pkt;
......@@ -148,6 +289,7 @@ static void server_value(struct trace *trace,
trace_packet_write(priv->clients[i], pkt);
if (check_client(priv, i)) {
server_interval_check(trace);
i--;
continue;
}
......@@ -170,6 +312,7 @@ static void server_timestamp(struct trace *trace,
for (i = 0; i < priv->nr_clients; i++) {
trace_packet_write(priv->clients[i], pkt);
if (check_client(priv, i)) {
server_interval_check(trace);
i--;
continue;
}
......@@ -201,12 +344,6 @@ static void server_type(struct trace *trace, enum trace_value_type type)
trace_packet_put(pkt);
}
static int nr_client_traces = 0;
static struct trace **client_traces;
static int nr_server_traces = 0;
static struct trace **server_traces;
static void client_accept(int fd_listen)
{
int fd;
......@@ -331,17 +468,7 @@ static void client_add(struct trace *client)
priv->clients[priv->nr_clients] = client;
priv->nr_clients++;
if (priv->nr_clients == 1) {
struct trace_pkt *pkt;
trace->interval_type = TRACE_INTERVAL_TYPE_INTERVAL;
pkt = trace_packet_new();
trace_packet_interval_set(pkt,
&base_interval, trace->interval_type);
trace_packet_write(trace, pkt);
trace_packet_put(pkt);
}
server_interval_check(trace);
printf("Trace now has %d clients\n", priv->nr_clients);
}
......@@ -416,22 +543,17 @@ static void val_handle(struct valclient *valclient)
name += strlen(name) + 1) {
struct trace *trace;
struct private *priv;
bool int_check;
trace = server_find(name);
priv = trace->private;
if (priv->nr_clients == 0 &&
trace->interval_type != TRACE_INTERVAL_TYPE_CHANGED) {
struct trace_pkt *pkt;
trace->interval_type = TRACE_INTERVAL_TYPE_CHANGED;
pkt = trace_packet_new();
trace_packet_interval_set(pkt,
&base_interval, trace->interval_type);
trace_packet_write(trace, pkt);
trace_packet_put(pkt);
}
int_check = priv->lastclient + TIMEOUT < now;
priv->lastclient = now;
if (int_check);
server_interval_check(trace);
switch(trace->type) {
case TRACE_VALUE_TYPE_FLOAT:
......@@ -467,8 +589,6 @@ static void val_handle(struct valclient *valclient)
trace->value.value.s32);
break;
}
priv->lastclient = now;
}
sprintf(replyp, "\n");
write(valclient->fd, reply, strlen(reply));
......@@ -483,7 +603,7 @@ int main (int argc, char **argv)
int fd_listen, fd_listenval;
fd_set fd_rd;
struct trace *list;
int high, i;
int high, i, j;
fd_listen = tcp_listen(CONSOLE_TRACE_PORT, 0, 100);
if (fd_listen < 0) {
......@@ -553,7 +673,12 @@ int main (int argc, char **argv)
trace_fd_set(client_traces[i], &fd_rd, &high);
}
for (i = 0; i < nr_server_traces; i++) {
struct private *priv = server_traces[i]->private;
trace_fd_set(server_traces[i], &fd_rd, &high);
for (j = 0; j < priv->nr_clients; j++) {
trace_fd_set(priv->clients[j], &fd_rd, &high);
}
}
for (i = 0; i < nr_valclients; i++) {
FD_SET(valclients[i]->fd, &fd_rd);
......@@ -600,8 +725,13 @@ int main (int argc, char **argv)
}
}
for (i = 0; i < nr_server_traces; i++) {
struct private *priv = server_traces[i]->private;
if (trace_handle(server_traces[i], &fd_rd))
continue;
for (j = 0; j < priv->nr_clients; j++) {
trace_handle(priv->clients[j], &fd_rd);
}
if (trace_state_get(server_traces[i]) ==
TRACE_STATE_DISCONNECTED &&
......
......@@ -42,6 +42,17 @@ static int nr_traces = 0;
static struct controller_trace_name *trace_list = NULL;
long interval_nsec;
static bool timespec_mod(struct timespec *t, struct timespec *tdiv)
{
/* TODO: make a proper mod taking both fields into account */
if (tdiv->tv_sec) {
return t->tv_sec % tdiv->tv_sec || t->tv_nsec;
} else if (tdiv->tv_nsec) {
return t->tv_nsec % tdiv->tv_nsec;
}
return false;
}
struct controller_trace_name *controller_trace_get(int nr)
{
return &trace_list[nr];
......@@ -184,7 +195,12 @@ static void *controller_trace_handle(void *arg)
}
if (trace_hdl[i].type == TRACE_INTERVAL_TYPE_INTERVAL) {
sendval = true;
struct timespec t;
t.tv_sec = trace_hdl[i].ctrace.timestamp;
t.tv_nsec = trace_hdl[i].nsec;
sendval = !timespec_mod(&t, &trace_hdl[i].trace.interval);
} else if (trace_hdl[i].type == TRACE_INTERVAL_TYPE_CHANGED) {
uint32_t bufval = 0;
uint32_t last = 0;
......@@ -336,10 +352,20 @@ static void handler_interval(struct trace *trace,
struct timespec *interval, enum trace_interval_type type)
{
struct trace_hdl *hdl = trace->private;
struct trace_pkt *pkt;
log_send(LOG_T_DEBUG, "Setting interval type %d", type);
log_send(LOG_T_DEBUG, "Setting interval %ld.%09ld type %d",
interval->tv_sec, interval->tv_nsec, type);
hdl->type = type;
/* Send back interval to client (as ACK) */
pkt = trace_packet_new();
trace_packet_interval_set(pkt,
interval, type);
trace_packet_write(trace, pkt);
trace_packet_put(pkt);
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment