Commit 798488cf authored by Jeroen Vreeken's avatar Jeroen Vreeken
Browse files

Add write queue to http server

Fix some cleanup issues in poll code
parent ae3e6837
......@@ -34,6 +34,136 @@
magic_t magic;
struct writebuf {
char *data;
char *msg;
size_t msg_len;
struct writebuf *next;
};
struct ws_client {
struct libwebsocket_context *context;
struct libwebsocket *wsi;
struct writebuf *writeq;
struct ws_client *next;
};
struct writebuf *writebuf_alloc(size_t msglen)
{
struct writebuf *wb;
wb = malloc(sizeof(struct writebuf));
if (!wb)
return NULL;
wb->data = malloc(msglen +
LWS_SEND_BUFFER_PRE_PADDING +
LWS_SEND_BUFFER_POST_PADDING);
wb->msg = wb->data + LWS_SEND_BUFFER_PRE_PADDING;
return wb;
}
void writebuf_free(struct writebuf *wb)
{
free(wb->data);
free(wb);
}
void writebuf_add(struct ws_client *client, struct writebuf *wb)
{
struct writebuf **entry;
for (entry = &client->writeq; *entry; entry = &(*entry)->next);
*entry = wb;
wb->next = NULL;
}
struct writebuf *writebuf_next(struct ws_client *client)
{
struct writebuf *wb;
if (!client->writeq)
return NULL;
wb = client->writeq;
client->writeq = wb->next;
return wb;
}
struct ws_client *ws_client_list = NULL;
struct ws_client *ws_client_add(struct libwebsocket_context *context, struct libwebsocket *wsi)
{
struct ws_client *client;
client = calloc(sizeof(struct ws_client), 1);
if (!client)
return NULL;
client->context = context;
client->wsi = wsi;
client->next = ws_client_list;
ws_client_list = client;
return client;
}
void ws_client_remove(struct ws_client *client)
{
struct ws_client **entry;
for (entry = &ws_client_list; *entry; entry = &(*entry)->next) {
if (*entry == client) {
struct writebuf *wb;
while ((wb = writebuf_next(client))) {
writebuf_free(wb);
}
*entry = (*entry)->next;
free(client);
return;
}
}
}
struct ws_client *ws_client_get_by_wsi(struct libwebsocket *wsi)
{
struct ws_client *entry;
for (entry = ws_client_list; entry; entry = entry->next)
if (entry->wsi == wsi)
return entry;
printf("wsi %p not found\n", wsi);
return NULL;
}
void ws_client_flush(struct ws_client *client)
{
while (client->writeq) {
struct writebuf *wb;
if (lws_send_pipe_choked(client->wsi))
break;
wb = writebuf_next(client);
libwebsocket_write(client->wsi, (unsigned char *)wb->msg, wb->msg_len, LWS_WRITE_TEXT);
}
if (client->writeq) {
libwebsocket_callback_on_writable(client->context, client->wsi);
}
}
#define MAX_POLL_ELEMENTS 256
struct pollfd pollfds[MAX_POLL_ELEMENTS];
int count_pollfds = 0;
......@@ -143,7 +273,7 @@ static void start_status(struct libwebsocket *wsi, char *ident)
static int status_handle(struct status *status)
{
int r;
unsigned char *linebuf = status->linebuf;
char *linebuf = (char *)status->linebuf;
int pos = status->pos;
int fd = status->fd;
char *ident = status->ident;
......@@ -154,16 +284,19 @@ static int status_handle(struct status *status)
for (i = 0; i <r; i++) {
if (linebuf[pos + i] == '\n') {
char *msg = status->msg;
struct ws_client *ws_client;
struct writebuf *wb;
ws_client = ws_client_get_by_wsi(status->wsi);
wb = writebuf_alloc(strlen(ident)+strlen(linebuf)+100);
linebuf[pos + i] = 0;
sprintf(msg,
wb->msg_len = sprintf(wb->msg,
"status %s %s\n",
ident, linebuf);
if (!lws_send_pipe_choked(status->wsi))
libwebsocket_write (status->wsi,
(unsigned char*)msg, strlen(msg),
LWS_WRITE_TEXT);
writebuf_add(ws_client, wb);
ws_client_flush(ws_client);
if (r - i - 1 > 0)
memmove(
linebuf,
......@@ -175,8 +308,8 @@ static int status_handle(struct status *status)
pos += r;
} else if (errno != EAGAIN) {
close(fd);
fd = -1;
poll_remove(fd);
fd = -1;
}
status->pos = pos;
status->fd = fd;
......@@ -251,56 +384,57 @@ static void handler_trace_value(struct trace *trace,
struct trace_value *value)
{
struct libwebsocket *wsi;
char premsg[LWS_SEND_BUFFER_PRE_PADDING + LWS_SEND_BUFFER_POST_PADDING + strlen(trace->name) + 100];
char *msg = premsg + LWS_SEND_BUFFER_PRE_PADDING;
struct ws_client *ws_client;
struct writebuf *wb;
wsi = trace->private;
ws_client = ws_client_get_by_wsi(wsi);
wb = writebuf_alloc(strlen(trace->name) + 100);
switch (trace->type) {
case TRACE_VALUE_TYPE_FLOAT:
sprintf(msg, "trace %s %ld.%09ld %e\n",
wb->msg_len = sprintf(wb->msg, "trace %s %ld.%09ld %e\n",
trace->name,
value->t.tv_sec, value->t.tv_nsec, value->value.f);
break;
case TRACE_VALUE_TYPE_BOOL:
sprintf(msg, "trace %s %ld.%09ld %d\n",
wb->msg_len = sprintf(wb->msg, "trace %s %ld.%09ld %d\n",
trace->name,
value->t.tv_sec, value->t.tv_nsec, value->value.b);
break;
case TRACE_VALUE_TYPE_UINT8:
sprintf(msg, "trace %s %ld.%09ld %u\n",
wb->msg_len = sprintf(wb->msg, "trace %s %ld.%09ld %u\n",
trace->name,
value->t.tv_sec, value->t.tv_nsec, value->value.u8);
break;
case TRACE_VALUE_TYPE_UINT16:
sprintf(msg, "trace %s %ld.%09ld %u\n",
wb->msg_len = sprintf(wb->msg, "trace %s %ld.%09ld %u\n",
trace->name,
value->t.tv_sec, value->t.tv_nsec, value->value.u16);
break;
case TRACE_VALUE_TYPE_UINT32:
sprintf(msg, "trace %s %ld.%09ld %u\n",
wb->msg_len = sprintf(wb->msg, "trace %s %ld.%09ld %u\n",
trace->name,
value->t.tv_sec, value->t.tv_nsec, value->value.u32);
break;
case TRACE_VALUE_TYPE_SINT8:
sprintf(msg, "trace %s %ld.%09ld %d\n",
wb->msg_len = sprintf(wb->msg, "trace %s %ld.%09ld %d\n",
trace->name,
value->t.tv_sec, value->t.tv_nsec, value->value.s8);
break;
case TRACE_VALUE_TYPE_SINT16:
sprintf(msg, "trace %s %ld.%09ld %d\n",
wb->msg_len = sprintf(wb->msg, "trace %s %ld.%09ld %d\n",
trace->name,
value->t.tv_sec, value->t.tv_nsec, value->value.s16);
break;
case TRACE_VALUE_TYPE_SINT32:
sprintf(msg, "trace %s %ld.%09ld %d\n",
wb->msg_len = sprintf(wb->msg, "trace %s %ld.%09ld %d\n",
trace->name,
value->t.tv_sec, value->t.tv_nsec, value->value.s32);
break;
}
if (!lws_send_pipe_choked(wsi))
libwebsocket_write(wsi, (unsigned char*)msg, strlen(msg),
LWS_WRITE_TEXT);
writebuf_add(ws_client, wb);
ws_client_flush(ws_client);
}
static void start_trace(struct libwebsocket *wsi, int freq, char *variable)
......@@ -477,8 +611,14 @@ static int callback_http(struct libwebsocket_context *context,
switch (reason) {
case LWS_CALLBACK_RECEIVE: {
char *rcv = in;
struct ws_client *ws_client;
printf("lws receive: %s\n", rcv);
ws_client = ws_client_get_by_wsi(wsi);
if (!ws_client) {
ws_client = ws_client_add(context, wsi);
}
if (!strncmp(rcv, "status ", 7)) {
char *ident;
......@@ -529,7 +669,8 @@ static int callback_http(struct libwebsocket_context *context,
case LWS_CALLBACK_CLOSED:
status_remove(wsi);
trace_remove(wsi);
printf("Close connection\n");
ws_client_remove(ws_client_get_by_wsi(wsi));
printf("Close connection %p\n", wsi);
break;
case LWS_CALLBACK_HTTP: {
......@@ -589,6 +730,16 @@ static int callback_http(struct libwebsocket_context *context,
break;
}
case LWS_CALLBACK_SERVER_WRITEABLE: {
struct ws_client *ws_client;
ws_client = ws_client_get_by_wsi(wsi);
if (ws_client) {
ws_client_flush(ws_client);
}
break;
}
default:
// if (reason != 30)
// printf("unhandled callback (%d)\n", reason);
......
......@@ -191,7 +191,6 @@ static void client_interval(struct trace *trace,
{
struct trace_pkt *pkt;
struct trace *server;
struct private *priv;
printf("Client %s requests %ld.%09ld type %d\n",
trace->name,
......@@ -208,15 +207,6 @@ static void client_interval(struct trace *trace,
return;
server_interval_check(server);
priv = server->private;
if (type == TRACE_INTERVAL_TYPE_CHANGED && !priv->firstvalue) {
pkt = trace_packet_new();
trace_packet_value_add(pkt, &priv->lastvalue, server->type);
trace_packet_write(trace, pkt);
trace_packet_put(pkt);
}
}
static struct trace_pkt *list_pkt;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment