summaryrefslogtreecommitdiff
path: root/rtmp
diff options
context:
space:
mode:
authorDavid Schleef <ds@schleef.org>2014-08-27 00:04:10 -0700
committerDavid Schleef <ds@schleef.org>2014-08-27 00:04:10 -0700
commit8b025005d2bff72434fd6fe0125109a6f23d2390 (patch)
tree1e353faf7b78ff20ba1c88c7c07bb8fa0ef24fa5 /rtmp
parentd84190d24394d3007ebad8f740cc32a16472028a (diff)
hacking, cleanup
Diffstat (limited to 'rtmp')
-rw-r--r--rtmp/rtmpchunk.c2
-rw-r--r--rtmp/rtmpclient.c39
-rw-r--r--rtmp/rtmpclient.h6
-rw-r--r--rtmp/rtmpconnection.c91
-rw-r--r--rtmp/rtmpconnection.h5
-rw-r--r--rtmp/rtmputils.c12
-rw-r--r--rtmp/rtmputils.h1
7 files changed, 103 insertions, 53 deletions
diff --git a/rtmp/rtmpchunk.c b/rtmp/rtmpchunk.c
index f8bdd90..b23fe45 100644
--- a/rtmp/rtmpchunk.c
+++ b/rtmp/rtmpchunk.c
@@ -292,7 +292,7 @@ gst_rtmp_chunk_serialize (GstRtmpChunk * chunk,
memcpy (data + offset, chunkdata, chunksize);
offset += chunksize;
}
- GST_ERROR ("type: %d in: %" G_GSIZE_FORMAT " out: %d", chunk->message_type_id,
+ GST_DEBUG ("type: %d in: %" G_GSIZE_FORMAT " out: %d", chunk->message_type_id,
chunksize, offset);
return g_bytes_new_take (data, offset);
diff --git a/rtmp/rtmpclient.c b/rtmp/rtmpclient.c
index 088b632..8b9eae8 100644
--- a/rtmp/rtmpclient.c
+++ b/rtmp/rtmpclient.c
@@ -49,9 +49,14 @@ gst_rtmp_client_handshake_done (GObject * source, GAsyncResult * result,
enum
{
- PROP_0
+ PROP_0,
+ PROP_SERVER_ADDRESS,
+ PROP_SERVER_PORT
};
+#define DEFAULT_SERVER_ADDRESS ""
+#define DEFAULT_SERVER_PORT 1935
+
/* pad templates */
@@ -75,6 +80,8 @@ gst_rtmp_client_class_init (GstRtmpClientClass * klass)
static void
gst_rtmp_client_init (GstRtmpClient * rtmpclient)
{
+ rtmpclient->server_address = g_strdup (DEFAULT_SERVER_ADDRESS);
+ rtmpclient->server_port = DEFAULT_SERVER_PORT;
}
void
@@ -86,6 +93,13 @@ gst_rtmp_client_set_property (GObject * object, guint property_id,
GST_DEBUG_OBJECT (rtmpclient, "set_property");
switch (property_id) {
+ case PROP_SERVER_ADDRESS:
+ gst_rtmp_client_set_server_address (rtmpclient,
+ g_value_get_string (value));
+ break;
+ case PROP_SERVER_PORT:
+ gst_rtmp_client_set_server_port (rtmpclient, g_value_get_int (value));
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
break;
@@ -101,6 +115,12 @@ gst_rtmp_client_get_property (GObject * object, guint property_id,
GST_DEBUG_OBJECT (rtmpclient, "get_property");
switch (property_id) {
+ case PROP_SERVER_ADDRESS:
+ g_value_set_string (value, rtmpclient->server_address);
+ break;
+ case PROP_SERVER_PORT:
+ g_value_set_int (value, rtmpclient->server_port);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
break;
@@ -142,10 +162,17 @@ gst_rtmp_client_new (void)
}
void
-gst_rtmp_client_set_host (GstRtmpClient * client, const char *host)
+gst_rtmp_client_set_server_address (GstRtmpClient * client,
+ const char *server_address)
+{
+ g_free (client->server_address);
+ client->server_address = g_strdup (server_address);
+}
+
+void
+gst_rtmp_client_set_server_port (GstRtmpClient * client, int port)
{
- g_free (client->host);
- client->host = g_strdup (host);
+ client->server_port = port;
}
void
@@ -177,9 +204,7 @@ gst_rtmp_client_connect_async (GstRtmpClient * client,
client->cancellable = cancellable;
client->async = async;
- addr =
- g_network_address_new
- ("ec2-54-190-75-249.us-west-2.compute.amazonaws.com", 1935);
+ addr = g_network_address_new (client->server_address, client->server_port);
client->socket_client = g_socket_client_new ();
GST_DEBUG ("g_socket_client_connect_async");
diff --git a/rtmp/rtmpclient.h b/rtmp/rtmpclient.h
index 3a209db..280e161 100644
--- a/rtmp/rtmpclient.h
+++ b/rtmp/rtmpclient.h
@@ -58,8 +58,8 @@ struct _GstRtmpClient
GObject object;
/* properties */
- char *host;
- int port;
+ char *server_address;
+ int server_port;
char *stream;
/* private */
@@ -90,6 +90,8 @@ GType gst_rtmp_client_get_type (void);
GstRtmpClient *gst_rtmp_client_new (void);
void gst_rtmp_client_set_url (GstRtmpClient *client, const char *url);
+void gst_rtmp_client_set_server_address (GstRtmpClient * client, const char *host);
+void gst_rtmp_client_set_server_port (GstRtmpClient * client, int port);
void gst_rtmp_client_connect_async (GstRtmpClient *client,
GCancellable *cancellable, GAsyncReadyCallback callback,
diff --git a/rtmp/rtmpconnection.c b/rtmp/rtmpconnection.c
index d0b7d86..2e9e919 100644
--- a/rtmp/rtmpconnection.c
+++ b/rtmp/rtmpconnection.c
@@ -241,8 +241,6 @@ gst_rtmp_connection_input_ready (GInputStream * is, gpointer user_data)
sc->input_bytes = g_bytes_new_take (data, ret);
}
- //GST_ERROR("ic: %p", sc->input_callback);
- //GST_ERROR("in queue: %" G_GSIZE_FORMAT, g_bytes_get_size (sc->input_bytes));
GST_DEBUG ("needed: %" G_GSIZE_FORMAT, sc->input_needed_bytes);
while (sc->input_callback && sc->input_bytes &&
@@ -263,33 +261,35 @@ gst_rtmp_connection_output_ready (GOutputStream * os, gpointer user_data)
{
GstRtmpConnection *sc = GST_RTMP_CONNECTION (user_data);
GstRtmpChunk *chunk;
- GBytes *bytes;
- GstRtmpChunkCacheEntry *entry;
GST_DEBUG ("output ready");
- if (sc->writing) {
- GST_DEBUG ("busy writing");
+ if (sc->writing)
return G_SOURCE_REMOVE;
- }
- chunk = g_queue_pop_head (sc->output_queue);
- if (!chunk) {
- return G_SOURCE_REMOVE;
+ if (sc->output_chunk) {
+ chunk = sc->output_chunk;
+ } else {
+ GstRtmpChunkCacheEntry *entry;
+
+ chunk = g_queue_pop_head (sc->output_queue);
+ if (!chunk) {
+ return G_SOURCE_REMOVE;
+ }
+ sc->output_chunk = chunk;
+
+ entry = gst_rtmp_chunk_cache_get (sc->output_chunk_cache, chunk->stream_id);
+ sc->output_bytes = gst_rtmp_chunk_serialize (chunk,
+ &entry->previous_header, sc->out_chunk_size);
+ g_bytes_ref (sc->output_bytes);
+ gst_rtmp_chunk_cache_update (entry, chunk);
}
- sc->writing = TRUE;
os = g_io_stream_get_output_stream (G_IO_STREAM (sc->connection));
+ g_output_stream_write_bytes_async (os, sc->output_bytes, G_PRIORITY_DEFAULT,
+ sc->cancellable, gst_rtmp_connection_write_chunk_done, sc);
+ sc->writing = TRUE;
- entry = gst_rtmp_chunk_cache_get (sc->output_chunk_cache, chunk->stream_id);
- bytes = gst_rtmp_chunk_serialize (chunk, &entry->previous_header,
- sc->out_chunk_size);
- gst_rtmp_chunk_cache_update (entry, chunk);
- //gst_rtmp_dump_data (bytes);
- g_output_stream_write_bytes_async (os, bytes, G_PRIORITY_DEFAULT,
- sc->cancellable, gst_rtmp_connection_write_chunk_done, chunk);
-
- //return G_SOURCE_CONTINUE;
return G_SOURCE_REMOVE;
}
@@ -298,8 +298,8 @@ gst_rtmp_connection_write_chunk_done (GObject * obj,
GAsyncResult * res, gpointer user_data)
{
GOutputStream *os = G_OUTPUT_STREAM (obj);
- GstRtmpChunk *chunk = GST_RTMP_CHUNK (user_data);
- GstRtmpConnection *connection = GST_RTMP_CONNECTION (chunk->priv);
+ GstRtmpConnection *connection = GST_RTMP_CONNECTION (user_data);
+ GstRtmpChunk *chunk = connection->output_chunk;
GError *error = NULL;
gssize ret;
@@ -314,8 +314,18 @@ gst_rtmp_connection_write_chunk_done (GObject * obj,
g_error_free (error);
return;
}
- GST_ERROR ("write %" G_GSSIZE_FORMAT " bytes", ret);
- g_object_unref (chunk);
+ if (ret < g_bytes_get_size (connection->output_bytes)) {
+ GST_DEBUG ("short write %" G_GSIZE_FORMAT " < %" G_GSIZE_FORMAT,
+ ret, g_bytes_get_size (connection->output_bytes));
+
+ connection->output_bytes =
+ gst_rtmp_bytes_remove (connection->output_bytes, ret);
+ } else {
+ g_bytes_unref (connection->output_bytes);
+ connection->output_bytes = NULL;
+ g_object_unref (chunk);
+ connection->output_chunk = NULL;
+ }
gst_rtmp_connection_start_output (connection);
}
@@ -429,11 +439,11 @@ gst_rtmp_connection_server_handshake2 (GstRtmpConnection * sc)
g_bytes_unref (bytes);
/* handshake finished */
- GST_ERROR ("server handshake finished");
+ GST_INFO ("server handshake finished");
sc->handshake_complete = TRUE;
if (sc->input_bytes && g_bytes_get_size (sc->input_bytes) >= 1) {
- GST_ERROR ("spare bytes after handshake: %" G_GSIZE_FORMAT,
+ GST_DEBUG ("spare bytes after handshake: %" G_GSIZE_FORMAT,
g_bytes_get_size (sc->input_bytes));
gst_rtmp_connection_chunk_callback (sc);
}
@@ -462,8 +472,6 @@ gst_rtmp_connection_chunk_callback (GstRtmpConnection * sc)
if (sc->input_bytes == NULL)
break;
- //gst_rtmp_dump_data (sc->input_bytes);
-
ret = gst_rtmp_chunk_parse_header1 (&header, sc->input_bytes);
if (!ret) {
needed_bytes = header.header_size;
@@ -473,7 +481,7 @@ gst_rtmp_connection_chunk_callback (GstRtmpConnection * sc)
entry = gst_rtmp_chunk_cache_get (sc->input_chunk_cache, header.stream_id);
if (entry->chunk && header.format != 3) {
- GST_ERROR ("expected message continuation, got new message");
+ GST_ERROR ("expected message continuation, but got new message");
}
ret = gst_rtmp_chunk_parse_header2 (&header, sc->input_bytes,
@@ -521,15 +529,13 @@ gst_rtmp_connection_chunk_callback (GstRtmpConnection * sc)
header.message_length);
entry->payload = NULL;
- //gst_rtmp_dump_data (entry->chunk->payload);
-
if (entry->chunk->stream_id == 0x02) {
- GST_ERROR ("got protocol control message, type: %d",
+ GST_DEBUG ("got protocol control message, type: %d",
entry->chunk->message_type_id);
gst_rtmp_connection_handle_pcm (sc, entry->chunk);
g_object_unref (entry->chunk);
} else {
- GST_ERROR ("got chunk: %" G_GSIZE_FORMAT " bytes",
+ GST_DEBUG ("got chunk: %" G_GSIZE_FORMAT " bytes",
entry->chunk->message_length);
g_signal_emit_by_name (sc, "got-chunk", entry->chunk);
g_object_unref (entry->chunk);
@@ -557,29 +563,29 @@ gst_rtmp_connection_handle_pcm (GstRtmpConnection * connection,
switch (chunk->message_type_id) {
case 0x01:
moo = GST_READ_UINT32_BE (data);
- GST_ERROR ("new chunk size %d", moo);
+ GST_INFO ("new chunk size %d", moo);
connection->in_chunk_size = moo;
break;
case 0x02:
moo = GST_READ_UINT32_BE (data);
- GST_ERROR ("chunk abort, stream_id = %d", moo);
+ GST_INFO ("chunk abort, stream_id = %d", moo);
break;
case 0x03:
moo = GST_READ_UINT32_BE (data);
- GST_ERROR ("acknowledgement %d", moo);
+ GST_INFO ("acknowledgement %d", moo);
break;
case 0x04:
moo = GST_READ_UINT16_BE (data);
moo2 = GST_READ_UINT32_BE (data + 2);
- GST_ERROR ("control: %d, %d", moo, moo2);
+ GST_INFO ("control: %d, %d", moo, moo2);
break;
case 0x05:
moo = GST_READ_UINT32_BE (data);
- GST_ERROR ("window ack size: %d", moo);
+ GST_INFO ("window ack size: %d", moo);
break;
case 0x06:
moo = GST_READ_UINT32_BE (data);
- GST_ERROR ("set peer bandwidth: %d", moo);
+ GST_INFO ("set peer bandwidth: %d", moo);
break;
default:
GST_ERROR ("unimplemented");
@@ -594,7 +600,6 @@ gst_rtmp_connection_queue_chunk (GstRtmpConnection * connection,
g_return_if_fail (GST_IS_RTMP_CONNECTION (connection));
g_return_if_fail (GST_IS_RTMP_CHUNK (chunk));
- GST_ERROR ("queued");
chunk->priv = connection;
g_queue_push_tail (connection->output_queue, chunk);
gst_rtmp_connection_start_output (connection);
@@ -617,7 +622,7 @@ gst_rtmp_connection_set_input_callback (GstRtmpConnection * connection,
g_bytes_get_size (connection->input_bytes) >=
connection->input_needed_bytes) {
GstRtmpConnectionCallback callback;
- GST_ERROR ("got %" G_GSIZE_FORMAT " bytes, calling callback",
+ GST_DEBUG ("got %" G_GSIZE_FORMAT " bytes, calling callback",
g_bytes_get_size (connection->input_bytes));
callback = connection->input_callback;
connection->input_callback = NULL;
@@ -738,11 +743,11 @@ gst_rtmp_connection_client_handshake2_done (GObject * obj,
GST_DEBUG ("wrote %" G_GSSIZE_FORMAT " bytes", ret);
/* handshake finished */
- GST_ERROR ("client handshake finished");
+ GST_INFO ("client handshake finished");
sc->handshake_complete = TRUE;
if (sc->input_bytes && g_bytes_get_size (sc->input_bytes) >= 1) {
- GST_ERROR ("spare bytes after handshake: %" G_GSIZE_FORMAT,
+ GST_DEBUG ("spare bytes after handshake: %" G_GSIZE_FORMAT,
g_bytes_get_size (sc->input_bytes));
gst_rtmp_connection_chunk_callback (sc);
} else {
diff --git a/rtmp/rtmpconnection.h b/rtmp/rtmpconnection.h
index deae843..c00ddee 100644
--- a/rtmp/rtmpconnection.h
+++ b/rtmp/rtmpconnection.h
@@ -60,6 +60,11 @@ struct _GstRtmpConnection
GstRtmpChunkCache *input_chunk_cache;
GstRtmpChunkCache *output_chunk_cache;
+ /* chunk currently being written */
+ GstRtmpChunk *output_chunk;
+ GBytes *output_bytes;
+
+ /* RTMP configuration */
gsize in_chunk_size;
gsize out_chunk_size;
};
diff --git a/rtmp/rtmputils.c b/rtmp/rtmputils.c
index 32354ea..cb0474f 100644
--- a/rtmp/rtmputils.c
+++ b/rtmp/rtmputils.c
@@ -70,3 +70,15 @@ gst_rtmp_bytes_append (GBytes * bytes, guint8 * data, gsize size)
return g_bytes_new_take (outdata, size1 + size);
}
+
+GBytes *
+gst_rtmp_bytes_remove (GBytes * bytes, gsize size)
+{
+ GBytes *new_bytes;
+
+ new_bytes =
+ g_bytes_new_from_bytes (bytes, size, g_bytes_get_size (bytes) - size);
+ g_bytes_unref (bytes);
+
+ return new_bytes;
+}
diff --git a/rtmp/rtmputils.h b/rtmp/rtmputils.h
index 8c902e3..2a58f82 100644
--- a/rtmp/rtmputils.h
+++ b/rtmp/rtmputils.h
@@ -26,6 +26,7 @@ G_BEGIN_DECLS
void gst_rtmp_dump_data (GBytes * bytes);
GBytes *gst_rtmp_bytes_append (GBytes *bytes, guint8 *data, gsize size);
+GBytes *gst_rtmp_bytes_remove (GBytes *bytes, gsize size);
G_END_DECLS