diff options
author | Petr Kulhavy <brain@jikos.cz> | 2018-03-06 02:14:34 +0100 |
---|---|---|
committer | Nicolas Dufresne <nicolas.dufresne@collabora.com> | 2018-03-21 17:47:27 -0400 |
commit | 738eb0d8edd72eb5e76a5c1cbd9f2672c314840c (patch) | |
tree | 2982898d448959c803174be0e5ffbf6163bd80cd | |
parent | 589019d8f5072d4470ea2ed76cfffa75f45e9426 (diff) |
udpsrc: switch to using a buffer pool
This exposes a new property, mtu, which is used to determine the
initial size of buffers from the buffer pool. If received data
exceeds this, the element gracefully handles that in a manner similar
to what we had previously: a large memory gets filled and reallocated
at the next call to "fill".
The default size is set to 1500, which should cover most use cases.
With contributions from Mathieu Duponchelle <mathieu@centricular.com>
https://bugzilla.gnome.org/show_bug.cgi?id=772841
-rw-r--r-- | gst/udp/gstudpsrc.c | 284 | ||||
-rw-r--r-- | gst/udp/gstudpsrc.h | 14 |
2 files changed, 137 insertions, 161 deletions
diff --git a/gst/udp/gstudpsrc.c b/gst/udp/gstudpsrc.c index fdeca5dad..bf9de10f7 100644 --- a/gst/udp/gstudpsrc.c +++ b/gst/udp/gstudpsrc.c @@ -407,6 +407,43 @@ gst_ip_recvdstaddr_message_class_init (GstIPRecvdstaddrMessageClass * class) } #endif +static gboolean +gst_udpsrc_decide_allocation (GstBaseSrc * bsrc, GstQuery * query) +{ + GstUDPSrc *udpsrc; + GstBufferPool *pool; + gboolean update; + GstStructure *config; + GstCaps *caps = NULL; + + udpsrc = GST_UDPSRC (bsrc); + + if (gst_query_get_n_allocation_pools (query) > 0) { + update = TRUE; + } else { + update = FALSE; + } + + pool = gst_buffer_pool_new (); + + config = gst_buffer_pool_get_config (pool); + + gst_query_parse_allocation (query, &caps, NULL); + + gst_buffer_pool_config_set_params (config, caps, udpsrc->mtu, 0, 0); + + gst_buffer_pool_set_config (pool, config); + + if (update) + gst_query_set_nth_allocation_pool (query, 0, pool, udpsrc->mtu, 0, 0); + else + gst_query_add_allocation_pool (query, pool, udpsrc->mtu, 0, 0); + + gst_object_unref (pool); + + return TRUE; +} + /* not 100% correct, but a good upper bound for memory allocation purposes */ #define MAX_IPV4_UDP_PACKET_SIZE (65536 - 8) @@ -433,6 +470,7 @@ static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src", #define UDP_DEFAULT_REUSE TRUE #define UDP_DEFAULT_LOOP TRUE #define UDP_DEFAULT_RETRIEVE_SENDER_ADDRESS TRUE +#define UDP_DEFAULT_MTU (1492) enum { @@ -453,17 +491,17 @@ enum PROP_REUSE, PROP_ADDRESS, PROP_LOOP, - PROP_RETRIEVE_SENDER_ADDRESS + PROP_RETRIEVE_SENDER_ADDRESS, + PROP_MTU, }; static void gst_udpsrc_uri_handler_init (gpointer g_iface, gpointer iface_data); static GstCaps *gst_udpsrc_getcaps (GstBaseSrc * src, GstCaps * filter); -static GstFlowReturn gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf); static gboolean gst_udpsrc_close (GstUDPSrc * src); static gboolean gst_udpsrc_unlock (GstBaseSrc * bsrc); static gboolean gst_udpsrc_unlock_stop (GstBaseSrc * bsrc); -static gboolean gst_udpsrc_negotiate (GstBaseSrc * basesrc); +static GstFlowReturn gst_udpsrc_fill (GstPushSrc * psrc, GstBuffer * outbuf); static void gst_udpsrc_finalize (GObject * object); @@ -602,6 +640,24 @@ gst_udpsrc_class_init (GstUDPSrcClass * klass) "meta. Disabling this might result in minor performance improvements " "in certain scenarios", UDP_DEFAULT_RETRIEVE_SENDER_ADDRESS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + /** + * GstUDPSrc::mtu: + * + * Maximum expected packet size. This directly defines the allocation + * size of the receive buffer pool. + * + * In case more data is received, a new #GstMemory is appended to the + * output buffer, ensuring no data is lost, this however leads to that + * buffer being freed and reallocated. + * + * Since: 1.14 + */ + g_object_class_install_property (gobject_class, PROP_MTU, + g_param_spec_uint ("mtu", "Expected Maximum Transmission Unit", + "Maximum expected packet size. This directly defines the allocation" + "size of the receive buffer pool.", + 0, G_MAXINT, UDP_DEFAULT_MTU, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); gst_element_class_add_static_pad_template (gstelement_class, &src_template); @@ -616,9 +672,9 @@ gst_udpsrc_class_init (GstUDPSrcClass * klass) gstbasesrc_class->unlock = gst_udpsrc_unlock; gstbasesrc_class->unlock_stop = gst_udpsrc_unlock_stop; gstbasesrc_class->get_caps = gst_udpsrc_getcaps; - gstbasesrc_class->negotiate = gst_udpsrc_negotiate; + gstbasesrc_class->decide_allocation = gst_udpsrc_decide_allocation; - gstpushsrc_class->create = gst_udpsrc_create; + gstpushsrc_class->fill = gst_udpsrc_fill; } static void @@ -642,6 +698,7 @@ gst_udpsrc_init (GstUDPSrc * udpsrc) udpsrc->reuse = UDP_DEFAULT_REUSE; udpsrc->loop = UDP_DEFAULT_LOOP; udpsrc->retrieve_sender_address = UDP_DEFAULT_RETRIEVE_SENDER_ADDRESS; + udpsrc->mtu = UDP_DEFAULT_MTU; /* configure basesrc to be a live source */ gst_base_src_set_live (GST_BASE_SRC (udpsrc), TRUE); @@ -680,6 +737,10 @@ gst_udpsrc_finalize (GObject * object) g_object_unref (udpsrc->used_socket); udpsrc->used_socket = NULL; + if (udpsrc->extra_mem) + gst_memory_unref (udpsrc->extra_mem); + udpsrc->extra_mem = NULL; + G_OBJECT_CLASS (parent_class)->finalize (object); } @@ -710,112 +771,6 @@ gst_udpsrc_getcaps (GstBaseSrc * src, GstCaps * filter) } static void -gst_udpsrc_reset_memory_allocator (GstUDPSrc * src) -{ - if (src->mem != NULL) { - gst_memory_unmap (src->mem, &src->map); - gst_memory_unref (src->mem); - src->mem = NULL; - } - if (src->mem_max != NULL) { - gst_memory_unmap (src->mem_max, &src->map_max); - gst_memory_unref (src->mem_max); - src->mem_max = NULL; - } - - src->vec[0].buffer = NULL; - src->vec[0].size = 0; - src->vec[1].buffer = NULL; - src->vec[1].size = 0; - - if (src->allocator != NULL) { - gst_object_unref (src->allocator); - src->allocator = NULL; - } -} - -static gboolean -gst_udpsrc_negotiate (GstBaseSrc * basesrc) -{ - GstUDPSrc *src = GST_UDPSRC_CAST (basesrc); - gboolean ret; - - /* just chain up to the default implementation, we just want to - * retrieve the allocator at the end of it (if there is one) */ - ret = GST_BASE_SRC_CLASS (parent_class)->negotiate (basesrc); - - if (ret) { - GstAllocationParams new_params; - GstAllocator *new_allocator = NULL; - - /* retrieve new allocator */ - gst_base_src_get_allocator (basesrc, &new_allocator, &new_params); - - if (src->allocator != new_allocator || - memcmp (&src->params, &new_params, sizeof (GstAllocationParams)) != 0) { - /* drop old allocator and throw away any memory allocated with it */ - gst_udpsrc_reset_memory_allocator (src); - - /* and save the new allocator and/or new allocation parameters */ - src->allocator = new_allocator; - src->params = new_params; - - GST_INFO_OBJECT (src, "new allocator: %" GST_PTR_FORMAT, new_allocator); - } - } - - return ret; -} - -static gboolean -gst_udpsrc_alloc_mem (GstUDPSrc * src, GstMemory ** p_mem, GstMapInfo * map, - gsize size) -{ - GstMemory *mem; - - mem = gst_allocator_alloc (src->allocator, size, &src->params); - - if (!gst_memory_map (mem, map, GST_MAP_WRITE)) { - gst_memory_unref (mem); - memset (map, 0, sizeof (GstMapInfo)); - return FALSE; - } - *p_mem = mem; - return TRUE; -} - -static gboolean -gst_udpsrc_ensure_mem (GstUDPSrc * src) -{ - if (src->mem == NULL) { - gsize mem_size = 1500; /* typical max. MTU */ - - /* if packets are likely to be smaller, just use that size, otherwise - * default to assuming incoming packets are around MTU size */ - if (src->max_size > 0 && src->max_size < mem_size) - mem_size = src->max_size; - - if (!gst_udpsrc_alloc_mem (src, &src->mem, &src->map, mem_size)) - return FALSE; - - src->vec[0].buffer = src->map.data; - src->vec[0].size = src->map.size; - } - - if (src->mem_max == NULL) { - gsize max_size = MAX_IPV4_UDP_PACKET_SIZE; - - if (!gst_udpsrc_alloc_mem (src, &src->mem_max, &src->map_max, max_size)) - return FALSE; - - src->vec[1].buffer = src->map_max.data; - src->vec[1].size = src->map_max.size; - } - - return TRUE; -} - -static void gst_udpsrc_create_cancellable (GstUDPSrc * src) { GPollFD pollfd; @@ -836,10 +791,9 @@ gst_udpsrc_free_cancellable (GstUDPSrc * src) } static GstFlowReturn -gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf) +gst_udpsrc_fill (GstPushSrc * psrc, GstBuffer * outbuf) { GstUDPSrc *udpsrc; - GstBuffer *outbuf = NULL; GSocketAddress *saddr = NULL; GSocketAddress **p_saddr; gint flags = G_SOCKET_MSG_NONE; @@ -850,12 +804,12 @@ gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf) GSocketControlMessage **msgs = NULL; GSocketControlMessage ***p_msgs; gint n_msgs = 0, i; + GstMapInfo info; + GstMapInfo extra_info; + GInputVector ivec[2]; udpsrc = GST_UDPSRC_CAST (psrc); - if (!gst_udpsrc_ensure_mem (udpsrc)) - goto memory_alloc_error; - /* optimization: use messages only in multicast mode and * if we can't let the kernel do the filtering for us */ p_msgs = @@ -870,6 +824,38 @@ gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf) /* Retrieve sender address unless we've been configured not to do so */ p_saddr = (udpsrc->retrieve_sender_address) ? &saddr : NULL; + if (!gst_buffer_map (outbuf, &info, GST_MAP_READWRITE)) + goto buffer_map_error; + + ivec[0].buffer = info.data; + ivec[0].size = info.size; + + /* Prepare memory in case the data size exceeds mtu */ + if (udpsrc->extra_mem == NULL) { + GstBufferPool *pool; + GstStructure *config; + GstAllocator *allocator = NULL; + GstAllocationParams params; + + pool = gst_base_src_get_buffer_pool (GST_BASE_SRC_CAST (psrc)); + config = gst_buffer_pool_get_config (pool); + gst_buffer_pool_config_get_allocator (config, &allocator, ¶ms); + + udpsrc->extra_mem = + gst_allocator_alloc (allocator, MAX_IPV4_UDP_PACKET_SIZE, ¶ms); + + gst_object_unref (pool); + gst_structure_free (config); + if (allocator) + gst_object_unref (allocator); + } + + if (!gst_memory_map (udpsrc->extra_mem, &extra_info, GST_MAP_READWRITE)) + goto memory_map_error; + + ivec[1].buffer = extra_info.data; + ivec[1].size = extra_info.size; + retry: if (saddr != NULL) { g_object_unref (saddr); @@ -909,7 +895,7 @@ retry: } while (G_UNLIKELY (try_again)); res = - g_socket_receive_message (udpsrc->used_socket, p_saddr, udpsrc->vec, 2, + g_socket_receive_message (udpsrc->used_socket, p_saddr, ivec, 2, p_msgs, &n_msgs, &flags, udpsrc->cancellable, &err); if (G_UNLIKELY (res < 0)) { @@ -929,10 +915,6 @@ retry: goto receive_error; } - /* remember maximum packet size */ - if (res > udpsrc->max_size) - udpsrc->max_size = res; - /* Retry if multicast and the destination address is not ours. We don't want * to receive arbitrary packets */ if (p_msgs) { @@ -983,27 +965,17 @@ retry: } } - outbuf = gst_buffer_new (); + gst_buffer_unmap (outbuf, &info); + gst_memory_unmap (udpsrc->extra_mem, &extra_info); - /* append first memory chunk to buffer */ - gst_buffer_append_memory (outbuf, udpsrc->mem); - - /* if the packet didn't fit into the first chunk, add second one as well */ - if (res > udpsrc->map.size) { - gst_buffer_append_memory (outbuf, udpsrc->mem_max); - gst_memory_unmap (udpsrc->mem_max, &udpsrc->map_max); - udpsrc->vec[1].buffer = NULL; - udpsrc->vec[1].size = 0; - udpsrc->mem_max = NULL; + /* If this is the case, the buffer will be freed once unreffed, + * and the buffer pool will have to reallocate a new one. + */ + if (res > udpsrc->mtu) { + gst_buffer_append_memory (outbuf, udpsrc->extra_mem); + udpsrc->extra_mem = NULL; } - /* make sure we allocate a new chunk next time (we do this only here because - * we look at map.size to see if the second memory chunk is needed above) */ - gst_memory_unmap (udpsrc->mem, &udpsrc->map); - udpsrc->vec[0].buffer = NULL; - udpsrc->vec[0].size = 0; - udpsrc->mem = NULL; - offset = udpsrc->skip_first_bytes; if (G_UNLIKELY (offset > 0 && res < offset)) @@ -1020,19 +992,26 @@ retry: GST_LOG_OBJECT (udpsrc, "read packet of %d bytes", (int) res); - *buf = GST_BUFFER_CAST (outbuf); - return GST_FLOW_OK; /* ERRORS */ -memory_alloc_error: +buffer_map_error: + { + GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL), + ("Failed to map memory")); + return GST_FLOW_ERROR; + } +memory_map_error: { + gst_buffer_unmap (outbuf, &info); GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL), - ("Failed to allocate or map memory")); + ("Failed to map memory")); return GST_FLOW_ERROR; } select_error: { + gst_buffer_unmap (outbuf, &info); + gst_memory_unmap (udpsrc->extra_mem, &extra_info); GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL), ("select error: %s", err->message)); g_clear_error (&err); @@ -1040,12 +1019,16 @@ select_error: } stopped: { + gst_buffer_unmap (outbuf, &info); + gst_memory_unmap (udpsrc->extra_mem, &extra_info); GST_DEBUG ("stop called"); g_clear_error (&err); return GST_FLOW_FLUSHING; } receive_error: { + gst_buffer_unmap (outbuf, &info); + gst_memory_unmap (udpsrc->extra_mem, &extra_info); if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_BUSY) || g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { g_clear_error (&err); @@ -1059,8 +1042,6 @@ receive_error: } skip_error: { - gst_buffer_unref (outbuf); - GST_ELEMENT_ERROR (udpsrc, STREAM, DECODE, (NULL), ("UDP buffer to small to skip header")); return GST_FLOW_ERROR; @@ -1201,6 +1182,9 @@ gst_udpsrc_set_property (GObject * object, guint prop_id, const GValue * value, case PROP_RETRIEVE_SENDER_ADDRESS: udpsrc->retrieve_sender_address = g_value_get_boolean (value); break; + case PROP_MTU: + udpsrc->mtu = g_value_get_uint (value); + break; default: break; } @@ -1261,6 +1245,9 @@ gst_udpsrc_get_property (GObject * object, guint prop_id, GValue * value, case PROP_RETRIEVE_SENDER_ADDRESS: g_value_set_boolean (value, udpsrc->retrieve_sender_address); break; + case PROP_MTU: + g_value_set_uint (value, udpsrc->mtu); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -1535,11 +1522,6 @@ gst_udpsrc_open (GstUDPSrc * src) g_object_unref (addr); } - src->allocator = NULL; - gst_allocation_params_init (&src->params); - - src->max_size = 0; - return TRUE; /* ERRORS */ @@ -1666,8 +1648,6 @@ gst_udpsrc_close (GstUDPSrc * src) src->addr = NULL; } - gst_udpsrc_reset_memory_allocator (src); - gst_udpsrc_free_cancellable (src); return TRUE; diff --git a/gst/udp/gstudpsrc.h b/gst/udp/gstudpsrc.h index 6c512562e..456bddc38 100644 --- a/gst/udp/gstudpsrc.h +++ b/gst/udp/gstudpsrc.h @@ -74,15 +74,11 @@ struct _GstUDPSrc { gboolean external_socket; gboolean made_cancel_fd; - /* memory management */ - GstAllocator *allocator; - GstAllocationParams params; - - GstMemory *mem; - GstMapInfo map; - GstMemory *mem_max; - GstMapInfo map_max; - GInputVector vec[2]; + /* Initial size of buffers in the buffer pool */ + guint mtu; + + /* Extra memory for buffers with a size superior to max_packet_size */ + GstMemory *extra_mem; gchar *uri; }; |