Got rid of a malloc/free per line of input.

This patch alone improved throughput ~20% (52MB/s -> 63MB/s)
This commit is contained in:
Marcel Hellkamp 2018-03-31 21:42:50 +02:00
parent b2b4403b67
commit c3171ab76d

View file

@ -35,43 +35,76 @@ typedef struct NetClient {
#define NET_CSTATE_OPEN 0 #define NET_CSTATE_OPEN 0
#define NET_CSTATE_CLOSING 1 #define NET_CSTATE_CLOSING 1
static inline int min(int a, int b) {
return a < b ? a : b;
}
// global state // global state
static struct event_base *base; static struct event_base *base;
static char * line_buffer;
// User defined callbacks // User defined callbacks
static net_on_connect netcb_on_connect = NULL; static net_on_connect netcb_on_connect = NULL;
static net_on_read netcb_on_read = NULL; static net_on_read netcb_on_read = NULL;
static net_on_close netcb_on_close = NULL; static net_on_close netcb_on_close = NULL;
// libevent callbacks // libevent callbacks
// Like evbuffer_readln, but read into an existing string instead of allocating a new one.
// Return 1 if a line ending was found and the line was read completely.
// Return 2 if a line ending was found but the line was read only partially (longer than maxread-1).
// Return 0 if no line ending not found within the buffer.
int net_evbuffer_readln(struct evbuffer *buffer, char *line, size_t maxread, size_t * read_out, enum evbuffer_eol_style eol_style) {
struct evbuffer_ptr it;
size_t read_len=0, eol_len=0;
it = evbuffer_search_eol(buffer, NULL, &eol_len, eol_style);
if (it.pos < 0)
return 0;
read_len = min(maxread-1, it.pos);
if(read_out)
*read_out = read_len;
evbuffer_remove(buffer, line, read_len);
line[read_len] = '\0';
// Eat EOL if we have read all bytes before it
if(read_len == it.pos) {
evbuffer_drain(buffer, eol_len);
return 1;
} else {
return 2;
}
}
static void netev_on_read(struct bufferevent *bev, void *ctx) { static void netev_on_read(struct bufferevent *bev, void *ctx) {
struct evbuffer *input; struct evbuffer *input;
char *line;
size_t n;
NetClient *client = ctx; NetClient *client = ctx;
int r = 0;
input = bufferevent_get_input(bev); input = bufferevent_get_input(bev);
// Change while->if for less throughput but more fair pixel distribution across client connections. // Change while->if for less throughput but more fair pixel distribution across client connections.
while ((line = evbuffer_readln(input, &n, EVBUFFER_EOL_LF))) { while ((r = net_evbuffer_readln(input, line_buffer, NET_MAX_LINE, NULL, EVBUFFER_EOL_LF)) == 1) {
(*netcb_on_read)(client, line); (*netcb_on_read)(client, line_buffer);
free(line);
} }
if (evbuffer_get_length(input) >= NET_MAX_LINE) { if (r == 2) {
net_err(client, "Line to long"); net_err(client, "Line to long");
} }
} }
static void netev_on_write(struct bufferevent *bev, void *arg) { static void netev_on_write(struct bufferevent *bev, void *arg) {
NetClient *client = arg; NetClient *client = arg;
if (client->state == NET_CSTATE_CLOSING && evbuffer_get_length(bufferevent_get_output(bev)) == 0) { if (client->state == NET_CSTATE_CLOSING
&& evbuffer_get_length(bufferevent_get_output(bev)) == 0) {
if(netcb_on_close) if (netcb_on_close)
(*netcb_on_close)(client, 0); (*netcb_on_close)(client, 0);
bufferevent_free(bev); bufferevent_free(bev);
@ -89,7 +122,7 @@ static void netev_on_error(struct bufferevent *bev, short error, void *arg) {
} }
client->state = NET_CSTATE_CLOSING; client->state = NET_CSTATE_CLOSING;
if(netcb_on_close) if (netcb_on_close)
(*netcb_on_close)(client, error); (*netcb_on_close)(client, error);
bufferevent_free(bev); bufferevent_free(bev);
@ -114,28 +147,32 @@ static void on_accept(evutil_socket_t listener, short event, void *arg) {
} }
client->sock_fd = fd; client->sock_fd = fd;
client->buf_ev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE); client->buf_ev = bufferevent_socket_new(base, fd,
BEV_OPT_CLOSE_ON_FREE);
evutil_make_socket_nonblocking(fd); evutil_make_socket_nonblocking(fd);
bufferevent_setcb(client->buf_ev, netev_on_read, netev_on_write, netev_on_error, client); bufferevent_setcb(client->buf_ev, netev_on_read, netev_on_write,
netev_on_error, client);
bufferevent_setwatermark(client->buf_ev, EV_READ, 0, NET_MAX_BUFFER); bufferevent_setwatermark(client->buf_ev, EV_READ, 0, NET_MAX_BUFFER);
if(netcb_on_connect) if (netcb_on_connect)
(*netcb_on_connect)(client); (*netcb_on_connect)(client);
if(client->state == NET_CSTATE_OPEN) if (client->state == NET_CSTATE_OPEN)
bufferevent_enable(client->buf_ev, EV_READ | EV_WRITE); bufferevent_enable(client->buf_ev, EV_READ | EV_WRITE);
} }
} }
// Public functions // Public functions
void net_start(int port, net_on_connect on_connect, net_on_read on_read, net_on_close on_close) { void net_start(int port, net_on_connect on_connect, net_on_read on_read,
net_on_close on_close) {
evutil_socket_t listener; evutil_socket_t listener;
struct sockaddr_in sin; struct sockaddr_in sin;
struct event *listener_event; struct event *listener_event;
line_buffer = malloc(sizeof(char)*NET_MAX_LINE);
evthread_use_pthreads(); evthread_use_pthreads();
//setvbuf(stdout, NULL, _IONBF, 0); //setvbuf(stdout, NULL, _IONBF, 0);
@ -165,18 +202,21 @@ void net_start(int port, net_on_connect on_connect, net_on_read on_read, net_on_
err(1, "listen failed"); err(1, "listen failed");
} }
listener_event = event_new(base, listener, EV_READ | EV_PERSIST, on_accept, (void*) base); listener_event = event_new(base, listener, EV_READ | EV_PERSIST, on_accept,
(void*) base);
event_add(listener_event, NULL); event_add(listener_event, NULL);
event_base_dispatch(base); event_base_dispatch(base);
} }
void net_stop() { void net_stop() {
event_base_loopbreak(base); event_base_loopbreak(base);
if(line_buffer) {
free(line_buffer);
line_buffer = NULL;
}
} }
void net_send(NetClient *client, const char * msg) { void net_send(NetClient *client, const char * msg) {
struct evbuffer *output = bufferevent_get_output(client->buf_ev); struct evbuffer *output = bufferevent_get_output(client->buf_ev);
evbuffer_add(output, msg, strlen(msg)); evbuffer_add(output, msg, strlen(msg));
@ -184,7 +224,7 @@ void net_send(NetClient *client, const char * msg) {
} }
void net_close(NetClient *client) { void net_close(NetClient *client) {
if(client->state == NET_CSTATE_OPEN) { if (client->state == NET_CSTATE_OPEN) {
client->state = NET_CSTATE_CLOSING; client->state = NET_CSTATE_CLOSING;
bufferevent_disable(client->buf_ev, EV_READ); bufferevent_disable(client->buf_ev, EV_READ);
} }