summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Schleef <ds@schleef.org>2014-08-24 18:18:43 -0700
committerDavid Schleef <ds@schleef.org>2014-08-24 18:21:12 -0700
commit233d7e887e079b0f1c3da2366443744e54573324 (patch)
treebad1109e3bd05912b9dcbee470186a40dff3ef7f
parentd09625be30625611a213ff618f99c990ff0a23bc (diff)
hacking
-rw-r--r--rtmp/Makefile.am4
-rw-r--r--rtmp/rtmpchunk.c32
-rw-r--r--rtmp/rtmpchunk.h2
-rw-r--r--rtmp/rtmpclient.c243
-rw-r--r--rtmp/rtmpclient.h13
-rw-r--r--rtmp/rtmpconnection.c378
-rw-r--r--rtmp/rtmpconnection.h17
-rw-r--r--rtmp/rtmpserver.c25
-rw-r--r--rtmp/rtmputils.c51
-rw-r--r--rtmp/rtmputils.h32
-rw-r--r--tools/proxy-server.c72
11 files changed, 485 insertions, 384 deletions
diff --git a/rtmp/Makefile.am b/rtmp/Makefile.am
index 18bc37c..4cab66b 100644
--- a/rtmp/Makefile.am
+++ b/rtmp/Makefile.am
@@ -30,4 +30,6 @@ sources = \
rtmpserver.c \
rtmpserver.h \
rtmpstream.c \
- rtmpstream.h
+ rtmpstream.h \
+ rtmputils.c \
+ rtmputils.h
diff --git a/rtmp/rtmpchunk.c b/rtmp/rtmpchunk.c
index de0f46b..7cc5f43 100644
--- a/rtmp/rtmpchunk.c
+++ b/rtmp/rtmpchunk.c
@@ -23,6 +23,7 @@
#include <gst/gst.h>
#include "rtmpchunk.h"
+#include <string.h>
GST_DEBUG_CATEGORY_STATIC (gst_rtmp_chunk_debug_category);
#define GST_CAT_DEFAULT gst_rtmp_chunk_debug_category
@@ -236,6 +237,37 @@ gst_rtmp_chunk_new_parse (GBytes * bytes, gsize * chunk_size)
return NULL;
}
+GBytes *
+gst_rtmp_chunk_serialize (GstRtmpChunk * chunk)
+{
+ guint8 *data;
+ const guint8 *chunkdata;
+ gsize chunksize;
+ int header_fmt;
+
+ /* FIXME this is incomplete and inefficient */
+ chunkdata = g_bytes_get_data (chunk->payload, &chunksize);
+ data = g_malloc (chunksize + 12);
+ header_fmt = 0;
+ g_assert (chunk->stream_id < 64);
+ data[0] = (header_fmt << 6) | (chunk->stream_id);
+ g_assert (chunk->timestamp < 0xffffff);
+ data[1] = (chunk->timestamp >> 16) & 0xff;
+ data[2] = (chunk->timestamp >> 8) & 0xff;
+ data[3] = chunk->timestamp & 0xff;
+ data[4] = (chunk->message_length >> 16) & 0xff;
+ data[5] = (chunk->message_length >> 8) & 0xff;
+ data[6] = chunk->message_length & 0xff;
+ data[7] = chunk->message_type_id;
+ data[8] = 0;
+ data[9] = 0;
+ data[10] = 0;
+ data[11] = 0;
+ memcpy (data + 12, chunkdata, chunksize);
+
+ return g_bytes_new_take (data, chunksize + 12);
+}
+
void
gst_rtmp_chunk_set_stream_id (GstRtmpChunk * chunk, guint32 stream_id)
{
diff --git a/rtmp/rtmpchunk.h b/rtmp/rtmpchunk.h
index 68edfcb..1165fb7 100644
--- a/rtmp/rtmpchunk.h
+++ b/rtmp/rtmpchunk.h
@@ -43,6 +43,7 @@ struct _GstRtmpChunk
int message_type_id;
GBytes *payload;
+ gpointer priv;
};
struct _GstRtmpChunkClass
@@ -63,6 +64,7 @@ GstRtmpChunk *gst_rtmp_chunk_new (void);
GstRtmpChunkParseStatus gst_rtmp_chunk_can_parse (GBytes *bytes,
gsize *chunk_size);
GstRtmpChunk * gst_rtmp_chunk_new_parse (GBytes *bytes, gsize *chunk_size);
+GBytes * gst_rtmp_chunk_serialize (GstRtmpChunk *chunk);
void gst_rtmp_chunk_set_stream_id (GstRtmpChunk *chunk, guint32 stream_id);
void gst_rtmp_chunk_set_timestamp (GstRtmpChunk *chunk, guint32 timestamp);
diff --git a/rtmp/rtmpclient.c b/rtmp/rtmpclient.c
index 9db1519..6a4f373 100644
--- a/rtmp/rtmpclient.c
+++ b/rtmp/rtmpclient.c
@@ -39,19 +39,10 @@ static void gst_rtmp_client_dispose (GObject * object);
static void gst_rtmp_client_finalize (GObject * object);
static void
-gst_rtmp_client_connect_start (GstRtmpClient * client,
- GCancellable * cancellable, GSimpleAsyncResult * async);
-static void
-gst_rtmp_client_connect_1 (GObject * source, GAsyncResult * result,
- gpointer user_data);
-static void
-gst_rtmp_client_connect_2 (GObject * source, GAsyncResult * result,
+gst_rtmp_client_connect_done (GObject * source, GAsyncResult * result,
gpointer user_data);
static void
-gst_rtmp_client_connect_3 (GObject * source, GAsyncResult * result,
- gpointer user_data);
-static void
-gst_rtmp_client_connect_4 (GObject * source, GAsyncResult * result,
+gst_rtmp_client_handshake_done (GObject * source, GAsyncResult * result,
gpointer user_data);
enum
@@ -167,7 +158,8 @@ gst_rtmp_client_connect_async (GstRtmpClient * client,
GCancellable * cancellable, GAsyncReadyCallback callback,
gpointer user_data)
{
- GSimpleAsyncResult *simple;
+ GSimpleAsyncResult *async;
+ GSocketConnectable *addr;
if (client->state != GST_RTMP_CLIENT_STATE_NEW) {
g_simple_async_report_error_in_idle (G_OBJECT (client),
@@ -176,226 +168,89 @@ gst_rtmp_client_connect_async (GstRtmpClient * client,
return;
}
- simple = g_simple_async_result_new (G_OBJECT (client),
+ async = g_simple_async_result_new (G_OBJECT (client),
callback, user_data, gst_rtmp_client_connect_async);
+ g_simple_async_result_set_check_cancellable (async, cancellable);
- gst_rtmp_client_connect_start (client, cancellable, simple);
-}
-
-gboolean
-gst_rtmp_client_connect_finish (GstRtmpClient * client,
- GAsyncResult * result, GError ** error)
-{
- GSimpleAsyncResult *simple;
-
- g_return_val_if_fail (g_simple_async_result_is_valid (result,
- G_OBJECT (client), gst_rtmp_client_connect_async), FALSE);
-
- simple = (GSimpleAsyncResult *) result;
-
- if (g_simple_async_result_propagate_error (simple, error))
- return FALSE;
-
- return TRUE;
-}
-
-
-/* async implementation */
-
-/* connect */
-
-typedef struct _ConnectContext ConnectContext;
-struct _ConnectContext
-{
- GstRtmpClient *client;
- GSimpleAsyncResult *async;
- GCancellable *cancellable;
-
- GSocketClient *socket_client;
- GSocketConnection *connection;
- guint8 *handshake_send;
- guint8 *handshake_recv;
-};
-
-static void
-gst_rtmp_client_connect_complete (ConnectContext * context)
-{
- GSimpleAsyncResult *async = context->async;
-
- if (context->socket_client)
- g_object_unref (context->socket_client);
- if (context->connection)
- g_object_unref (context->connection);
- g_free (context->handshake_send);
- g_free (context->handshake_recv);
- g_free (context);
- g_simple_async_result_complete (async);
-}
-
-static void
-gst_rtmp_client_connect_start (GstRtmpClient * client,
- GCancellable * cancellable, GSimpleAsyncResult * async)
-{
- ConnectContext *context;
- GSocketConnectable *addr;
-
- context = g_malloc0 (sizeof (ConnectContext));
- context->cancellable = cancellable;
- context->client = client;
- context->async = async;
+ client->cancellable = cancellable;
+ client->async = async;
addr = g_network_address_new ("localhost", 1935);
- context->socket_client = g_socket_client_new ();
+ client->socket_client = g_socket_client_new ();
- GST_ERROR ("g_socket_client_connect_async");
- g_socket_client_connect_async (context->socket_client, addr,
- context->cancellable, gst_rtmp_client_connect_1, context);
+ GST_DEBUG ("g_socket_client_connect_async");
+ g_socket_client_connect_async (client->socket_client, addr,
+ client->cancellable, gst_rtmp_client_connect_done, client);
}
-
static void
-gst_rtmp_client_connect_1 (GObject * source, GAsyncResult * result,
+gst_rtmp_client_connect_done (GObject * source, GAsyncResult * result,
gpointer user_data)
{
- ConnectContext *context = user_data;
+ GstRtmpClient *client = GST_RTMP_CLIENT (user_data);
GError *error = NULL;
- GOutputStream *os;
- GST_ERROR ("g_socket_client_connect_finish");
- context->connection = g_socket_client_connect_finish (context->socket_client,
- result, &error);
- if (context->connection == NULL) {
+ GST_DEBUG ("g_socket_client_connect_done");
+ client->socket_connection =
+ g_socket_client_connect_finish (client->socket_client, result, &error);
+ if (client->socket_connection == NULL) {
GST_ERROR ("error");
- g_simple_async_result_set_error (context->async, GST_RTMP_ERROR,
+ g_simple_async_result_set_error (client->async, GST_RTMP_ERROR,
GST_RTMP_ERROR_TOO_LAZY, "%s", error->message);
g_error_free (error);
- gst_rtmp_client_connect_complete (context);
+ client->cancellable = NULL;
+ g_simple_async_result_complete (client->async);
return;
}
- context->handshake_send = g_malloc (1537);
- context->handshake_send[0] = 3;
- memset (context->handshake_send + 1, 0, 8);
- memset (context->handshake_send + 9, 0xa5, 1528);
-
- os = g_io_stream_get_output_stream (G_IO_STREAM (context->connection));
-
- GST_ERROR ("g_output_stream_write_async");
- g_output_stream_write_async (os, context->handshake_send, 1537,
- G_PRIORITY_DEFAULT, context->cancellable, gst_rtmp_client_connect_2,
- context);
+ client->connection = gst_rtmp_connection_new (client->socket_connection);
+ gst_rtmp_connection_handshake_async (client->connection, FALSE,
+ client->cancellable, gst_rtmp_client_handshake_done, client);
}
static void
-gst_rtmp_client_connect_2 (GObject * source, GAsyncResult * result,
+gst_rtmp_client_handshake_done (GObject * source, GAsyncResult * result,
gpointer user_data)
{
- ConnectContext *context = user_data;
- GOutputStream *os;
- GInputStream *is;
+ GstRtmpClient *client = GST_RTMP_CLIENT (user_data);
GError *error = NULL;
- gssize n;
+ gboolean ret;
- GST_ERROR ("g_output_stream_write_finish");
- os = g_io_stream_get_output_stream (G_IO_STREAM (context->connection));
- n = g_output_stream_write_finish (os, result, &error);
- GST_ERROR ("wrote %" G_GSIZE_FORMAT " bytes", n);
- if (error) {
- GST_ERROR ("error");
- g_simple_async_result_set_error (context->async, GST_RTMP_ERROR,
+ GST_DEBUG ("g_socket_client_connect_done");
+ ret = gst_rtmp_connection_handshake_finish (client->connection,
+ result, &error);
+ if (!ret) {
+ g_simple_async_result_set_error (client->async, GST_RTMP_ERROR,
GST_RTMP_ERROR_TOO_LAZY, "%s", error->message);
g_error_free (error);
- gst_rtmp_client_connect_complete (context);
+ client->cancellable = NULL;
+ g_simple_async_result_complete (client->async);
return;
}
- context->handshake_recv = g_malloc (1537);
-
- GST_ERROR ("g_input_stream_read_async");
- is = g_io_stream_get_input_stream (G_IO_STREAM (context->connection));
- g_input_stream_read_async (is, context->handshake_recv, 1537,
- G_PRIORITY_DEFAULT, context->cancellable, gst_rtmp_client_connect_3,
- context);
+ client->cancellable = NULL;
+ g_simple_async_result_complete (client->async);
}
-static void
-gst_rtmp_client_connect_3 (GObject * source, GAsyncResult * result,
- gpointer user_data)
+gboolean
+gst_rtmp_client_connect_finish (GstRtmpClient * client,
+ GAsyncResult * result, GError ** error)
{
- ConnectContext *context = user_data;
- GInputStream *is;
- GError *error = NULL;
- gssize n;
+ GSimpleAsyncResult *simple;
- GST_ERROR ("g_input_stream_read_finish");
- is = g_io_stream_get_input_stream (G_IO_STREAM (context->connection));
- n = g_input_stream_read_finish (is, result, &error);
- GST_ERROR ("read %" G_GSIZE_FORMAT " bytes", n);
- if (error) {
- GST_ERROR ("error");
- g_simple_async_result_set_error (context->async, GST_RTMP_ERROR,
- GST_RTMP_ERROR_TOO_LAZY, "%s", error->message);
- g_error_free (error);
- gst_rtmp_client_connect_complete (context);
- return;
- }
-#if 0
- GST_ERROR ("recv: %02x %02x %02x %02x %02x %02x %02x %02x %02x",
- context->handshake_recv[0], context->handshake_recv[1],
- context->handshake_recv[2], context->handshake_recv[3],
- context->handshake_recv[4], context->handshake_recv[5],
- context->handshake_recv[6], context->handshake_recv[7],
- context->handshake_recv[8]);
-#endif
+ g_return_val_if_fail (g_simple_async_result_is_valid (result,
+ G_OBJECT (client), gst_rtmp_client_connect_async), FALSE);
- n = g_output_stream_write (g_io_stream_get_output_stream (G_IO_STREAM
- (context->connection)), context->handshake_recv + 1, 1536,
- context->cancellable, &error);
- if (n < 1536) {
- GST_ERROR ("error");
- g_simple_async_result_set_error (context->async, GST_RTMP_ERROR,
- GST_RTMP_ERROR_TOO_LAZY, "%s", error->message);
- g_error_free (error);
- gst_rtmp_client_connect_complete (context);
- return;
- }
+ simple = (GSimpleAsyncResult *) result;
- GST_ERROR ("g_input_stream_read_async");
- is = g_io_stream_get_input_stream (G_IO_STREAM (context->connection));
- g_input_stream_read_async (is, context->handshake_recv + 1, 1536,
- G_PRIORITY_DEFAULT, context->cancellable, gst_rtmp_client_connect_4,
- context);
+ if (g_simple_async_result_propagate_error (simple, error))
+ return FALSE;
+ return TRUE;
}
-static void
-gst_rtmp_client_connect_4 (GObject * source, GAsyncResult * result,
- gpointer user_data)
+GstRtmpConnection *
+gst_rtmp_client_get_connection (GstRtmpClient * client)
{
- ConnectContext *context = user_data;
- GInputStream *is;
- GError *error = NULL;
- gssize n;
-
- GST_ERROR ("g_input_stream_read_finish");
- is = g_io_stream_get_input_stream (G_IO_STREAM (context->connection));
- n = g_input_stream_read_finish (is, result, &error);
- GST_ERROR ("read %" G_GSIZE_FORMAT " bytes", n);
- if (error) {
- GST_ERROR ("error");
- g_simple_async_result_set_error (context->async, GST_RTMP_ERROR,
- GST_RTMP_ERROR_TOO_LAZY, "%s", error->message);
- g_error_free (error);
- gst_rtmp_client_connect_complete (context);
- return;
- }
-
- context->client->socket_client = context->socket_client;
- context->socket_client = NULL;
- context->client->connection = context->connection;
- context->connection = NULL;
- context->client->state = GST_RTMP_CLIENT_STATE_CONNECTED;
-
- GST_ERROR ("got here");
- gst_rtmp_client_connect_complete (context);
+ return client->connection;
}
diff --git a/rtmp/rtmpclient.h b/rtmp/rtmpclient.h
index 8a1b8b1..3a209db 100644
--- a/rtmp/rtmpclient.h
+++ b/rtmp/rtmpclient.h
@@ -22,6 +22,7 @@
#include <rtmp/rtmpmessage.h>
#include <rtmp/rtmpchunk.h>
+#include <rtmp/rtmpconnection.h>
G_BEGIN_DECLS
@@ -67,9 +68,12 @@ struct _GstRtmpClient
GCond cond;
GMainContext *context;
+ GCancellable *cancellable;
+ GSimpleAsyncResult *async;
GSocketClient *socket_client;
- GSocketConnection *connection;
+ GSocketConnection *socket_connection;
+ GstRtmpConnection *connection;
};
struct _GstRtmpClientClass
@@ -93,12 +97,7 @@ void gst_rtmp_client_connect_async (GstRtmpClient *client,
gboolean gst_rtmp_client_connect_finish (GstRtmpClient *client,
GAsyncResult *result, GError **error);
-void gst_rtmp_client_queue_message (GstRtmpClient *client,
- GstRtmpMessage *message, GstRtmpClientMessageCallback callback,
- gpointer user_data);
-void gst_rtmp_client_queue_chunk (GstRtmpClient *client,
- GstRtmpChunk *Chunk, GstRtmpClientChunkCallback callback,
- gpointer user_data);
+GstRtmpConnection *gst_rtmp_client_get_connection (GstRtmpClient *client);
G_END_DECLS
diff --git a/rtmp/rtmpconnection.c b/rtmp/rtmpconnection.c
index 86181bd..6a217ce 100644
--- a/rtmp/rtmpconnection.c
+++ b/rtmp/rtmpconnection.c
@@ -25,6 +25,7 @@
#include "rtmpconnection.h"
#include "rtmpchunk.h"
#include "amf.h"
+#include "rtmputils.h"
#include <string.h>
@@ -39,9 +40,44 @@ static void gst_rtmp_connection_get_property (GObject * object,
guint property_id, GValue * value, GParamSpec * pspec);
static void gst_rtmp_connection_dispose (GObject * object);
static void gst_rtmp_connection_finalize (GObject * object);
+static void gst_rtmp_connection_client_handshake1_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data);
+static void gst_rtmp_connection_client_handshake2 (GstRtmpConnection * sc);
+static void gst_rtmp_connection_client_handshake2_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data);
+static void gst_rtmp_connection_client_handshake3 (GstRtmpConnection * sc,
+ GBytes * bytes);
+static void gst_rtmp_connection_client_handshake3_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data);
+static void gst_rtmp_connection_client_handshake1 (GstRtmpConnection * sc);
+static void gst_rtmp_connection_client_handshake1_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data);
+static void gst_rtmp_connection_client_handshake2 (GstRtmpConnection * sc);
+static void gst_rtmp_connection_client_handshake2_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data);
+static void gst_rtmp_connection_client_handshake3 (GstRtmpConnection * sc,
+ GBytes * bytes);
+static void gst_rtmp_connection_client_handshake3_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data);
+static void gst_rtmp_connection_read_chunk (GstRtmpConnection * sc);
+static void gst_rtmp_connection_read_chunk_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data);
+static void gst_rtmp_connection_write_chunk (GstRtmpConnection * sc);
+static void gst_rtmp_connection_write_chunk_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data);
+static void gst_rtmp_connection_server_handshake1 (GstRtmpConnection * sc);
+static void gst_rtmp_connection_server_handshake1_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data);
+static void gst_rtmp_connection_server_handshake2 (GstRtmpConnection * sc,
+ GBytes * bytes);
+static void gst_rtmp_connection_server_handshake2_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data);
+static void gst_rtmp_connection_server_handshake3 (GstRtmpConnection * sc);
+static void gst_rtmp_connection_server_handshake3_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data);
+
-static void proxy_connect (GstRtmpConnection * sc);
-static void gst_rtmp_connection_handshake1 (GstRtmpConnection * sc);
+static void gst_rtmp_connection_server_handshake1 (GstRtmpConnection * sc);
enum
{
@@ -75,6 +111,7 @@ static void
gst_rtmp_connection_init (GstRtmpConnection * rtmpconnection)
{
rtmpconnection->cancellable = g_cancellable_new ();
+ rtmpconnection->output_queue = g_queue_new ();
}
void
@@ -139,85 +176,12 @@ gst_rtmp_connection_new (GSocketConnection * connection)
sc = g_object_new (GST_TYPE_RTMP_CONNECTION, NULL);
sc->connection = connection;
- //proxy_connect (sc);
- //gst_rtmp_connection_read_chunk (sc);
- gst_rtmp_connection_handshake1 (sc);
-
return sc;
}
-typedef struct _ChunkRead ChunkRead;
-struct _ChunkRead
-{
- gpointer data;
- gsize alloc_size;
- gsize size;
- GstRtmpConnection *connection;
-};
-
-
-static void proxy_connect (GstRtmpConnection * sc);
-static void proxy_connect_done (GObject * obj, GAsyncResult * res,
- gpointer user_data);
-static void gst_rtmp_connection_read_chunk (GstRtmpConnection * sc);
-static void gst_rtmp_connection_read_chunk_done (GObject * obj,
- GAsyncResult * res, gpointer user_data);
-static void proxy_write_chunk (GstRtmpConnection * sc, ChunkRead * chunk);
-static void proxy_write_done (GObject * obj, GAsyncResult * res,
- gpointer user_data);
-static void proxy_read_chunk (GstRtmpConnection * sc);
-static void proxy_read_done (GObject * obj, GAsyncResult * res,
- gpointer user_data);
-static void gst_rtmp_connection_write_chunk (GstRtmpConnection *
- sc, ChunkRead * chunk);
-static void gst_rtmp_connection_write_chunk_done (GObject * obj,
- GAsyncResult * res, gpointer user_data);
-static void gst_rtmp_connection_handshake1 (GstRtmpConnection * sc);
-static void gst_rtmp_connection_handshake1_done (GObject * obj,
- GAsyncResult * res, gpointer user_data);
-static void gst_rtmp_connection_handshake2 (GstRtmpConnection * sc,
- GBytes * bytes);
-static void gst_rtmp_connection_handshake2_done (GObject * obj,
- GAsyncResult * res, gpointer user_data);
-static void gst_rtmp_connection_handshake3 (GstRtmpConnection * sc);
-static void gst_rtmp_connection_handshake3_done (GObject * obj,
- GAsyncResult * res, gpointer user_data);
static void
-proxy_connect (GstRtmpConnection * sc)
-{
- GSocketConnectable *addr;
-
- GST_ERROR ("proxy_connect");
-
- addr = g_network_address_new ("localhost", 1935);
-
- sc->socket_client = g_socket_client_new ();
- g_socket_client_connect_async (sc->socket_client, addr,
- sc->cancellable, proxy_connect_done, sc);
-}
-
-static void
-proxy_connect_done (GObject * obj, GAsyncResult * res, gpointer user_data)
-{
- GstRtmpConnection *sc = GST_RTMP_CONNECTION (user_data);
- GError *error = NULL;
-
- GST_ERROR ("proxy_connect_done");
-
- sc->proxy_connection = g_socket_client_connect_finish (sc->socket_client,
- res, &error);
- if (sc->proxy_connection == NULL) {
- GST_ERROR ("connection error: %s", error->message);
- g_error_free (error);
- return;
- }
-
- gst_rtmp_connection_read_chunk (sc);
-}
-
-static void
gst_rtmp_connection_read_chunk (GstRtmpConnection * sc)
{
GInputStream *is;
@@ -272,7 +236,7 @@ gst_rtmp_connection_read_chunk_done (GObject * obj,
gsize chunk_size;
GBytes *bytes;
- GST_ERROR ("gst_rtmp_connection_read_chunk_done");
+ GST_DEBUG ("gst_rtmp_connection_read_chunk_done");
bytes = g_input_stream_read_bytes_finish (is, res, &error);
if (bytes == NULL) {
@@ -282,6 +246,7 @@ gst_rtmp_connection_read_chunk_done (GObject * obj,
}
GST_ERROR ("read %" G_GSSIZE_FORMAT " bytes", g_bytes_get_size (bytes));
+ gst_rtmp_dump_data (bytes);
chunk = gst_rtmp_chunk_new_parse (bytes, &chunk_size);
if (chunk) {
@@ -289,133 +254,152 @@ gst_rtmp_connection_read_chunk_done (GObject * obj,
g_object_unref (chunk);
}
+
+ gst_rtmp_connection_read_chunk (connection);
}
-G_GNUC_UNUSED static void
-proxy_write_chunk (GstRtmpConnection * sc, ChunkRead * chunk)
+static void
+gst_rtmp_connection_write_chunk (GstRtmpConnection * sc)
{
GOutputStream *os;
+ GBytes *bytes;
+ GstRtmpChunk *chunk;
+
+ if (sc->writing) {
+ GST_WARNING ("gst_rtmp_connection_write_chunk while writing == TRUE");
+ return;
+ }
- os = g_io_stream_get_output_stream (G_IO_STREAM (sc->proxy_connection));
+ chunk = g_queue_pop_head (sc->output_queue);
+ if (!chunk) {
+ return;
+ }
- g_output_stream_write_async (os, chunk->data, chunk->size,
- G_PRIORITY_DEFAULT, sc->cancellable, proxy_write_done, chunk);
+ sc->writing = TRUE;
+ os = g_io_stream_get_output_stream (G_IO_STREAM (sc->connection));
+ bytes = gst_rtmp_chunk_serialize (chunk);
+ gst_rtmp_dump_data (bytes);
+ g_output_stream_write_bytes_async (os, bytes, G_PRIORITY_DEFAULT,
+ sc->cancellable, gst_rtmp_connection_write_chunk_done, chunk);
}
static void
-proxy_write_done (GObject * obj, GAsyncResult * res, gpointer user_data)
+gst_rtmp_connection_write_chunk_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data)
{
GOutputStream *os = G_OUTPUT_STREAM (obj);
- ChunkRead *chunk = (ChunkRead *) user_data;
+ GstRtmpChunk *chunk = GST_RTMP_CHUNK (user_data);
+ GstRtmpConnection *connection = GST_RTMP_CONNECTION (chunk->priv);
GError *error = NULL;
gssize ret;
- GST_ERROR ("proxy_write_done");
+ GST_DEBUG ("gst_rtmp_connection_write_chunk_done");
+
+ connection->writing = FALSE;
- ret = g_output_stream_write_finish (os, res, &error);
+ ret = g_output_stream_write_bytes_finish (os, res, &error);
if (ret < 0) {
GST_ERROR ("write error: %s", error->message);
g_error_free (error);
return;
}
- GST_ERROR ("proxy write %" G_GSSIZE_FORMAT " bytes", ret);
-
- proxy_read_chunk (chunk->connection);
+ GST_ERROR ("write %" G_GSSIZE_FORMAT " bytes", ret);
+ g_object_unref (chunk);
- g_free (chunk->data);
- g_free (chunk);
+ gst_rtmp_connection_write_chunk (connection);
}
static void
-proxy_read_chunk (GstRtmpConnection * sc)
+gst_rtmp_connection_server_handshake1 (GstRtmpConnection * sc)
{
GInputStream *is;
- ChunkRead *chunk;
-
- chunk = g_malloc0 (sizeof (ChunkRead));
- chunk->alloc_size = 4096;
- chunk->data = g_malloc (chunk->alloc_size);
- chunk->connection = sc;
- is = g_io_stream_get_input_stream (G_IO_STREAM (sc->proxy_connection));
+ is = g_io_stream_get_input_stream (G_IO_STREAM (sc->connection));
- g_input_stream_read_async (is, chunk->data, chunk->alloc_size,
- G_PRIORITY_DEFAULT, sc->cancellable, proxy_read_done, chunk);
+ g_input_stream_read_bytes_async (is, 4096,
+ G_PRIORITY_DEFAULT, sc->cancellable,
+ gst_rtmp_connection_server_handshake1_done, sc);
}
static void
-proxy_read_done (GObject * obj, GAsyncResult * res, gpointer user_data)
+gst_rtmp_connection_server_handshake1_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data)
{
GInputStream *is = G_INPUT_STREAM (obj);
- ChunkRead *chunk = (ChunkRead *) user_data;
+ GstRtmpConnection *sc = GST_RTMP_CONNECTION (user_data);
GError *error = NULL;
- gssize ret;
+ GBytes *bytes;
- GST_ERROR ("proxy_read_done");
+ GST_DEBUG ("gst_rtmp_connection_server_handshake1_done");
- ret = g_input_stream_read_finish (is, res, &error);
- if (ret < 0) {
+ bytes = g_input_stream_read_bytes_finish (is, res, &error);
+ if (bytes == NULL) {
GST_ERROR ("read error: %s", error->message);
g_error_free (error);
return;
}
- GST_ERROR ("proxy read %" G_GSSIZE_FORMAT " bytes", ret);
- chunk->size = ret;
+ GST_DEBUG ("read %" G_GSSIZE_FORMAT " bytes", g_bytes_get_size (bytes));
- gst_rtmp_connection_write_chunk (chunk->connection, chunk);
+ gst_rtmp_connection_server_handshake2 (sc, bytes);
}
static void
-gst_rtmp_connection_write_chunk (GstRtmpConnection * sc, ChunkRead * chunk)
+gst_rtmp_connection_server_handshake2 (GstRtmpConnection * sc, GBytes * bytes)
{
GOutputStream *os;
+ guint8 *data;
+ GBytes *out_bytes;
os = g_io_stream_get_output_stream (G_IO_STREAM (sc->connection));
- g_output_stream_write_async (os, chunk->data, chunk->size,
+ data = g_malloc (1 + 1536 + 1536);
+ memcpy (data, g_bytes_get_data (bytes, NULL), 1 + 1536);
+ memset (data + 1537, 0, 8);
+ memset (data + 1537 + 8, 0xef, 1528);
+
+ out_bytes = g_bytes_new_take (data, 1 + 1536 + 1536);
+
+ g_output_stream_write_bytes_async (os, out_bytes,
G_PRIORITY_DEFAULT, sc->cancellable,
- gst_rtmp_connection_write_chunk_done, chunk);
+ gst_rtmp_connection_server_handshake2_done, sc);
}
static void
-gst_rtmp_connection_write_chunk_done (GObject * obj,
+gst_rtmp_connection_server_handshake2_done (GObject * obj,
GAsyncResult * res, gpointer user_data)
{
GOutputStream *os = G_OUTPUT_STREAM (obj);
- ChunkRead *chunk = (ChunkRead *) user_data;
+ GstRtmpConnection *sc = GST_RTMP_CONNECTION (user_data);
GError *error = NULL;
gssize ret;
- GST_ERROR ("gst_rtmp_connection_write_chunk_done");
+ GST_DEBUG ("gst_rtmp_connection_server_handshake2_done");
- ret = g_output_stream_write_finish (os, res, &error);
- if (ret < 0) {
- GST_ERROR ("write error: %s", error->message);
+ ret = g_output_stream_write_bytes_finish (os, res, &error);
+ if (ret < 1 + 1536 + 1536) {
+ GST_ERROR ("read error: %s", error->message);
g_error_free (error);
return;
}
- GST_ERROR ("write %" G_GSSIZE_FORMAT " bytes", ret);
-
- gst_rtmp_connection_read_chunk (chunk->connection);
+ GST_DEBUG ("wrote %" G_GSSIZE_FORMAT " bytes", ret);
- g_free (chunk->data);
- g_free (chunk);
+ gst_rtmp_connection_server_handshake3 (sc);
}
static void
-gst_rtmp_connection_handshake1 (GstRtmpConnection * sc)
+gst_rtmp_connection_server_handshake3 (GstRtmpConnection * sc)
{
GInputStream *is;
is = g_io_stream_get_input_stream (G_IO_STREAM (sc->connection));
- g_input_stream_read_bytes_async (is, 4096,
+ g_input_stream_read_bytes_async (is, 1536,
G_PRIORITY_DEFAULT, sc->cancellable,
- gst_rtmp_connection_handshake1_done, sc);
+ gst_rtmp_connection_server_handshake3_done, sc);
}
static void
-gst_rtmp_connection_handshake1_done (GObject * obj,
+gst_rtmp_connection_server_handshake3_done (GObject * obj,
GAsyncResult * res, gpointer user_data)
{
GInputStream *is = G_INPUT_STREAM (obj);
@@ -423,7 +407,7 @@ gst_rtmp_connection_handshake1_done (GObject * obj,
GError *error = NULL;
GBytes *bytes;
- GST_ERROR ("gst_rtmp_connection_handshake1_done");
+ GST_DEBUG ("gst_rtmp_connection_server_handshake3_done");
bytes = g_input_stream_read_bytes_finish (is, res, &error);
if (bytes == NULL) {
@@ -431,34 +415,68 @@ gst_rtmp_connection_handshake1_done (GObject * obj,
g_error_free (error);
return;
}
- GST_ERROR ("read %" G_GSSIZE_FORMAT " bytes", g_bytes_get_size (bytes));
+ GST_DEBUG ("read %" G_GSSIZE_FORMAT " bytes", g_bytes_get_size (bytes));
+
+ gst_rtmp_connection_read_chunk (sc);
+}
+
+void
+gst_rtmp_connection_queue_chunk (GstRtmpConnection * connection,
+ GstRtmpChunk * chunk)
+{
+ g_return_if_fail (GST_IS_RTMP_CONNECTION (connection));
+ g_return_if_fail (GST_IS_RTMP_CHUNK (chunk));
- gst_rtmp_connection_handshake2 (sc, bytes);
+ chunk->priv = connection;
+ g_queue_push_tail (connection->output_queue, chunk);
+ if (!connection->writing) {
+ gst_rtmp_connection_write_chunk (connection);
+ }
+}
+
+void
+gst_rtmp_connection_handshake_async (GstRtmpConnection * connection,
+ gboolean is_server, GCancellable * cancellable,
+ GAsyncReadyCallback callback, gpointer user_data)
+{
+ GSimpleAsyncResult *async;
+
+ async = g_simple_async_result_new (G_OBJECT (connection),
+ callback, user_data, gst_rtmp_connection_handshake_async);
+ g_simple_async_result_set_check_cancellable (async, cancellable);
+
+ connection->cancellable = cancellable;
+ connection->async = async;
+
+ if (is_server) {
+ gst_rtmp_connection_server_handshake1 (connection);
+ } else {
+ gst_rtmp_connection_client_handshake1 (connection);
+ }
}
static void
-gst_rtmp_connection_handshake2 (GstRtmpConnection * sc, GBytes * bytes)
+gst_rtmp_connection_client_handshake1 (GstRtmpConnection * sc)
{
GOutputStream *os;
guint8 *data;
- GBytes *out_bytes;
+ GBytes *bytes;
os = g_io_stream_get_output_stream (G_IO_STREAM (sc->connection));
- data = g_malloc (1 + 1536 + 1536);
- memcpy (data, g_bytes_get_data (bytes, NULL), 1 + 1536);
- memset (data + 1537, 0, 8);
- memset (data + 1537 + 8, 0xef, 1528);
-
- out_bytes = g_bytes_new_take (data, 1 + 1536 + 1536);
+ data = g_malloc (1 + 1536);
+ data[0] = 3;
+ memset (data + 1, 0, 8);
+ memset (data + 9, 0xa5, 1528);
+ bytes = g_bytes_new_take (data, 1 + 1536);
- g_output_stream_write_bytes_async (os, out_bytes,
+ g_output_stream_write_bytes_async (os, bytes,
G_PRIORITY_DEFAULT, sc->cancellable,
- gst_rtmp_connection_handshake2_done, sc);
+ gst_rtmp_connection_client_handshake1_done, sc);
}
static void
-gst_rtmp_connection_handshake2_done (GObject * obj,
+gst_rtmp_connection_client_handshake1_done (GObject * obj,
GAsyncResult * res, gpointer user_data)
{
GOutputStream *os = G_OUTPUT_STREAM (obj);
@@ -466,33 +484,33 @@ gst_rtmp_connection_handshake2_done (GObject * obj,
GError *error = NULL;
gssize ret;
- GST_ERROR ("gst_rtmp_connection_handshake2_done");
+ GST_DEBUG ("gst_rtmp_connection_client_handshake2_done");
ret = g_output_stream_write_bytes_finish (os, res, &error);
- if (ret < 1 + 1536 + 1536) {
- GST_ERROR ("read error: %s", error->message);
+ if (ret < 1 + 1536) {
+ GST_ERROR ("write error: %s", error->message);
g_error_free (error);
return;
}
- GST_ERROR ("wrote %" G_GSSIZE_FORMAT " bytes", ret);
+ GST_DEBUG ("wrote %" G_GSSIZE_FORMAT " bytes", ret);
- gst_rtmp_connection_handshake3 (sc);
+ gst_rtmp_connection_client_handshake2 (sc);
}
static void
-gst_rtmp_connection_handshake3 (GstRtmpConnection * sc)
+gst_rtmp_connection_client_handshake2 (GstRtmpConnection * sc)
{
GInputStream *is;
is = g_io_stream_get_input_stream (G_IO_STREAM (sc->connection));
- g_input_stream_read_bytes_async (is, 1536,
+ g_input_stream_read_bytes_async (is, 1 + 1536 + 1536,
G_PRIORITY_DEFAULT, sc->cancellable,
- gst_rtmp_connection_handshake3_done, sc);
+ gst_rtmp_connection_client_handshake2_done, sc);
}
static void
-gst_rtmp_connection_handshake3_done (GObject * obj,
+gst_rtmp_connection_client_handshake2_done (GObject * obj,
GAsyncResult * res, gpointer user_data)
{
GInputStream *is = G_INPUT_STREAM (obj);
@@ -500,7 +518,7 @@ gst_rtmp_connection_handshake3_done (GObject * obj,
GError *error = NULL;
GBytes *bytes;
- GST_ERROR ("gst_rtmp_connection_handshake3_done");
+ GST_DEBUG ("gst_rtmp_connection_client_handshake2_done");
bytes = g_input_stream_read_bytes_finish (is, res, &error);
if (bytes == NULL) {
@@ -508,8 +526,62 @@ gst_rtmp_connection_handshake3_done (GObject * obj,
g_error_free (error);
return;
}
- GST_ERROR ("read %" G_GSSIZE_FORMAT " bytes", g_bytes_get_size (bytes));
+ GST_DEBUG ("read %" G_GSSIZE_FORMAT " bytes", g_bytes_get_size (bytes));
+
+ gst_rtmp_connection_client_handshake3 (sc, bytes);
+}
+
+static void
+gst_rtmp_connection_client_handshake3 (GstRtmpConnection * sc, GBytes * bytes)
+{
+ GOutputStream *os;
+ GBytes *out_bytes;
+
+ os = g_io_stream_get_output_stream (G_IO_STREAM (sc->connection));
+
+ out_bytes = g_bytes_new_from_bytes (bytes, 1 + 1536, 1536);
+
+ g_output_stream_write_bytes_async (os, out_bytes, G_PRIORITY_DEFAULT,
+ sc->cancellable, gst_rtmp_connection_client_handshake3_done, sc);
+}
+
+static void
+gst_rtmp_connection_client_handshake3_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data)
+{
+ GOutputStream *os = G_OUTPUT_STREAM (obj);
+ GstRtmpConnection *sc = GST_RTMP_CONNECTION (user_data);
+ GError *error = NULL;
+ gssize ret;
+
+ GST_DEBUG ("gst_rtmp_connection_client_handshake3_done");
+
+ ret = g_output_stream_write_bytes_finish (os, res, &error);
+ if (ret < 1536) {
+ GST_ERROR ("write error: %s", error->message);
+ g_error_free (error);
+ return;
+ }
+ GST_DEBUG ("wrote %" G_GSSIZE_FORMAT " bytes", ret);
+
+ g_simple_async_result_complete (sc->async);
gst_rtmp_connection_read_chunk (sc);
- (void) &proxy_connect;
+}
+
+gboolean
+gst_rtmp_connection_handshake_finish (GstRtmpConnection * connection,
+ GAsyncResult * result, GError ** error)
+{
+ GSimpleAsyncResult *simple;
+
+ g_return_val_if_fail (g_simple_async_result_is_valid (result,
+ G_OBJECT (connection), gst_rtmp_connection_handshake_async), FALSE);
+
+ simple = (GSimpleAsyncResult *) result;
+
+ if (g_simple_async_result_propagate_error (simple, error))
+ return FALSE;
+
+ return TRUE;
}
diff --git a/rtmp/rtmpconnection.h b/rtmp/rtmpconnection.h
index 8713147..cde9e58 100644
--- a/rtmp/rtmpconnection.h
+++ b/rtmp/rtmpconnection.h
@@ -38,12 +38,18 @@ struct _GstRtmpConnection
{
GObject object;
+ /* should be properties */
+ gboolean input_paused;
+
/* private */
GSocketConnection *connection;
GSocketConnection *proxy_connection;
GCancellable *cancellable;
int state;
GSocketClient *socket_client;
+ GQueue *output_queue;
+ GSimpleAsyncResult *async;
+ gboolean writing;
};
struct _GstRtmpConnectionClass
@@ -57,8 +63,15 @@ struct _GstRtmpConnectionClass
GType gst_rtmp_connection_get_type (void);
-GstRtmpConnection *gst_rtmp_connection_new (
- GSocketConnection *connection);
+GstRtmpConnection *gst_rtmp_connection_new (GSocketConnection *connection);
+
+void gst_rtmp_connection_handshake_async (GstRtmpConnection *connection,
+ gboolean is_server, GCancellable *cancellable,
+ GAsyncReadyCallback callback, gpointer user_data);
+gboolean gst_rtmp_connection_handshake_finish (GstRtmpConnection *connection,
+ GAsyncResult *result, GError **error);
+void gst_rtmp_connection_queue_chunk (GstRtmpConnection *connection,
+ GstRtmpChunk *chunk);
G_END_DECLS
diff --git a/rtmp/rtmpserver.c b/rtmp/rtmpserver.c
index e35bff6..e9527c8 100644
--- a/rtmp/rtmpserver.c
+++ b/rtmp/rtmpserver.c
@@ -38,6 +38,9 @@ static void gst_rtmp_server_finalize (GObject * object);
static gboolean gst_rtmp_server_incoming (GSocketService * service,
GSocketConnection * connection, GObject * source_object,
gpointer user_data);
+static void
+gst_rtmp_server_handshake_done (GObject * source, GAsyncResult * result,
+ gpointer user_data);
enum
@@ -179,15 +182,35 @@ gst_rtmp_server_incoming (GSocketService * service,
GstRtmpServer *rtmpserver = GST_RTMP_SERVER (user_data);
GstRtmpConnection *connection;
- GST_ERROR ("client connected");
+ GST_INFO ("client connected");
g_object_ref (socket_connection);
connection = gst_rtmp_connection_new (socket_connection);
gst_rtmp_server_add_connection (rtmpserver, connection);
+ gst_rtmp_connection_handshake_async (connection, TRUE,
+ NULL, gst_rtmp_server_handshake_done, rtmpserver);
return TRUE;
}
+static void
+gst_rtmp_server_handshake_done (GObject * source, GAsyncResult * result,
+ gpointer user_data)
+{
+ //GstRtmpServer *rtmpserver = GST_RTMP_SERVER (user_data);
+ GstRtmpConnection *connection = GST_RTMP_CONNECTION (source);
+ GError *error = NULL;
+ gboolean ret;
+
+ GST_ERROR ("handshake done");
+
+ ret = gst_rtmp_connection_handshake_finish (connection, result, &error);
+ if (!ret) {
+ g_error_free (error);
+ return;
+ }
+}
+
void
gst_rtmp_server_add_connection (GstRtmpServer * rtmpserver,
GstRtmpConnection * connection)
diff --git a/rtmp/rtmputils.c b/rtmp/rtmputils.c
new file mode 100644
index 0000000..3e4a1b5
--- /dev/null
+++ b/rtmp/rtmputils.c
@@ -0,0 +1,51 @@
+/* GStreamer RTMP Library
+ * Copyright (C) 2013 David Schleef <ds@schleef.org>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
+ * Boston, MA 02110-1335, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "rtmputils.h"
+
+
+void
+gst_rtmp_dump_data (GBytes * bytes)
+{
+ const guint8 *data;
+ gsize size;
+ int i, j;
+
+ data = g_bytes_get_data (bytes, &size);
+ for (i = 0; i < size; i += 16) {
+ g_print ("%04x: ", i);
+ for (j = 0; j < 16; j++) {
+ if (i + j < size) {
+ g_print ("%02x ", data[i + j]);
+ } else {
+ g_print (" ");
+ }
+ }
+ for (j = 0; j < 16; j++) {
+ if (i + j < size) {
+ g_print ("%c", g_ascii_isprint (data[i + j]) ? data[i + j] : '.');
+ }
+ }
+ g_print ("\n");
+ }
+}
diff --git a/rtmp/rtmputils.h b/rtmp/rtmputils.h
new file mode 100644
index 0000000..5cc4e6e
--- /dev/null
+++ b/rtmp/rtmputils.h
@@ -0,0 +1,32 @@
+/* GStreamer RTMP Library
+ * Copyright (C) 2013 David Schleef <ds@schleef.org>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef _GST_RTMP_UTILS_H_
+#define _GST_RTMP_UTILS_H_
+
+#include <glib.h>
+
+G_BEGIN_DECLS
+
+void gst_rtmp_dump_data (GBytes * bytes);
+
+G_END_DECLS
+
+#endif
+
diff --git a/tools/proxy-server.c b/tools/proxy-server.c
index fe8fc18..5abc013 100644
--- a/tools/proxy-server.c
+++ b/tools/proxy-server.c
@@ -25,6 +25,8 @@
#include <gio/gio.h>
#include <stdlib.h>
#include "rtmpserver.h"
+#include "rtmpclient.h"
+#include "rtmputils.h"
#define GETTEXT_PACKAGE NULL
@@ -34,8 +36,13 @@ add_connection (GstRtmpServer * server, GstRtmpConnection * connection,
static void
got_chunk (GstRtmpConnection * connection, GstRtmpChunk * chunk,
gpointer user_data);
-static void dump_data (GBytes * bytes);
+static void
+connect_done (GObject * source, GAsyncResult * result, gpointer user_data);
+GstRtmpServer *server;
+GstRtmpClient *client;
+GCancellable *cancellable;
+GstRtmpChunk *proxy_chunk;
gboolean verbose;
@@ -51,7 +58,6 @@ main (int argc, char *argv[])
GError *error = NULL;
GOptionContext *context;
GMainLoop *main_loop;
- GstRtmpServer *server;
context = g_option_context_new ("- FIXME");
g_option_context_add_main_entries (context, entries, GETTEXT_PACKAGE);
@@ -67,6 +73,10 @@ main (int argc, char *argv[])
NULL);
gst_rtmp_server_start (server);
+ client = gst_rtmp_client_new ();
+ cancellable = g_cancellable_new ();
+
+
main_loop = g_main_loop_new (NULL, TRUE);
g_main_loop_run (main_loop);
@@ -77,9 +87,11 @@ static void
add_connection (GstRtmpServer * server, GstRtmpConnection * connection,
gpointer user_data)
{
- GST_ERROR ("new connection");
+ GST_INFO ("new connection");
g_signal_connect (connection, "got-chunk", G_CALLBACK (got_chunk), NULL);
+
+ gst_rtmp_client_connect_async (client, cancellable, connect_done, client);
}
static void
@@ -87,35 +99,43 @@ got_chunk (GstRtmpConnection * connection, GstRtmpChunk * chunk,
gpointer user_data)
{
GBytes *bytes;
+ GstRtmpConnection *proxy_conn;
- GST_ERROR ("got chunk");
+ GST_INFO ("got chunk");
bytes = gst_rtmp_chunk_get_payload (chunk);
- dump_data (bytes);
+ gst_rtmp_dump_data (bytes);
+
+ proxy_conn = gst_rtmp_client_get_connection (client);
+
+ g_object_ref (chunk);
+ if (proxy_conn) {
+ gst_rtmp_connection_queue_chunk (proxy_conn, chunk);
+ } else {
+ /* save it for after the connection is complete */
+ proxy_chunk = chunk;
+ }
}
static void
-dump_data (GBytes * bytes)
+connect_done (GObject * source, GAsyncResult * result, gpointer user_data)
{
- const guint8 *data;
- gsize size;
- int i, j;
-
- data = g_bytes_get_data (bytes, &size);
- for (i = 0; i < size; i += 16) {
- g_print ("%04x: ", i);
- for (j = 0; j < 16; j++) {
- if (i + j < size) {
- g_print ("%02x ", data[i + j]);
- } else {
- g_print (" ");
- }
- }
- for (j = 0; j < 16; j++) {
- if (i + j < size) {
- g_print ("%c", g_ascii_isprint (data[i + j]) ? data[i + j] : '.');
- }
- }
- g_print ("\n");
+ GstRtmpClient *client = user_data;
+ GError *error = NULL;
+ gboolean ret;
+
+ GST_INFO ("connect_done");
+
+ ret = gst_rtmp_client_connect_finish (client, result, &error);
+ if (!ret) {
+ GST_ERROR ("error: %s", error->message);
+ g_error_free (error);
+ }
+ if (proxy_chunk) {
+ GstRtmpConnection *proxy_conn;
+
+ proxy_conn = gst_rtmp_client_get_connection (client);
+ gst_rtmp_connection_queue_chunk (proxy_conn, proxy_chunk);
+ proxy_chunk = NULL;
}
}