diff options
author | Wim Taymans <wim.taymans@collabora.co.uk> | 2010-12-15 19:43:10 +0100 |
---|---|---|
committer | Wim Taymans <wim.taymans@collabora.co.uk> | 2010-12-20 12:50:40 +0100 |
commit | 28b15a60ebb713a42a214d70b55b7f4a7a7ea10b (patch) | |
tree | 1dc18bff56872e295ce3b1b2e4402159a7d53ddd | |
parent | 2286326203b6ac229828f46662f6498b1044d413 (diff) |
wj2depay: rwrite depayloader
Make depayloader more resilient to packet loss.
-rw-r--r-- | gst/rtp/gstrtpwj2depay.c | 895 | ||||
-rw-r--r-- | gst/rtp/gstrtpwj2depay.h | 28 |
2 files changed, 560 insertions, 363 deletions
diff --git a/gst/rtp/gstrtpwj2depay.c b/gst/rtp/gstrtpwj2depay.c index 9d0ea73ce..96d609c9f 100644 --- a/gst/rtp/gstrtpwj2depay.c +++ b/gst/rtp/gstrtpwj2depay.c @@ -1,5 +1,5 @@ /* GStreamer - * Copyright (C) <2010> Wim Taymans <wim.taymans@gmail.com> + * Copyright (C) <2009> Wim Taymans <wim.taymans@gmail.com> * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public @@ -46,11 +46,37 @@ GST_STATIC_PAD_TEMPLATE ("sink", "clock-rate = (int) 90000, " "encoding-name = (string) \"X-WJ2\"") ); +#define HEADER_SIZE 10 + +typedef enum +{ + WJ2_MARKER = 0xFF, + WJ2_MARKER_SOC = 0x4F, + WJ2_MARKER_SOT = 0x90, + WJ2_MARKER_SOP = 0x91, + WJ2_MARKER_SOD = 0x93, + WJ2_MARKER_EOC = 0xD9 +} RtpWJ2Marker; + +#define DEFAULT_BUFFER_LIST TRUE + +enum +{ + PROP_0, + PROP_BUFFER_LIST, + PROP_LAST +}; + GST_BOILERPLATE (GstRtpWJ2Depay, gst_rtp_wj2_depay, GstBaseRTPDepayload, GST_TYPE_BASE_RTP_DEPAYLOAD); static void gst_rtp_wj2_depay_finalize (GObject * object); +static void gst_rtp_wj2_depay_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec); +static void gst_rtp_wj2_depay_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec); + static GstStateChangeReturn gst_rtp_wj2_depay_change_state (GstElement * element, GstStateChange transition); @@ -89,6 +115,14 @@ gst_rtp_wj2_depay_class_init (GstRtpWJ2DepayClass * klass) gobject_class->finalize = gst_rtp_wj2_depay_finalize; + gobject_class->set_property = gst_rtp_wj2_depay_set_property; + gobject_class->get_property = gst_rtp_wj2_depay_get_property; + + g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BUFFER_LIST, + g_param_spec_boolean ("buffer-list", "Buffer List", + "Use Buffer Lists", + DEFAULT_BUFFER_LIST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + gstelement_class->change_state = gst_rtp_wj2_depay_change_state; gstbasertpdepayload_class->set_caps = gst_rtp_wj2_depay_setcaps; @@ -99,59 +133,28 @@ gst_rtp_wj2_depay_class_init (GstRtpWJ2DepayClass * klass) } static void -gst_rtp_wj2_store_wheader (GstRtpWJ2Depay * rtpwj2depay, guint idx, - GstBuffer * buf) -{ - GstBuffer *old; - - GST_DEBUG_OBJECT (rtpwj2depay, "storing WJ2 header %p at index %u", buf, idx); - if ((old = rtpwj2depay->WH[idx])) - gst_buffer_unref (old); - rtpwj2depay->WH[idx] = buf; - /* after we completed the header, we can accept packets with any WH_id */ - rtpwj2depay->WH_id = -1; -} - -static void -gst_rtp_wj2_store_mheader (GstRtpWJ2Depay * rtpwj2depay, guint idx, - GstBuffer * buf) +free_buffer (gpointer data) { - GstBuffer *old; - - GST_DEBUG_OBJECT (rtpwj2depay, "storing main header %p at index %u", buf, - idx); - if ((old = rtpwj2depay->MH[idx])) - gst_buffer_unref (old); - rtpwj2depay->MH[idx] = buf; + if (data) + gst_buffer_unref (GST_BUFFER_CAST (data)); } static void -gst_rtp_wj2_clear_caches (GstRtpWJ2Depay * rtpwj2depay) +gst_rtp_wj2_depay_init (GstRtpWJ2Depay * rtpwj2depay, + GstRtpWJ2DepayClass * klass) { - guint i; + rtpwj2depay->buffer_list = DEFAULT_BUFFER_LIST; - for (i = 0; i < 16; i++) - gst_rtp_wj2_store_wheader (rtpwj2depay, i, NULL); + rtpwj2depay->tiles = g_ptr_array_new_with_free_func (free_buffer); - for (i = 0; i < 8; i++) - gst_rtp_wj2_store_mheader (rtpwj2depay, i, NULL); + rtpwj2depay->pu_adapter = gst_adapter_new (); + rtpwj2depay->t_adapter = gst_adapter_new (); + rtpwj2depay->f_adapter = gst_adapter_new (); } -static void -gst_rtp_wj2_clear_adapter (GstRtpWJ2Depay * rtpwj2depay, gboolean reset_offset) -{ - GST_LOG_OBJECT (rtpwj2depay, "clearing adapter"); - gst_adapter_clear (rtpwj2depay->adapter); - rtpwj2depay->WH_id = -1; - rtpwj2depay->MH_id = -1; - if (reset_offset) - rtpwj2depay->offset = 0; - rtpwj2depay->state = GST_RTP_WJ2_STATE_NONE; -} static void -gst_rtp_wj2_store_tile (GstRtpWJ2Depay * rtpwj2depay, guint idx, - GstBuffer * buf) +store_tile (GstRtpWJ2Depay * rtpwj2depay, guint idx, GstBuffer * buf) { if (idx < rtpwj2depay->tiles->len) { GstBuffer *old; @@ -160,6 +163,7 @@ gst_rtp_wj2_store_tile (GstRtpWJ2Depay * rtpwj2depay, guint idx, if ((old = g_ptr_array_index (rtpwj2depay->tiles, idx))) gst_buffer_unref (old); + g_ptr_array_index (rtpwj2depay->tiles, idx) = buf; } else { GST_DEBUG_OBJECT (rtpwj2depay, "invalid tile index %u", idx); @@ -169,28 +173,66 @@ gst_rtp_wj2_store_tile (GstRtpWJ2Depay * rtpwj2depay, guint idx, } static void -gst_rtp_wj2_clear_tiles (GstRtpWJ2Depay * rtpwj2depay) +clear_tiles (GstRtpWJ2Depay * rtpwj2depay) { guint i, len; len = rtpwj2depay->tiles->len; for (i = 0; i < len; i++) - gst_rtp_wj2_store_tile (rtpwj2depay, i, NULL); + store_tile (rtpwj2depay, i, NULL); } static void -free_buffer (gpointer data) +store_wheader (GstRtpWJ2Depay * rtpwj2depay, guint idx, GstBuffer * buf) { - if (data) - gst_buffer_unref (GST_BUFFER_CAST (data)); + GstBuffer *old; + + GST_DEBUG_OBJECT (rtpwj2depay, "storing WJ2 header %p at index %u", buf, idx); + if ((old = rtpwj2depay->WH[idx])) + gst_buffer_unref (old); + rtpwj2depay->WH[idx] = buf; } static void -gst_rtp_wj2_depay_init (GstRtpWJ2Depay * rtpwj2depay, - GstRtpWJ2DepayClass * klass) +clear_wheaders (GstRtpWJ2Depay * rtpwj2depay) { - rtpwj2depay->adapter = gst_adapter_new (); - rtpwj2depay->tiles = g_ptr_array_new_with_free_func (free_buffer); + guint i; + + for (i = 0; i < 16; i++) + store_wheader (rtpwj2depay, i, NULL); +} + +static void +store_mheader (GstRtpWJ2Depay * rtpwj2depay, guint idx, GstBuffer * buf) +{ + GstBuffer *old; + + GST_DEBUG_OBJECT (rtpwj2depay, "storing main header %p at index %u", buf, + idx); + if ((old = rtpwj2depay->MH[idx])) + gst_buffer_unref (old); + rtpwj2depay->MH[idx] = buf; +} + +static void +clear_mheaders (GstRtpWJ2Depay * rtpwj2depay) +{ + guint i; + + for (i = 0; i < 8; i++) + store_mheader (rtpwj2depay, i, NULL); +} + +static void +gst_rtp_wj2_depay_reset (GstRtpWJ2Depay * rtpwj2depay) +{ + clear_tiles (rtpwj2depay); + clear_wheaders (rtpwj2depay); + clear_mheaders (rtpwj2depay); + gst_adapter_clear (rtpwj2depay->pu_adapter); + gst_adapter_clear (rtpwj2depay->t_adapter); + gst_adapter_clear (rtpwj2depay->f_adapter); + rtpwj2depay->next_frag = 0; } static void @@ -200,11 +242,11 @@ gst_rtp_wj2_depay_finalize (GObject * object) rtpwj2depay = GST_RTP_WJ2_DEPAY (object); - gst_rtp_wj2_clear_caches (rtpwj2depay); - gst_rtp_wj2_clear_tiles (rtpwj2depay); + clear_mheaders (rtpwj2depay); - g_object_unref (rtpwj2depay->adapter); - g_ptr_array_free (rtpwj2depay->tiles, TRUE); + g_object_unref (rtpwj2depay->pu_adapter); + g_object_unref (rtpwj2depay->t_adapter); + g_object_unref (rtpwj2depay->f_adapter); G_OBJECT_CLASS (parent_class)->finalize (object); } @@ -230,11 +272,230 @@ gst_rtp_wj2_depay_setcaps (GstBaseRTPDepayload * depayload, GstCaps * caps) return res; } +static void +gst_rtp_wj2_depay_clear_pu (GstRtpWJ2Depay * rtpwj2depay) +{ + gst_adapter_clear (rtpwj2depay->pu_adapter); + rtpwj2depay->have_sync = FALSE; +} + +/* calculate how many tiles there are in the wj2 header */ +static void +gst_rtp_wj2_ensure_ntiles (GstRtpWJ2Depay * rtpwj2depay, GstBuffer * buf) +{ + guint size, tiles; + + size = GST_BUFFER_SIZE (buf); + if (size < 32) + return; + + tiles = (size - 32) / 4; + GST_DEBUG_OBJECT (rtpwj2depay, "ensure %u tiles from size %u", tiles, size); + g_ptr_array_set_size (rtpwj2depay->tiles, tiles); +} + +static GstFlowReturn +gst_rtp_wj2_depay_flush_pu (GstBaseRTPDepayload * depayload) +{ + GstRtpWJ2Depay *rtpwj2depay; + GstBuffer *header; + guint avail, MHF, MH_id, tp, tci; + + rtpwj2depay = GST_RTP_WJ2_DEPAY (depayload); + + /* take all available buffers */ + avail = gst_adapter_available (rtpwj2depay->pu_adapter); + if (avail == 0) + goto done; + + tp = rtpwj2depay->pu_tp; + tci = rtpwj2depay->pu_tci; + MHF = rtpwj2depay->pu_MHF; + MH_id = rtpwj2depay->last_MH_id; + + GST_DEBUG_OBJECT (rtpwj2depay, "flushing PU of size %u", avail); + + if (tp == 3) { + /* we have a WJ2 header */ + GST_DEBUG_OBJECT (rtpwj2depay, "keeping WJ2 header %u", MH_id); + header = gst_adapter_take_buffer (rtpwj2depay->pu_adapter, avail); + + gst_rtp_wj2_ensure_ntiles (rtpwj2depay, header); + store_wheader (rtpwj2depay, MH_id, header); + } else if (MHF == 0) { + GList *packets, *walk; + + packets = gst_adapter_take_list (rtpwj2depay->pu_adapter, avail); + /* append packets */ + for (walk = packets; walk; walk = g_list_next (walk)) { + GstBuffer *buf = GST_BUFFER_CAST (walk->data); + GST_DEBUG_OBJECT (rtpwj2depay, "append pu packet of size %u", + GST_BUFFER_SIZE (buf)); + gst_adapter_push (rtpwj2depay->t_adapter, buf); + } + g_list_free (packets); + } else { + /* we have a header */ + GST_DEBUG_OBJECT (rtpwj2depay, "keeping header %u", MH_id); + /* we managed to see the start and end of the header, take all from + * adapter and keep in header */ + header = gst_adapter_take_buffer (rtpwj2depay->pu_adapter, avail); + + store_mheader (rtpwj2depay, MH_id, header); + } + +done: + rtpwj2depay->have_sync = FALSE; + + return GST_FLOW_OK; +} + +static GstFlowReturn +gst_rtp_wj2_depay_flush_tile (GstBaseRTPDepayload * depayload) +{ + GstRtpWJ2Depay *rtpwj2depay; + guint avail, MH_id; + GList *packets, *walk; + guint8 end[2]; + GstFlowReturn ret = GST_FLOW_OK; + + rtpwj2depay = GST_RTP_WJ2_DEPAY (depayload); + + /* flush pending PU */ + gst_rtp_wj2_depay_flush_pu (depayload); + + /* take all available buffers */ + avail = gst_adapter_available (rtpwj2depay->t_adapter); + if (avail == 0) + goto done; + + MH_id = rtpwj2depay->last_MH_id; + + GST_DEBUG_OBJECT (rtpwj2depay, "flushing tile of size %u", avail); + + if (gst_adapter_available (rtpwj2depay->f_adapter) == 0) { + GstBuffer *mheader; + + /* we need a header now */ + if ((mheader = rtpwj2depay->MH[MH_id]) == NULL) + goto waiting_header; + + rtpwj2depay->frame_tci = rtpwj2depay->pu_tci; + + /* push header in the adapter */ + GST_DEBUG_OBJECT (rtpwj2depay, "pushing header %u", MH_id); + gst_adapter_push (rtpwj2depay->f_adapter, gst_buffer_ref (mheader)); + } + + /* check for last bytes */ + gst_adapter_copy (rtpwj2depay->t_adapter, end, avail - 2, 2); + + /* now append the tile packets to the frame */ + packets = gst_adapter_take_list (rtpwj2depay->t_adapter, avail); + for (walk = packets; walk; walk = g_list_next (walk)) { + GstBuffer *buf = GST_BUFFER_CAST (walk->data); + + if (walk == packets) { + guint8 *data; + guint size; + + /* first buffer should contain the SOT */ + data = GST_BUFFER_DATA (buf); + size = GST_BUFFER_SIZE (buf); + + if (size < 12) + goto invalid_tile; + + if (data[0] == 0xff && data[1] == WJ2_MARKER_SOT) { + guint Psot, nPsot; + + if (end[0] == 0xff && end[1] == WJ2_MARKER_EOC) + nPsot = avail - 2; + else + nPsot = avail; + + Psot = GST_READ_UINT32_BE (&data[6]); + if (Psot != nPsot && Psot != 0) { + /* Psot must match the size of the tile */ + GST_DEBUG_OBJECT (rtpwj2depay, "set Psot from %u to %u", Psot, nPsot); + buf = gst_buffer_make_writable (buf); + data = GST_BUFFER_DATA (buf); + GST_WRITE_UINT32_BE (&data[6], nPsot); + } + } + } + + GST_DEBUG_OBJECT (rtpwj2depay, "append pu packet of size %u", + GST_BUFFER_SIZE (buf)); + gst_adapter_push (rtpwj2depay->f_adapter, buf); + } + g_list_free (packets); + +done: + rtpwj2depay->last_tile = -1; + + return ret; + + /* errors */ +waiting_header: + { + GST_DEBUG_OBJECT (rtpwj2depay, "waiting for header %u", MH_id); + gst_adapter_clear (rtpwj2depay->t_adapter); + rtpwj2depay->last_tile = -1; + return ret; + } +invalid_tile: + { + GST_ELEMENT_WARNING (rtpwj2depay, STREAM, DECODE, ("Invalid tile"), (NULL)); + gst_adapter_clear (rtpwj2depay->t_adapter); + rtpwj2depay->last_tile = -1; + return ret; + } +} + +static GstFlowReturn +gst_rtp_wj2_depay_flush_frame (GstBaseRTPDepayload * depayload) +{ + GstRtpWJ2Depay *rtpwj2depay; + guint avail; + GstBuffer *outbuf; + + GstFlowReturn ret = GST_FLOW_OK; + + rtpwj2depay = GST_RTP_WJ2_DEPAY (depayload); + + /* flush pending tile */ + gst_rtp_wj2_depay_flush_tile (depayload); + + /* last buffer take all data out of the adapter */ + avail = gst_adapter_available (rtpwj2depay->f_adapter); + if (avail == 0) + goto done; + + outbuf = gst_adapter_take_buffer (rtpwj2depay->f_adapter, avail); + store_tile (rtpwj2depay, rtpwj2depay->frame_tci, outbuf); + + /* we accept any tci now */ + rtpwj2depay->frame_tci = -1; + + /* reset state */ + rtpwj2depay->next_frag = 0; + rtpwj2depay->have_sync = FALSE; + +done: + /* we can't keep headers with MH_id of 0 */ + store_mheader (rtpwj2depay, 0, NULL); + + return ret; +} + /* parse and store the WJ2 header packet */ -static GstBuffer * -gst_rtp_wj2_flush_frame (GstRtpWJ2Depay * rtpwj2depay) +static GstFlowReturn +gst_rtp_wj2_depay_flush_wj2 (GstBaseRTPDepayload * depayload) { - GstBuffer *wheader, *buf, *out = NULL; + GstRtpWJ2Depay *rtpwj2depay; + GstFlowReturn ret = GST_FLOW_OK; + GstBuffer *wheader, *buf; gint WH_id, MH_id, size, wsize; guint Xsiz, Ysiz, XOsiz, YOsiz, XTsiz, YTsiz, XTOsiz, YTOsiz; guint TC_count, T_count, C_count; @@ -243,8 +504,13 @@ gst_rtp_wj2_flush_frame (GstRtpWJ2Depay * rtpwj2depay) GstBufferList *blist; GstBufferListIterator *it; - WH_id = rtpwj2depay->WH_id; - MH_id = rtpwj2depay->MH_id; + rtpwj2depay = GST_RTP_WJ2_DEPAY (depayload); + + /* flush pending tile */ + gst_rtp_wj2_depay_flush_frame (depayload); + + WH_id = rtpwj2depay->last_WH_id; + MH_id = rtpwj2depay->last_MH_id; if (WH_id == -1 || MH_id == -1) goto done; @@ -272,6 +538,15 @@ gst_rtp_wj2_flush_frame (GstRtpWJ2Depay * rtpwj2depay) T_count = (Xsiz / XTsiz) * (Ysiz / YTsiz); C_count = TC_count / T_count; + if (rtpwj2depay->tiles->len < TC_count) + goto missing_buffer; + + /* check if we have all tiles */ + for (i = 0; i < TC_count; i++) { + if (g_ptr_array_index (rtpwj2depay->tiles, i) == NULL) + goto missing_buffer; + } + GST_DEBUG_OBJECT (rtpwj2depay, "TC %u, T %u, C %u", TC_count, T_count, C_count); @@ -280,7 +555,6 @@ gst_rtp_wj2_flush_frame (GstRtpWJ2Depay * rtpwj2depay) buf = gst_buffer_new_and_alloc (wsize); wdata = GST_BUFFER_DATA (buf); offset = wsize; - GST_DEBUG_OBJECT (rtpwj2depay, "Header size %u", wsize); /* fill partial header */ @@ -309,10 +583,9 @@ gst_rtp_wj2_flush_frame (GstRtpWJ2Depay * rtpwj2depay) wdata[74 + (9 * i)] = data[32 + (4 * i)] & 0xf; /* fill in offset */ GST_WRITE_UINT32_LE (wdata + 75 + (9 * i), offset); + /* take the data buffer and its length */ J_buf = g_ptr_array_index (rtpwj2depay->tiles, i); - if (J_buf == NULL) - goto missing_buffer; length = GST_BUFFER_SIZE (J_buf); /* align length and add ADV header length */ @@ -355,6 +628,8 @@ gst_rtp_wj2_flush_frame (GstRtpWJ2Depay * rtpwj2depay) /* take the data buffer and its length */ J_buf = g_ptr_array_index (rtpwj2depay->tiles, i); + g_ptr_array_index (rtpwj2depay->tiles, i) = NULL; + wsize = GST_BUFFER_SIZE (J_buf); /* align length and add ADV header length */ length = GST_ROUND_UP_32 (wsize); @@ -372,16 +647,20 @@ gst_rtp_wj2_flush_frame (GstRtpWJ2Depay * rtpwj2depay) } gst_buffer_list_iterator_free (it); - it = gst_buffer_list_iterate (blist); - gst_buffer_list_iterator_next_group (it); - out = gst_buffer_list_iterator_merge_group (it); - gst_buffer_list_iterator_free (it); + ret = gst_base_rtp_depayload_push_list (depayload, blist); + + /* we accept any WH_id and MH_id now */ + rtpwj2depay->last_WH_id = -1; + rtpwj2depay->last_MH_id = -1; + + /* reset state */ + rtpwj2depay->next_frag = 0; + rtpwj2depay->have_sync = FALSE; done: - gst_rtp_wj2_clear_tiles (rtpwj2depay); - gst_rtp_wj2_clear_adapter (rtpwj2depay, TRUE); + clear_tiles (rtpwj2depay); - return out; + return ret; /* ERRORS */ missing_buffer: @@ -391,305 +670,186 @@ missing_buffer: } } -static void -gst_rtp_wj2_new_state (GstRtpWJ2Depay * rtpwj2depay, GstRtpWJ2State state) -{ - GstRtpWJ2State old = rtpwj2depay->state; - - GST_DEBUG_OBJECT (rtpwj2depay, "state %d -> %d", old, state); - - if (old == GST_RTP_WJ2_STATE_NONE || old == state) { - /* we're at the desired state, all fine */ - } else if (state == GST_RTP_WJ2_STATE_WHEADER) { - if (old == GST_RTP_WJ2_STATE_MHEADER) { - /* we found a wj2 header and we were doing Main headers, - * clear main headers */ - gst_rtp_wj2_clear_adapter (rtpwj2depay, TRUE); - } else if (old == GST_RTP_WJ2_STATE_MDATA) { - /* we found a wj2 header and we were doing J2K data, - * flush the accumulated data */ - } - } else if (state == GST_RTP_WJ2_STATE_MHEADER) { - if (old == GST_RTP_WJ2_STATE_WHEADER) { - /* we found a main header and we were doing wj2 headers, - * clear wj2 headers */ - gst_rtp_wj2_clear_adapter (rtpwj2depay, TRUE); - } else if (old == GST_RTP_WJ2_STATE_MDATA) { - /* we found a main header and we were doing j2k data, - * flush data */ - rtpwj2depay->offset = 0; - } - } else if (state == GST_RTP_WJ2_STATE_MDATA) { - if (old == GST_RTP_WJ2_STATE_WHEADER) { - /* we found j2k data and we were doing w2j headers, - * flush headers */ - } else if (old == GST_RTP_WJ2_STATE_MHEADER) { - /* we found j2k data and we were doing main headers, - * flush headers */ - } - } - rtpwj2depay->state = state;; -} -/* parse and store the WJ2 header packet */ -static void -gst_rtp_wj2_handle_w_header (GstRtpWJ2Depay * rtpwj2depay, GstBuffer * buf, - guint8 * payload) +static GstBuffer * +gst_rtp_wj2_depay_process (GstBaseRTPDepayload * depayload, GstBuffer * buf) { - GstBuffer *WH_buf; - guint offset, WHF, avail, WH_id; + GstRtpWJ2Depay *rtpwj2depay; + guint8 *payload; + guint header_size, frag_offset, tile, payload_len, wj2len; + gint gap; + guint32 rtptime; + guint tp, MHF, L, MH_id, WH_id, tci; + + rtpwj2depay = GST_RTP_WJ2_DEPAY (depayload); + + payload = gst_rtp_buffer_get_payload (buf); + payload_len = gst_rtp_buffer_get_payload_len (buf); + + /* we need at least a header */ + if (payload_len < 8) + goto empty_packet; + + rtptime = gst_rtp_buffer_get_timestamp (buf); - /* first update our state */ - gst_rtp_wj2_new_state (rtpwj2depay, GST_RTP_WJ2_STATE_WHEADER); + /* new timestamp marks new frame */ + if (rtpwj2depay->last_rtptime != rtptime) { + rtpwj2depay->last_rtptime = rtptime; + /* flush pending WJ2 frame */ + gst_rtp_wj2_depay_flush_wj2 (depayload); + } /* 0 1 2 3 * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + * |tp |MHF|MH_id|T| priority | tile number | + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + * | MBZ |L| WH_id | fragment offset | + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + * | Tile Component index | + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + * + * With tp = 3: + * + * 0 1 2 3 + * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ * |1 1|WHF| WH_id | MBZ | fragment offset | * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ * * WHF = 1 piece, 2 end, 3 complete */ - WHF = (payload[0] & 0x30) >> 4; - WH_id = (payload[0] & 0xf); - offset = (payload[2] << 8) | payload[3]; - - GST_DEBUG_OBJECT (rtpwj2depay, "WHF %u, WH_id %u, offset %u, expect %u", - WHF, WH_id, offset, rtpwj2depay->offset); - - /* we have a valid WH_id if we didn't have a previous one of it it matched the - * previous one */ - if (rtpwj2depay->WH_id == -1) - rtpwj2depay->WH_id = WH_id; - else if (rtpwj2depay->WH_id != WH_id) - goto unexpected_header; - - /* valid offset when it matches our expected offset, we can't really generate - * dummy data for this header*/ - if (rtpwj2depay->offset != offset) - goto unexpected_header; - - /* take WJ2 header data, push in the adapter */ - WH_buf = gst_rtp_buffer_get_payload_subbuffer (buf, 4, -1); - gst_adapter_push (rtpwj2depay->adapter, WH_buf); - - /* adjust the expected next offset */ - rtpwj2depay->offset += GST_BUFFER_SIZE (WH_buf); + tp = payload[0] >> 6; + MHF = (payload[0] & 0x30) >> 4; - if ((WHF & 2) == 2) { - /* we have a complete WJ2 header */ - avail = gst_adapter_available (rtpwj2depay->adapter); - WH_buf = gst_adapter_take_buffer (rtpwj2depay->adapter, avail); + if (tp == 3) { + /* WJ2 header */ + MH_id = (payload[0] & 0xf); + frag_offset = (payload[2] << 8) | payload[3]; + tile = 0; + header_size = 4; + tci = 0; + } else { + MH_id = (payload[0] & 0xe) >> 1; + /* else jpeg 2000 packet */ + tile = (payload[2] << 8) | payload[3]; + L = (payload[4] & 0x10) >> 4; + WH_id = payload[4] & 0xf; + frag_offset = (payload[5] << 16) | (payload[6] << 8) | payload[7]; + tci = (payload[8] << 8) | payload[9]; - GST_LOG_OBJECT (rtpwj2depay, "complete header %u of size %u", WH_id, avail); + if (rtpwj2depay->last_WH_id == -1) + rtpwj2depay->last_WH_id = WH_id; + else if (rtpwj2depay->last_WH_id != WH_id) + goto wrong_WH_id; - /* store the new header */ - gst_rtp_wj2_store_wheader (rtpwj2depay, WH_id, WH_buf); + header_size = HEADER_SIZE; } - return; - - /* ERRORS */ -unexpected_header: - { - /* doesn't match, we need to discard data */ - gst_rtp_wj2_clear_adapter (rtpwj2depay, TRUE); - return; - } -} - -/* calculate how many tiles there are in the wj2 header */ -static void -gst_rtp_wj2_ensure_ntiles (GstRtpWJ2Depay * rtpwj2depay, GstBuffer * buf) -{ - guint size, tiles; - if (buf == NULL) - return; - - size = GST_BUFFER_SIZE (buf); - if (size < 32) - return; + wj2len = payload_len - header_size; - tiles = (size - 32) / 4; - GST_DEBUG_OBJECT (rtpwj2depay, "ensure %u tiles from size %u", tiles, size); - g_ptr_array_set_size (rtpwj2depay->tiles, tiles); -} + if (rtpwj2depay->last_MH_id == -1) + rtpwj2depay->last_MH_id = MH_id; + else if (rtpwj2depay->last_MH_id != MH_id) + goto wrong_MH_id; -static GstBuffer * -gst_rtp_wj2_depay_process (GstBaseRTPDepayload * depayload, GstBuffer * buf) -{ - GstRtpWJ2Depay *rtpwj2depay; - guint8 *payload; - guint tp; - GstBuffer *out = NULL; + GST_DEBUG_OBJECT (rtpwj2depay, + "tp %u, tci %u, MHF %u, MH_id %u, tile %u, frag %u, expected %u", tp, tci, + MHF, MH_id, tile, frag_offset, rtpwj2depay->next_frag); - rtpwj2depay = GST_RTP_WJ2_DEPAY (depayload); + /* calculate the gap between expected frag */ + gap = frag_offset - rtpwj2depay->next_frag; + /* calculate next frag */ + rtpwj2depay->next_frag = frag_offset + wj2len; - /* flush everything on discont for now */ - if (GST_BUFFER_IS_DISCONT (buf)) { - if (rtpwj2depay->state != GST_RTP_WJ2_STATE_MDATA) { - GST_DEBUG_OBJECT (rtpwj2depay, "DISCONT, flushing data"); - gst_rtp_wj2_clear_adapter (rtpwj2depay, TRUE); - } else { - GST_DEBUG_OBJECT (rtpwj2depay, - "DISCONT, not flushing data, we in data mode"); - } + if (gap != 0) { + GST_DEBUG_OBJECT (rtpwj2depay, "discont of %d, clear PU", gap); + /* discont, clear pu adapter and resync */ + gst_rtp_wj2_depay_clear_pu (rtpwj2depay); } - if (gst_rtp_buffer_get_payload_len (buf) < 8) - goto empty_packet; - - payload = gst_rtp_buffer_get_payload (buf); - - /* look at the header type */ - tp = payload[0] >> 6; - - GST_LOG_OBJECT (rtpwj2depay, "GOT tp %d", tp); - + /* check for sync code */ if (tp == 3) { - /* handle a WJ2 header packet */ - gst_rtp_wj2_handle_w_header (rtpwj2depay, buf, payload); - } else { - guint MHF, L, offset, MH_id, WH_id, tci; - GstBuffer *J_buf; - /* 0 1 2 3 - * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 - * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - * |tp |MHF|MH_id|T| priority | tile number | - * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - * | MBZ |L| WH_id | fragment offset | - * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - * | Tile Component index | - * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - */ - MHF = (payload[0] & 0x30) >> 4; - MH_id = (payload[0] & 0xe) >> 1; - L = (payload[4] & 0x10) >> 4; - WH_id = payload[4] & 0xf; - offset = (payload[5] << 16) | (payload[6] << 8) | payload[7]; - tci = (payload[8] << 8) | payload[9]; + GST_DEBUG_OBJECT (rtpwj2depay, "found WJ2 header packet"); - GST_DEBUG_OBJECT (rtpwj2depay, - "MHF %u, L %u, MH %u, WH %u, offset %u, TCI %u", MHF, L, MH_id, WH_id, - offset, tci); - - GST_DEBUG_OBJECT (rtpwj2depay, "current WH_id %d, MH_id %d", - rtpwj2depay->WH_id, rtpwj2depay->MH_id); - - if (MHF != 0) { - gst_rtp_wj2_new_state (rtpwj2depay, GST_RTP_WJ2_STATE_MHEADER); - - /* we have a Main JPEG 2000 header */ - GST_DEBUG_OBJECT (rtpwj2depay, - "we have a main header, expected offset %u", rtpwj2depay->offset); - /* valid offset when it matches our expected offset, we can't really generate - * dummy data for this header*/ - if (rtpwj2depay->offset != offset) - goto invalid_m_header; - } else { - GstBuffer *mheader = NULL; - - gst_rtp_wj2_new_state (rtpwj2depay, GST_RTP_WJ2_STATE_MDATA); - - GST_DEBUG_OBJECT (rtpwj2depay, - "we have JPEG2000 data, expected offset %u", rtpwj2depay->offset); - - /* check if we have the WJ2 header */ - if (rtpwj2depay->WH[WH_id] == NULL) - goto waiting_header; - - if (MH_id != 0) { - /* check if we have the main header */ - if ((mheader = rtpwj2depay->MH[MH_id]) == NULL) - goto waiting_header; - } else if (rtpwj2depay->MH_id == -1) - goto waiting_header; - - if (rtpwj2depay->offset != offset) { - guint fillsize; - - if (offset < rtpwj2depay->offset) - goto invalid_packet; - - fillsize = offset - rtpwj2depay->offset; - - /* gap in the data, this could either be a missing header or a gap in the - * data, check for missing header first */ - if (mheader && fillsize == GST_BUFFER_SIZE (mheader)) { - GST_DEBUG_OBJECT (rtpwj2depay, "missing main header of %u bytes", - fillsize); - /* push header in the adapter */ - gst_adapter_push (rtpwj2depay->adapter, mheader); + if (frag_offset == 0) { + /* flush the previous frame, should have happened when the timestamp + * changed above. */ + gst_rtp_wj2_depay_flush_wj2 (depayload); + rtpwj2depay->have_sync = TRUE; + } + } else if (wj2len > 2 && payload[HEADER_SIZE] == 0xff) { + guint marker = payload[HEADER_SIZE + 1]; + + /* packets must start with SOC, SOT or SOP */ + switch (marker) { + case WJ2_MARKER_SOC: + GST_DEBUG_OBJECT (rtpwj2depay, "found SOC packet"); + /* flush the previous frame */ + gst_rtp_wj2_depay_flush_frame (depayload); + rtpwj2depay->have_sync = TRUE; + break; + case WJ2_MARKER_SOT: + /* flush the previous tile */ + gst_rtp_wj2_depay_flush_tile (depayload); + GST_DEBUG_OBJECT (rtpwj2depay, "found SOT packet"); + rtpwj2depay->have_sync = TRUE; + /* we sync on the tile now */ + rtpwj2depay->last_tile = tile; + break; + case WJ2_MARKER_SOP: + GST_DEBUG_OBJECT (rtpwj2depay, "found SOP packet"); + /* flush the previous PU */ + gst_rtp_wj2_depay_flush_pu (depayload); + if (rtpwj2depay->last_tile != tile) { + /* wrong tile, we lose sync and we need a new SOT or SOC to regain + * sync. First flush out the previous tile if we have one. */ + if (rtpwj2depay->last_tile != -1) + gst_rtp_wj2_depay_flush_tile (depayload); + /* now we have no more valid tile and no sync */ + rtpwj2depay->last_tile = -1; + rtpwj2depay->have_sync = FALSE; } else { - /* fill with 0 */ - GST_DEBUG_OBJECT (rtpwj2depay, "filling gap of %u bytes", fillsize); - - J_buf = gst_buffer_new_and_alloc (fillsize); - gst_adapter_push (rtpwj2depay->adapter, J_buf); + rtpwj2depay->have_sync = TRUE; } - } + break; + default: + GST_DEBUG_OBJECT (rtpwj2depay, "no sync packet 0x%02d", marker); + break; } + } - if (rtpwj2depay->WH_id == -1) { - /* first time we lock onto a WH_id */ - rtpwj2depay->WH_id = WH_id; - gst_rtp_wj2_ensure_ntiles (rtpwj2depay, rtpwj2depay->WH[WH_id]); - } else if (rtpwj2depay->WH_id != WH_id) - goto invalid_m_header; - - /* we have a valid MH_id if we didn't have a previous one or if it matched the - * previous one */ - if (rtpwj2depay->MH_id == -1) - rtpwj2depay->MH_id = MH_id; - else if (rtpwj2depay->MH_id != MH_id) - goto invalid_m_header; - - /* take JPEG2000 data, push in the adapter */ - J_buf = gst_rtp_buffer_get_payload_subbuffer (buf, 10, -1); - GST_DEBUG_OBJECT (rtpwj2depay, "adding buffer of %u bytes", - GST_BUFFER_SIZE (J_buf)); - rtpwj2depay->offset += GST_BUFFER_SIZE (J_buf); - gst_adapter_push (rtpwj2depay->adapter, J_buf); - - if (MHF != 0) { - if ((MHF & 2) == 2) { - guint avail; - - /* we have a complete JPEG2000 header */ - avail = gst_adapter_available (rtpwj2depay->adapter); - GST_LOG_OBJECT (rtpwj2depay, "complete main header %u of size %u", - MH_id, avail); - - /* only cache headers when != 0 */ - if (MH_id != 0) { - J_buf = gst_adapter_take_buffer (rtpwj2depay->adapter, avail); - /* store the new header */ - gst_rtp_wj2_store_mheader (rtpwj2depay, MH_id, J_buf); - /* and push it back in */ - gst_adapter_push (rtpwj2depay->adapter, J_buf); - } - } - } else { - /* we have a complete JPEG2000 packet */ - if (L) { - guint avail; - - avail = gst_adapter_available (rtpwj2depay->adapter); - J_buf = gst_adapter_take_buffer (rtpwj2depay->adapter, avail); - - GST_LOG_OBJECT (rtpwj2depay, "complete data of size %u", avail); + if (rtpwj2depay->have_sync) { + GstBuffer *pu_frag; - gst_rtp_wj2_store_tile (rtpwj2depay, tci, J_buf); - } + if (gst_adapter_available (rtpwj2depay->pu_adapter) == 0) { + /* first part of pu, record state */ + GST_DEBUG_OBJECT (rtpwj2depay, "first PU"); + rtpwj2depay->pu_tp = tp; + rtpwj2depay->pu_tci = tci; + rtpwj2depay->pu_MHF = MHF; } - - /* last packet, construct WJ2 image from collected data */ - if (gst_rtp_buffer_get_marker (buf)) { - GST_LOG_OBJECT (rtpwj2depay, "complete wj2 frame"); - out = gst_rtp_wj2_flush_frame (rtpwj2depay); + /* and push in pu adapter */ + GST_DEBUG_OBJECT (rtpwj2depay, "push pu of size %u in adapter", wj2len); + pu_frag = gst_rtp_buffer_get_payload_subbuffer (buf, header_size, -1); + gst_adapter_push (rtpwj2depay->pu_adapter, pu_frag); + + if (MHF & 2) { + /* last part of main header received, we can flush it */ + GST_DEBUG_OBJECT (rtpwj2depay, "header end, flush pu"); + gst_rtp_wj2_depay_flush_pu (depayload); } + } else { + GST_DEBUG_OBJECT (rtpwj2depay, "discard packet, no sync"); + } + + /* marker bit finishes the frame */ + if (gst_rtp_buffer_get_marker (buf)) { + GST_DEBUG_OBJECT (rtpwj2depay, "marker set, last buffer"); + /* then flush frame */ + gst_rtp_wj2_depay_flush_frame (depayload); } - return out; + return NULL; /* ERRORS */ empty_packet: @@ -698,24 +858,57 @@ empty_packet: ("Empty Payload."), (NULL)); return NULL; } -waiting_header: +wrong_WH_id: { - GST_DEBUG_OBJECT (rtpwj2depay, "we are waiting for a header"); + GST_ELEMENT_WARNING (rtpwj2depay, STREAM, DECODE, + ("Invalid WH_id %u, expected %u", WH_id, rtpwj2depay->last_WH_id), + (NULL)); + gst_rtp_wj2_depay_clear_pu (rtpwj2depay); return NULL; } -invalid_m_header: +wrong_MH_id: { - /* doesn't match, we need to discard data */ - GST_DEBUG_OBJECT (rtpwj2depay, "invalid header"); - gst_adapter_clear (rtpwj2depay->adapter); - rtpwj2depay->offset = 0; - rtpwj2depay->MH_id = -1; - rtpwj2depay->WH_id = -1; + GST_ELEMENT_WARNING (rtpwj2depay, STREAM, DECODE, + ("Invalid MH_id %u, expected %u", MH_id, rtpwj2depay->last_MH_id), + (NULL)); + gst_rtp_wj2_depay_clear_pu (rtpwj2depay); return NULL; } -invalid_packet: - { - return NULL; +} + +static void +gst_rtp_wj2_depay_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstRtpWJ2Depay *rtpwj2depay; + + rtpwj2depay = GST_RTP_WJ2_DEPAY (object); + + switch (prop_id) { + case PROP_BUFFER_LIST: + rtpwj2depay->buffer_list = g_value_get_boolean (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_rtp_wj2_depay_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstRtpWJ2Depay *rtpwj2depay; + + rtpwj2depay = GST_RTP_WJ2_DEPAY (object); + + switch (prop_id) { + case PROP_BUFFER_LIST: + g_value_set_boolean (value, rtpwj2depay->buffer_list); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; } } @@ -731,7 +924,7 @@ gst_rtp_wj2_depay_change_state (GstElement * element, GstStateChange transition) case GST_STATE_CHANGE_NULL_TO_READY: break; case GST_STATE_CHANGE_READY_TO_PAUSED: - gst_rtp_wj2_clear_adapter (rtpwj2depay, TRUE); + gst_rtp_wj2_depay_reset (rtpwj2depay); break; default: break; @@ -741,7 +934,7 @@ gst_rtp_wj2_depay_change_state (GstElement * element, GstStateChange transition) switch (transition) { case GST_STATE_CHANGE_PAUSED_TO_READY: - gst_rtp_wj2_clear_adapter (rtpwj2depay, TRUE); + gst_rtp_wj2_depay_reset (rtpwj2depay); break; case GST_STATE_CHANGE_READY_TO_NULL: break; diff --git a/gst/rtp/gstrtpwj2depay.h b/gst/rtp/gstrtpwj2depay.h index 5b118d0cd..df5be0232 100644 --- a/gst/rtp/gstrtpwj2depay.h +++ b/gst/rtp/gstrtpwj2depay.h @@ -40,27 +40,31 @@ G_BEGIN_DECLS typedef struct _GstRtpWJ2Depay GstRtpWJ2Depay; typedef struct _GstRtpWJ2DepayClass GstRtpWJ2DepayClass; -typedef enum { - GST_RTP_WJ2_STATE_NONE, - GST_RTP_WJ2_STATE_WHEADER, - GST_RTP_WJ2_STATE_MHEADER, - GST_RTP_WJ2_STATE_MDATA -} GstRtpWJ2State; - struct _GstRtpWJ2Depay { GstBaseRTPDepayload depayload; + guint64 last_rtptime; + guint last_WH_id; + guint last_MH_id; + guint last_tile; + GstBuffer *WH[16]; GstBuffer *MH[8]; GPtrArray *tiles; - gint WH_id; - gint MH_id; - guint offset; - GstAdapter *adapter; - GstRtpWJ2State state; + guint pu_tp; + guint pu_MHF; + guint pu_tci; + GstAdapter *pu_adapter; + GstAdapter *t_adapter; + guint frame_tci; + GstAdapter *f_adapter; + + guint next_frag; + gboolean have_sync; + gboolean buffer_list; gint width, height; }; |