summaryrefslogtreecommitdiff
path: root/rtmp/rtmpconnection.c
diff options
context:
space:
mode:
Diffstat (limited to 'rtmp/rtmpconnection.c')
-rw-r--r--rtmp/rtmpconnection.c378
1 files changed, 225 insertions, 153 deletions
diff --git a/rtmp/rtmpconnection.c b/rtmp/rtmpconnection.c
index 86181bd..6a217ce 100644
--- a/rtmp/rtmpconnection.c
+++ b/rtmp/rtmpconnection.c
@@ -25,6 +25,7 @@
#include "rtmpconnection.h"
#include "rtmpchunk.h"
#include "amf.h"
+#include "rtmputils.h"
#include <string.h>
@@ -39,9 +40,44 @@ static void gst_rtmp_connection_get_property (GObject * object,
guint property_id, GValue * value, GParamSpec * pspec);
static void gst_rtmp_connection_dispose (GObject * object);
static void gst_rtmp_connection_finalize (GObject * object);
+static void gst_rtmp_connection_client_handshake1_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data);
+static void gst_rtmp_connection_client_handshake2 (GstRtmpConnection * sc);
+static void gst_rtmp_connection_client_handshake2_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data);
+static void gst_rtmp_connection_client_handshake3 (GstRtmpConnection * sc,
+ GBytes * bytes);
+static void gst_rtmp_connection_client_handshake3_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data);
+static void gst_rtmp_connection_client_handshake1 (GstRtmpConnection * sc);
+static void gst_rtmp_connection_client_handshake1_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data);
+static void gst_rtmp_connection_client_handshake2 (GstRtmpConnection * sc);
+static void gst_rtmp_connection_client_handshake2_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data);
+static void gst_rtmp_connection_client_handshake3 (GstRtmpConnection * sc,
+ GBytes * bytes);
+static void gst_rtmp_connection_client_handshake3_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data);
+static void gst_rtmp_connection_read_chunk (GstRtmpConnection * sc);
+static void gst_rtmp_connection_read_chunk_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data);
+static void gst_rtmp_connection_write_chunk (GstRtmpConnection * sc);
+static void gst_rtmp_connection_write_chunk_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data);
+static void gst_rtmp_connection_server_handshake1 (GstRtmpConnection * sc);
+static void gst_rtmp_connection_server_handshake1_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data);
+static void gst_rtmp_connection_server_handshake2 (GstRtmpConnection * sc,
+ GBytes * bytes);
+static void gst_rtmp_connection_server_handshake2_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data);
+static void gst_rtmp_connection_server_handshake3 (GstRtmpConnection * sc);
+static void gst_rtmp_connection_server_handshake3_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data);
+
-static void proxy_connect (GstRtmpConnection * sc);
-static void gst_rtmp_connection_handshake1 (GstRtmpConnection * sc);
+static void gst_rtmp_connection_server_handshake1 (GstRtmpConnection * sc);
enum
{
@@ -75,6 +111,7 @@ static void
gst_rtmp_connection_init (GstRtmpConnection * rtmpconnection)
{
rtmpconnection->cancellable = g_cancellable_new ();
+ rtmpconnection->output_queue = g_queue_new ();
}
void
@@ -139,85 +176,12 @@ gst_rtmp_connection_new (GSocketConnection * connection)
sc = g_object_new (GST_TYPE_RTMP_CONNECTION, NULL);
sc->connection = connection;
- //proxy_connect (sc);
- //gst_rtmp_connection_read_chunk (sc);
- gst_rtmp_connection_handshake1 (sc);
-
return sc;
}
-typedef struct _ChunkRead ChunkRead;
-struct _ChunkRead
-{
- gpointer data;
- gsize alloc_size;
- gsize size;
- GstRtmpConnection *connection;
-};
-
-
-static void proxy_connect (GstRtmpConnection * sc);
-static void proxy_connect_done (GObject * obj, GAsyncResult * res,
- gpointer user_data);
-static void gst_rtmp_connection_read_chunk (GstRtmpConnection * sc);
-static void gst_rtmp_connection_read_chunk_done (GObject * obj,
- GAsyncResult * res, gpointer user_data);
-static void proxy_write_chunk (GstRtmpConnection * sc, ChunkRead * chunk);
-static void proxy_write_done (GObject * obj, GAsyncResult * res,
- gpointer user_data);
-static void proxy_read_chunk (GstRtmpConnection * sc);
-static void proxy_read_done (GObject * obj, GAsyncResult * res,
- gpointer user_data);
-static void gst_rtmp_connection_write_chunk (GstRtmpConnection *
- sc, ChunkRead * chunk);
-static void gst_rtmp_connection_write_chunk_done (GObject * obj,
- GAsyncResult * res, gpointer user_data);
-static void gst_rtmp_connection_handshake1 (GstRtmpConnection * sc);
-static void gst_rtmp_connection_handshake1_done (GObject * obj,
- GAsyncResult * res, gpointer user_data);
-static void gst_rtmp_connection_handshake2 (GstRtmpConnection * sc,
- GBytes * bytes);
-static void gst_rtmp_connection_handshake2_done (GObject * obj,
- GAsyncResult * res, gpointer user_data);
-static void gst_rtmp_connection_handshake3 (GstRtmpConnection * sc);
-static void gst_rtmp_connection_handshake3_done (GObject * obj,
- GAsyncResult * res, gpointer user_data);
static void
-proxy_connect (GstRtmpConnection * sc)
-{
- GSocketConnectable *addr;
-
- GST_ERROR ("proxy_connect");
-
- addr = g_network_address_new ("localhost", 1935);
-
- sc->socket_client = g_socket_client_new ();
- g_socket_client_connect_async (sc->socket_client, addr,
- sc->cancellable, proxy_connect_done, sc);
-}
-
-static void
-proxy_connect_done (GObject * obj, GAsyncResult * res, gpointer user_data)
-{
- GstRtmpConnection *sc = GST_RTMP_CONNECTION (user_data);
- GError *error = NULL;
-
- GST_ERROR ("proxy_connect_done");
-
- sc->proxy_connection = g_socket_client_connect_finish (sc->socket_client,
- res, &error);
- if (sc->proxy_connection == NULL) {
- GST_ERROR ("connection error: %s", error->message);
- g_error_free (error);
- return;
- }
-
- gst_rtmp_connection_read_chunk (sc);
-}
-
-static void
gst_rtmp_connection_read_chunk (GstRtmpConnection * sc)
{
GInputStream *is;
@@ -272,7 +236,7 @@ gst_rtmp_connection_read_chunk_done (GObject * obj,
gsize chunk_size;
GBytes *bytes;
- GST_ERROR ("gst_rtmp_connection_read_chunk_done");
+ GST_DEBUG ("gst_rtmp_connection_read_chunk_done");
bytes = g_input_stream_read_bytes_finish (is, res, &error);
if (bytes == NULL) {
@@ -282,6 +246,7 @@ gst_rtmp_connection_read_chunk_done (GObject * obj,
}
GST_ERROR ("read %" G_GSSIZE_FORMAT " bytes", g_bytes_get_size (bytes));
+ gst_rtmp_dump_data (bytes);
chunk = gst_rtmp_chunk_new_parse (bytes, &chunk_size);
if (chunk) {
@@ -289,133 +254,152 @@ gst_rtmp_connection_read_chunk_done (GObject * obj,
g_object_unref (chunk);
}
+
+ gst_rtmp_connection_read_chunk (connection);
}
-G_GNUC_UNUSED static void
-proxy_write_chunk (GstRtmpConnection * sc, ChunkRead * chunk)
+static void
+gst_rtmp_connection_write_chunk (GstRtmpConnection * sc)
{
GOutputStream *os;
+ GBytes *bytes;
+ GstRtmpChunk *chunk;
+
+ if (sc->writing) {
+ GST_WARNING ("gst_rtmp_connection_write_chunk while writing == TRUE");
+ return;
+ }
- os = g_io_stream_get_output_stream (G_IO_STREAM (sc->proxy_connection));
+ chunk = g_queue_pop_head (sc->output_queue);
+ if (!chunk) {
+ return;
+ }
- g_output_stream_write_async (os, chunk->data, chunk->size,
- G_PRIORITY_DEFAULT, sc->cancellable, proxy_write_done, chunk);
+ sc->writing = TRUE;
+ os = g_io_stream_get_output_stream (G_IO_STREAM (sc->connection));
+ bytes = gst_rtmp_chunk_serialize (chunk);
+ gst_rtmp_dump_data (bytes);
+ g_output_stream_write_bytes_async (os, bytes, G_PRIORITY_DEFAULT,
+ sc->cancellable, gst_rtmp_connection_write_chunk_done, chunk);
}
static void
-proxy_write_done (GObject * obj, GAsyncResult * res, gpointer user_data)
+gst_rtmp_connection_write_chunk_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data)
{
GOutputStream *os = G_OUTPUT_STREAM (obj);
- ChunkRead *chunk = (ChunkRead *) user_data;
+ GstRtmpChunk *chunk = GST_RTMP_CHUNK (user_data);
+ GstRtmpConnection *connection = GST_RTMP_CONNECTION (chunk->priv);
GError *error = NULL;
gssize ret;
- GST_ERROR ("proxy_write_done");
+ GST_DEBUG ("gst_rtmp_connection_write_chunk_done");
+
+ connection->writing = FALSE;
- ret = g_output_stream_write_finish (os, res, &error);
+ ret = g_output_stream_write_bytes_finish (os, res, &error);
if (ret < 0) {
GST_ERROR ("write error: %s", error->message);
g_error_free (error);
return;
}
- GST_ERROR ("proxy write %" G_GSSIZE_FORMAT " bytes", ret);
-
- proxy_read_chunk (chunk->connection);
+ GST_ERROR ("write %" G_GSSIZE_FORMAT " bytes", ret);
+ g_object_unref (chunk);
- g_free (chunk->data);
- g_free (chunk);
+ gst_rtmp_connection_write_chunk (connection);
}
static void
-proxy_read_chunk (GstRtmpConnection * sc)
+gst_rtmp_connection_server_handshake1 (GstRtmpConnection * sc)
{
GInputStream *is;
- ChunkRead *chunk;
-
- chunk = g_malloc0 (sizeof (ChunkRead));
- chunk->alloc_size = 4096;
- chunk->data = g_malloc (chunk->alloc_size);
- chunk->connection = sc;
- is = g_io_stream_get_input_stream (G_IO_STREAM (sc->proxy_connection));
+ is = g_io_stream_get_input_stream (G_IO_STREAM (sc->connection));
- g_input_stream_read_async (is, chunk->data, chunk->alloc_size,
- G_PRIORITY_DEFAULT, sc->cancellable, proxy_read_done, chunk);
+ g_input_stream_read_bytes_async (is, 4096,
+ G_PRIORITY_DEFAULT, sc->cancellable,
+ gst_rtmp_connection_server_handshake1_done, sc);
}
static void
-proxy_read_done (GObject * obj, GAsyncResult * res, gpointer user_data)
+gst_rtmp_connection_server_handshake1_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data)
{
GInputStream *is = G_INPUT_STREAM (obj);
- ChunkRead *chunk = (ChunkRead *) user_data;
+ GstRtmpConnection *sc = GST_RTMP_CONNECTION (user_data);
GError *error = NULL;
- gssize ret;
+ GBytes *bytes;
- GST_ERROR ("proxy_read_done");
+ GST_DEBUG ("gst_rtmp_connection_server_handshake1_done");
- ret = g_input_stream_read_finish (is, res, &error);
- if (ret < 0) {
+ bytes = g_input_stream_read_bytes_finish (is, res, &error);
+ if (bytes == NULL) {
GST_ERROR ("read error: %s", error->message);
g_error_free (error);
return;
}
- GST_ERROR ("proxy read %" G_GSSIZE_FORMAT " bytes", ret);
- chunk->size = ret;
+ GST_DEBUG ("read %" G_GSSIZE_FORMAT " bytes", g_bytes_get_size (bytes));
- gst_rtmp_connection_write_chunk (chunk->connection, chunk);
+ gst_rtmp_connection_server_handshake2 (sc, bytes);
}
static void
-gst_rtmp_connection_write_chunk (GstRtmpConnection * sc, ChunkRead * chunk)
+gst_rtmp_connection_server_handshake2 (GstRtmpConnection * sc, GBytes * bytes)
{
GOutputStream *os;
+ guint8 *data;
+ GBytes *out_bytes;
os = g_io_stream_get_output_stream (G_IO_STREAM (sc->connection));
- g_output_stream_write_async (os, chunk->data, chunk->size,
+ data = g_malloc (1 + 1536 + 1536);
+ memcpy (data, g_bytes_get_data (bytes, NULL), 1 + 1536);
+ memset (data + 1537, 0, 8);
+ memset (data + 1537 + 8, 0xef, 1528);
+
+ out_bytes = g_bytes_new_take (data, 1 + 1536 + 1536);
+
+ g_output_stream_write_bytes_async (os, out_bytes,
G_PRIORITY_DEFAULT, sc->cancellable,
- gst_rtmp_connection_write_chunk_done, chunk);
+ gst_rtmp_connection_server_handshake2_done, sc);
}
static void
-gst_rtmp_connection_write_chunk_done (GObject * obj,
+gst_rtmp_connection_server_handshake2_done (GObject * obj,
GAsyncResult * res, gpointer user_data)
{
GOutputStream *os = G_OUTPUT_STREAM (obj);
- ChunkRead *chunk = (ChunkRead *) user_data;
+ GstRtmpConnection *sc = GST_RTMP_CONNECTION (user_data);
GError *error = NULL;
gssize ret;
- GST_ERROR ("gst_rtmp_connection_write_chunk_done");
+ GST_DEBUG ("gst_rtmp_connection_server_handshake2_done");
- ret = g_output_stream_write_finish (os, res, &error);
- if (ret < 0) {
- GST_ERROR ("write error: %s", error->message);
+ ret = g_output_stream_write_bytes_finish (os, res, &error);
+ if (ret < 1 + 1536 + 1536) {
+ GST_ERROR ("read error: %s", error->message);
g_error_free (error);
return;
}
- GST_ERROR ("write %" G_GSSIZE_FORMAT " bytes", ret);
-
- gst_rtmp_connection_read_chunk (chunk->connection);
+ GST_DEBUG ("wrote %" G_GSSIZE_FORMAT " bytes", ret);
- g_free (chunk->data);
- g_free (chunk);
+ gst_rtmp_connection_server_handshake3 (sc);
}
static void
-gst_rtmp_connection_handshake1 (GstRtmpConnection * sc)
+gst_rtmp_connection_server_handshake3 (GstRtmpConnection * sc)
{
GInputStream *is;
is = g_io_stream_get_input_stream (G_IO_STREAM (sc->connection));
- g_input_stream_read_bytes_async (is, 4096,
+ g_input_stream_read_bytes_async (is, 1536,
G_PRIORITY_DEFAULT, sc->cancellable,
- gst_rtmp_connection_handshake1_done, sc);
+ gst_rtmp_connection_server_handshake3_done, sc);
}
static void
-gst_rtmp_connection_handshake1_done (GObject * obj,
+gst_rtmp_connection_server_handshake3_done (GObject * obj,
GAsyncResult * res, gpointer user_data)
{
GInputStream *is = G_INPUT_STREAM (obj);
@@ -423,7 +407,7 @@ gst_rtmp_connection_handshake1_done (GObject * obj,
GError *error = NULL;
GBytes *bytes;
- GST_ERROR ("gst_rtmp_connection_handshake1_done");
+ GST_DEBUG ("gst_rtmp_connection_server_handshake3_done");
bytes = g_input_stream_read_bytes_finish (is, res, &error);
if (bytes == NULL) {
@@ -431,34 +415,68 @@ gst_rtmp_connection_handshake1_done (GObject * obj,
g_error_free (error);
return;
}
- GST_ERROR ("read %" G_GSSIZE_FORMAT " bytes", g_bytes_get_size (bytes));
+ GST_DEBUG ("read %" G_GSSIZE_FORMAT " bytes", g_bytes_get_size (bytes));
+
+ gst_rtmp_connection_read_chunk (sc);
+}
+
+void
+gst_rtmp_connection_queue_chunk (GstRtmpConnection * connection,
+ GstRtmpChunk * chunk)
+{
+ g_return_if_fail (GST_IS_RTMP_CONNECTION (connection));
+ g_return_if_fail (GST_IS_RTMP_CHUNK (chunk));
- gst_rtmp_connection_handshake2 (sc, bytes);
+ chunk->priv = connection;
+ g_queue_push_tail (connection->output_queue, chunk);
+ if (!connection->writing) {
+ gst_rtmp_connection_write_chunk (connection);
+ }
+}
+
+void
+gst_rtmp_connection_handshake_async (GstRtmpConnection * connection,
+ gboolean is_server, GCancellable * cancellable,
+ GAsyncReadyCallback callback, gpointer user_data)
+{
+ GSimpleAsyncResult *async;
+
+ async = g_simple_async_result_new (G_OBJECT (connection),
+ callback, user_data, gst_rtmp_connection_handshake_async);
+ g_simple_async_result_set_check_cancellable (async, cancellable);
+
+ connection->cancellable = cancellable;
+ connection->async = async;
+
+ if (is_server) {
+ gst_rtmp_connection_server_handshake1 (connection);
+ } else {
+ gst_rtmp_connection_client_handshake1 (connection);
+ }
}
static void
-gst_rtmp_connection_handshake2 (GstRtmpConnection * sc, GBytes * bytes)
+gst_rtmp_connection_client_handshake1 (GstRtmpConnection * sc)
{
GOutputStream *os;
guint8 *data;
- GBytes *out_bytes;
+ GBytes *bytes;
os = g_io_stream_get_output_stream (G_IO_STREAM (sc->connection));
- data = g_malloc (1 + 1536 + 1536);
- memcpy (data, g_bytes_get_data (bytes, NULL), 1 + 1536);
- memset (data + 1537, 0, 8);
- memset (data + 1537 + 8, 0xef, 1528);
-
- out_bytes = g_bytes_new_take (data, 1 + 1536 + 1536);
+ data = g_malloc (1 + 1536);
+ data[0] = 3;
+ memset (data + 1, 0, 8);
+ memset (data + 9, 0xa5, 1528);
+ bytes = g_bytes_new_take (data, 1 + 1536);
- g_output_stream_write_bytes_async (os, out_bytes,
+ g_output_stream_write_bytes_async (os, bytes,
G_PRIORITY_DEFAULT, sc->cancellable,
- gst_rtmp_connection_handshake2_done, sc);
+ gst_rtmp_connection_client_handshake1_done, sc);
}
static void
-gst_rtmp_connection_handshake2_done (GObject * obj,
+gst_rtmp_connection_client_handshake1_done (GObject * obj,
GAsyncResult * res, gpointer user_data)
{
GOutputStream *os = G_OUTPUT_STREAM (obj);
@@ -466,33 +484,33 @@ gst_rtmp_connection_handshake2_done (GObject * obj,
GError *error = NULL;
gssize ret;
- GST_ERROR ("gst_rtmp_connection_handshake2_done");
+ GST_DEBUG ("gst_rtmp_connection_client_handshake2_done");
ret = g_output_stream_write_bytes_finish (os, res, &error);
- if (ret < 1 + 1536 + 1536) {
- GST_ERROR ("read error: %s", error->message);
+ if (ret < 1 + 1536) {
+ GST_ERROR ("write error: %s", error->message);
g_error_free (error);
return;
}
- GST_ERROR ("wrote %" G_GSSIZE_FORMAT " bytes", ret);
+ GST_DEBUG ("wrote %" G_GSSIZE_FORMAT " bytes", ret);
- gst_rtmp_connection_handshake3 (sc);
+ gst_rtmp_connection_client_handshake2 (sc);
}
static void
-gst_rtmp_connection_handshake3 (GstRtmpConnection * sc)
+gst_rtmp_connection_client_handshake2 (GstRtmpConnection * sc)
{
GInputStream *is;
is = g_io_stream_get_input_stream (G_IO_STREAM (sc->connection));
- g_input_stream_read_bytes_async (is, 1536,
+ g_input_stream_read_bytes_async (is, 1 + 1536 + 1536,
G_PRIORITY_DEFAULT, sc->cancellable,
- gst_rtmp_connection_handshake3_done, sc);
+ gst_rtmp_connection_client_handshake2_done, sc);
}
static void
-gst_rtmp_connection_handshake3_done (GObject * obj,
+gst_rtmp_connection_client_handshake2_done (GObject * obj,
GAsyncResult * res, gpointer user_data)
{
GInputStream *is = G_INPUT_STREAM (obj);
@@ -500,7 +518,7 @@ gst_rtmp_connection_handshake3_done (GObject * obj,
GError *error = NULL;
GBytes *bytes;
- GST_ERROR ("gst_rtmp_connection_handshake3_done");
+ GST_DEBUG ("gst_rtmp_connection_client_handshake2_done");
bytes = g_input_stream_read_bytes_finish (is, res, &error);
if (bytes == NULL) {
@@ -508,8 +526,62 @@ gst_rtmp_connection_handshake3_done (GObject * obj,
g_error_free (error);
return;
}
- GST_ERROR ("read %" G_GSSIZE_FORMAT " bytes", g_bytes_get_size (bytes));
+ GST_DEBUG ("read %" G_GSSIZE_FORMAT " bytes", g_bytes_get_size (bytes));
+
+ gst_rtmp_connection_client_handshake3 (sc, bytes);
+}
+
+static void
+gst_rtmp_connection_client_handshake3 (GstRtmpConnection * sc, GBytes * bytes)
+{
+ GOutputStream *os;
+ GBytes *out_bytes;
+
+ os = g_io_stream_get_output_stream (G_IO_STREAM (sc->connection));
+
+ out_bytes = g_bytes_new_from_bytes (bytes, 1 + 1536, 1536);
+
+ g_output_stream_write_bytes_async (os, out_bytes, G_PRIORITY_DEFAULT,
+ sc->cancellable, gst_rtmp_connection_client_handshake3_done, sc);
+}
+
+static void
+gst_rtmp_connection_client_handshake3_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data)
+{
+ GOutputStream *os = G_OUTPUT_STREAM (obj);
+ GstRtmpConnection *sc = GST_RTMP_CONNECTION (user_data);
+ GError *error = NULL;
+ gssize ret;
+
+ GST_DEBUG ("gst_rtmp_connection_client_handshake3_done");
+
+ ret = g_output_stream_write_bytes_finish (os, res, &error);
+ if (ret < 1536) {
+ GST_ERROR ("write error: %s", error->message);
+ g_error_free (error);
+ return;
+ }
+ GST_DEBUG ("wrote %" G_GSSIZE_FORMAT " bytes", ret);
+
+ g_simple_async_result_complete (sc->async);
gst_rtmp_connection_read_chunk (sc);
- (void) &proxy_connect;
+}
+
+gboolean
+gst_rtmp_connection_handshake_finish (GstRtmpConnection * connection,
+ GAsyncResult * result, GError ** error)
+{
+ GSimpleAsyncResult *simple;
+
+ g_return_val_if_fail (g_simple_async_result_is_valid (result,
+ G_OBJECT (connection), gst_rtmp_connection_handshake_async), FALSE);
+
+ simple = (GSimpleAsyncResult *) result;
+
+ if (g_simple_async_result_propagate_error (simple, error))
+ return FALSE;
+
+ return TRUE;
}