summaryrefslogtreecommitdiff
path: root/rtmp/rtmpconnection.c
diff options
context:
space:
mode:
Diffstat (limited to 'rtmp/rtmpconnection.c')
-rw-r--r--rtmp/rtmpconnection.c93
1 files changed, 75 insertions, 18 deletions
diff --git a/rtmp/rtmpconnection.c b/rtmp/rtmpconnection.c
index 625f721..6e37caa 100644
--- a/rtmp/rtmpconnection.c
+++ b/rtmp/rtmpconnection.c
@@ -104,6 +104,8 @@ gst_rtmp_connection_init (GstRtmpConnection * rtmpconnection)
{
rtmpconnection->cancellable = g_cancellable_new ();
rtmpconnection->output_queue = g_queue_new ();
+ rtmpconnection->input_chunk_cache = gst_rtmp_chunk_cache_new ();
+ rtmpconnection->output_chunk_cache = gst_rtmp_chunk_cache_new ();
}
void
@@ -156,6 +158,8 @@ gst_rtmp_connection_finalize (GObject * object)
GST_DEBUG_OBJECT (rtmpconnection, "finalize");
/* clean up object here */
+ gst_rtmp_chunk_cache_free (rtmpconnection->input_chunk_cache);
+ gst_rtmp_chunk_cache_free (rtmpconnection->output_chunk_cache);
G_OBJECT_CLASS (gst_rtmp_connection_parent_class)->finalize (object);
}
@@ -177,7 +181,6 @@ gst_rtmp_connection_new (GSocketConnection * connection)
(GSourceFunc) gst_rtmp_connection_input_ready, sc, NULL);
g_source_attach (sc->input_source, NULL);
-
return sc;
}
@@ -209,15 +212,22 @@ gst_rtmp_connection_input_ready (GInputStream * is, gpointer user_data)
GST_DEBUG ("input ready");
- data = g_malloc (128);
+ data = g_malloc (4096);
ret =
g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM (is),
- data, 128, sc->cancellable, &error);
+ data, 4096, sc->cancellable, &error);
if (ret < 0) {
GST_ERROR ("read error: %s", error->message);
g_error_free (error);
return G_SOURCE_REMOVE;
}
+ if (ret == 0) {
+ /* FIXME probably closed */
+ GST_ERROR ("closed?");
+ return G_SOURCE_REMOVE;
+ }
+
+ GST_ERROR ("read %" G_GSIZE_FORMAT " bytes", ret);
if (sc->input_bytes) {
sc->input_bytes = gst_rtmp_bytes_append (sc->input_bytes, data, ret);
@@ -225,15 +235,18 @@ gst_rtmp_connection_input_ready (GInputStream * is, gpointer user_data)
sc->input_bytes = g_bytes_new_take (data, ret);
}
- if (sc->input_callback) {
- if (g_bytes_get_size (sc->input_bytes) >= sc->input_needed_bytes) {
- GstRtmpConnectionCallback callback;
- GST_ERROR ("got %" G_GSIZE_FORMAT " bytes, calling callback",
- g_bytes_get_size (sc->input_bytes));
- callback = sc->input_callback;
- sc->input_callback = NULL;
- (*callback) (sc);
- }
+ //GST_ERROR("ic: %p", sc->input_callback);
+ //GST_ERROR("in queue: %" G_GSIZE_FORMAT, g_bytes_get_size (sc->input_bytes));
+ GST_ERROR ("needed: %" G_GSIZE_FORMAT, sc->input_needed_bytes);
+
+ while (sc->input_callback && sc->input_bytes &&
+ g_bytes_get_size (sc->input_bytes) >= sc->input_needed_bytes) {
+ GstRtmpConnectionCallback callback;
+ GST_ERROR ("got %" G_GSIZE_FORMAT " bytes, calling callback",
+ g_bytes_get_size (sc->input_bytes));
+ callback = sc->input_callback;
+ sc->input_callback = NULL;
+ (*callback) (sc);
}
return G_SOURCE_CONTINUE;
@@ -246,10 +259,10 @@ gst_rtmp_connection_output_ready (GOutputStream * os, gpointer user_data)
GstRtmpChunk *chunk;
GBytes *bytes;
- GST_ERROR ("output ready");
+ GST_DEBUG ("output ready");
if (sc->writing) {
- GST_ERROR ("busy writing");
+ GST_DEBUG ("busy writing");
return G_SOURCE_REMOVE;
}
@@ -257,11 +270,13 @@ gst_rtmp_connection_output_ready (GOutputStream * os, gpointer user_data)
if (!chunk) {
return G_SOURCE_REMOVE;
}
+ GST_ERROR ("popped");
sc->writing = TRUE;
os = g_io_stream_get_output_stream (G_IO_STREAM (sc->connection));
- bytes = gst_rtmp_chunk_serialize (chunk);
- //gst_rtmp_dump_data (bytes);
+ bytes = gst_rtmp_chunk_serialize (chunk, sc->output_chunk_cache);
+ gst_rtmp_chunk_cache_update (sc->output_chunk_cache, chunk);
+ gst_rtmp_dump_data (bytes);
g_output_stream_write_bytes_async (os, bytes, G_PRIORITY_DEFAULT,
sc->cancellable, gst_rtmp_connection_write_chunk_done, chunk);
@@ -407,6 +422,12 @@ gst_rtmp_connection_server_handshake2 (GstRtmpConnection * sc)
GST_ERROR ("server handshake finished");
sc->handshake_complete = TRUE;
+ if (sc->input_bytes && g_bytes_get_size (sc->input_bytes) >= 1) {
+ GST_ERROR ("spare bytes after handshake: %" G_GSIZE_FORMAT,
+ g_bytes_get_size (sc->input_bytes));
+ gst_rtmp_connection_chunk_callback (sc);
+ }
+
gst_rtmp_connection_set_input_callback (sc,
gst_rtmp_connection_chunk_callback, 0);
@@ -417,7 +438,7 @@ static void
gst_rtmp_connection_chunk_callback (GstRtmpConnection * sc)
{
GstRtmpChunk *chunk;
- gsize size;
+ gsize size = 0;
while (1) {
GBytes *bytes;
@@ -425,17 +446,23 @@ gst_rtmp_connection_chunk_callback (GstRtmpConnection * sc)
if (sc->input_bytes == NULL)
break;
- chunk = gst_rtmp_chunk_new_parse (sc->input_bytes, &size);
+ chunk = gst_rtmp_chunk_new_parse (sc->input_bytes, &size,
+ sc->input_chunk_cache);
if (chunk == NULL)
break;
+ gst_rtmp_chunk_cache_update (sc->input_chunk_cache, chunk);
+
bytes = gst_rtmp_connection_take_input_bytes (sc, size);
g_bytes_unref (bytes);
GST_ERROR ("got chunk: %" G_GSIZE_FORMAT " bytes", chunk->message_length);
g_signal_emit_by_name (sc, "got-chunk", chunk);
g_object_unref (chunk);
+
+ size = 0;
}
+ GST_ERROR ("setting needed bytes to %" G_GSIZE_FORMAT, size);
gst_rtmp_connection_set_input_callback (sc,
gst_rtmp_connection_chunk_callback, size);
}
@@ -447,6 +474,7 @@ gst_rtmp_connection_queue_chunk (GstRtmpConnection * connection,
g_return_if_fail (GST_IS_RTMP_CONNECTION (connection));
g_return_if_fail (GST_IS_RTMP_CHUNK (chunk));
+ GST_ERROR ("queued");
chunk->priv = connection;
g_queue_push_tail (connection->output_queue, chunk);
gst_rtmp_connection_start_output (connection);
@@ -464,6 +492,18 @@ gst_rtmp_connection_set_input_callback (GstRtmpConnection * connection,
if (needed_bytes == 0) {
connection->input_needed_bytes = 1;
}
+
+ if (connection->input_callback && connection->input_bytes &&
+ g_bytes_get_size (connection->input_bytes) >=
+ connection->input_needed_bytes) {
+ GstRtmpConnectionCallback callback;
+ GST_ERROR ("got %" G_GSIZE_FORMAT " bytes, calling callback",
+ g_bytes_get_size (connection->input_bytes));
+ callback = connection->input_callback;
+ connection->input_callback = NULL;
+ (*callback) (connection);
+ }
+
}
void
@@ -581,7 +621,24 @@ gst_rtmp_connection_client_handshake2_done (GObject * obj,
GST_ERROR ("client handshake finished");
sc->handshake_complete = TRUE;
+ if (sc->input_bytes && g_bytes_get_size (sc->input_bytes) >= 1) {
+ GST_ERROR ("spare bytes after handshake: %" G_GSIZE_FORMAT,
+ g_bytes_get_size (sc->input_bytes));
+ gst_rtmp_connection_chunk_callback (sc);
+ }
+
gst_rtmp_connection_set_input_callback (sc,
gst_rtmp_connection_chunk_callback, 0);
gst_rtmp_connection_start_output (sc);
}
+
+void
+gst_rtmp_connection_dump (GstRtmpConnection * connection)
+{
+ g_print (" output_queue: %d\n",
+ g_queue_get_length (connection->output_queue));
+ g_print (" input_bytes: %" G_GSIZE_FORMAT "\n",
+ connection->input_bytes ? g_bytes_get_size (connection->input_bytes) : 0);
+ g_print (" needed: %" G_GSIZE_FORMAT "\n", connection->input_needed_bytes);
+
+}