Commit 000a212b authored by Jeroen Vreeken's avatar Jeroen Vreeken
Browse files

trace_proxy now supports INTERVAL_TYPE_CHANGED clients and clients with a...

trace_proxy now supports INTERVAL_TYPE_CHANGED clients and clients with a bigger interval than the source.
parent 8538dc65
......@@ -3,7 +3,7 @@ CFLAGS+= -Wall -O3 -I../utils/ -I.. -I../include
LDFLAGS+= -L../lib
LOBJS= trace.lo
SRCS= trace_dump.c trace_dumpdiff.c trace_fft.c trace_tcp.c
SRCS= trace_dump.c trace_dumpdiff.c trace_fft.c trace_tcp.c trace.c trace_tcp.c
all: libs bins
......
......@@ -334,6 +334,12 @@ void trace_fd_set(struct trace *trace, fd_set *set, int *high)
trace_packet_name_set(pkt, trace->name);
trace_packet_write(trace, pkt);
trace_packet_put(pkt);
pkt = trace_packet_new();
trace_packet_interval_set(pkt,
&trace->interval, trace->interval_type);
trace_packet_write(trace, pkt);
trace_packet_put(pkt);
}
}
if (trace->fd < 0) {
......@@ -367,6 +373,7 @@ int trace_handle(struct trace *trace, fd_set *set)
trace->interval.tv_sec = be64toh(pinterval->sec);
trace->interval.tv_nsec = be32toh(pinterval->nsec);
type = pinterval->type;
trace->interval_type = type;
if (trace->handler_interval) {
trace->handler_interval(trace, &trace->interval, type);
......@@ -468,6 +475,7 @@ int trace_handle(struct trace *trace, fd_set *set)
} else {
timespec_add(&trace->value.t, &trace->interval);
}
trace->value_set = true;
if (trace->handler_value) {
trace->handler_value(trace, &trace->value);
}
......
......@@ -51,9 +51,9 @@ enum trace_value_type {
char *enum_trace_value_type2str(enum trace_value_type type);
enum trace_interval_type {
TRACE_INTERVAL_TYPE_INTERVAL,
TRACE_INTERVAL_TYPE_CHANGED,
TRACE_INTERVAL_TYPE_CHANGED_INTERVAL,
TRACE_INTERVAL_TYPE_INTERVAL = 1,
TRACE_INTERVAL_TYPE_CHANGED = 2,
TRACE_INTERVAL_TYPE_CHANGED_INTERVAL = 3,
};
char *enum_trace_interval_type2str(enum trace_interval_type type);
......@@ -95,13 +95,15 @@ struct trace {
enum trace_value_type type;
struct timespec timestamp;
struct timespec interval;
enum trace_interval_type interval_type;
char *name;
bool name_set;
bool list_received;
bool recover;
bool type_set;
bool value_set;
time_t recovertime;
struct trace_value value;
......
......@@ -130,7 +130,7 @@ int main(int argc, char **argv)
t_int.tv_sec *= interval;
t_int.tv_nsec *= interval;
if (t_int.tv_nsec > 1000000000) {
if (t_int.tv_nsec >= 1000000000) {
t_int.tv_sec++;
t_int.tv_nsec -= 1000000000;
}
......
......@@ -38,19 +38,10 @@ static void handler_interval(struct trace *trace,
printf("interval: %ld.%09ld\n", interval->tv_sec, interval->tv_nsec);
}
int cnt =0;
static void handler_value(struct trace *trace,
struct trace_value *value)
{
static struct trace_value prev;
if (cnt || memcmp(&prev.value, &value->value, sizeof(prev.value))) {
return;
}
prev.value = value->value;
cnt = 1;
switch (trace->type) {
case TRACE_VALUE_TYPE_FLOAT:
printf("%ld.%09ld %e\n",
......@@ -91,11 +82,10 @@ int main(int argc, char **argv)
{
struct trace *trace;
struct trace_pkt *pkt;
int interval = 1;
if (argc < 4) {
printf("Usage:\n\n");
printf("%s [host] [port] [trace] <interval>\n", argv[0]);
printf("%s [host] [port] [trace]\n", argv[0]);
return 0;
}
......@@ -103,10 +93,6 @@ int main(int argc, char **argv)
trace = trace_open(argv[1], atoi(argv[2]));
if (argc >= 5) {
interval = atoi(argv[4]);
}
trace->handler_interval = handler_interval;
trace->handler_value = handler_value;
......@@ -125,21 +111,11 @@ int main(int argc, char **argv)
printf("Connection ready for tracing\n");
if (interval != 1) {
t_int.tv_sec *= interval;
t_int.tv_nsec *= interval;
if (t_int.tv_nsec > 1000000000) {
t_int.tv_sec++;
t_int.tv_nsec -= 1000000000;
}
pkt = trace_packet_new();
trace_packet_interval_set(pkt,
&t_int, TRACE_INTERVAL_TYPE_INTERVAL);
trace_packet_write(trace, pkt);
trace_packet_put(pkt);
}
pkt = trace_packet_new();
trace_packet_interval_set(pkt,
&t_int, TRACE_INTERVAL_TYPE_CHANGED);
trace_packet_write(trace, pkt);
trace_packet_put(pkt);
pkt = trace_packet_new();
trace_packet_name_set(pkt, tracename);
......
......@@ -193,6 +193,9 @@ int trace_packet_write(struct trace *trace, struct trace_pkt *pkt)
int start = 0;
ssize_t r;
if (!pkt->len)
return 0;
while (start < pkt->len) for (i = start; i < pkt->len; i++) {
if (pkt->data[i] == TRACE_ESC || pkt->data[i] == TRACE_END) {
unsigned char *seq;
......
......@@ -23,10 +23,12 @@
#include <stdio.h>
#include <stdlib.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <netdb.h>
#include <errno.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <sys/types.h>
#include "tcp_connect.h"
......@@ -82,6 +84,20 @@ int tcp_connect(char *host, int port)
if (sock >= 0) {
ioctl(sock, FIONBIO, &(int){0});
setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
&(int){1}, sizeof(int));
/* number of probes which may fail */
setsockopt(sock, SOL_TCP, TCP_KEEPCNT,
&(int){5}, sizeof(int));
/* Idle time before starting with probes */
setsockopt(sock, SOL_TCP, TCP_KEEPIDLE,
&(int){10}, sizeof(int));
/* interval between probes */
setsockopt(sock, SOL_TCP, TCP_KEEPINTVL,
&(int){2}, sizeof(int));
break;
}
}
......
......@@ -226,29 +226,12 @@ void output(struct status_server *stat_srv)
int do_output = 1;
double pressure, temperature, alt_adj;
if (traceval_az->value.t.tv_sec != traceval_el->value.t.tv_sec)
do_output = 0;
now = traceval_az->value.t.tv_sec;
if (now <= last)
do_output = 0;
if (do_output == 0) {
time_t localtime;
localtime = time(NULL);
if (localtime > last + 5 && last) {
trace_close(traceval_az);
trace_autorecover(traceval_az, true);
last = 0;
}
if (localtime > last + 5 && last) {
trace_close(traceval_el);
trace_autorecover(traceval_el, true);
last = 0;
}
return;
} else {
last = now;
......@@ -324,12 +307,10 @@ void output(struct status_server *stat_srv)
status_server_send(stat_srv, statline);
}
static void handler_value_az(struct trace *trace, struct trace_value *value)
{
}
static void handler_value_el(struct trace *trace, struct trace_value *value)
static void handler_interval(struct trace *trace,
struct timespec *interval, enum trace_interval_type type)
{
printf("interval: %ld.%09ld\n", interval->tv_sec, interval->tv_nsec);
}
......@@ -339,7 +320,9 @@ int main(int argc, char **argv)
struct setpoint_command *sp_command_el = NULL;
struct command_server *cmd_srv;
struct status_server *stat_srv;
struct trace_pkt *pkt;
time_t lastt = 0;
struct timespec t_int;
dt_model_init();
......@@ -405,8 +388,18 @@ int main(int argc, char **argv)
trace_name_set(traceval_el, el_trace_name);
trace_autorecover(traceval_az, true);
trace_autorecover(traceval_el, true);
traceval_az->handler_value = handler_value_az;
traceval_el->handler_value = handler_value_el;
traceval_az->handler_interval = handler_interval;
traceval_el->handler_interval = handler_interval;
t_int.tv_sec = 1;
t_int.tv_nsec = 0;
pkt = trace_packet_new();
trace_packet_interval_set(pkt,
&t_int, TRACE_INTERVAL_TYPE_INTERVAL);
trace_packet_write(traceval_az, pkt);
trace_packet_write(traceval_el, pkt);
trace_packet_put(pkt);
cmd_srv = command_server_create(cmd_port, 0, 100);
if (!cmd_srv) {
......
......@@ -45,6 +45,7 @@ struct private {
struct trace **clients;
time_t lastclient;
struct trace_value lastvalue;
};
......@@ -59,6 +60,18 @@ static void handler_interval(struct trace *trace,
interval, TRACE_INTERVAL_TYPE_INTERVAL);
}
static void client_interval(struct trace *trace,
struct timespec *interval, enum trace_interval_type type)
{
struct trace_pkt *pkt;
pkt = trace_packet_new();
trace_packet_interval_set(pkt,
interval, type);
trace_packet_write(trace, pkt);
trace_packet_put(pkt);
}
static struct trace_pkt *list_pkt;
static void handler_list_entry(struct trace *trace,
......@@ -69,6 +82,31 @@ static void handler_list_entry(struct trace *trace,
trace_autorecover(trace, false);
}
static bool check_client(struct private *priv, int nr)
{
if (trace_state_get(priv->clients[nr]) == TRACE_STATE_DISCONNECTED) {
trace_free(priv->clients[nr]);
memmove(&priv->clients[nr], &priv->clients[nr+1],
(priv->nr_clients-nr-1)*sizeof(struct trace *));
priv->nr_clients--;
printf("Trace lost a client, %d left\n",
priv->nr_clients);
return true;
}
return false;
}
static bool timespec_mod(struct timespec *t, struct timespec *tdiv)
{
/* TODO: make a proper mod taking both fields into account */
if (tdiv->tv_sec) {
return t->tv_sec % tdiv->tv_sec || t->tv_nsec;
} else if (tdiv->tv_nsec) {
return t->tv_nsec % tdiv->tv_nsec;
}
return false;
}
static void server_value(struct trace *trace,
struct trace_value *value)
......@@ -78,18 +116,38 @@ static void server_value(struct trace *trace,
int i;
if (priv->nr_clients) {
bool changed;
if (priv->lastvalue.value.u32 != value->value.u32)
changed = true;
else
changed = false;
priv->lastvalue.value.u32 = value->value.u32;
pkt = trace_packet_new();
trace_packet_value_add(pkt, value, trace->type);
for (i = 0; i < priv->nr_clients; i++) {
bool send = false;
switch (priv->clients[i]->interval_type) {
case TRACE_INTERVAL_TYPE_CHANGED:
send = changed;
break;
case TRACE_INTERVAL_TYPE_INTERVAL:
send =!timespec_mod(&value->t,
&priv->clients[i]->interval);
break;
default:
send = true;
break;
}
if (!send)
continue;
trace_packet_write(priv->clients[i], pkt);
if (trace_state_get(priv->clients[i]) == TRACE_STATE_DISCONNECTED) {
trace_free(priv->clients[i]);
memmove(&priv->clients[i], &priv->clients[i+1],
(priv->nr_clients-i-1)*sizeof(struct trace *));
priv->nr_clients--;
printf("Trace lost a client, %d left\n",
priv->nr_clients);
if (check_client(priv, i)) {
i--;
continue;
}
......@@ -111,6 +169,10 @@ static void server_timestamp(struct trace *trace,
for (i = 0; i < priv->nr_clients; i++) {
trace_packet_write(priv->clients[i], pkt);
if (check_client(priv, i)) {
i--;
continue;
}
}
trace_packet_put(pkt);
}
......@@ -166,6 +228,7 @@ static void client_accept(int fd_listen)
trace_packet_write(client, interval_pkt);
trace_packet_write(client, list_pkt);
client->handler_interval = client_interval;
client_traces = realloc(client_traces,
sizeof(struct trace *) * (nr_client_traces + 1));
......@@ -203,9 +266,19 @@ static struct trace *server_find(char *name)
}
if (i == nr_server_traces) {
struct trace_pkt *pkt;
trace = trace_open(tr_host, tr_port);
trace_name_set(trace, name);
trace->interval_type = TRACE_INTERVAL_TYPE_INTERVAL;
pkt = trace_packet_new();
trace_packet_interval_set(pkt,
&base_interval, trace->interval_type);
trace_packet_write(trace, pkt);
trace_packet_put(pkt);
server_traces = realloc(server_traces,
sizeof(struct trace *) * (nr_server_traces + 1));
......@@ -244,13 +317,32 @@ static void client_add(struct trace *client)
trace_packet_write(client, pkt);
trace_packet_put(pkt);
}
if (trace->value_set) {
struct trace_pkt *pkt;
pkt = trace_packet_new();
trace_packet_value_add(pkt, &trace->value, trace->type);
trace_packet_write(client, pkt);
trace_packet_put(pkt);
}
priv->clients = realloc(priv->clients,
sizeof(struct trace *) * (priv->nr_clients + 1));
priv->clients[priv->nr_clients] = client;
priv->nr_clients++;
if (priv->nr_clients == 1) {
struct trace_pkt *pkt;
trace->interval_type = TRACE_INTERVAL_TYPE_INTERVAL;
pkt = trace_packet_new();
trace_packet_interval_set(pkt,
&base_interval, trace->interval_type);
trace_packet_write(trace, pkt);
trace_packet_put(pkt);
}
printf("Trace now has %d clients\n", priv->nr_clients);
}
......@@ -328,6 +420,19 @@ static void val_handle(struct valclient *valclient)
trace = server_find(name);
priv = trace->private;
if (priv->nr_clients == 0 &&
trace->interval_type != TRACE_INTERVAL_TYPE_CHANGED) {
struct trace_pkt *pkt;
trace->interval_type = TRACE_INTERVAL_TYPE_CHANGED;
pkt = trace_packet_new();
trace_packet_interval_set(pkt,
&base_interval, trace->interval_type);
trace_packet_write(trace, pkt);
trace_packet_put(pkt);
}
switch(trace->type) {
case TRACE_VALUE_TYPE_FLOAT:
replyp += sprintf(replyp, "%g ",
......
......@@ -86,6 +86,7 @@ int controller_block_trace_add(char *blockname, char *outterm,
return -1;
}
trace->timestamp = controller_time_seconds;
trace->rd_pos = 0;
trace->wr_pos = 0;
trace->type = type;
......
......@@ -40,6 +40,7 @@
static int nr_traces = 0;
static struct controller_trace_name *trace_list = NULL;
long interval_nsec;
struct controller_trace_name *controller_trace_get(int nr)
{
......@@ -75,9 +76,12 @@ struct trace_hdl {
struct trace trace;
bool added;
enum trace_interval_type type;
long nsec;
char tracename[100];
struct controller_trace ctrace;
union controller_trace_value last;
union controller_trace_value buffer[TRACE_LEN];
};
......@@ -161,16 +165,59 @@ static void *controller_trace_handle(void *arg)
len = trace_hdl[i].ctrace.len - rd_pos;
else {
len = wr_pos - rd_pos;
if (len < 2)
continue;
}
pkt = trace_packet_new();
for (j = 0; j < len; j++) {
size_t pos = rd_pos+j;
bool sendtime = false;
bool sendval = false;
if (trace_hdl[i].nsec < 0) {
sendval = true;
trace_hdl[i].nsec = 0;
}
if (pos == trace_hdl[i].ctrace.timestamp_pos) {
trace_hdl[i].nsec = 0;
sendtime = true;
}
if (trace_hdl[i].type == TRACE_INTERVAL_TYPE_INTERVAL) {
sendval = true;
} else if (trace_hdl[i].type == TRACE_INTERVAL_TYPE_CHANGED) {
uint32_t bufval = 0;
uint32_t last = 0;
switch (trace_hdl[i].trace.type) {
case TRACE_VALUE_TYPE_BOOL:
case TRACE_VALUE_TYPE_SINT8:
case TRACE_VALUE_TYPE_UINT8:
bufval = trace_hdl[i].ctrace.buffer[rd_pos+j].u8;
last = trace_hdl[i].last.u8;
trace_hdl[i].last.u8 = bufval;
break;
case TRACE_VALUE_TYPE_UINT16:
case TRACE_VALUE_TYPE_SINT16:
bufval = trace_hdl[i].ctrace.buffer[rd_pos+j].u16;
last = trace_hdl[i].last.u16;
trace_hdl[i].last.u16 = bufval;
break;
case TRACE_VALUE_TYPE_UINT32:
case TRACE_VALUE_TYPE_SINT32:
case TRACE_VALUE_TYPE_FLOAT:
bufval = trace_hdl[i].ctrace.buffer[rd_pos+j].u32;
last = trace_hdl[i].last.u32;
trace_hdl[i].last.u32 = bufval;
break;
}
if (bufval != last) {
sendtime = true;
sendval = true;
}
}
if (sendtime) {
struct timespec t;
if (pkt->len) {
......@@ -179,9 +226,9 @@ static void *controller_trace_handle(void *arg)
pkt = trace_packet_new();
}
t.tv_sec = trace_hdl[i].ctrace.timestamp;
t.tv_nsec = 0;
t.tv_nsec = trace_hdl[i].nsec;
trace_packet_timestamp_set(
pkt, &t);
trace_packet_write(&trace_hdl[i].trace, pkt);
......@@ -190,9 +237,12 @@ static void *controller_trace_handle(void *arg)
pkt = trace_packet_new();
}
trace_packet_value_add(pkt,
(void*)&trace_hdl[i].ctrace.buffer[rd_pos+j],
trace_hdl[i].trace.type);
if (sendval) {
trace_packet_value_add(pkt,
(void*)&trace_hdl[i].ctrace.buffer[rd_pos+j],
trace_hdl[i].trace.type);
}
trace_hdl[i].nsec += interval_nsec;
}
trace_packet_write(&trace_hdl[i].trace, pkt);
......@@ -261,6 +311,8 @@ static void handler_name(struct trace *trace, char *name)
hdl->ctrace.len = TRACE_LEN;
hdl->ctrace.buffer = hdl->buffer;
memset(hdl->buffer, 0, sizeof(union controller_trace_value) * TRACE_LEN);
if (controller_block_trace_add(
blockname,
termname,
......@@ -280,6 +332,17 @@ static void handler_name(struct trace *trace, char *name)
trace_packet_put(pkt);
}
static void handler_interval(struct trace *trace,
struct timespec *interval, enum trace_interval_type type)
{
struct trace_hdl *hdl = trace->private;
log_send(LOG_T_DEBUG, "Setting interval type %d", type);
hdl->type = type;
}
static void *controller_trace_server(void *arg)
{
int listen_sock;
......@@ -340,6 +403,8 @@ static void *controller_trace_server(void *arg)
trace_hdl[i].trace.name_set = false;
trace_hdl[i].added = false;
trace_hdl[i].type = TRACE_INTERVAL_TYPE_INTERVAL;
trace_hdl[i].nsec = -1;
trace_hdl[i].free = 0;
break;
......@@ -360,6 +425,8 @@ void controller_trace_server_start(int portnr, int max)
pthread_t thread_id;
pthread_attr_t attr;
int i, j;
double period;