diff options
author | Julian Bouzas <julian.bouzas@collabora.com> | 2020-07-07 13:06:11 -0400 |
---|---|---|
committer | Julian Bouzas <julian.bouzas@collabora.com> | 2020-07-13 12:05:01 -0400 |
commit | ba96eecba00a632c325deb315a3d2840b0fd6528 (patch) | |
tree | ae1af7209d1ffd92d5a1a2b05231d5b37d0cc72e /spa | |
parent | f743fff694d1ec42e693c41a7b2c6a6095dcf92d (diff) |
sco-source: clean up and support different buffer sizes
Diffstat (limited to 'spa')
-rw-r--r-- | spa/plugins/bluez5/sco-source.c | 231 |
1 files changed, 99 insertions, 132 deletions
diff --git a/spa/plugins/bluez5/sco-source.c b/spa/plugins/bluez5/sco-source.c index cb85a7f7..5d95c9db 100644 --- a/spa/plugins/bluez5/sco-source.c +++ b/spa/plugins/bluez5/sco-source.c @@ -55,7 +55,6 @@ struct props { uint32_t max_latency; }; -#define FILL_FRAMES 2 #define MAX_BUFFERS 32 struct buffer { @@ -83,7 +82,8 @@ struct port { struct spa_list free; struct spa_list ready; - size_t ready_offset; + struct buffer *current_buffer; + uint32_t ready_offset; }; struct impl { @@ -109,12 +109,11 @@ struct impl { struct port port; unsigned int started:1; - unsigned int following:1; struct spa_source source; struct spa_io_clock *clock; - struct spa_io_position *position; + struct spa_io_position *position; struct timespec now; uint32_t sample_count; @@ -123,10 +122,10 @@ struct impl { #define NAME "sco-source" -#define CHECK_PORT(this,d,p) ((d) == SPA_DIRECTION_OUTPUT && (p) == 0) +#define CHECK_PORT(this,d,p) ((d) == SPA_DIRECTION_OUTPUT && (p) == 0) -static const uint32_t default_min_latency = 64; -static const uint32_t default_max_latency = 256; +static const uint32_t default_min_latency = 128; +static const uint32_t default_max_latency = 512; static void reset_props(struct props *props) { @@ -148,10 +147,10 @@ static int impl_node_enum_params(void *object, int seq, spa_return_val_if_fail(this != NULL, -EINVAL); spa_return_val_if_fail(num != 0, -EINVAL); - result.id = id; + result.id = id; result.next = start; next: - result.index = result.next++; + result.index = result.next++; spa_pod_builder_init(&b, buffer, sizeof(buffer)); @@ -211,25 +210,9 @@ static int impl_node_enum_params(void *object, int seq, return 0; } -static int do_reassign_follower(struct spa_loop *loop, - bool async, - uint32_t seq, - const void *data, - size_t size, - void *user_data) -{ - return 0; -} - -static inline bool is_following(struct impl *this) -{ - return this->position && this->clock && this->position->clock.id != this->clock->id; -} - static int impl_node_set_io(void *object, uint32_t id, void *data, size_t size) { struct impl *this = object; - bool following; spa_return_val_if_fail(this != NULL, -EINVAL); @@ -244,12 +227,6 @@ static int impl_node_set_io(void *object, uint32_t id, void *data, size_t size) return -ENOENT; } - following = is_following(this); - if (this->started && following != this->following) { - spa_log_debug(this->log, "sco-source %p: reassign follower %d->%d", this, this->following, following); - this->following = following; - spa_loop_invoke(this->data_loop, do_reassign_follower, 0, NULL, 0, true, this); - } return 0; } @@ -296,36 +273,26 @@ static void reset_buffers(struct port *port) } } -static bool read_data(struct impl *this, uint8_t *data, uint32_t size, uint32_t *total_read) +static int read_data(struct impl *this, uint8_t *data, uint32_t data_size) { - const uint32_t mtu_size = this->read_mtu; - uint32_t local_total_read = 0; - - /* Read chunks of mtu_size */ - while (local_total_read <= (size - mtu_size)) { - const int bytes_read = read(this->sock_fd, data, mtu_size); - if (bytes_read < 0) { - /* Retry */ - if (errno == EINTR) - continue; - - /* Socked has no data */ - if (errno == EAGAIN || errno == EWOULDBLOCK) - goto done; - - /* Error */ - spa_log_error(this->log, "read error: %s", strerror(errno)); - return false; - } + int res = 0; + +again: + res = read(this->sock_fd, data, data_size); + if (res <= 0) { + /* retry if interrupted */ + if (errno == EINTR) + goto again; + + /* return socked has no data */ + if (errno == EAGAIN || errno == EWOULDBLOCK) + return res; - data += bytes_read; - local_total_read += bytes_read; + /* error */ + return -errno; } -done: - if (total_read) - *total_read = local_total_read; - return true; + return res; } static void recycle_buffer(struct impl *this, struct port *port, uint32_t buffer_id) @@ -344,70 +311,81 @@ static void sco_on_ready_read(struct spa_source *source) struct impl *this = source->data; struct port *port = &this->port; struct spa_io_buffers *io = port->io; - int32_t io_done_status = io->status; - struct buffer *buffer; - struct spa_data *buffer_data; - uint32_t total_read; + int size_read; + struct spa_data *datas; - spa_return_if_fail(io != NULL); - - /* Read a buffer if there is one free */ - if (!spa_list_is_empty(&port->free)) { - /* Get the free buffer and remove it from the free list */ - buffer = spa_list_first(&port->free, struct buffer, link); - spa_list_remove(&buffer->link); - - buffer_data = &buffer->buf->datas[0]; - spa_assert(buffer_data->data); + /* make sure the source has input data */ + if ((source->rmask & SPA_IO_IN) == 0) { + spa_log_error(this->log, "source has no input data, rmask=%d", source->rmask); + goto stop; + } + if (this->transport == NULL) { + spa_log_debug(this->log, "no transport, stop reading"); + goto stop; + } - /* Read sco data */ - if (!read_data(this, buffer_data->data, buffer_data->maxsize, &total_read)) { - if (this->source.loop) - spa_loop_remove_source(this->data_loop, &this->source); + /* get buffer */ + if (!port->current_buffer) { + if (spa_list_is_empty(&port->free)) { + spa_log_warn(this->log, "buffer not available"); return; } + port->current_buffer = spa_list_first(&port->free, struct buffer, link); + spa_list_remove(&port->current_buffer->link); + port->ready_offset = 0; + } + datas = port->current_buffer->buf->datas; - /* Append a ready buffer if data could be read */ - if (total_read > 0) { - /* Update the buffer offset, size and stride */ - buffer_data->chunk->offset = 0; - buffer_data->chunk->size = total_read; - buffer_data->chunk->stride = port->frame_size; - - /* Update the sample count */ - this->sample_count += buffer_data->chunk->size / port->frame_size; - - /* Add the buffer to the ready list */ - buffer->outstanding = true; - spa_list_append(&port->ready, &buffer->link); - } + /* read */ + size_read = read_data(this, (uint8_t *)datas[0].data + port->ready_offset, datas[0].maxsize); + if (size_read < 0) { + spa_log_error(this->log, "failed to read data"); + goto stop; + } + spa_log_debug(this->log, "read socket data %d", size_read); + + /* send buffer if full */ + port->ready_offset += size_read; + if ((this->read_mtu + port->ready_offset) > (this->props.max_latency * port->frame_size)) { + datas[0].chunk->offset = 0; + datas[0].chunk->size = port->ready_offset; + datas[0].chunk->stride = port->frame_size; + + this->sample_count += datas[0].chunk->size / port->frame_size; + spa_list_append(&port->ready, &port->current_buffer->link); + port->current_buffer = NULL; } - /* Process a buffer if there is one ready and IO does not have one */ - if (!spa_list_is_empty(&port->ready) && io->status != SPA_STATUS_HAVE_DATA) { - /* Get the ready buffer and remove it from the ready list */ - buffer = spa_list_first(&port->ready, struct buffer, link); - spa_list_remove(&buffer->link); + /* done if there are no buffers ready */ + if (spa_list_is_empty(&port->ready)) + return; - /* Mark the buffer to be processed */ - io->buffer_id = buffer->id; - io->status = SPA_STATUS_HAVE_DATA; + /* process the buffer if IO does not have any */ + if (io->status != SPA_STATUS_HAVE_DATA) { + struct buffer *b; + + if (io->buffer_id < port->n_buffers) + recycle_buffer(this, port, io->buffer_id); - /* Add the buffer to the free list */ - spa_list_append(&port->free, &buffer->link); - buffer->outstanding = false; + b = spa_list_first(&port->ready, struct buffer, link); + spa_list_remove(&b->link); + b->outstanding = true; - /* Set the done status as have buffer */ - io_done_status = SPA_STATUS_HAVE_DATA; + io->buffer_id = b->id; + io->status = SPA_STATUS_HAVE_DATA; } - /* Notify the current status */ - spa_node_call_ready(&this->callbacks, io_done_status); + /* notify ready */ + spa_node_call_ready(&this->callbacks, SPA_STATUS_HAVE_DATA); + return; + +stop: + if (this->source.loop) + spa_loop_remove_source(this->data_loop, &this->source); } static int do_start(struct impl *this) { - int val; bool do_accept; /* Dont do anything if the node has already started */ @@ -425,22 +403,6 @@ static int do_start(struct impl *this) if (this->sock_fd < 0) return -1; - /* Set the write MTU */ - val = FILL_FRAMES * this->transport->write_mtu; - if (setsockopt(this->sock_fd, SOL_SOCKET, SO_SNDBUF, &val, sizeof(val)) < 0) - spa_log_warn(this->log, "sco-source %p: SO_SNDBUF %m", this); - - /* Set the read MTU */ - this->read_mtu = this->transport->read_mtu; - val = FILL_FRAMES * this->read_mtu; - if (setsockopt(this->sock_fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof(val)) < 0) - spa_log_warn(this->log, "sco-source %p: SO_RCVBUF %m", this); - - /* Set the priority */ - val = 6; - if (setsockopt(this->sock_fd, SOL_SOCKET, SO_PRIORITY, &val, sizeof(val)) < 0) - spa_log_warn(this->log, "SO_PRIORITY failed: %m"); - /* Reset the buffers and sample count */ reset_buffers(&this->port); this->sample_count = 0; @@ -641,7 +603,7 @@ impl_node_port_enum_params(void *object, int seq, result.id = id; result.next = start; next: - result.index = result.next++; + result.index = result.next++; spa_pod_builder_init(&b, buffer, sizeof(buffer)); @@ -686,10 +648,12 @@ impl_node_port_enum_params(void *object, int seq, param = spa_pod_builder_add_object(&b, SPA_TYPE_OBJECT_ParamBuffers, id, - /* 8 buffers are enough to make sure we always have one available when decoding */ - SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int(8, 8, MAX_BUFFERS), + SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int(2, 1, MAX_BUFFERS), SPA_PARAM_BUFFERS_blocks, SPA_POD_Int(1), - SPA_PARAM_BUFFERS_size, SPA_POD_Int(this->props.max_latency * port->frame_size), + SPA_PARAM_BUFFERS_size, SPA_POD_CHOICE_RANGE_Int( + this->props.max_latency * port->frame_size, + this->props.min_latency * port->frame_size, + INT32_MAX), SPA_PARAM_BUFFERS_stride, SPA_POD_Int(port->frame_size), SPA_PARAM_BUFFERS_align, SPA_POD_Int(16)); break; @@ -932,22 +896,25 @@ static int impl_node_process(void *object) if (io->status == SPA_STATUS_HAVE_DATA) return SPA_STATUS_HAVE_DATA; - /* Return if there is not buffers ready to be processed */ + /* Recycle */ + if (io->buffer_id < port->n_buffers) { + recycle_buffer(this, port, io->buffer_id); + io->buffer_id = SPA_ID_INVALID; + } + + /* Return if there are no buffers ready to be processed */ if (spa_list_is_empty(&port->ready)) - return io->status; + return SPA_STATUS_OK; /* Get the new buffer from the ready list */ buffer = spa_list_first(&port->ready, struct buffer, link); spa_list_remove(&buffer->link); + buffer->outstanding = false; /* Set the new buffer in IO */ io->buffer_id = buffer->id; io->status = SPA_STATUS_HAVE_DATA; - /* Add the buffer to the free list */ - spa_list_append(&port->free, &buffer->link); - buffer->outstanding = false; - /* Notify we have a buffer ready to be processed */ return SPA_STATUS_HAVE_DATA; } @@ -980,7 +947,7 @@ static void transport_destroy(void *data) static const struct spa_bt_transport_events transport_events = { SPA_VERSION_BT_TRANSPORT_EVENTS, - .destroy = transport_destroy, + .destroy = transport_destroy, }; static int impl_get_interface(struct spa_handle *handle, const char *type, void **interface) |