diff options
author | David Schleef <ds@schleef.org> | 2014-08-27 00:04:10 -0700 |
---|---|---|
committer | David Schleef <ds@schleef.org> | 2014-08-27 00:04:10 -0700 |
commit | 8b025005d2bff72434fd6fe0125109a6f23d2390 (patch) | |
tree | 1e353faf7b78ff20ba1c88c7c07bb8fa0ef24fa5 /rtmp | |
parent | d84190d24394d3007ebad8f740cc32a16472028a (diff) |
hacking, cleanup
Diffstat (limited to 'rtmp')
-rw-r--r-- | rtmp/rtmpchunk.c | 2 | ||||
-rw-r--r-- | rtmp/rtmpclient.c | 39 | ||||
-rw-r--r-- | rtmp/rtmpclient.h | 6 | ||||
-rw-r--r-- | rtmp/rtmpconnection.c | 91 | ||||
-rw-r--r-- | rtmp/rtmpconnection.h | 5 | ||||
-rw-r--r-- | rtmp/rtmputils.c | 12 | ||||
-rw-r--r-- | rtmp/rtmputils.h | 1 |
7 files changed, 103 insertions, 53 deletions
diff --git a/rtmp/rtmpchunk.c b/rtmp/rtmpchunk.c index f8bdd90..b23fe45 100644 --- a/rtmp/rtmpchunk.c +++ b/rtmp/rtmpchunk.c @@ -292,7 +292,7 @@ gst_rtmp_chunk_serialize (GstRtmpChunk * chunk, memcpy (data + offset, chunkdata, chunksize); offset += chunksize; } - GST_ERROR ("type: %d in: %" G_GSIZE_FORMAT " out: %d", chunk->message_type_id, + GST_DEBUG ("type: %d in: %" G_GSIZE_FORMAT " out: %d", chunk->message_type_id, chunksize, offset); return g_bytes_new_take (data, offset); diff --git a/rtmp/rtmpclient.c b/rtmp/rtmpclient.c index 088b632..8b9eae8 100644 --- a/rtmp/rtmpclient.c +++ b/rtmp/rtmpclient.c @@ -49,9 +49,14 @@ gst_rtmp_client_handshake_done (GObject * source, GAsyncResult * result, enum { - PROP_0 + PROP_0, + PROP_SERVER_ADDRESS, + PROP_SERVER_PORT }; +#define DEFAULT_SERVER_ADDRESS "" +#define DEFAULT_SERVER_PORT 1935 + /* pad templates */ @@ -75,6 +80,8 @@ gst_rtmp_client_class_init (GstRtmpClientClass * klass) static void gst_rtmp_client_init (GstRtmpClient * rtmpclient) { + rtmpclient->server_address = g_strdup (DEFAULT_SERVER_ADDRESS); + rtmpclient->server_port = DEFAULT_SERVER_PORT; } void @@ -86,6 +93,13 @@ gst_rtmp_client_set_property (GObject * object, guint property_id, GST_DEBUG_OBJECT (rtmpclient, "set_property"); switch (property_id) { + case PROP_SERVER_ADDRESS: + gst_rtmp_client_set_server_address (rtmpclient, + g_value_get_string (value)); + break; + case PROP_SERVER_PORT: + gst_rtmp_client_set_server_port (rtmpclient, g_value_get_int (value)); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); break; @@ -101,6 +115,12 @@ gst_rtmp_client_get_property (GObject * object, guint property_id, GST_DEBUG_OBJECT (rtmpclient, "get_property"); switch (property_id) { + case PROP_SERVER_ADDRESS: + g_value_set_string (value, rtmpclient->server_address); + break; + case PROP_SERVER_PORT: + g_value_set_int (value, rtmpclient->server_port); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); break; @@ -142,10 +162,17 @@ gst_rtmp_client_new (void) } void -gst_rtmp_client_set_host (GstRtmpClient * client, const char *host) +gst_rtmp_client_set_server_address (GstRtmpClient * client, + const char *server_address) +{ + g_free (client->server_address); + client->server_address = g_strdup (server_address); +} + +void +gst_rtmp_client_set_server_port (GstRtmpClient * client, int port) { - g_free (client->host); - client->host = g_strdup (host); + client->server_port = port; } void @@ -177,9 +204,7 @@ gst_rtmp_client_connect_async (GstRtmpClient * client, client->cancellable = cancellable; client->async = async; - addr = - g_network_address_new - ("ec2-54-190-75-249.us-west-2.compute.amazonaws.com", 1935); + addr = g_network_address_new (client->server_address, client->server_port); client->socket_client = g_socket_client_new (); GST_DEBUG ("g_socket_client_connect_async"); diff --git a/rtmp/rtmpclient.h b/rtmp/rtmpclient.h index 3a209db..280e161 100644 --- a/rtmp/rtmpclient.h +++ b/rtmp/rtmpclient.h @@ -58,8 +58,8 @@ struct _GstRtmpClient GObject object; /* properties */ - char *host; - int port; + char *server_address; + int server_port; char *stream; /* private */ @@ -90,6 +90,8 @@ GType gst_rtmp_client_get_type (void); GstRtmpClient *gst_rtmp_client_new (void); void gst_rtmp_client_set_url (GstRtmpClient *client, const char *url); +void gst_rtmp_client_set_server_address (GstRtmpClient * client, const char *host); +void gst_rtmp_client_set_server_port (GstRtmpClient * client, int port); void gst_rtmp_client_connect_async (GstRtmpClient *client, GCancellable *cancellable, GAsyncReadyCallback callback, diff --git a/rtmp/rtmpconnection.c b/rtmp/rtmpconnection.c index d0b7d86..2e9e919 100644 --- a/rtmp/rtmpconnection.c +++ b/rtmp/rtmpconnection.c @@ -241,8 +241,6 @@ gst_rtmp_connection_input_ready (GInputStream * is, gpointer user_data) sc->input_bytes = g_bytes_new_take (data, ret); } - //GST_ERROR("ic: %p", sc->input_callback); - //GST_ERROR("in queue: %" G_GSIZE_FORMAT, g_bytes_get_size (sc->input_bytes)); GST_DEBUG ("needed: %" G_GSIZE_FORMAT, sc->input_needed_bytes); while (sc->input_callback && sc->input_bytes && @@ -263,33 +261,35 @@ gst_rtmp_connection_output_ready (GOutputStream * os, gpointer user_data) { GstRtmpConnection *sc = GST_RTMP_CONNECTION (user_data); GstRtmpChunk *chunk; - GBytes *bytes; - GstRtmpChunkCacheEntry *entry; GST_DEBUG ("output ready"); - if (sc->writing) { - GST_DEBUG ("busy writing"); + if (sc->writing) return G_SOURCE_REMOVE; - } - chunk = g_queue_pop_head (sc->output_queue); - if (!chunk) { - return G_SOURCE_REMOVE; + if (sc->output_chunk) { + chunk = sc->output_chunk; + } else { + GstRtmpChunkCacheEntry *entry; + + chunk = g_queue_pop_head (sc->output_queue); + if (!chunk) { + return G_SOURCE_REMOVE; + } + sc->output_chunk = chunk; + + entry = gst_rtmp_chunk_cache_get (sc->output_chunk_cache, chunk->stream_id); + sc->output_bytes = gst_rtmp_chunk_serialize (chunk, + &entry->previous_header, sc->out_chunk_size); + g_bytes_ref (sc->output_bytes); + gst_rtmp_chunk_cache_update (entry, chunk); } - sc->writing = TRUE; os = g_io_stream_get_output_stream (G_IO_STREAM (sc->connection)); + g_output_stream_write_bytes_async (os, sc->output_bytes, G_PRIORITY_DEFAULT, + sc->cancellable, gst_rtmp_connection_write_chunk_done, sc); + sc->writing = TRUE; - entry = gst_rtmp_chunk_cache_get (sc->output_chunk_cache, chunk->stream_id); - bytes = gst_rtmp_chunk_serialize (chunk, &entry->previous_header, - sc->out_chunk_size); - gst_rtmp_chunk_cache_update (entry, chunk); - //gst_rtmp_dump_data (bytes); - g_output_stream_write_bytes_async (os, bytes, G_PRIORITY_DEFAULT, - sc->cancellable, gst_rtmp_connection_write_chunk_done, chunk); - - //return G_SOURCE_CONTINUE; return G_SOURCE_REMOVE; } @@ -298,8 +298,8 @@ gst_rtmp_connection_write_chunk_done (GObject * obj, GAsyncResult * res, gpointer user_data) { GOutputStream *os = G_OUTPUT_STREAM (obj); - GstRtmpChunk *chunk = GST_RTMP_CHUNK (user_data); - GstRtmpConnection *connection = GST_RTMP_CONNECTION (chunk->priv); + GstRtmpConnection *connection = GST_RTMP_CONNECTION (user_data); + GstRtmpChunk *chunk = connection->output_chunk; GError *error = NULL; gssize ret; @@ -314,8 +314,18 @@ gst_rtmp_connection_write_chunk_done (GObject * obj, g_error_free (error); return; } - GST_ERROR ("write %" G_GSSIZE_FORMAT " bytes", ret); - g_object_unref (chunk); + if (ret < g_bytes_get_size (connection->output_bytes)) { + GST_DEBUG ("short write %" G_GSIZE_FORMAT " < %" G_GSIZE_FORMAT, + ret, g_bytes_get_size (connection->output_bytes)); + + connection->output_bytes = + gst_rtmp_bytes_remove (connection->output_bytes, ret); + } else { + g_bytes_unref (connection->output_bytes); + connection->output_bytes = NULL; + g_object_unref (chunk); + connection->output_chunk = NULL; + } gst_rtmp_connection_start_output (connection); } @@ -429,11 +439,11 @@ gst_rtmp_connection_server_handshake2 (GstRtmpConnection * sc) g_bytes_unref (bytes); /* handshake finished */ - GST_ERROR ("server handshake finished"); + GST_INFO ("server handshake finished"); sc->handshake_complete = TRUE; if (sc->input_bytes && g_bytes_get_size (sc->input_bytes) >= 1) { - GST_ERROR ("spare bytes after handshake: %" G_GSIZE_FORMAT, + GST_DEBUG ("spare bytes after handshake: %" G_GSIZE_FORMAT, g_bytes_get_size (sc->input_bytes)); gst_rtmp_connection_chunk_callback (sc); } @@ -462,8 +472,6 @@ gst_rtmp_connection_chunk_callback (GstRtmpConnection * sc) if (sc->input_bytes == NULL) break; - //gst_rtmp_dump_data (sc->input_bytes); - ret = gst_rtmp_chunk_parse_header1 (&header, sc->input_bytes); if (!ret) { needed_bytes = header.header_size; @@ -473,7 +481,7 @@ gst_rtmp_connection_chunk_callback (GstRtmpConnection * sc) entry = gst_rtmp_chunk_cache_get (sc->input_chunk_cache, header.stream_id); if (entry->chunk && header.format != 3) { - GST_ERROR ("expected message continuation, got new message"); + GST_ERROR ("expected message continuation, but got new message"); } ret = gst_rtmp_chunk_parse_header2 (&header, sc->input_bytes, @@ -521,15 +529,13 @@ gst_rtmp_connection_chunk_callback (GstRtmpConnection * sc) header.message_length); entry->payload = NULL; - //gst_rtmp_dump_data (entry->chunk->payload); - if (entry->chunk->stream_id == 0x02) { - GST_ERROR ("got protocol control message, type: %d", + GST_DEBUG ("got protocol control message, type: %d", entry->chunk->message_type_id); gst_rtmp_connection_handle_pcm (sc, entry->chunk); g_object_unref (entry->chunk); } else { - GST_ERROR ("got chunk: %" G_GSIZE_FORMAT " bytes", + GST_DEBUG ("got chunk: %" G_GSIZE_FORMAT " bytes", entry->chunk->message_length); g_signal_emit_by_name (sc, "got-chunk", entry->chunk); g_object_unref (entry->chunk); @@ -557,29 +563,29 @@ gst_rtmp_connection_handle_pcm (GstRtmpConnection * connection, switch (chunk->message_type_id) { case 0x01: moo = GST_READ_UINT32_BE (data); - GST_ERROR ("new chunk size %d", moo); + GST_INFO ("new chunk size %d", moo); connection->in_chunk_size = moo; break; case 0x02: moo = GST_READ_UINT32_BE (data); - GST_ERROR ("chunk abort, stream_id = %d", moo); + GST_INFO ("chunk abort, stream_id = %d", moo); break; case 0x03: moo = GST_READ_UINT32_BE (data); - GST_ERROR ("acknowledgement %d", moo); + GST_INFO ("acknowledgement %d", moo); break; case 0x04: moo = GST_READ_UINT16_BE (data); moo2 = GST_READ_UINT32_BE (data + 2); - GST_ERROR ("control: %d, %d", moo, moo2); + GST_INFO ("control: %d, %d", moo, moo2); break; case 0x05: moo = GST_READ_UINT32_BE (data); - GST_ERROR ("window ack size: %d", moo); + GST_INFO ("window ack size: %d", moo); break; case 0x06: moo = GST_READ_UINT32_BE (data); - GST_ERROR ("set peer bandwidth: %d", moo); + GST_INFO ("set peer bandwidth: %d", moo); break; default: GST_ERROR ("unimplemented"); @@ -594,7 +600,6 @@ gst_rtmp_connection_queue_chunk (GstRtmpConnection * connection, g_return_if_fail (GST_IS_RTMP_CONNECTION (connection)); g_return_if_fail (GST_IS_RTMP_CHUNK (chunk)); - GST_ERROR ("queued"); chunk->priv = connection; g_queue_push_tail (connection->output_queue, chunk); gst_rtmp_connection_start_output (connection); @@ -617,7 +622,7 @@ gst_rtmp_connection_set_input_callback (GstRtmpConnection * connection, g_bytes_get_size (connection->input_bytes) >= connection->input_needed_bytes) { GstRtmpConnectionCallback callback; - GST_ERROR ("got %" G_GSIZE_FORMAT " bytes, calling callback", + GST_DEBUG ("got %" G_GSIZE_FORMAT " bytes, calling callback", g_bytes_get_size (connection->input_bytes)); callback = connection->input_callback; connection->input_callback = NULL; @@ -738,11 +743,11 @@ gst_rtmp_connection_client_handshake2_done (GObject * obj, GST_DEBUG ("wrote %" G_GSSIZE_FORMAT " bytes", ret); /* handshake finished */ - GST_ERROR ("client handshake finished"); + GST_INFO ("client handshake finished"); sc->handshake_complete = TRUE; if (sc->input_bytes && g_bytes_get_size (sc->input_bytes) >= 1) { - GST_ERROR ("spare bytes after handshake: %" G_GSIZE_FORMAT, + GST_DEBUG ("spare bytes after handshake: %" G_GSIZE_FORMAT, g_bytes_get_size (sc->input_bytes)); gst_rtmp_connection_chunk_callback (sc); } else { diff --git a/rtmp/rtmpconnection.h b/rtmp/rtmpconnection.h index deae843..c00ddee 100644 --- a/rtmp/rtmpconnection.h +++ b/rtmp/rtmpconnection.h @@ -60,6 +60,11 @@ struct _GstRtmpConnection GstRtmpChunkCache *input_chunk_cache; GstRtmpChunkCache *output_chunk_cache; + /* chunk currently being written */ + GstRtmpChunk *output_chunk; + GBytes *output_bytes; + + /* RTMP configuration */ gsize in_chunk_size; gsize out_chunk_size; }; diff --git a/rtmp/rtmputils.c b/rtmp/rtmputils.c index 32354ea..cb0474f 100644 --- a/rtmp/rtmputils.c +++ b/rtmp/rtmputils.c @@ -70,3 +70,15 @@ gst_rtmp_bytes_append (GBytes * bytes, guint8 * data, gsize size) return g_bytes_new_take (outdata, size1 + size); } + +GBytes * +gst_rtmp_bytes_remove (GBytes * bytes, gsize size) +{ + GBytes *new_bytes; + + new_bytes = + g_bytes_new_from_bytes (bytes, size, g_bytes_get_size (bytes) - size); + g_bytes_unref (bytes); + + return new_bytes; +} diff --git a/rtmp/rtmputils.h b/rtmp/rtmputils.h index 8c902e3..2a58f82 100644 --- a/rtmp/rtmputils.h +++ b/rtmp/rtmputils.h @@ -26,6 +26,7 @@ G_BEGIN_DECLS void gst_rtmp_dump_data (GBytes * bytes); GBytes *gst_rtmp_bytes_append (GBytes *bytes, guint8 *data, gsize size); +GBytes *gst_rtmp_bytes_remove (GBytes *bytes, gsize size); G_END_DECLS |