diff options
author | Wim Taymans <wtaymans@redhat.com> | 2016-10-18 11:11:38 +0200 |
---|---|---|
committer | Wim Taymans <wtaymans@redhat.com> | 2016-10-18 11:11:38 +0200 |
commit | 7ee66cfc35e2f0208d6ce484e9e4ce6a344a3095 (patch) | |
tree | 74dc0feee19a8248119f60ee4cbeb5d2e00f8f87 | |
parent | d711e15f0ad631371d88e2d6376277fa682b7df3 (diff) |
dynamically resize connection buffers
-rw-r--r-- | pinos/client/connection.c | 191 | ||||
-rw-r--r-- | pinos/client/stream.c | 17 | ||||
-rw-r--r-- | pinos/server/client-node.c | 2 |
3 files changed, 107 insertions, 103 deletions
diff --git a/pinos/client/connection.c b/pinos/client/connection.c index f49d0d09..d6f434fb 100644 --- a/pinos/client/connection.c +++ b/pinos/client/connection.c @@ -28,25 +28,27 @@ #include "connection.h" #include "serialize.h" -#define MAX_BUFFER_SIZE 4096 +#define MAX_BUFFER_SIZE 1024 #define MAX_FDS 28 typedef struct { - uint8_t buffer_data[MAX_BUFFER_SIZE]; - size_t buffer_size; - int fds[MAX_FDS]; - unsigned int n_fds; + uint8_t *buffer_data; + size_t buffer_size; + size_t buffer_maxsize; + int fds[MAX_FDS]; + unsigned int n_fds; SpaControlCmd cmd; off_t offset; void *data; size_t size; + + bool update; } ConnectionBuffer; struct _SpaConnection { ConnectionBuffer in, out; int fd; - bool update; }; #if 0 @@ -58,26 +60,22 @@ struct _SpaConnection { static bool read_length (uint8_t * data, unsigned int size, size_t * length, size_t * skip) { - size_t len, offset; uint8_t b; /* start reading the length, we need this to skip to the data later */ - len = offset = 0; + *length = *skip = 0; do { - if (offset >= size) + if (*skip >= size) return false; - b = data[offset++]; - len = (len << 7) | (b & 0x7f); + b = data[(*skip)++]; + *length = (*length << 7) | (b & 0x7f); } while (b & 0x80); /* check remaining command size */ - if (size - offset < len) + if (size - *skip < *length) return false; - *length = len; - *skip = offset; - return true; } @@ -160,12 +158,14 @@ connection_parse_node_command (SpaConnection *conn, SpaControlCmdNodeCommand *cm #define MAX(a,b) ((a) > (b) ? (a) : (b)) static void * -connection_ensure_size (SpaConnection *conn, size_t size) +connection_ensure_size (SpaConnection *conn, ConnectionBuffer *buf, size_t size) { - if (conn->out.buffer_size + size > MAX_BUFFER_SIZE) { - fprintf (stderr, "error connection %p: overflow\n", conn); + if (buf->buffer_size + size > buf->buffer_maxsize) { + buf->buffer_maxsize = buf->buffer_size + MAX_BUFFER_SIZE * ((size + MAX_BUFFER_SIZE-1) / MAX_BUFFER_SIZE); + buf->buffer_data = realloc (buf->buffer_data, buf->buffer_maxsize); + fprintf (stderr, "connection %p: resize buffer to %zd\n", conn, buf->buffer_maxsize); } - return (uint8_t *) conn->out.buffer_data + conn->out.buffer_size; + return (uint8_t *) buf->buffer_data + buf->buffer_size; } static void * @@ -173,17 +173,18 @@ connection_add_cmd (SpaConnection *conn, SpaControlCmd cmd, size_t size) { uint8_t *p; unsigned int plen; + ConnectionBuffer *buf = &conn->out; plen = 1; while (size >> (7 * plen)) plen++; /* 1 for cmd, plen for size and size for payload */ - p = connection_ensure_size (conn, 1 + plen + size); + p = connection_ensure_size (conn, buf, 1 + plen + size); - conn->out.cmd = cmd; - conn->out.offset = conn->out.buffer_size; - conn->out.buffer_size += 1 + plen + size; + buf->cmd = cmd; + buf->offset = buf->buffer_size; + buf->buffer_size += 1 + plen + size; *p++ = cmd; /* write length */ @@ -382,7 +383,7 @@ connection_add_node_command (SpaConnection *conn, SpaControlCmdNodeCommand *cm) } static SpaResult -refill_buffer (SpaConnection *conn) +refill_buffer (SpaConnection *conn, ConnectionBuffer *buf) { ssize_t len; struct cmsghdr *cmsg; @@ -390,13 +391,8 @@ refill_buffer (SpaConnection *conn) struct iovec iov[1]; char cmsgbuf[CMSG_SPACE (MAX_FDS * sizeof (int))]; - conn->in.cmd = SPA_CONTROL_CMD_INVALID; - conn->in.offset = 0; - conn->in.size = 0; - conn->in.buffer_size = 0; - - iov[0].iov_base = conn->in.buffer_data; - iov[0].iov_len = MAX_BUFFER_SIZE; + iov[0].iov_base = buf->buffer_data + buf->buffer_size; + iov[0].iov_len = buf->buffer_maxsize - buf->buffer_size; msg.msg_iov = iov; msg.msg_iovlen = 1; msg.msg_control = cmsgbuf; @@ -417,17 +413,17 @@ refill_buffer (SpaConnection *conn) if (len < 4) return SPA_RESULT_ERROR; - conn->in.buffer_size = len; + buf->buffer_size += len; /* handle control messages */ for (cmsg = CMSG_FIRSTHDR (&msg); cmsg != NULL; cmsg = CMSG_NXTHDR (&msg, cmsg)) { if (cmsg->cmsg_level != SOL_SOCKET || cmsg->cmsg_type != SCM_RIGHTS) continue; - conn->in.n_fds = (cmsg->cmsg_len - ((char *)CMSG_DATA (cmsg) - (char *)cmsg)) / sizeof (int); - memcpy (conn->in.fds, CMSG_DATA (cmsg), conn->in.n_fds * sizeof (int)); + buf->n_fds = (cmsg->cmsg_len - ((char *)CMSG_DATA (cmsg) - (char *)cmsg)) / sizeof (int); + memcpy (buf->fds, CMSG_DATA (cmsg), buf->n_fds * sizeof (int)); } - SPA_DEBUG_CONTROL ("connection %p: %d read %zd bytes and %d fds\n", conn, conn->fd, len, conn->in.n_fds); + SPA_DEBUG_CONTROL ("connection %p: %d read %zd bytes and %d fds\n", conn, conn->fd, len, buf->n_fds); return SPA_RESULT_OK; @@ -439,6 +435,24 @@ recv_error: } } +static void +clear_buffer (ConnectionBuffer *buf) +{ + unsigned int i; + + for (i = 0; i < buf->n_fds; i++) { + if (buf->fds[i] > 0) { + if (close (buf->fds[i]) < 0) + perror ("close"); + } + } + buf->n_fds = 0; + buf->cmd = SPA_CONTROL_CMD_INVALID; + buf->offset = 0; + buf->size = 0; + buf->buffer_size = 0; +} + SpaConnection * spa_connection_new (int fd) { @@ -446,10 +460,23 @@ spa_connection_new (int fd) c = calloc (1, sizeof (SpaConnection)); c->fd = fd; + c->out.buffer_data = malloc (MAX_BUFFER_SIZE); + c->out.buffer_maxsize = MAX_BUFFER_SIZE; + c->in.buffer_data = malloc (MAX_BUFFER_SIZE); + c->in.buffer_maxsize = MAX_BUFFER_SIZE; + c->in.update = true; return c; } +void +spa_connection_free (SpaConnection *conn) +{ + free (conn->out.buffer_data); + free (conn->in.buffer_data); + free (conn); +} + /** * spa_connection_has_next: * @iter: a #SpaConnection @@ -463,44 +490,47 @@ spa_connection_has_next (SpaConnection *conn) { size_t len, size, skip; uint8_t *data; + ConnectionBuffer *buf; if (conn == NULL) return SPA_RESULT_INVALID_ARGUMENTS; + buf = &conn->in; + /* move to next packet */ - conn->in.offset += conn->in.size; + buf->offset += buf->size; - if (conn->update) { - refill_buffer (conn); - conn->update = false; +again: + if (buf->update) { + refill_buffer (conn, buf); + buf->update = false; } /* now read packet */ - data = conn->in.buffer_data; - size = conn->in.buffer_size; + data = buf->buffer_data; + size = buf->buffer_size; - if (conn->in.offset >= size) { - conn->update = true; + if (buf->offset >= size) { + clear_buffer (buf); + buf->update = true; return SPA_RESULT_ERROR; } - data += conn->in.offset; - size -= conn->in.offset; - - if (size < 1) - return SPA_RESULT_ERROR; - - conn->in.cmd = *data; + data += buf->offset; + size -= buf->offset; + buf->cmd = *data; data++; size--; - if (!read_length (data, size, &len, &skip)) - return SPA_RESULT_ERROR; - - conn->in.size = len; - conn->in.data = data + skip; - conn->in.offset += 1 + skip; + if (!read_length (data, size, &len, &skip)) { + connection_ensure_size (conn, buf, len + skip); + buf->update = true; + goto again; + } + buf->size = len; + buf->data = data + skip; + buf->offset += 1 + skip; return SPA_RESULT_OK; } @@ -619,10 +649,7 @@ spa_connection_get_fd (SpaConnection *conn, { int fd; - if (conn == NULL) - return -1; - - if (conn->in.n_fds < index) + if (conn == NULL || conn->in.n_fds < index) return -1; fd = conn->in.fds[index]; @@ -766,21 +793,24 @@ spa_connection_flush (SpaConnection *conn) struct cmsghdr *cmsg; char cmsgbuf[CMSG_SPACE (MAX_FDS * sizeof (int))]; int *cm, i, fds_len; + ConnectionBuffer *buf; if (conn == NULL) return SPA_RESULT_INVALID_ARGUMENTS; - if (conn->out.buffer_size == 0) + buf = &conn->out; + + if (buf->buffer_size == 0) return SPA_RESULT_OK; - fds_len = conn->out.n_fds * sizeof (int); + fds_len = buf->n_fds * sizeof (int); - iov[0].iov_base = conn->out.buffer_data; - iov[0].iov_len = conn->out.buffer_size; + iov[0].iov_base = buf->buffer_data; + iov[0].iov_len = buf->buffer_size; msg.msg_iov = iov; msg.msg_iovlen = 1; - if (conn->out.n_fds > 0) { + if (buf->n_fds > 0) { msg.msg_control = cmsgbuf; msg.msg_controllen = CMSG_SPACE (fds_len); cmsg = CMSG_FIRSTHDR(&msg); @@ -788,8 +818,8 @@ spa_connection_flush (SpaConnection *conn) cmsg->cmsg_type = SCM_RIGHTS; cmsg->cmsg_len = CMSG_LEN (fds_len); cm = (int*)CMSG_DATA (cmsg); - for (i = 0; i < conn->out.n_fds; i++) - cm[i] = conn->out.fds[i] > 0 ? conn->out.fds[i] : -conn->out.fds[i]; + for (i = 0; i < buf->n_fds; i++) + cm[i] = buf->fds[i] > 0 ? buf->fds[i] : -buf->fds[i]; msg.msg_controllen = cmsg->cmsg_len; } else { msg.msg_control = NULL; @@ -806,10 +836,10 @@ spa_connection_flush (SpaConnection *conn) } break; } - conn->out.buffer_size -= len; - conn->out.n_fds = 0; + buf->buffer_size -= len; + buf->n_fds = 0; - SPA_DEBUG_CONTROL ("connection %p: %d written %zd bytes and %u fds\n", conn, conn->fd, len, conn->out.n_fds); + SPA_DEBUG_CONTROL ("connection %p: %d written %zd bytes and %u fds\n", conn, conn->fd, len, buf->n_fds); return SPA_RESULT_OK; @@ -824,27 +854,12 @@ send_error: SpaResult spa_connection_clear (SpaConnection *conn) { - unsigned int i; - if (conn == NULL) return SPA_RESULT_INVALID_ARGUMENTS; - for (i = 0; i < conn->out.n_fds; i++) { - if (conn->out.fds[i] > 0) { - if (close (conn->out.fds[i]) < 0) - perror ("close"); - } - } - conn->out.n_fds = 0; - conn->out.buffer_size = 0; - for (i = 0; i < conn->in.n_fds; i++) { - if (conn->in.fds[i] > 0) { - if (close (conn->in.fds[i]) < 0) - perror ("close"); - } - } - conn->in.n_fds = 0; - conn->in.buffer_size = 0; + clear_buffer (&conn->out); + clear_buffer (&conn->in); + conn->in.update = true; return SPA_RESULT_OK; } diff --git a/pinos/client/stream.c b/pinos/client/stream.c index 778ff620..a2848032 100644 --- a/pinos/client/stream.c +++ b/pinos/client/stream.c @@ -662,7 +662,7 @@ add_port_update (PinosStream *stream, uint32_t change_mask) } static void -add_need_input (PinosStream *stream, uint32_t port_id) +send_need_input (PinosStream *stream, uint32_t port_id) { PinosStreamPrivate *priv = stream->priv; SpaControlCmdNodeEvent cne; @@ -675,14 +675,6 @@ add_need_input (PinosStream *stream, uint32_t port_id) ne.size = sizeof (ni); ni.port_id = port_id; spa_connection_add_cmd (priv->rtconn, SPA_CONTROL_CMD_NODE_EVENT, &cne); -} - -static void -send_need_input (PinosStream *stream, uint32_t port_id) -{ - PinosStreamPrivate *priv = stream->priv; - - add_need_input (stream, port_id); if (spa_connection_flush (priv->rtconn) < 0) g_warning ("stream %p: error writing connection", stream); @@ -917,11 +909,8 @@ handle_node_command (PinosStream *stream, if (spa_connection_flush (priv->conn) < 0) g_warning ("stream %p: error writing connection", stream); - if (priv->direction == SPA_DIRECTION_INPUT) { - add_need_input (stream, priv->port_id); - if (spa_connection_flush (priv->rtconn) < 0) - g_warning ("stream %p: error writing connection", stream); - } + if (priv->direction == SPA_DIRECTION_INPUT) + send_need_input (stream, priv->port_id); stream_set_state (stream, PINOS_STREAM_STATE_STREAMING, NULL); break; diff --git a/pinos/server/client-node.c b/pinos/server/client-node.c index a5405cb5..2c966a9b 100644 --- a/pinos/server/client-node.c +++ b/pinos/server/client-node.c @@ -1048,7 +1048,7 @@ parse_connection (SpaProxy *this) case SPA_CONTROL_CMD_SET_PROPERTY: case SPA_CONTROL_CMD_NODE_COMMAND: case SPA_CONTROL_CMD_PROCESS_BUFFER: - spa_log_error (this->log, "proxy %p: got unexpected connection %d\n", this, cmd); + spa_log_error (this->log, "proxy %p: got unexpected command %d\n", this, cmd); break; case SPA_CONTROL_CMD_NODE_UPDATE: |