diff options
author | Søren Sandmann <sandmann@redhat.com> | 2007-07-31 21:13:44 -0400 |
---|---|---|
committer | Søren Sandmann <sandmann@redhat.com> | 2007-07-31 21:13:44 -0400 |
commit | c6bdf7500b3ac117326d7d3fa6b2289fd2d39964 (patch) | |
tree | d35c157c4a974573de0c65516f7b5e1de60e1aa4 | |
parent | 2d6326cf8bd33604f6d8097e55a7d9165a5acaa7 (diff) |
Consolidate connections into LacConnection
-rw-r--r-- | src/Makefile.am | 3 | ||||
-rw-r--r-- | src/lac.h | 79 | ||||
-rw-r--r-- | src/lacconnection.c | 553 | ||||
-rw-r--r-- | src/lacdns-nameserver.c | 2 | ||||
-rw-r--r-- | src/lacgenconnection.c | 187 | ||||
-rw-r--r-- | src/lachttp.c | 2 | ||||
-rw-r--r-- | src/lacinternals.h | 54 | ||||
-rw-r--r-- | src/lactcpconnection.c | 524 | ||||
-rw-r--r-- | src/lactlsconnection.c | 116 | ||||
-rw-r--r-- | tests/connection-test.c | 2 |
10 files changed, 749 insertions, 773 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index 489571a..8976d15 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -21,9 +21,8 @@ liblac_1_la_SOURCES = \ lacuri.c \ lacconnection.c \ lactlsconnection.c \ - lacgenconnection.c \ + lactcpconnection.c \ lachttp.c \ - lacimap.c \ \ lacinternals.h \ lacdns-messages.h \ @@ -264,7 +264,6 @@ gboolean lac_fd_is_watched (gint fd); /* * Connection */ -typedef struct _LacConnection LacConnection; typedef union _LacConnectionEvent LacConnectionEvent; typedef enum { @@ -306,13 +305,22 @@ union _LacConnectionEvent { LacConnectionErrorEvent error; }; +/* + * LacConnection + */ +typedef struct _LacConnection LacConnection; + typedef void (* LacConnectionFunc) (LacConnection *connection, - const LacConnectionEvent *event); + const LacConnectionEvent *event); -LacConnection * lac_connection_new (const LacAddress *address, - gint port, - LacConnectionFunc callback, - gpointer data); +LacConnection * lac_connection_new_tcp (const LacAddress *address, + gint port, + LacConnectionFunc callback, + gpointer data); +LacConnection * lac_connection_new_tls (const LacAddress *address, + gint port, + LacConnectionFunc callback, + gpointer data); gpointer lac_connection_get_data (LacConnection *connection); void lac_connection_write (LacConnection *connection, const gchar *data, @@ -330,65 +338,6 @@ void lac_connection_flush (LacConnection *con /* - * LacTlsConnection - */ -typedef struct _LacTlsConnection LacTlsConnection; - -typedef void (* LacTlsConnectionFunc) (LacTlsConnection *connection, - const LacConnectionEvent *event); - -LacTlsConnection * lac_tls_connection_new (const LacAddress *address, - gint port, - LacTlsConnectionFunc callback, - gpointer data); -gpointer lac_tls_connection_get_data (LacTlsConnection *connection); -void lac_tls_connection_write (LacTlsConnection *connection, - const gchar *data, - guint len); -void lac_tls_connection_write_cstr (LacTlsConnection *connection, - const gchar *data); -void lac_tls_connection_shutdown_write (LacTlsConnection *connection); -LacTlsConnection * lac_tls_connection_ref (LacTlsConnection *connection); -void lac_tls_connection_unref (LacTlsConnection *connection); -G_CONST_RETURN LacAddress *lac_tls_connection_get_address (LacTlsConnection *connection); -gint lac_tls_connection_get_port (LacTlsConnection *connection); -void lac_tls_connection_close (LacTlsConnection *connection); -gboolean lac_tls_connection_is_connected (LacTlsConnection *connection); -void lac_tls_connection_flush (LacTlsConnection *connection); - -/* - * LacGenConnection - */ -typedef struct _LacGenConnection LacGenConnection; - -typedef void (* LacGenConnectionFunc) (LacGenConnection *connection, - const LacConnectionEvent *event); - -LacGenConnection * lac_gen_connection_new_tcp (const LacAddress *address, - gint port, - LacGenConnectionFunc callback, - gpointer data); -LacGenConnection * lac_gen_connection_new_tls (const LacAddress *address, - gint port, - LacGenConnectionFunc callback, - gpointer data); -gpointer lac_gen_connection_get_data (LacGenConnection *connection); -void lac_gen_connection_write (LacGenConnection *connection, - const gchar *data, - guint len); -void lac_gen_connection_write_cstr (LacGenConnection *connection, - const gchar *data); -void lac_gen_connection_shutdown_write (LacGenConnection *connection); -LacGenConnection * lac_gen_connection_ref (LacGenConnection *connection); -void lac_gen_connection_unref (LacGenConnection *connection); -G_CONST_RETURN LacAddress *lac_gen_connection_get_address (LacGenConnection *connection); -gint lac_gen_connection_get_port (LacGenConnection *connection); -void lac_gen_connection_close (LacGenConnection *connection); -gboolean lac_gen_connection_is_connected (LacGenConnection *connection); -void lac_gen_connection_flush (LacGenConnection *connection); - - -/* * URI */ typedef enum { diff --git a/src/lacconnection.c b/src/lacconnection.c index 50ed238..b88b381 100644 --- a/src/lacconnection.c +++ b/src/lacconnection.c @@ -1,7 +1,7 @@ /* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- */ /* Lac - Library for asynchronous communication - * Copyright (C) 2000, 2001, 2002, 2003, 2007 Søren Sandmann + * Copyright (C) 2007 Søren Sandmann * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public @@ -19,533 +19,176 @@ * Boston, MA 02111-1307, USA. */ -#include "lac.h" #include <string.h> +#include "lac.h" +#include "lacinternals.h" -/* - * Prototypes for a primitive abstraction covering TCP and SSL - */ -typedef enum { - UNDEFINED, - CONNECT_IN_PROGRESS, - CONNECTED, - DISCONNECTED, -} ConnectionState; - -struct _LacConnection { - int fd; - LacAddress * address; - gint port; - LacConnectionFunc callback; - gpointer data; - - GQueue * pending_events; +struct _LacConnection +{ + gboolean is_tcp; - ConnectionState state; - LacByteQueue * unwritten; + union + { + LacTcpConnection *tcp; + LacTlsConnection *tls; + } u; - gboolean has_fd; - gboolean write_shutdown; - gboolean in_emit_events; - gboolean need_flush; - gint ref_count; + LacConnectionFunc func; + gpointer data; + int ref_count; }; - static void -event_free (LacConnectionEvent *event) +on_tcp_event (LacTcpConnection *connection, + const LacConnectionEvent *event) { - if (event->type == LAC_CONNECTION_EVENT_READ) - { - /* FIXME: at some point we should support pushing - * data back into the queue, ie., it must be kept alive - */ - lac_byte_queue_free (event->read.byte_queue, TRUE); - } - else if (event->type == LAC_CONNECTION_EVENT_ERROR) - g_error_free ((GError *)event->error.err); + LacConnection *gen = lac_tcp_connection_get_data (connection); - g_free (event); + gen->func (gen, event); } static void -queue_connect (LacConnection *connection) +on_ssl_event (LacTlsConnection *connection, + const LacConnectionEvent *event) { - LacConnectionEvent *event = g_new (LacConnectionEvent, 1); - - event->type = LAC_CONNECTION_EVENT_CONNECT; - - g_queue_push_tail (connection->pending_events, event); -} - -static void -queue_read (LacConnection *connection, LacByteQueue *queue) -{ - LacConnectionEvent *event = g_new (LacConnectionEvent, 1); + LacConnection *gen = lac_tls_connection_get_data (connection); - event->type = LAC_CONNECTION_EVENT_READ; - event->read.byte_queue = queue; - event->read.data = lac_byte_queue_peek (queue, &(event->read.len)); - - g_queue_push_tail (connection->pending_events, event); -} - -static void -queue_close (LacConnection *connection, gboolean remote) -{ - LacConnectionEvent *event = g_new (LacConnectionEvent, 1); - - event->type = LAC_CONNECTION_EVENT_CLOSE; - event->close.remote_closed = remote; - - g_queue_push_tail (connection->pending_events, event); + gen->func (gen, event); } -static void -queue_error (LacConnection *connection, const GError *err) +LacConnection * +lac_connection_new_tcp (const LacAddress *address, + gint port, + LacConnectionFunc func, + gpointer data) { - LacConnectionEvent *event = g_new (LacConnectionEvent, 1); + LacConnection *connection = g_new0 (LacConnection, 1); - event->type = LAC_CONNECTION_EVENT_ERROR; - event->error.err = g_error_copy (err); - - g_queue_push_tail (connection->pending_events, event); -} - -static void -emit_events (LacConnection *connection) -{ - LacConnectionEvent *event; - - if (connection->in_emit_events) - return; - - connection->in_emit_events = TRUE; - lac_connection_ref (connection); - - while ((event = g_queue_pop_head (connection->pending_events))) - { - if (connection->state != DISCONNECTED) - { - if (event->type == LAC_CONNECTION_EVENT_CLOSE || - event->type == LAC_CONNECTION_EVENT_ERROR) - { - connection->state = DISCONNECTED; - - if (connection->has_fd) - { - lac_fd_remove_watch (connection->fd); - lac_close (connection->fd, NULL); - connection->has_fd = FALSE; - } - } - - connection->callback (connection, event); - } - - event_free (event); - } - - connection->in_emit_events = FALSE; - lac_connection_unref (connection); -} - -static gboolean -emit_events_idle_handler (gpointer data) -{ - LacConnection *connection = data; - emit_events (connection); - - lac_connection_unref (connection); - - return FALSE; /* don't call me again */ -} - -static void -emit_events_idle (LacConnection *connection) -{ - if (!g_queue_is_empty (connection->pending_events)) - { - lac_connection_ref (connection); - - g_idle_add ( - emit_events_idle_handler, connection); - } -} - -static void -lac_connection_do_reads (gpointer data) -{ - LacConnection *connection = data; - gint len; + connection->is_tcp = TRUE; + connection->u.tcp = lac_tcp_connection_new (address, port, on_tcp_event, connection); + connection->ref_count = 1; + connection->func = func; + connection->data = data; - enum { BUF_SIZE = 8280 }; - - do - { - GError *err = NULL; - LacByteQueue *queue = lac_byte_queue_new (); - gchar *buf = lac_byte_queue_alloc_tail (queue, BUF_SIZE); - - len = lac_recv (connection->fd, buf, BUF_SIZE, &err); - - if (len > 0) - { - lac_byte_queue_delete_tail (queue, BUF_SIZE - len); - - queue_read (connection, queue); - } - else - { - lac_byte_queue_free (queue, TRUE); - - if (len == 0) - { - queue_close (connection, TRUE); - } - else - { - g_assert (err); - - if (!g_error_matches ( - err, LAC_SOCKET_ERROR, LAC_SOCKET_ERROR_AGAIN)) - { - queue_error (connection, err); - } - - g_error_free (err); - } - } - - /* FIXME: check that we haven't used too much time? */ - } - while (len == BUF_SIZE); + return connection; } -static void lac_connection_writable (gpointer data); - -static void -lac_connection_do_writes (LacConnection *connection) +LacConnection * +lac_connection_new_tls (const LacAddress *address, + gint port, + LacConnectionFunc func, + gpointer data) { - if (connection->state != CONNECTED) - { - lac_fd_set_write_callback (connection->fd, lac_connection_writable); - return; - } - - while (lac_byte_queue_get_length (connection->unwritten) > 0) - { - GError *err = NULL; - const gchar *unwritten; - gsize len, sent; - - unwritten = lac_byte_queue_peek (connection->unwritten, &len); - - sent = lac_send (connection->fd, unwritten, len, &err); + LacConnection *connection = g_new0 (LacConnection, 1); - if (err) - { - if (g_error_matches ( - err, LAC_SOCKET_ERROR, LAC_SOCKET_ERROR_AGAIN)) - { - lac_fd_set_write_callback ( - connection->fd, lac_connection_writable); - } - else - { - queue_error (connection, err); - } - - g_error_free (err); - return; - } - - lac_byte_queue_delete_head (connection->unwritten, sent); - - /* FIXME check that we haven't used too much time? */ - } - - lac_fd_set_write_callback (connection->fd, NULL); - - if (connection->need_flush) - { - lac_set_nagle (connection->fd, FALSE, NULL); - lac_set_nagle (connection->fd, TRUE, NULL); - - connection->need_flush = FALSE; - } - - if (connection->write_shutdown) - { - GError *err = NULL; - - lac_shutdown (connection->fd, LAC_SHUTDOWN_WRITE, &err); - if (err) - { - queue_error (connection, err); - g_error_free (err); - } - } -} - -static void -lac_connection_readable (gpointer data) -{ - LacConnection *connection = data; + connection->is_tcp = FALSE; + connection->u.tls = lac_tls_connection_new (address, port, on_ssl_event, connection); + connection->ref_count = 1; + connection->func = func; + connection->data = data; - lac_connection_do_reads (connection); - emit_events (connection); + return connection; } static void -lac_connection_writable (gpointer data) +lac_connection_destroy (LacConnection *connection) { - LacConnection *connection = data; + if (connection->is_tcp) + lac_tcp_connection_unref (connection->u.tcp); + else + lac_tls_connection_unref (connection->u.tls); - if (connection->state == CONNECT_IN_PROGRESS) - { - connection->state = CONNECTED; - queue_connect (connection); - } - - lac_connection_do_writes (connection); - emit_events (connection); + g_free (connection); } -static void -lac_connection_connect (LacConnection *connection) +LacConnection * +lac_connection_ref (LacConnection *connection) { - GError *err = NULL; + connection->ref_count++; - lac_connect (connection->fd, - connection->address, connection->port, - &err); - - if (err) - { - if (g_error_matches ( - err, LAC_SOCKET_ERROR, LAC_SOCKET_ERROR_IN_PROGRESS)) - { - connection->state = CONNECT_IN_PROGRESS; - } - else - { - queue_error (connection, err); - } - - g_error_free (err); - } - else - { - connection->state = CONNECTED; - queue_connect (connection); - lac_connection_do_writes (connection); - } -} - -static void -lac_connection_discard_pending_events (LacConnection *connection) -{ - LacConnectionEvent *event; - - while ((event = g_queue_pop_head (connection->pending_events))) - event_free (event); -} - -static void -lac_connection_add_watch (LacConnection *connection) -{ - lac_fd_add_watch (connection->fd, connection); - - lac_fd_set_read_callback (connection->fd, lac_connection_readable); - lac_fd_set_write_callback (connection->fd, lac_connection_writable); + return connection; } -LacConnection * -lac_connection_new (const LacAddress *address, - gint port, - LacConnectionFunc callback, - gpointer data) +void +lac_connection_unref (LacConnection *connection) { - LacConnection *connection; - GError *err = NULL; - - g_return_val_if_fail (address != NULL, NULL); - g_return_val_if_fail (port > 0, NULL); - g_return_val_if_fail (callback != NULL, NULL); - - connection = g_new (LacConnection, 1); - - connection->ref_count = 1; - - connection->address = lac_address_copy (address); - connection->port = port; - connection->callback = callback; - connection->data = data; - - connection->pending_events = g_queue_new (); - - connection->state = UNDEFINED; - connection->has_fd = FALSE; - connection->unwritten = lac_byte_queue_new (); - - connection->write_shutdown = FALSE; - connection->in_emit_events = FALSE; - connection->need_flush = FALSE; - - connection->fd = lac_socket_tcp (&err); - if (err) - { - queue_error (connection, err); - emit_events_idle (connection); - g_error_free (err); - return connection; - } - - connection->has_fd = TRUE; - - lac_set_blocking (connection->fd, FALSE, &err); - if (err) - { - queue_error (connection, err); - emit_events_idle (connection); - g_error_free (err); - return connection; - } - - lac_connection_add_watch (connection); - lac_connection_connect (connection); - - emit_events_idle (connection); - - return connection; + if (--connection->ref_count == 0) + lac_connection_destroy (connection); } gpointer -lac_connection_get_data (LacConnection *connection) +lac_connection_get_data (LacConnection *connection) { - g_return_val_if_fail (connection != NULL, NULL); - return connection->data; } void -lac_connection_write (LacConnection *connection, - const gchar *data, - guint len) +lac_connection_write (LacConnection *connection, + const gchar *data, + guint len) { - gboolean do_writes; - - g_return_if_fail (connection != NULL); - g_return_if_fail (data != NULL); - g_return_if_fail (!connection->write_shutdown); - g_return_if_fail (connection->state != DISCONNECTED); - - if (len == 0) - return; - - do_writes = (lac_byte_queue_get_length (connection->unwritten) == 0); - - lac_byte_queue_append (connection->unwritten, data, len); - - if (do_writes) - { - lac_connection_do_writes (connection); - emit_events (connection); - } + if (connection->is_tcp) + lac_tcp_connection_write (connection->u.tcp, data, len); + else + lac_tls_connection_write (connection->u.tls, data, len); } void -lac_connection_write_cstr (LacConnection *connection, - const gchar *data) +lac_connection_write_cstr (LacConnection *connection, + const gchar *data) { - guint len; + gsize len; g_return_if_fail (connection != NULL); g_return_if_fail (data != NULL); - g_return_if_fail (!connection->write_shutdown); - + len = strlen (data); if (len > 0) lac_connection_write (connection, data, len); } -void -lac_connection_shutdown_write (LacConnection *connection) -{ - g_return_if_fail (connection != NULL); - - connection->write_shutdown = TRUE; - - lac_connection_do_writes (connection); - emit_events (connection); -} - -void -lac_connection_close (LacConnection *connection) +G_CONST_RETURN LacAddress * +lac_connection_get_address (LacConnection *connection) { - g_return_if_fail (connection != NULL); - - lac_connection_discard_pending_events (connection); - - queue_close (connection, FALSE); - emit_events (connection); + if (connection->is_tcp) + return lac_tcp_connection_get_address (connection->u.tcp); + else + return lac_tls_connection_get_address (connection->u.tls); } -LacConnection * -lac_connection_ref (LacConnection *connection) +gint +lac_connection_get_port (LacConnection *connection) { - ++connection->ref_count; - - return connection; + if (connection->is_tcp) + return lac_tcp_connection_get_port (connection->u.tcp); + else + return lac_tls_connection_get_port (connection->u.tls); } void -lac_connection_unref (LacConnection *connection) -{ - if (--connection->ref_count == 0) - { - lac_address_free (connection->address); - - lac_connection_discard_pending_events (connection); - g_queue_free (connection->pending_events); - - lac_byte_queue_free (connection->unwritten, TRUE); - - if (connection->has_fd) - { - lac_fd_remove_watch (connection->fd); - lac_close (connection->fd, NULL); - } - - g_free (connection); - } -} - -G_CONST_RETURN LacAddress * -lac_connection_get_address (LacConnection *connection) -{ - return connection->address; -} - -gint -lac_connection_get_port (LacConnection *connection) +lac_connection_close (LacConnection *connection) { - return connection->port; + if (connection->is_tcp) + lac_tcp_connection_close (connection->u.tcp); + else + lac_tls_connection_close (connection->u.tls); } gboolean lac_connection_is_connected (LacConnection *connection) { - return connection->state == CONNECTED; + if (connection->is_tcp) + return lac_tcp_connection_is_connected (connection->u.tcp); + else + return lac_tls_connection_is_connected (connection->u.tls); } void -lac_connection_flush (LacConnection *connection) +lac_connection_flush (LacConnection *connection) { - connection->need_flush = TRUE; - - if (lac_byte_queue_get_length (connection->unwritten) == 0) - lac_connection_do_writes (connection); + if (connection->is_tcp) + lac_tcp_connection_flush (connection->u.tcp); + else + lac_tls_connection_flush (connection->u.tls); } diff --git a/src/lacdns-nameserver.c b/src/lacdns-nameserver.c index 17856ff..2725a8f 100644 --- a/src/lacdns-nameserver.c +++ b/src/lacdns-nameserver.c @@ -398,7 +398,7 @@ name_server_dispatch_tcp_query (NameServer *server, Query *query) #if 0 g_print ("NEW CONNECTION\n"); #endif - connection = lac_connection_new ( + connection = lac_connection_new_tcp ( server->address, DNS_PORT, name_server_handle_tcp_event, closure); diff --git a/src/lacgenconnection.c b/src/lacgenconnection.c deleted file mode 100644 index 4c77144..0000000 --- a/src/lacgenconnection.c +++ /dev/null @@ -1,187 +0,0 @@ -/* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- */ - -/* Lac - Library for asynchronous communication - * Copyright (C) 2007 Søren Sandmann - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Library General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Library General Public License for more details. - * - * You should have received a copy of the GNU Library General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 02111-1307, USA. - */ - -#include <string.h> -#include "lac.h" - -struct _LacGenConnection -{ - gboolean is_tcp; - - union - { - LacConnection *tcp; - LacTlsConnection *tls; - } u; - - LacGenConnectionFunc func; - gpointer data; - int ref_count; -}; - -static void -on_tcp_event (LacConnection *connection, - const LacConnectionEvent *event) -{ - LacGenConnection *gen = lac_connection_get_data (connection); - - gen->func (gen, event); -} - -static void -on_ssl_event (LacTlsConnection *connection, - const LacConnectionEvent *event) -{ - LacGenConnection *gen = lac_tls_connection_get_data (connection); - - gen->func (gen, event); -} - -LacGenConnection * -lac_gen_connection_new_tcp (const LacAddress *address, - gint port, - LacGenConnectionFunc callback, - gpointer data) -{ - LacGenConnection *connection = g_new0 (LacGenConnection, 1); - - connection->u.tcp = lac_connection_new (address, port, on_tcp_event, connection); - connection->ref_count = 1; - - return connection; -} - -LacGenConnection * -lac_gen_connection_new_tls (const LacAddress *address, - gint port, - LacGenConnectionFunc callback, - gpointer data) -{ - LacGenConnection *connection = g_new0 (LacGenConnection, 1); - - connection->u.tls = lac_tls_connection_new (address, port, on_ssl_event, connection); - connection->ref_count = 1; - - return connection; -} - -static void -lac_gen_connection_destroy (LacGenConnection *connection) -{ - if (connection->is_tcp) - lac_connection_unref (connection->u.tcp); - else - lac_tls_connection_unref (connection->u.tls); - - g_free (connection); -} - -LacGenConnection * -lac_gen_connection_ref (LacGenConnection *connection) -{ - connection->ref_count++; - - return connection; -} - -void -lac_gen_connection_unref (LacGenConnection *connection) -{ - if (--connection->ref_count == 0) - lac_gen_connection_destroy (connection); -} - -gpointer -lac_gen_connection_get_data (LacGenConnection *connection) -{ - return connection->data; -} - -void -lac_gen_connection_write (LacGenConnection *connection, - const gchar *data, - guint len) -{ - if (connection->is_tcp) - lac_connection_write (connection->u.tcp, data, len); - else - lac_tls_connection_write (connection->u.tls, data, len); -} - -void -lac_gen_connection_write_cstr (LacGenConnection *connection, - const gchar *data) -{ - gsize len; - - g_return_if_fail (connection != NULL); - g_return_if_fail (data != NULL); - - len = strlen (data); - - if (len > 0) - lac_gen_connection_write (connection, data, len); -} - -G_CONST_RETURN LacAddress * -lac_gen_connection_get_address (LacGenConnection *connection) -{ - if (connection->is_tcp) - return lac_connection_get_address (connection->u.tcp); - else - return lac_tls_connection_get_address (connection->u.tls); -} - -gint -lac_gen_connection_get_port (LacGenConnection *connection) -{ - if (connection->is_tcp) - return lac_connection_get_port (connection->u.tcp); - else - return lac_tls_connection_get_port (connection->u.tls); -} - -void -lac_gen_connection_close (LacGenConnection *connection) -{ - if (connection->is_tcp) - lac_connection_close (connection->u.tcp); - else - lac_tls_connection_close (connection->u.tls); -} - -gboolean -lac_gen_connection_is_connected (LacGenConnection *connection) -{ - if (connection->is_tcp) - return lac_connection_is_connected (connection->u.tcp); - else - return lac_tls_connection_is_connected (connection->u.tls); -} - -void -lac_gen_connection_flush (LacGenConnection *connection) -{ - if (connection->is_tcp) - lac_connection_flush (connection->u.tcp); - else - lac_tls_connection_flush (connection->u.tls); -} diff --git a/src/lachttp.c b/src/lachttp.c index b385ff1..64e0b79 100644 --- a/src/lachttp.c +++ b/src/lachttp.c @@ -2563,7 +2563,7 @@ http_transport_new (HttpHost *host, #endif transport->host = host; - transport->connection = lac_connection_new ( + transport->connection = lac_connection_new_tcp ( host->address, host->port, connection_callback, transport); transport->unsent = g_queue_new (); diff --git a/src/lacinternals.h b/src/lacinternals.h index 6953391..1b4446e 100644 --- a/src/lacinternals.h +++ b/src/lacinternals.h @@ -52,6 +52,60 @@ void lac_address_set_in_addr (LacAddress *addr, /* + * TCP connection + */ +typedef struct _LacTcpConnection LacTcpConnection; + +typedef void (* LacTcpConnectionFunc) (LacTcpConnection *connection, + const LacConnectionEvent *event); + +LacTcpConnection * lac_tcp_connection_new (const LacAddress *address, + gint port, + LacTcpConnectionFunc callback, + gpointer data); +gpointer lac_tcp_connection_get_data (LacTcpConnection *connection); +void lac_tcp_connection_write (LacTcpConnection *connection, + const gchar *data, + guint len); +void lac_tcp_connection_write_cstr (LacTcpConnection *connection, + const gchar *data); +void lac_tcp_connection_shutdown_write (LacTcpConnection *connection); +LacTcpConnection * lac_tcp_connection_ref (LacTcpConnection *connection); +void lac_tcp_connection_unref (LacTcpConnection *connection); +G_CONST_RETURN LacAddress *lac_tcp_connection_get_address (LacTcpConnection *connection); +gint lac_tcp_connection_get_port (LacTcpConnection *connection); +void lac_tcp_connection_close (LacTcpConnection *connection); +gboolean lac_tcp_connection_is_connected (LacTcpConnection *connection); +void lac_tcp_connection_flush (LacTcpConnection *connection); + +/* + * TLS Connection + */ +typedef struct _LacTlsConnection LacTlsConnection; + +typedef void (* LacTlsConnectionFunc) (LacTlsConnection *connection, + const LacConnectionEvent *event); + +LacTlsConnection * lac_tls_connection_new (const LacAddress *address, + gint port, + LacTlsConnectionFunc callback, + gpointer data); +gpointer lac_tls_connection_get_data (LacTlsConnection *connection); +void lac_tls_connection_write (LacTlsConnection *connection, + const gchar *data, + guint len); +void lac_tls_connection_write_cstr (LacTlsConnection *connection, + const gchar *data); +void lac_tls_connection_shutdown_write (LacTlsConnection *connection); +LacTlsConnection * lac_tls_connection_ref (LacTlsConnection *connection); +void lac_tls_connection_unref (LacTlsConnection *connection); +G_CONST_RETURN LacAddress *lac_tls_connection_get_address (LacTlsConnection *connection); +gint lac_tls_connection_get_port (LacTlsConnection *connection); +void lac_tls_connection_close (LacTlsConnection *connection); +gboolean lac_tls_connection_is_connected (LacTlsConnection *connection); +void lac_tls_connection_flush (LacTlsConnection *connection); + +/* * Debug spew */ #define lac_debug_out(format,args...) G_STMT_START{ \ diff --git a/src/lactcpconnection.c b/src/lactcpconnection.c new file mode 100644 index 0000000..cfe1312 --- /dev/null +++ b/src/lactcpconnection.c @@ -0,0 +1,524 @@ +/* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- */ + +/* Lac - Library for asynchronous communication + * Copyright (C) 2000, 2001, 2002, 2003, 2007 Søren Sandmann + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#include <string.h> +#include "lac.h" +#include "lacinternals.h" + +/* + * Prototypes for a primitive abstraction covering TCP and SSL + */ +typedef enum { + UNDEFINED, + CONNECT_IN_PROGRESS, + CONNECTED, + DISCONNECTED, +} ConnectionState; + +struct _LacTcpConnection { + int fd; + LacAddress * address; + gint port; + LacTcpConnectionFunc callback; + gpointer data; + + GQueue * pending_events; + + ConnectionState state; + LacByteQueue * unwritten; + + gboolean has_fd; + gboolean in_emit_events; + gboolean need_flush; + gint ref_count; +}; + +static void +event_free (LacConnectionEvent *event) +{ + if (event->type == LAC_CONNECTION_EVENT_READ) + { + /* FIXME: at some point we should support pushing + * data back into the queue, ie., it must be kept alive + */ + lac_byte_queue_free (event->read.byte_queue, TRUE); + } + else if (event->type == LAC_CONNECTION_EVENT_ERROR) + g_error_free ((GError *)event->error.err); + + g_free (event); +} + +static void +queue_connect (LacTcpConnection *connection) +{ + LacConnectionEvent *event = g_new (LacConnectionEvent, 1); + + event->type = LAC_CONNECTION_EVENT_CONNECT; + + g_queue_push_tail (connection->pending_events, event); +} + +static void +queue_read (LacTcpConnection *connection, LacByteQueue *queue) +{ + LacConnectionEvent *event = g_new (LacConnectionEvent, 1); + + event->type = LAC_CONNECTION_EVENT_READ; + event->read.byte_queue = queue; + event->read.data = lac_byte_queue_peek (queue, &(event->read.len)); + + g_queue_push_tail (connection->pending_events, event); +} + +static void +queue_close (LacTcpConnection *connection, gboolean remote) +{ + LacConnectionEvent *event = g_new (LacConnectionEvent, 1); + + event->type = LAC_CONNECTION_EVENT_CLOSE; + event->close.remote_closed = remote; + + g_queue_push_tail (connection->pending_events, event); +} + +static void +queue_error (LacTcpConnection *connection, const GError *err) +{ + LacConnectionEvent *event = g_new (LacConnectionEvent, 1); + + event->type = LAC_CONNECTION_EVENT_ERROR; + event->error.err = g_error_copy (err); + + g_queue_push_tail (connection->pending_events, event); +} + +static void +emit_events (LacTcpConnection *connection) +{ + LacConnectionEvent *event; + + if (connection->in_emit_events) + return; + + connection->in_emit_events = TRUE; + lac_tcp_connection_ref (connection); + + while ((event = g_queue_pop_head (connection->pending_events))) + { + if (connection->state != DISCONNECTED) + { + if (event->type == LAC_CONNECTION_EVENT_CLOSE || + event->type == LAC_CONNECTION_EVENT_ERROR) + { + connection->state = DISCONNECTED; + + if (connection->has_fd) + { + lac_fd_remove_watch (connection->fd); + lac_close (connection->fd, NULL); + connection->has_fd = FALSE; + } + } + + connection->callback (connection, event); + } + + event_free (event); + } + + connection->in_emit_events = FALSE; + lac_tcp_connection_unref (connection); +} + +static gboolean +emit_events_idle_handler (gpointer data) +{ + LacTcpConnection *connection = data; + emit_events (connection); + + lac_tcp_connection_unref (connection); + + return FALSE; /* don't call me again */ +} + +static void +emit_events_idle (LacTcpConnection *connection) +{ + if (!g_queue_is_empty (connection->pending_events)) + { + lac_tcp_connection_ref (connection); + + g_idle_add ( + emit_events_idle_handler, connection); + } +} + +static void +lac_tcp_connection_do_reads (gpointer data) +{ + LacTcpConnection *connection = data; + gint len; + + enum { BUF_SIZE = 8280 }; + + do + { + GError *err = NULL; + LacByteQueue *queue = lac_byte_queue_new (); + gchar *buf = lac_byte_queue_alloc_tail (queue, BUF_SIZE); + + len = lac_recv (connection->fd, buf, BUF_SIZE, &err); + + if (len > 0) + { + lac_byte_queue_delete_tail (queue, BUF_SIZE - len); + + queue_read (connection, queue); + } + else + { + lac_byte_queue_free (queue, TRUE); + + if (len == 0) + { + queue_close (connection, TRUE); + } + else + { + g_assert (err); + + if (!g_error_matches ( + err, LAC_SOCKET_ERROR, LAC_SOCKET_ERROR_AGAIN)) + { + queue_error (connection, err); + } + + g_error_free (err); + } + } + + /* FIXME: check that we haven't used too much time? */ + } + while (len == BUF_SIZE); +} + +static void lac_tcp_connection_writable (gpointer data); + +static void +lac_tcp_connection_do_writes (LacTcpConnection *connection) +{ + if (connection->state != CONNECTED) + { + lac_fd_set_write_callback (connection->fd, lac_tcp_connection_writable); + return; + } + + while (lac_byte_queue_get_length (connection->unwritten) > 0) + { + GError *err = NULL; + const gchar *unwritten; + gsize len, sent; + + unwritten = lac_byte_queue_peek (connection->unwritten, &len); + + sent = lac_send (connection->fd, unwritten, len, &err); + + if (err) + { + if (g_error_matches ( + err, LAC_SOCKET_ERROR, LAC_SOCKET_ERROR_AGAIN)) + { + lac_fd_set_write_callback ( + connection->fd, lac_tcp_connection_writable); + } + else + { + queue_error (connection, err); + } + + g_error_free (err); + return; + } + + lac_byte_queue_delete_head (connection->unwritten, sent); + + /* FIXME check that we haven't used too much time? */ + } + + lac_fd_set_write_callback (connection->fd, NULL); + + if (connection->need_flush) + { + lac_set_nagle (connection->fd, FALSE, NULL); + lac_set_nagle (connection->fd, TRUE, NULL); + + connection->need_flush = FALSE; + } +} + +static void +lac_tcp_connection_readable (gpointer data) +{ + LacTcpConnection *connection = data; + + lac_tcp_connection_do_reads (connection); + emit_events (connection); +} + +static void +lac_tcp_connection_writable (gpointer data) +{ + LacTcpConnection *connection = data; + + if (connection->state == CONNECT_IN_PROGRESS) + { + connection->state = CONNECTED; + queue_connect (connection); + } + + lac_tcp_connection_do_writes (connection); + emit_events (connection); +} + +static void +lac_tcp_connection_connect (LacTcpConnection *connection) +{ + GError *err = NULL; + + lac_connect (connection->fd, + connection->address, connection->port, + &err); + + if (err) + { + if (g_error_matches ( + err, LAC_SOCKET_ERROR, LAC_SOCKET_ERROR_IN_PROGRESS)) + { + connection->state = CONNECT_IN_PROGRESS; + } + else + { + queue_error (connection, err); + } + + g_error_free (err); + } + else + { + connection->state = CONNECTED; + queue_connect (connection); + lac_tcp_connection_do_writes (connection); + } +} + +static void +lac_tcp_connection_discard_pending_events (LacTcpConnection *connection) +{ + LacConnectionEvent *event; + + while ((event = g_queue_pop_head (connection->pending_events))) + event_free (event); +} + +static void +lac_tcp_connection_add_watch (LacTcpConnection *connection) +{ + lac_fd_add_watch (connection->fd, connection); + + lac_fd_set_read_callback (connection->fd, lac_tcp_connection_readable); + lac_fd_set_write_callback (connection->fd, lac_tcp_connection_writable); +} + +LacTcpConnection * +lac_tcp_connection_new (const LacAddress *address, + gint port, + LacTcpConnectionFunc callback, + gpointer data) +{ + LacTcpConnection *connection; + GError *err = NULL; + + g_return_val_if_fail (address != NULL, NULL); + g_return_val_if_fail (port > 0, NULL); + g_return_val_if_fail (callback != NULL, NULL); + + connection = g_new (LacTcpConnection, 1); + + connection->ref_count = 1; + + connection->address = lac_address_copy (address); + connection->port = port; + connection->callback = callback; + connection->data = data; + + connection->pending_events = g_queue_new (); + + connection->state = UNDEFINED; + connection->has_fd = FALSE; + connection->unwritten = lac_byte_queue_new (); + + connection->in_emit_events = FALSE; + connection->need_flush = FALSE; + + connection->fd = lac_socket_tcp (&err); + if (err) + { + queue_error (connection, err); + emit_events_idle (connection); + g_error_free (err); + return connection; + } + + connection->has_fd = TRUE; + + lac_set_blocking (connection->fd, FALSE, &err); + if (err) + { + queue_error (connection, err); + emit_events_idle (connection); + g_error_free (err); + return connection; + } + + lac_tcp_connection_add_watch (connection); + lac_tcp_connection_connect (connection); + + emit_events_idle (connection); + + return connection; +} + +gpointer +lac_tcp_connection_get_data (LacTcpConnection *connection) +{ + g_return_val_if_fail (connection != NULL, NULL); + + return connection->data; +} + +void +lac_tcp_connection_write (LacTcpConnection *connection, + const gchar *data, + guint len) +{ + gboolean do_writes; + + g_return_if_fail (connection != NULL); + g_return_if_fail (data != NULL); + g_return_if_fail (connection->state != DISCONNECTED); + + if (len == 0) + return; + + do_writes = (lac_byte_queue_get_length (connection->unwritten) == 0); + + lac_byte_queue_append (connection->unwritten, data, len); + + if (do_writes) + { + lac_tcp_connection_do_writes (connection); + emit_events (connection); + } +} + +void +lac_tcp_connection_write_cstr (LacTcpConnection *connection, + const gchar *data) +{ + guint len; + + g_return_if_fail (connection != NULL); + g_return_if_fail (data != NULL); + + len = strlen (data); + + if (len > 0) + lac_tcp_connection_write (connection, data, len); +} + +void +lac_tcp_connection_close (LacTcpConnection *connection) +{ + g_return_if_fail (connection != NULL); + + lac_tcp_connection_discard_pending_events (connection); + + queue_close (connection, FALSE); + emit_events (connection); +} + +LacTcpConnection * +lac_tcp_connection_ref (LacTcpConnection *connection) +{ + ++connection->ref_count; + + return connection; +} + +void +lac_tcp_connection_unref (LacTcpConnection *connection) +{ + if (--connection->ref_count == 0) + { + lac_address_free (connection->address); + + lac_tcp_connection_discard_pending_events (connection); + g_queue_free (connection->pending_events); + + lac_byte_queue_free (connection->unwritten, TRUE); + + if (connection->has_fd) + { + lac_fd_remove_watch (connection->fd); + lac_close (connection->fd, NULL); + } + + g_free (connection); + } +} + +G_CONST_RETURN LacAddress * +lac_tcp_connection_get_address (LacTcpConnection *connection) +{ + return connection->address; +} + +gint +lac_tcp_connection_get_port (LacTcpConnection *connection) +{ + return connection->port; +} + +gboolean +lac_tcp_connection_is_connected (LacTcpConnection *connection) +{ + return connection->state == CONNECTED; +} + +void +lac_tcp_connection_flush (LacTcpConnection *connection) +{ + connection->need_flush = TRUE; + + if (lac_byte_queue_get_length (connection->unwritten) == 0) + lac_tcp_connection_do_writes (connection); +} diff --git a/src/lactlsconnection.c b/src/lactlsconnection.c index 3485e19..e686e4e 100644 --- a/src/lactlsconnection.c +++ b/src/lactlsconnection.c @@ -20,6 +20,7 @@ */ #include "lac.h" +#include "lacinternals.h" #include <string.h> #include <gnutls/gnutls.h> #include <errno.h> @@ -39,8 +40,6 @@ struct _LacTlsConnection gboolean need_handshake; int ref_count; - - gboolean write_shutdown; }; static void @@ -83,12 +82,20 @@ do_handshake (LacTlsConnection *tls) if (res == 0) { + LacConnectionEvent connect; #if 0 g_print (" handshake complete\n"); #endif tls->need_handshake = FALSE; - /* FIXME: emit handshake event */ + + /* We consider CONNECT to mean 'handshake complete' + * Maybe worthwhile considering a separate HANDSHAKE + * event. + */ + connect.type = LAC_CONNECTION_EVENT_CONNECT; + + tls->callback (tls, &connect); } else if (res < 0) { @@ -164,64 +171,59 @@ static void handle_read (LacTlsConnection *tls, const LacConnectionReadEvent *event) { - lac_byte_queue_append (tls->buffer, event->data, event->len); + LacByteQueue *queue; + gchar *buffer; + gssize n_read; + enum { BUF_SIZE = 8192 }; + GError *err = NULL; - do_handshake (tls); - do_writes (tls); + if (tls->need_handshake) + return; - if (!tls->need_handshake) + queue = lac_byte_queue_new (); + + do { - LacByteQueue *queue; - gchar *buffer; - gssize n_read; - enum { BUF_SIZE = 8192 }; - GError *err = NULL; + buffer = lac_byte_queue_alloc_tail (queue, BUF_SIZE); - queue = lac_byte_queue_new (); + n_read = gnutls_record_recv (tls->session, buffer, BUF_SIZE); - do + if (n_read < 0) { - buffer = lac_byte_queue_alloc_tail (queue, BUF_SIZE); - - n_read = gnutls_record_recv (tls->session, buffer, BUF_SIZE); - - if (n_read < 0) + if (n_read != GNUTLS_E_INTERRUPTED && + n_read != GNUTLS_E_AGAIN) { - if (n_read != GNUTLS_E_INTERRUPTED && - n_read != GNUTLS_E_AGAIN) - { - err = (GError *)0x01; /* FIXME - make a new error */ - } - - n_read = 0; + err = (GError *)0x01; /* FIXME - make a new error */ } - - lac_byte_queue_delete_tail (queue, BUF_SIZE - n_read); + + n_read = 0; } - while (n_read > 0); - if (!lac_byte_queue_is_empty (queue)) - { - LacConnectionEvent read_event; - gsize n_bytes; - - read_event.type = LAC_CONNECTION_EVENT_READ; - read_event.read.byte_queue = queue; - read_event.read.data = lac_byte_queue_peek (queue, &n_bytes); - read_event.read.len = n_bytes; - - tls->callback (tls, &read_event); - } + lac_byte_queue_delete_tail (queue, BUF_SIZE - n_read); + } + while (n_read > 0); + + if (!lac_byte_queue_is_empty (queue)) + { + LacConnectionEvent read_event; + gsize n_bytes; - if (err) - { - emit_error (tls, err); - - lac_connection_close (tls->tcp_connection); - } + read_event.type = LAC_CONNECTION_EVENT_READ; + read_event.read.byte_queue = queue; + read_event.read.data = lac_byte_queue_peek (queue, &n_bytes); + read_event.read.len = n_bytes; + + tls->callback (tls, &read_event); + } + + if (err) + { + emit_error (tls, err); - lac_byte_queue_free (queue, TRUE); + lac_connection_close (tls->tcp_connection); } + + lac_byte_queue_free (queue, TRUE); } static void @@ -238,12 +240,15 @@ tcp_callback (LacConnection *connection, break; case LAC_CONNECTION_EVENT_CONNECT: - tls->callback (tls, event); do_handshake (tls); do_writes (tls); break; case LAC_CONNECTION_EVENT_READ: + lac_byte_queue_append (tls->buffer, event->read.data, event->read.len); + do_handshake (tls); + do_writes (tls); + handle_read (tls, &event->read); break; } @@ -301,13 +306,12 @@ lac_tls_connection_new (const LacAddress *address, { LacTlsConnection *tls = g_new0 (LacTlsConnection, 1); - tls->tcp_connection = lac_connection_new (address, port, tcp_callback, tls); + tls->tcp_connection = lac_connection_new_tcp (address, port, tcp_callback, tls); tls->need_handshake = TRUE; tls->callback = callback; tls->data = data; tls->buffer = lac_byte_queue_new (); tls->unwritten = lac_byte_queue_new (); - tls->write_shutdown = FALSE; tls->ref_count = 1; gnutls_global_init (); @@ -376,7 +380,6 @@ lac_tls_connection_write_cstr (LacTlsConnection *connection, g_return_if_fail (connection != NULL); g_return_if_fail (data != NULL); - g_return_if_fail (!connection->write_shutdown); len = strlen (data); @@ -384,15 +387,6 @@ lac_tls_connection_write_cstr (LacTlsConnection *connection, lac_tls_connection_write (connection, data, len); } -void -lac_tls_connection_shutdown_write (LacTlsConnection *connection) -{ - /* FIXME: do we really need this? */ - connection->write_shutdown = TRUE; - - lac_connection_shutdown_write (connection->tcp_connection); -} - G_CONST_RETURN LacAddress * lac_tls_connection_get_address (LacTlsConnection *connection) { diff --git a/tests/connection-test.c b/tests/connection-test.c index e928f43..5096c93 100644 --- a/tests/connection-test.c +++ b/tests/connection-test.c @@ -92,7 +92,7 @@ callback (const LacAddress *addr, gpointer data, const GError *err) { LacConnection *connection; - connection = lac_connection_new (addr, 80, conn_callback, data); + connection = lac_connection_new_tcp (addr, 80, conn_callback, data); } } |