summaryrefslogtreecommitdiff
path: root/rtmp/rtmpconnection.c
diff options
context:
space:
mode:
Diffstat (limited to 'rtmp/rtmpconnection.c')
-rw-r--r--rtmp/rtmpconnection.c50
1 files changed, 31 insertions, 19 deletions
diff --git a/rtmp/rtmpconnection.c b/rtmp/rtmpconnection.c
index 26f8538..625f721 100644
--- a/rtmp/rtmpconnection.c
+++ b/rtmp/rtmpconnection.c
@@ -66,6 +66,7 @@ gst_rtmp_connection_set_input_callback (GstRtmpConnection * connection,
void (*input_callback) (GstRtmpConnection * connection),
gsize needed_bytes);
static void gst_rtmp_connection_chunk_callback (GstRtmpConnection * sc);
+static void gst_rtmp_connection_start_output (GstRtmpConnection * sc);
static void gst_rtmp_connection_server_handshake1 (GstRtmpConnection * sc);
@@ -164,7 +165,6 @@ gst_rtmp_connection_new (GSocketConnection * connection)
{
GstRtmpConnection *sc;
GInputStream *is;
- GOutputStream *os;
sc = g_object_new (GST_TYPE_RTMP_CONNECTION, NULL);
sc->connection = connection;
@@ -177,14 +177,26 @@ gst_rtmp_connection_new (GSocketConnection * connection)
(GSourceFunc) gst_rtmp_connection_input_ready, sc, NULL);
g_source_attach (sc->input_source, NULL);
+
+ return sc;
+}
+
+static void
+gst_rtmp_connection_start_output (GstRtmpConnection * sc)
+{
+ GSource *source;
+ GOutputStream *os;
+
+ if (!sc->handshake_complete)
+ return;
+
os = g_io_stream_get_output_stream (G_IO_STREAM (sc->connection));
- sc->output_source =
+ source =
g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM (os),
sc->cancellable);
- g_source_set_callback (sc->output_source,
- (GSourceFunc) gst_rtmp_connection_output_ready, sc, NULL);
-
- return sc;
+ g_source_set_callback (source, (GSourceFunc) gst_rtmp_connection_output_ready,
+ sc, NULL);
+ g_source_attach (source, NULL);
}
static gboolean
@@ -236,6 +248,11 @@ gst_rtmp_connection_output_ready (GOutputStream * os, gpointer user_data)
GST_ERROR ("output ready");
+ if (sc->writing) {
+ GST_ERROR ("busy writing");
+ return G_SOURCE_REMOVE;
+ }
+
chunk = g_queue_pop_head (sc->output_queue);
if (!chunk) {
return G_SOURCE_REMOVE;
@@ -244,10 +261,11 @@ gst_rtmp_connection_output_ready (GOutputStream * os, gpointer user_data)
sc->writing = TRUE;
os = g_io_stream_get_output_stream (G_IO_STREAM (sc->connection));
bytes = gst_rtmp_chunk_serialize (chunk);
- gst_rtmp_dump_data (bytes);
+ //gst_rtmp_dump_data (bytes);
g_output_stream_write_bytes_async (os, bytes, G_PRIORITY_DEFAULT,
sc->cancellable, gst_rtmp_connection_write_chunk_done, chunk);
+ //return G_SOURCE_CONTINUE;
return G_SOURCE_REMOVE;
}
@@ -274,9 +292,7 @@ gst_rtmp_connection_write_chunk_done (GObject * obj,
GST_ERROR ("write %" G_GSSIZE_FORMAT " bytes", ret);
g_object_unref (chunk);
- if (g_source_get_context (connection->output_source) == NULL) {
- g_source_attach (connection->output_source, NULL);
- }
+ gst_rtmp_connection_start_output (connection);
}
@@ -389,10 +405,12 @@ gst_rtmp_connection_server_handshake2 (GstRtmpConnection * sc)
/* handshake finished */
GST_ERROR ("server handshake finished");
+ sc->handshake_complete = TRUE;
gst_rtmp_connection_set_input_callback (sc,
gst_rtmp_connection_chunk_callback, 0);
+ gst_rtmp_connection_start_output (sc);
}
static void
@@ -406,9 +424,6 @@ gst_rtmp_connection_chunk_callback (GstRtmpConnection * sc)
if (sc->input_bytes == NULL)
break;
- GST_ERROR ("parsing %" G_GSIZE_FORMAT " bytes",
- g_bytes_get_size (sc->input_bytes));
- gst_rtmp_dump_data (sc->input_bytes);
chunk = gst_rtmp_chunk_new_parse (sc->input_bytes, &size);
if (chunk == NULL)
@@ -434,9 +449,7 @@ gst_rtmp_connection_queue_chunk (GstRtmpConnection * connection,
chunk->priv = connection;
g_queue_push_tail (connection->output_queue, chunk);
- if (connection->handshake_complete) {
- g_source_attach (connection->output_source, NULL);
- }
+ gst_rtmp_connection_start_output (connection);
}
static void
@@ -566,10 +579,9 @@ gst_rtmp_connection_client_handshake2_done (GObject * obj,
/* handshake finished */
GST_ERROR ("client handshake finished");
+ sc->handshake_complete = TRUE;
gst_rtmp_connection_set_input_callback (sc,
gst_rtmp_connection_chunk_callback, 0);
- if (!g_queue_is_empty (sc->output_queue)) {
- g_source_attach (sc->output_source, NULL);
- }
+ gst_rtmp_connection_start_output (sc);
}