summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWim Taymans <wtaymans@redhat.com>2016-10-18 11:11:38 +0200
committerWim Taymans <wtaymans@redhat.com>2016-10-18 11:11:38 +0200
commit7ee66cfc35e2f0208d6ce484e9e4ce6a344a3095 (patch)
tree74dc0feee19a8248119f60ee4cbeb5d2e00f8f87
parentd711e15f0ad631371d88e2d6376277fa682b7df3 (diff)
dynamically resize connection buffers
-rw-r--r--pinos/client/connection.c191
-rw-r--r--pinos/client/stream.c17
-rw-r--r--pinos/server/client-node.c2
3 files changed, 107 insertions, 103 deletions
diff --git a/pinos/client/connection.c b/pinos/client/connection.c
index f49d0d09..d6f434fb 100644
--- a/pinos/client/connection.c
+++ b/pinos/client/connection.c
@@ -28,25 +28,27 @@
#include "connection.h"
#include "serialize.h"
-#define MAX_BUFFER_SIZE 4096
+#define MAX_BUFFER_SIZE 1024
#define MAX_FDS 28
typedef struct {
- uint8_t buffer_data[MAX_BUFFER_SIZE];
- size_t buffer_size;
- int fds[MAX_FDS];
- unsigned int n_fds;
+ uint8_t *buffer_data;
+ size_t buffer_size;
+ size_t buffer_maxsize;
+ int fds[MAX_FDS];
+ unsigned int n_fds;
SpaControlCmd cmd;
off_t offset;
void *data;
size_t size;
+
+ bool update;
} ConnectionBuffer;
struct _SpaConnection {
ConnectionBuffer in, out;
int fd;
- bool update;
};
#if 0
@@ -58,26 +60,22 @@ struct _SpaConnection {
static bool
read_length (uint8_t * data, unsigned int size, size_t * length, size_t * skip)
{
- size_t len, offset;
uint8_t b;
/* start reading the length, we need this to skip to the data later */
- len = offset = 0;
+ *length = *skip = 0;
do {
- if (offset >= size)
+ if (*skip >= size)
return false;
- b = data[offset++];
- len = (len << 7) | (b & 0x7f);
+ b = data[(*skip)++];
+ *length = (*length << 7) | (b & 0x7f);
} while (b & 0x80);
/* check remaining command size */
- if (size - offset < len)
+ if (size - *skip < *length)
return false;
- *length = len;
- *skip = offset;
-
return true;
}
@@ -160,12 +158,14 @@ connection_parse_node_command (SpaConnection *conn, SpaControlCmdNodeCommand *cm
#define MAX(a,b) ((a) > (b) ? (a) : (b))
static void *
-connection_ensure_size (SpaConnection *conn, size_t size)
+connection_ensure_size (SpaConnection *conn, ConnectionBuffer *buf, size_t size)
{
- if (conn->out.buffer_size + size > MAX_BUFFER_SIZE) {
- fprintf (stderr, "error connection %p: overflow\n", conn);
+ if (buf->buffer_size + size > buf->buffer_maxsize) {
+ buf->buffer_maxsize = buf->buffer_size + MAX_BUFFER_SIZE * ((size + MAX_BUFFER_SIZE-1) / MAX_BUFFER_SIZE);
+ buf->buffer_data = realloc (buf->buffer_data, buf->buffer_maxsize);
+ fprintf (stderr, "connection %p: resize buffer to %zd\n", conn, buf->buffer_maxsize);
}
- return (uint8_t *) conn->out.buffer_data + conn->out.buffer_size;
+ return (uint8_t *) buf->buffer_data + buf->buffer_size;
}
static void *
@@ -173,17 +173,18 @@ connection_add_cmd (SpaConnection *conn, SpaControlCmd cmd, size_t size)
{
uint8_t *p;
unsigned int plen;
+ ConnectionBuffer *buf = &conn->out;
plen = 1;
while (size >> (7 * plen))
plen++;
/* 1 for cmd, plen for size and size for payload */
- p = connection_ensure_size (conn, 1 + plen + size);
+ p = connection_ensure_size (conn, buf, 1 + plen + size);
- conn->out.cmd = cmd;
- conn->out.offset = conn->out.buffer_size;
- conn->out.buffer_size += 1 + plen + size;
+ buf->cmd = cmd;
+ buf->offset = buf->buffer_size;
+ buf->buffer_size += 1 + plen + size;
*p++ = cmd;
/* write length */
@@ -382,7 +383,7 @@ connection_add_node_command (SpaConnection *conn, SpaControlCmdNodeCommand *cm)
}
static SpaResult
-refill_buffer (SpaConnection *conn)
+refill_buffer (SpaConnection *conn, ConnectionBuffer *buf)
{
ssize_t len;
struct cmsghdr *cmsg;
@@ -390,13 +391,8 @@ refill_buffer (SpaConnection *conn)
struct iovec iov[1];
char cmsgbuf[CMSG_SPACE (MAX_FDS * sizeof (int))];
- conn->in.cmd = SPA_CONTROL_CMD_INVALID;
- conn->in.offset = 0;
- conn->in.size = 0;
- conn->in.buffer_size = 0;
-
- iov[0].iov_base = conn->in.buffer_data;
- iov[0].iov_len = MAX_BUFFER_SIZE;
+ iov[0].iov_base = buf->buffer_data + buf->buffer_size;
+ iov[0].iov_len = buf->buffer_maxsize - buf->buffer_size;
msg.msg_iov = iov;
msg.msg_iovlen = 1;
msg.msg_control = cmsgbuf;
@@ -417,17 +413,17 @@ refill_buffer (SpaConnection *conn)
if (len < 4)
return SPA_RESULT_ERROR;
- conn->in.buffer_size = len;
+ buf->buffer_size += len;
/* handle control messages */
for (cmsg = CMSG_FIRSTHDR (&msg); cmsg != NULL; cmsg = CMSG_NXTHDR (&msg, cmsg)) {
if (cmsg->cmsg_level != SOL_SOCKET || cmsg->cmsg_type != SCM_RIGHTS)
continue;
- conn->in.n_fds = (cmsg->cmsg_len - ((char *)CMSG_DATA (cmsg) - (char *)cmsg)) / sizeof (int);
- memcpy (conn->in.fds, CMSG_DATA (cmsg), conn->in.n_fds * sizeof (int));
+ buf->n_fds = (cmsg->cmsg_len - ((char *)CMSG_DATA (cmsg) - (char *)cmsg)) / sizeof (int);
+ memcpy (buf->fds, CMSG_DATA (cmsg), buf->n_fds * sizeof (int));
}
- SPA_DEBUG_CONTROL ("connection %p: %d read %zd bytes and %d fds\n", conn, conn->fd, len, conn->in.n_fds);
+ SPA_DEBUG_CONTROL ("connection %p: %d read %zd bytes and %d fds\n", conn, conn->fd, len, buf->n_fds);
return SPA_RESULT_OK;
@@ -439,6 +435,24 @@ recv_error:
}
}
+static void
+clear_buffer (ConnectionBuffer *buf)
+{
+ unsigned int i;
+
+ for (i = 0; i < buf->n_fds; i++) {
+ if (buf->fds[i] > 0) {
+ if (close (buf->fds[i]) < 0)
+ perror ("close");
+ }
+ }
+ buf->n_fds = 0;
+ buf->cmd = SPA_CONTROL_CMD_INVALID;
+ buf->offset = 0;
+ buf->size = 0;
+ buf->buffer_size = 0;
+}
+
SpaConnection *
spa_connection_new (int fd)
{
@@ -446,10 +460,23 @@ spa_connection_new (int fd)
c = calloc (1, sizeof (SpaConnection));
c->fd = fd;
+ c->out.buffer_data = malloc (MAX_BUFFER_SIZE);
+ c->out.buffer_maxsize = MAX_BUFFER_SIZE;
+ c->in.buffer_data = malloc (MAX_BUFFER_SIZE);
+ c->in.buffer_maxsize = MAX_BUFFER_SIZE;
+ c->in.update = true;
return c;
}
+void
+spa_connection_free (SpaConnection *conn)
+{
+ free (conn->out.buffer_data);
+ free (conn->in.buffer_data);
+ free (conn);
+}
+
/**
* spa_connection_has_next:
* @iter: a #SpaConnection
@@ -463,44 +490,47 @@ spa_connection_has_next (SpaConnection *conn)
{
size_t len, size, skip;
uint8_t *data;
+ ConnectionBuffer *buf;
if (conn == NULL)
return SPA_RESULT_INVALID_ARGUMENTS;
+ buf = &conn->in;
+
/* move to next packet */
- conn->in.offset += conn->in.size;
+ buf->offset += buf->size;
- if (conn->update) {
- refill_buffer (conn);
- conn->update = false;
+again:
+ if (buf->update) {
+ refill_buffer (conn, buf);
+ buf->update = false;
}
/* now read packet */
- data = conn->in.buffer_data;
- size = conn->in.buffer_size;
+ data = buf->buffer_data;
+ size = buf->buffer_size;
- if (conn->in.offset >= size) {
- conn->update = true;
+ if (buf->offset >= size) {
+ clear_buffer (buf);
+ buf->update = true;
return SPA_RESULT_ERROR;
}
- data += conn->in.offset;
- size -= conn->in.offset;
-
- if (size < 1)
- return SPA_RESULT_ERROR;
-
- conn->in.cmd = *data;
+ data += buf->offset;
+ size -= buf->offset;
+ buf->cmd = *data;
data++;
size--;
- if (!read_length (data, size, &len, &skip))
- return SPA_RESULT_ERROR;
-
- conn->in.size = len;
- conn->in.data = data + skip;
- conn->in.offset += 1 + skip;
+ if (!read_length (data, size, &len, &skip)) {
+ connection_ensure_size (conn, buf, len + skip);
+ buf->update = true;
+ goto again;
+ }
+ buf->size = len;
+ buf->data = data + skip;
+ buf->offset += 1 + skip;
return SPA_RESULT_OK;
}
@@ -619,10 +649,7 @@ spa_connection_get_fd (SpaConnection *conn,
{
int fd;
- if (conn == NULL)
- return -1;
-
- if (conn->in.n_fds < index)
+ if (conn == NULL || conn->in.n_fds < index)
return -1;
fd = conn->in.fds[index];
@@ -766,21 +793,24 @@ spa_connection_flush (SpaConnection *conn)
struct cmsghdr *cmsg;
char cmsgbuf[CMSG_SPACE (MAX_FDS * sizeof (int))];
int *cm, i, fds_len;
+ ConnectionBuffer *buf;
if (conn == NULL)
return SPA_RESULT_INVALID_ARGUMENTS;
- if (conn->out.buffer_size == 0)
+ buf = &conn->out;
+
+ if (buf->buffer_size == 0)
return SPA_RESULT_OK;
- fds_len = conn->out.n_fds * sizeof (int);
+ fds_len = buf->n_fds * sizeof (int);
- iov[0].iov_base = conn->out.buffer_data;
- iov[0].iov_len = conn->out.buffer_size;
+ iov[0].iov_base = buf->buffer_data;
+ iov[0].iov_len = buf->buffer_size;
msg.msg_iov = iov;
msg.msg_iovlen = 1;
- if (conn->out.n_fds > 0) {
+ if (buf->n_fds > 0) {
msg.msg_control = cmsgbuf;
msg.msg_controllen = CMSG_SPACE (fds_len);
cmsg = CMSG_FIRSTHDR(&msg);
@@ -788,8 +818,8 @@ spa_connection_flush (SpaConnection *conn)
cmsg->cmsg_type = SCM_RIGHTS;
cmsg->cmsg_len = CMSG_LEN (fds_len);
cm = (int*)CMSG_DATA (cmsg);
- for (i = 0; i < conn->out.n_fds; i++)
- cm[i] = conn->out.fds[i] > 0 ? conn->out.fds[i] : -conn->out.fds[i];
+ for (i = 0; i < buf->n_fds; i++)
+ cm[i] = buf->fds[i] > 0 ? buf->fds[i] : -buf->fds[i];
msg.msg_controllen = cmsg->cmsg_len;
} else {
msg.msg_control = NULL;
@@ -806,10 +836,10 @@ spa_connection_flush (SpaConnection *conn)
}
break;
}
- conn->out.buffer_size -= len;
- conn->out.n_fds = 0;
+ buf->buffer_size -= len;
+ buf->n_fds = 0;
- SPA_DEBUG_CONTROL ("connection %p: %d written %zd bytes and %u fds\n", conn, conn->fd, len, conn->out.n_fds);
+ SPA_DEBUG_CONTROL ("connection %p: %d written %zd bytes and %u fds\n", conn, conn->fd, len, buf->n_fds);
return SPA_RESULT_OK;
@@ -824,27 +854,12 @@ send_error:
SpaResult
spa_connection_clear (SpaConnection *conn)
{
- unsigned int i;
-
if (conn == NULL)
return SPA_RESULT_INVALID_ARGUMENTS;
- for (i = 0; i < conn->out.n_fds; i++) {
- if (conn->out.fds[i] > 0) {
- if (close (conn->out.fds[i]) < 0)
- perror ("close");
- }
- }
- conn->out.n_fds = 0;
- conn->out.buffer_size = 0;
- for (i = 0; i < conn->in.n_fds; i++) {
- if (conn->in.fds[i] > 0) {
- if (close (conn->in.fds[i]) < 0)
- perror ("close");
- }
- }
- conn->in.n_fds = 0;
- conn->in.buffer_size = 0;
+ clear_buffer (&conn->out);
+ clear_buffer (&conn->in);
+ conn->in.update = true;
return SPA_RESULT_OK;
}
diff --git a/pinos/client/stream.c b/pinos/client/stream.c
index 778ff620..a2848032 100644
--- a/pinos/client/stream.c
+++ b/pinos/client/stream.c
@@ -662,7 +662,7 @@ add_port_update (PinosStream *stream, uint32_t change_mask)
}
static void
-add_need_input (PinosStream *stream, uint32_t port_id)
+send_need_input (PinosStream *stream, uint32_t port_id)
{
PinosStreamPrivate *priv = stream->priv;
SpaControlCmdNodeEvent cne;
@@ -675,14 +675,6 @@ add_need_input (PinosStream *stream, uint32_t port_id)
ne.size = sizeof (ni);
ni.port_id = port_id;
spa_connection_add_cmd (priv->rtconn, SPA_CONTROL_CMD_NODE_EVENT, &cne);
-}
-
-static void
-send_need_input (PinosStream *stream, uint32_t port_id)
-{
- PinosStreamPrivate *priv = stream->priv;
-
- add_need_input (stream, port_id);
if (spa_connection_flush (priv->rtconn) < 0)
g_warning ("stream %p: error writing connection", stream);
@@ -917,11 +909,8 @@ handle_node_command (PinosStream *stream,
if (spa_connection_flush (priv->conn) < 0)
g_warning ("stream %p: error writing connection", stream);
- if (priv->direction == SPA_DIRECTION_INPUT) {
- add_need_input (stream, priv->port_id);
- if (spa_connection_flush (priv->rtconn) < 0)
- g_warning ("stream %p: error writing connection", stream);
- }
+ if (priv->direction == SPA_DIRECTION_INPUT)
+ send_need_input (stream, priv->port_id);
stream_set_state (stream, PINOS_STREAM_STATE_STREAMING, NULL);
break;
diff --git a/pinos/server/client-node.c b/pinos/server/client-node.c
index a5405cb5..2c966a9b 100644
--- a/pinos/server/client-node.c
+++ b/pinos/server/client-node.c
@@ -1048,7 +1048,7 @@ parse_connection (SpaProxy *this)
case SPA_CONTROL_CMD_SET_PROPERTY:
case SPA_CONTROL_CMD_NODE_COMMAND:
case SPA_CONTROL_CMD_PROCESS_BUFFER:
- spa_log_error (this->log, "proxy %p: got unexpected connection %d\n", this, cmd);
+ spa_log_error (this->log, "proxy %p: got unexpected command %d\n", this, cmd);
break;
case SPA_CONTROL_CMD_NODE_UPDATE: