/* pop-transaction.c - queues actions to be run sequentially * * Copyright (C) 2007 Ray Strode * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2, or (at your option) * any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA * 02111-1307, USA. */ #include "config.h" #include "pop-transaction.h" #include #include #include #include #include enum { PROP_0 = 0, PROP_STATUS, PROP_ERROR, PROP_RESULT, }; enum { PROCESS = 0, WAIT, RESUME, CANCEL, FINISH, ROLLBACK, NUMBER_OF_SIGNALS }; typedef enum _PopTransactionState { POP_TRANSACTION_STATE_UNCOMMITED = 0, POP_TRANSACTION_STATE_COMMITED, POP_TRANSACTION_STATE_PROCESSING, POP_TRANSACTION_STATE_ROLLING_BACK, POP_TRANSACTION_STATE_FINISHED, } PopTransactionState; typedef struct _PopAction PopAction; typedef struct _PopActionTree PopActionTree; struct _PopTransactionPrivate { /* The transaction state is what the transaction * is currently doing, where as the status is * just whether its still pending, succeeded, or * failed. The state is used internally, whereas * the status is for the user of the api. */ PopTransactionState state; PopTransactionStatus status; /* Actions get run sequentially, returning to the event * loop in between. Actions are set up by the user of the * transaction api, or potentially by other actions during * the run. */ PopActionTree *action_tree; /* which event loop to hook up with. This is normally NULL, * which means "use the default context". */ GMainContext *context; /* state variable that means an action is running right now; * the api function getting called is getting called from * within an action handler */ guint is_waiting : 1; /* we process actions with an idle handler. We do one action * each time the idle handler is dispatched. These ids are * used for removing the idle handler later. For instance, if * the transaction gets paused temporarily by an action, then * we would cancel the idle handler and recreate it when the * transaction gets unpaused. */ guint process_idle_id; guint rollback_idle_id; /* two common reasons to want to pause a transaction * temporarily are to 1) wait for a timeout 2) wait for * activity on a file descriptor. Since they seem like pretty * common things to want to do, we provide convenience * functions for doing them. The convenience functions use * a timeout source and an io watch source respectively. * wait_id is id of one these sources when they are attached * to the event loop context. */ guint wait_id; /* when the transaction finishes there needs to be a way to * extract the output of the transaction. Rather than forcing * the user of the api to come up some ad-hoc way of moving * the result from the last action to caller of the * transaction, we provide an api to do it. */ gpointer result; GDestroyNotify free_result_func; /* when an action fails the transaction it can optionally set * an error on why it failed the transaction */ GError *error; }; typedef enum { POP_ACTION_TREE_WALK_DIRECTION_FORWARD = 0, POP_ACTION_TREE_WALK_DIRECTION_BACKWARD, } PopActionTreeWalkDirection; struct _PopActionTree { GNode *root; GNode *current_action_node; GNode *last_action_node; guint has_new_nodes : 1; }; static PopActionTree *pop_action_tree_new (void); static void pop_action_tree_reset (PopActionTree *action_tree); static gboolean pop_action_tree_is_at_first_action (PopActionTree *action_tree); static gboolean pop_action_tree_is_empty (PopActionTree *action_tree); static gboolean pop_action_tree_seek (PopActionTree *action_tree, PopActionTreeWalkDirection direction); static void pop_action_tree_add_action (PopActionTree *action_tree, PopAction *action); static PopAction *pop_action_tree_get_current_action (PopActionTree *tree); static void pop_action_tree_foreach (PopActionTree *action_tree, GFunc func, gpointer user_data); static void pop_action_tree_free (PopActionTree *action_tree); struct _PopAction { PopActionProcessFunc process_func; PopActionProcessStatus process_status; PopActionRollbackFunc rollback_func; PopActionRollbackStatus rollback_status; GHashTable *state; gpointer user_data; GDestroyNotify free_func; }; static PopAction *pop_action_new (PopActionProcessFunc action_process_func, PopActionRollbackFunc action_rollback_func, gpointer user_data, GDestroyNotify free_func); static void pop_action_free (PopAction *action); static void pop_transaction_finalize (GObject *object); static void pop_transaction_class_install_signals (PopTransactionClass * transaction_class); static void pop_transaction_class_install_properties (PopTransactionClass * transaction_class); static void pop_transaction_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec); static void pop_transaction_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec); static void pop_transaction_set_status (PopTransaction *transaction, PopTransactionStatus status); typedef gboolean (*PopTransactionIdleFunc) (PopTransaction *transaction); static guint pop_transaction_call_on_idle (PopTransaction *transaction, PopTransactionIdleFunc callback); static gboolean pop_transaction_rewind (PopTransaction *transaction); static gboolean pop_transaction_seek_forward (PopTransaction *transaction); static void pop_transaction_fail (PopTransaction *transaction); static void pop_transaction_finish (PopTransaction *transaction); static gboolean pop_transaction_process_action_and_seek_forward (PopTransaction *transaction); static void pop_transaction_process_on_idle (PopTransaction *transaction); static gboolean pop_transaction_rollback_action_and_rewind (PopTransaction *transaction); static void pop_transaction_rollback_on_idle (PopTransaction *transaction); static void pop_transaction_rollback_and_finish (PopTransaction *transaction); static gboolean pop_transaction_is_at_first_action (PopTransaction *transaction); static gboolean pop_transaction_is_attached (PopTransaction *transaction); static gboolean pop_transaction_is_empty (PopTransaction *transaction); static gboolean pop_transaction_is_committed (PopTransaction *transaction); static gboolean pop_transaction_is_waiting (PopTransaction *transaction); static gboolean pop_transaction_is_finished (PopTransaction *transaction); static guint pop_transaction_signals[NUMBER_OF_SIGNALS]; G_DEFINE_TYPE (PopTransaction, pop_transaction, G_TYPE_OBJECT); static void pop_transaction_class_init (PopTransactionClass *transaction_class) { GObjectClass *object_class; object_class = G_OBJECT_CLASS (transaction_class); object_class->finalize = pop_transaction_finalize; pop_transaction_class_install_properties (transaction_class); pop_transaction_class_install_signals (transaction_class); g_type_class_add_private (transaction_class, sizeof (PopTransactionPrivate)); } static void pop_transaction_class_install_signals (PopTransactionClass * transaction_class) { GObjectClass *object_class; object_class = G_OBJECT_CLASS (transaction_class); pop_transaction_signals[PROCESS] = g_signal_new ("process", G_OBJECT_CLASS_TYPE (object_class), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (PopTransactionClass, process), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); transaction_class->process = NULL; pop_transaction_signals[WAIT] = g_signal_new ("wait", G_OBJECT_CLASS_TYPE (object_class), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (PopTransactionClass, wait), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); transaction_class->wait = NULL; pop_transaction_signals[RESUME] = g_signal_new ("resume", G_OBJECT_CLASS_TYPE (object_class), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (PopTransactionClass, resume), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); transaction_class->resume = NULL; pop_transaction_signals[CANCEL] = g_signal_new ("cancel", G_OBJECT_CLASS_TYPE (object_class), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (PopTransactionClass, cancel), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); transaction_class->cancel = NULL; pop_transaction_signals[FINISH] = g_signal_new ("finish", G_OBJECT_CLASS_TYPE (object_class), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (PopTransactionClass, finish), NULL, NULL, g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT); transaction_class->finish = NULL; pop_transaction_signals[ROLLBACK] = g_signal_new ("rollback", G_OBJECT_CLASS_TYPE (object_class), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (PopTransactionClass, rollback), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); transaction_class->rollback = NULL; } static void pop_transaction_class_install_properties ( PopTransactionClass *transaction_class) { GObjectClass *object_class; GParamSpec *param_spec; object_class = G_OBJECT_CLASS (transaction_class); object_class->set_property = pop_transaction_set_property; object_class->get_property = pop_transaction_get_property; /* FIXME: do the whole enum gtype song and dance */ /** * PopTransaction:status: * * A transaction may be pending, successfully or * unsuccessfully finished. This property says which state the * transaction is currently in. */ param_spec = g_param_spec_int ("status", _ ("Status"), _ ("Current status of the transaction"), POP_TRANSACTION_STATUS_NOT_FINISHED, POP_TRANSACTION_STATUS_SUCCEEDED, POP_TRANSACTION_STATUS_NOT_FINISHED, G_PARAM_READABLE); /** * PopTransaction:result: * * The last action in a transaction may set some sort of output or result. * If it does, then this property contains that result. */ g_object_class_install_property (object_class, PROP_STATUS, param_spec); param_spec = g_param_spec_pointer ("result", _ ("Result"), _ ("Output of transaction"), G_PARAM_READABLE); g_object_class_install_property (object_class, PROP_RESULT, param_spec); /** * PopTransaction:error: * * When an action fails it may set a #GError on the transaction with a * reason why the action failed. This property is the error. */ param_spec = g_param_spec_pointer ("error", _ ("Error"), _ ("Reason for transaction error"), G_PARAM_READWRITE); g_object_class_install_property (object_class, PROP_ERROR, param_spec); } static void pop_transaction_init (PopTransaction *transaction) { g_assert (POP_IS_TRANSACTION (transaction)); transaction->priv = G_TYPE_INSTANCE_GET_PRIVATE (transaction, POP_TYPE_TRANSACTION, PopTransactionPrivate); transaction->priv->action_tree = pop_action_tree_new (); transaction->priv->status = POP_TRANSACTION_STATUS_NOT_FINISHED; } static void pop_transaction_finalize (GObject *object) { PopTransaction *transaction; GObjectClass *parent_class; g_assert (POP_IS_TRANSACTION (object)); transaction = POP_TRANSACTION (object); parent_class = G_OBJECT_CLASS (pop_transaction_parent_class); pop_action_tree_foreach (transaction->priv->action_tree, (GFunc) pop_action_free, NULL); pop_action_tree_free (transaction->priv->action_tree); pop_transaction_set_result (transaction, NULL, NULL); if (parent_class->finalize != NULL) { parent_class->finalize (object); } } static void pop_transaction_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec) { PopTransaction *transaction; g_assert (POP_IS_TRANSACTION (object)); transaction = POP_TRANSACTION (object); switch (prop_id) { case PROP_ERROR: pop_transaction_set_error (transaction, (GError *) g_value_get_pointer (value)); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); } } static void pop_transaction_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec) { PopTransaction *transaction; g_assert (POP_IS_TRANSACTION (object)); transaction = POP_TRANSACTION (object); switch (prop_id) { case PROP_STATUS: g_value_set_int (value, pop_transaction_get_status (transaction)); break; case PROP_ERROR: g_value_set_pointer (value, pop_transaction_get_error (transaction)); break; case PROP_RESULT: g_value_set_pointer (value, pop_transaction_get_result (transaction)); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); } } static void pop_transaction_set_status (PopTransaction *transaction, PopTransactionStatus status) { g_return_if_fail (POP_IS_TRANSACTION (transaction)); if (transaction->priv->status != status) { transaction->priv->status = status; g_object_notify (G_OBJECT (transaction), "status"); } } static PopActionTree * pop_action_tree_new (void) { PopActionTree *action_tree; action_tree = g_slice_new0 (PopActionTree); action_tree->root = g_node_new (NULL); pop_action_tree_reset (action_tree); return action_tree; } static void pop_action_tree_reset (PopActionTree *action_tree) { action_tree->current_action_node = NULL; action_tree->has_new_nodes = FALSE; } static gboolean pop_action_tree_is_at_first_action (PopActionTree *action_tree) { return action_tree->current_action_node == NULL; } static gboolean pop_action_tree_is_empty (PopActionTree *action_tree) { return g_node_n_children (action_tree->root) == 0; } static GNode * pop_action_tree_walk_one_step (PopActionTree *action_tree, GNode *current_node, PopActionTreeWalkDirection direction) { GNode *node; if (direction == POP_ACTION_TREE_WALK_DIRECTION_FORWARD) { /* go to the next sibling at the current level. * If there aren't anymore at the current level, * go to the parent. */ node = g_node_next_sibling (current_node); if (node == NULL) { node = current_node->parent; /* if we ever hit the root, we're done */ if (node == action_tree->root) return NULL; } } else { /* go to the last child of the current node. * if current node doesn't have any children, * go to the next sibling in the direction * we're walking (backward). */ node = g_node_last_child (current_node); if (node == NULL) { node = g_node_prev_sibling (current_node); if (node == NULL) return NULL; } } return node; } static GNode * pop_action_tree_walk_to_leaf (PopActionTree *action_tree, GNode *current_node, PopActionTreeWalkDirection direction) { GNode *node; node = current_node; while (node->children != NULL) { node = g_node_first_child (node); } return node; } static gboolean pop_action_tree_seek (PopActionTree *action_tree, PopActionTreeWalkDirection direction) { GNode *node; g_assert ((direction == POP_ACTION_TREE_WALK_DIRECTION_FORWARD) || (direction == POP_ACTION_TREE_WALK_DIRECTION_BACKWARD)); /* we got to backtrack a little if the node hierarchy changed out * from under us. Note by doing this, some nodes will get * visited more than once, so the caller needs to check the * action status and skip it if it's finished. We could color * the nodes to prevent revisiting nodes, but there isn't much * point. */ if (action_tree->has_new_nodes) { GNode *last_node; last_node = action_tree->last_action_node; action_tree->last_action_node = action_tree->current_action_node; action_tree->current_action_node = last_node; action_tree->has_new_nodes = FALSE; } /* if current_action_node is NULL then we haven't started yet, * so set node to a child of the root. Otherwise, walk node * one step in the tree (to the next sibling, or the parent * if there are no more siblings) */ if (action_tree->current_action_node == NULL) { node = action_tree->root; if (node->children == NULL) return FALSE; if (direction == POP_ACTION_TREE_WALK_DIRECTION_BACKWARD) node = g_node_last_child (node); } else { node = pop_action_tree_walk_one_step (action_tree, action_tree->current_action_node, direction); if (node == NULL) return FALSE; } /* we walked one step in the direction we care about above. * now from our new location walk the left outside path to * it's leaf. * * Note, if we're already at a leaf node, then this function is * a no-op */ if (direction == POP_ACTION_TREE_WALK_DIRECTION_FORWARD) node = pop_action_tree_walk_to_leaf (action_tree, node, direction); action_tree->last_action_node = action_tree->current_action_node; action_tree->current_action_node = node; return TRUE; } static void pop_action_tree_add_action (PopActionTree *action_tree, PopAction *action) { if (action_tree->current_action_node != NULL) { g_node_insert_data (action_tree->current_action_node, -1, action); } else { g_node_insert_data (action_tree->root, -1, action); } action_tree->has_new_nodes = TRUE; } static PopAction * pop_action_tree_get_current_action (PopActionTree *tree) { g_assert (tree->current_action_node != NULL); return (PopAction *) tree->current_action_node->data; } typedef struct { PopActionTree *action_tree; GFunc callback; gpointer user_data; } PopActionTreeForeachClosure; static gboolean pop_action_tree_on_each_node (GNode *node, PopActionTreeForeachClosure *closure) { if (node == closure->action_tree->root) return FALSE; closure->callback (node->data, closure->user_data); return FALSE; } static void pop_action_tree_foreach (PopActionTree *action_tree, GFunc func, gpointer user_data) { PopActionTreeForeachClosure closure; closure.action_tree = action_tree; closure.callback = func; closure.user_data = user_data; /* FIXME: think about just using the _step function and a * loop, for consistency with the seek function, and so * we can drop the ugly ad-hoc closure. */ g_node_traverse (action_tree->root, G_PRE_ORDER, G_TRAVERSE_ALL, -1, (GNodeTraverseFunc) pop_action_tree_on_each_node, &closure); } static void pop_action_tree_free (PopActionTree *action_tree) { g_node_destroy (action_tree->root); g_slice_free (PopActionTree, action_tree); } static PopAction * pop_action_new (PopActionProcessFunc action_process_func, PopActionRollbackFunc action_rollback_func, gpointer user_data, GDestroyNotify free_func) { PopAction *action; action = g_slice_new0 (PopAction); action->process_func = action_process_func; action->process_status = POP_ACTION_PROCESS_STATUS_NOT_FINISHED; action->rollback_func = action_rollback_func; action->rollback_status = POP_ACTION_ROLLBACK_STATUS_NOT_FINISHED; action->state = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, NULL); action->user_data = user_data; action->free_func = free_func; return action; } static void pop_action_free (PopAction *action) { g_assert (action != NULL); if (action->free_func != NULL) { action->free_func (action->user_data); } g_hash_table_destroy (action->state); g_slice_free (PopAction, action); } void pop_transaction_set_action_data (PopTransaction *transaction, const char *key, gpointer data) { PopAction *action; g_return_if_fail (POP_IS_TRANSACTION (transaction)); g_return_if_fail (key != NULL); g_return_if_fail (!pop_transaction_has_action_data (transaction, key)); action = pop_action_tree_get_current_action (transaction->priv->action_tree); g_hash_table_insert (action->state, g_strdup (key), data); } void pop_transaction_unset_action_data (PopTransaction *transaction, const gchar *key, gpointer data) { PopAction *action; gboolean data_was_removed; g_return_if_fail (POP_IS_TRANSACTION (transaction)); g_return_if_fail (key != NULL); g_return_if_fail (!pop_transaction_has_action_data (transaction, key)); action = pop_action_tree_get_current_action (transaction->priv->action_tree); data_was_removed = g_hash_table_remove (action->state, key); g_assert (data_was_removed); } gpointer pop_transaction_get_action_data (PopTransaction *transaction, const gchar *key) { PopAction *action; gpointer data; g_return_val_if_fail (POP_IS_TRANSACTION (transaction), NULL); g_return_val_if_fail (key != NULL, NULL); action = pop_action_tree_get_current_action (transaction->priv->action_tree); data = NULL; g_hash_table_lookup_extended (action->state, key, NULL, &data); return data; } gboolean pop_transaction_has_action_data (PopTransaction *transaction, const gchar *key) { PopAction *action; g_return_val_if_fail (POP_IS_TRANSACTION (transaction), FALSE); g_return_val_if_fail (key != NULL, FALSE); action = pop_action_tree_get_current_action (transaction->priv->action_tree); return g_hash_table_lookup_extended (action->state, key, NULL, NULL); } static PopActionProcessStatus pop_transaction_process_subtransaction (PopTransaction *transaction, PopTransaction *subtransaction) { PopTransactionStatus status; g_assert (POP_IS_TRANSACTION (transaction)); g_assert (POP_IS_TRANSACTION (subtransaction)); if (!pop_transaction_is_committed (transaction)) { pop_transaction_wait (transaction); g_signal_connect_swapped (G_OBJECT (subtransaction), "finish", G_CALLBACK (pop_transaction_resume), transaction); pop_transaction_commit (transaction); return POP_ACTION_PROCESS_STATUS_NOT_FINISHED; } status = pop_transaction_get_status (subtransaction); g_assert (status != POP_TRANSACTION_STATUS_NOT_FINISHED); g_assert (status == POP_TRANSACTION_STATUS_CANCELED || status == POP_TRANSACTION_STATUS_FAILED || status == POP_TRANSACTION_STATUS_SUCCEEDED); if ((status == POP_TRANSACTION_STATUS_CANCELED) || (status == POP_TRANSACTION_STATUS_FAILED)) { return POP_ACTION_PROCESS_STATUS_FAILED; } return POP_ACTION_PROCESS_STATUS_SUCCEEDED; } static PopActionRollbackStatus pop_transaction_rollback_subtransaction (PopTransaction *transaction, PopTransaction *subtransaction) { PopTransactionStatus status; status = pop_transaction_get_status (subtransaction); /* FIXME: things are a little confusing here. when you * rollback a transaction it puts the status in NOT_FINISHED * and emits a "finish" signal. That's pretty counter-intuitive. * * Maybe we need to change the wording a bit, or use a separate signal * for when the rollback completes */ g_assert ((status == POP_TRANSACTION_STATUS_SUCCEEDED) || (status == POP_TRANSACTION_STATUS_NOT_FINISHED)); /* not rolled back yet */ if (status == POP_TRANSACTION_STATUS_SUCCEEDED) { pop_transaction_wait (transaction); pop_transaction_rollback (subtransaction); g_signal_connect_swapped (G_OBJECT (subtransaction), "finish", G_CALLBACK (pop_transaction_resume), transaction); return POP_ACTION_ROLLBACK_STATUS_NOT_FINISHED; } /* the second time we've been called. After the pop_transaction_resume() */ return POP_ACTION_ROLLBACK_STATUS_FINISHED; } static gboolean pop_transaction_on_fd_ready_resume (GIOChannel *channel, GIOCondition condition, gpointer data) { PopTransaction *transaction; g_assert (POP_IS_TRANSACTION (data)); transaction = POP_TRANSACTION (data); pop_transaction_resume (transaction); return FALSE; } static gboolean pop_transaction_on_timeout_resume (gpointer data) { PopTransaction *transaction; g_assert (POP_IS_TRANSACTION (data)); transaction = POP_TRANSACTION (data); pop_transaction_resume (transaction); return FALSE; } static guint pop_transaction_call_on_idle (PopTransaction *transaction, PopTransactionIdleFunc callback) { GSource *source; guint idle_id; g_assert (POP_IS_TRANSACTION (transaction)); source = g_idle_source_new (); g_object_ref (transaction); g_source_set_callback (source, (GSourceFunc) callback, transaction, (GDestroyNotify) g_object_unref); idle_id = g_source_attach (source, transaction->priv->context); g_source_unref (source); return idle_id; } static gboolean pop_transaction_rewind (PopTransaction *transaction) { g_assert (POP_IS_TRANSACTION (transaction)); return pop_action_tree_seek (transaction->priv->action_tree, POP_ACTION_TREE_WALK_DIRECTION_BACKWARD); } static gboolean pop_transaction_seek_forward (PopTransaction *transaction) { g_assert (POP_IS_TRANSACTION (transaction)); return pop_action_tree_seek (transaction->priv->action_tree, POP_ACTION_TREE_WALK_DIRECTION_FORWARD); } static void pop_transaction_fail (PopTransaction *transaction) { g_assert (POP_IS_TRANSACTION (transaction)); pop_transaction_set_status (transaction, POP_TRANSACTION_STATUS_FAILED); pop_transaction_rollback_and_finish (transaction); } static void pop_transaction_finish (PopTransaction *transaction) { g_assert (POP_IS_TRANSACTION (transaction)); transaction->priv->state = POP_TRANSACTION_STATE_FINISHED; g_signal_emit (transaction, pop_transaction_signals[FINISH], 0, transaction->priv->status); } static gboolean pop_transaction_process_action_and_seek_forward (PopTransaction *transaction) { PopAction *action; gboolean should_continue; g_assert (POP_IS_TRANSACTION (transaction)); if (transaction->priv->state == POP_TRANSACTION_STATE_COMMITED) { transaction->priv->state = POP_TRANSACTION_STATE_PROCESSING; g_signal_emit (transaction, pop_transaction_signals[PROCESS], 0); } g_assert (transaction->priv->state == POP_TRANSACTION_STATE_PROCESSING); action = pop_action_tree_get_current_action (transaction->priv->action_tree); if (action->process_func != NULL) { if (action->process_status == POP_ACTION_PROCESS_STATUS_NOT_FINISHED) action->process_status = action->process_func (transaction, action->user_data); } else { action->process_status = POP_ACTION_PROCESS_STATUS_SUCCEEDED; } switch (action->process_status) { case POP_ACTION_PROCESS_STATUS_NOT_FINISHED: should_continue = TRUE; break; case POP_ACTION_PROCESS_STATUS_SUCCEEDED: if (!pop_transaction_seek_forward (transaction)) { pop_transaction_set_status (transaction, POP_TRANSACTION_STATUS_SUCCEEDED); pop_transaction_finish (transaction); should_continue = FALSE; } else { should_continue = TRUE; } break; default: g_warning ("status %d is not valid processing status\n", (int) action->process_status); /* Fall through and fail the action because it's not behaving */ case POP_ACTION_PROCESS_STATUS_FAILED: should_continue = FALSE; pop_transaction_fail (transaction); break; } if (!should_continue) { g_source_remove (transaction->priv->process_idle_id); transaction->priv->process_idle_id = 0; } return should_continue; } static void pop_transaction_process_on_idle (PopTransaction *transaction) { g_assert (POP_IS_TRANSACTION (transaction)); pop_action_tree_reset (transaction->priv->action_tree); pop_transaction_seek_forward (transaction); transaction->priv->process_idle_id = pop_transaction_call_on_idle (transaction, pop_transaction_process_action_and_seek_forward); } static gboolean pop_transaction_rollback_action_and_rewind (PopTransaction *transaction) { PopAction *action; gboolean should_continue; g_assert (POP_IS_TRANSACTION (transaction)); if (transaction->priv->state == POP_TRANSACTION_STATE_PROCESSING) { transaction->priv->state = POP_TRANSACTION_STATE_ROLLING_BACK; } g_assert (transaction->priv->state == POP_TRANSACTION_STATE_ROLLING_BACK); action = pop_action_tree_get_current_action (transaction->priv->action_tree); if (action->rollback_func != NULL) { if (action->rollback_status == POP_ACTION_ROLLBACK_STATUS_NOT_FINISHED) action->rollback_status = action->rollback_func (transaction, action->user_data); } else { action->rollback_status = POP_ACTION_ROLLBACK_STATUS_FINISHED; } switch (action->rollback_status) { case POP_ACTION_ROLLBACK_STATUS_NOT_FINISHED: should_continue = TRUE; break; default: g_warning ("status %d is not valid rollback status\n", (int) action->rollback_status); /* fall through and finish the action because it's not behaving */ case POP_ACTION_ROLLBACK_STATUS_FINISHED: should_continue = pop_transaction_rewind (transaction); break; should_continue = FALSE; break; } if (!should_continue) { g_source_remove (transaction->priv->rollback_idle_id); transaction->priv->rollback_idle_id = 0; pop_transaction_finish (transaction); } return should_continue; } static void pop_transaction_rollback_on_idle (PopTransaction *transaction) { g_assert (POP_IS_TRANSACTION (transaction)); pop_transaction_rewind (transaction); transaction->priv->rollback_idle_id = pop_transaction_call_on_idle (transaction, pop_transaction_rollback_action_and_rewind); } static void pop_transaction_rollback_and_finish (PopTransaction *transaction) { g_assert (POP_IS_TRANSACTION (transaction)); if (pop_transaction_is_at_first_action (transaction)) { pop_transaction_finish (transaction); return; } pop_transaction_rollback_on_idle (transaction); } static gboolean pop_transaction_is_at_first_action (PopTransaction *transaction) { g_assert (POP_IS_TRANSACTION (transaction)); return pop_action_tree_is_at_first_action (transaction->priv->action_tree); } static gboolean pop_transaction_is_attached (PopTransaction *transaction) { g_assert (POP_IS_TRANSACTION (transaction)); return transaction->priv->context != NULL; } static gboolean pop_transaction_is_empty (PopTransaction *transaction) { g_assert (POP_IS_TRANSACTION (transaction)); return pop_action_tree_is_empty (transaction->priv->action_tree); } static gboolean pop_transaction_is_committed (PopTransaction *transaction) { g_assert (POP_IS_TRANSACTION (transaction)); return transaction->priv->state >= POP_TRANSACTION_STATE_COMMITED; } static gboolean pop_transaction_is_waiting (PopTransaction *transaction) { g_assert (POP_IS_TRANSACTION (transaction)); return transaction->priv->is_waiting; } static gboolean pop_transaction_is_finished (PopTransaction *transaction) { return transaction->priv->state == POP_TRANSACTION_STATE_FINISHED; } PopTransaction * pop_transaction_new (void) { PopTransaction *transaction; transaction = g_object_new (POP_TYPE_TRANSACTION, NULL); return transaction; } PopTransactionStatus pop_transaction_get_status (PopTransaction *transaction) { g_return_val_if_fail (POP_IS_TRANSACTION (transaction), POP_TRANSACTION_STATUS_FAILED); return transaction->priv->status; } void pop_transaction_add_action_full (PopTransaction *transaction, PopActionProcessFunc action_process_func, PopActionRollbackFunc action_rollback_func, gpointer user_data, GDestroyNotify free_func) { PopAction *action; g_return_if_fail (POP_IS_TRANSACTION (transaction)); action = pop_action_new (action_process_func, action_rollback_func, user_data, free_func); pop_action_tree_add_action (transaction->priv->action_tree, action); } void pop_transaction_add_action (PopTransaction *transaction, PopActionProcessFunc action_process_func, PopActionRollbackFunc action_rollback_func, gpointer user_data) { pop_transaction_add_action_full (transaction, action_process_func, action_rollback_func, user_data, NULL); } void pop_transaction_add_subtransaction (PopTransaction *transaction, PopTransaction *subtransaction) { g_return_if_fail (POP_IS_TRANSACTION (transaction)); g_return_if_fail (POP_IS_TRANSACTION (subtransaction)); g_return_if_fail (!pop_transaction_is_committed (subtransaction)); pop_transaction_add_action_full (transaction, (PopActionProcessFunc) pop_transaction_process_subtransaction, (PopActionRollbackFunc) pop_transaction_rollback_subtransaction, g_object_ref (subtransaction), (GDestroyNotify) g_object_unref); } void pop_transaction_resume (PopTransaction *transaction) { g_assert (POP_IS_TRANSACTION (transaction)); g_assert (pop_transaction_is_waiting (transaction)); transaction->priv->is_waiting = FALSE; g_signal_emit (transaction, pop_transaction_signals[RESUME], 0); if (transaction->priv->wait_id != 0) { g_source_remove (transaction->priv->wait_id); transaction->priv->wait_id = 0; } if (!pop_transaction_is_committed (transaction)) return; switch (transaction->priv->state) { case POP_TRANSACTION_STATE_PROCESSING: pop_transaction_process_on_idle (transaction); break; case POP_TRANSACTION_STATE_ROLLING_BACK: pop_transaction_rollback_on_idle (transaction); break; default: break; } } void pop_transaction_wait (PopTransaction *transaction) { g_return_if_fail (POP_IS_TRANSACTION (transaction)); switch (transaction->priv->state) { case POP_TRANSACTION_STATE_PROCESSING: g_assert (transaction->priv->process_idle_id != 0); g_assert (transaction->priv->rollback_idle_id == 0); g_source_remove (transaction->priv->process_idle_id); transaction->priv->process_idle_id = 0; break; case POP_TRANSACTION_STATE_ROLLING_BACK: g_assert (transaction->priv->rollback_idle_id != 0); g_assert (transaction->priv->process_idle_id == 0); g_source_remove (transaction->priv->rollback_idle_id); transaction->priv->rollback_idle_id = 0; break; default: break; } transaction->priv->is_waiting = TRUE; g_signal_emit (transaction, pop_transaction_signals[WAIT], 0); } void pop_transaction_wait_for_fd (PopTransaction *transaction, int fd, GIOCondition condition, GDestroyNotify destroy_notify) { GIOChannel *channel; GSource *source; g_return_if_fail (POP_IS_TRANSACTION (transaction)); pop_transaction_wait (transaction); channel = g_io_channel_unix_new (fd); g_io_channel_set_encoding (channel, NULL, NULL); g_io_channel_set_buffered (channel, FALSE); source = g_io_create_watch (channel, condition); g_source_set_callback (source, (GSourceFunc) pop_transaction_on_fd_ready_resume, transaction, NULL); transaction->priv->wait_id = g_source_attach (source, transaction->priv->context); g_source_unref (source); g_io_channel_unref (channel); } void pop_transaction_wait_a_while (PopTransaction *transaction, int milliseconds) { GSource *source; g_return_if_fail (POP_IS_TRANSACTION (transaction)); pop_transaction_wait (transaction); source = g_timeout_source_new (milliseconds); g_source_set_callback (source, (GSourceFunc) pop_transaction_on_timeout_resume, transaction, NULL); transaction->priv->wait_id = g_source_attach (source, transaction->priv->context); g_source_unref (source); } void pop_transaction_attach (PopTransaction *transaction, GMainContext *context) { g_return_if_fail (POP_IS_TRANSACTION (transaction)); g_return_if_fail (!pop_transaction_is_attached (transaction)); transaction->priv->context = g_main_context_ref (context); } void pop_transaction_commit (PopTransaction *transaction) { g_return_if_fail (POP_IS_TRANSACTION (transaction)); g_return_if_fail (!pop_transaction_is_empty (transaction)); g_return_if_fail (!pop_transaction_is_committed (transaction)); pop_transaction_process_on_idle (transaction); transaction->priv->state = POP_TRANSACTION_STATE_COMMITED; } void pop_transaction_cancel (PopTransaction *transaction) { g_return_if_fail (POP_IS_TRANSACTION (transaction)); g_return_if_fail (!pop_transaction_is_finished (transaction)); pop_transaction_set_status (transaction, POP_TRANSACTION_STATUS_CANCELED); g_signal_emit (transaction, pop_transaction_signals[CANCEL], 0); pop_transaction_rollback_and_finish (transaction); } void pop_transaction_rollback (PopTransaction *transaction) { g_return_if_fail (POP_IS_TRANSACTION (transaction)); g_return_if_fail (pop_transaction_is_finished (transaction)); g_return_if_fail (pop_transaction_get_status (transaction) == POP_TRANSACTION_STATUS_SUCCEEDED); pop_transaction_set_status (transaction, POP_TRANSACTION_STATUS_NOT_FINISHED); g_signal_emit (transaction, pop_transaction_signals[ROLLBACK], 0); pop_transaction_rollback_and_finish (transaction); } void pop_transaction_set_result (PopTransaction *transaction, gpointer result, GDestroyNotify free_func) { g_return_if_fail (POP_IS_TRANSACTION (transaction)); if (transaction->priv->free_result_func != NULL) { transaction->priv->free_result_func (transaction->priv->result); } transaction->priv->result = result; transaction->priv->free_result_func = free_func; } gpointer pop_transaction_get_result (PopTransaction *transaction) { g_return_val_if_fail (POP_IS_TRANSACTION (transaction), NULL); return transaction->priv->result; } void pop_transaction_set_error (PopTransaction *transaction, const GError *error) { g_return_if_fail (POP_IS_TRANSACTION (transaction)); if (transaction->priv->error != error) { if (transaction->priv->error != NULL) { g_error_free (transaction->priv->error); } transaction->priv->error = g_error_copy (error); g_object_notify (G_OBJECT (transaction), "error"); } } void pop_transaction_set_error_from_errno (PopTransaction *transaction) { GError *error; error = g_error_new_literal (G_FILE_ERROR, g_file_error_from_errno (errno), g_strerror (errno)); pop_transaction_set_error (transaction, error); g_error_free (error); } GError * pop_transaction_get_error (PopTransaction *transaction) { g_return_val_if_fail (POP_IS_TRANSACTION (transaction), NULL); return g_error_copy (transaction->priv->error); } #ifdef POP_TRANSACTION_ENABLE_TEST #include #include #include #include typedef struct { int fd; guint8 *buffer; gssize number_of_bytes_to_process; } InputOutputArguments; typedef ssize_t (*InputOutputFunc) (int fd, void *buf, size_t count); static PopActionProcessStatus proces_io_action (PopTransaction *transaction, InputOutputFunc io_func, InputOutputArguments *arguments, GIOCondition condition) { gboolean is_ready; size_t bytes_left, bytes_done; ssize_t bytes_done_this_time; gpointer action_data; g_assert (POP_IS_TRANSACTION (transaction)); g_return_val_if_fail (arguments->fd >= 0, POP_ACTION_PROCESS_STATUS_FAILED); g_return_val_if_fail (arguments->buffer != NULL, POP_ACTION_PROCESS_STATUS_FAILED); is_ready = GPOINTER_TO_INT (g_object_get_data (G_OBJECT (transaction), "is-ready")); if (!is_ready) { g_printerr ("waiting for fd %d to become ready.\n", arguments->fd); /* pause transaction until fd is ready for reading (or writing * depending on the io_func) */ pop_transaction_wait_for_fd (transaction, arguments->fd, condition, NULL); action_data = GINT_TO_POINTER (arguments->number_of_bytes_to_process); g_object_set_data (transaction, "bytes-left-to-process", action_data); /* when the transaction gets unpaused we want to continue on */ action_data = GINT_TO_POINTER (TRUE) pop_transaction_set_action_data (G_OBJECT (transaction), "is-ready", action_data); return POP_ACTION_PROCESS_STATUS_NOT_FINISHED; } g_printerr ("fd %d is now ready!\n", arguments->fd); bytes_done = GPOINTER_TO_INT (g_object_get_data (G_OBJECT (transaction), "bytes-already-processed")); bytes_left = GPOINTER_TO_INT (g_object_get_data (G_OBJECT (transaction), "bytes-left-to-process")); bytes_done_this_time = io_func (arguments->fd, arguments->buffer + bytes_done, bytes_left); if (bytes_done_this_time < 0) { GError *error; if (errno == EINTR) { /* EINTR we can handle automatically by just trying again */ return POP_ACTION_PROCESS_STATUS_NOT_FINISHED; } else if (errno == EAGAIN) { /* EAGAIN means * "there's no more data to work with right now" * so we pause the transaction until there is data */ pop_transaction_wait_for_fd (transaction, arguments->fd, condition, NULL); return POP_ACTION_PROCESS_STATUS_NOT_FINISHED; } else { error = g_error_new_literal (G_FILE_ERROR, g_file_error_from_errno (errno), g_strerror (errno)); pop_transaction_set_error (transaction, error); g_error_free (error); return POP_ACTION_PROCESS_STATUS_FAILED; } } g_assert (bytes_done_this_time <= bytes_left); /* end of file case */ if (bytes_done_this_time == 0) { bytes_left = 0; } else { bytes_done += bytes_done_this_time; bytes_left -= bytes_done_this_time; } g_object_set_data (G_OBJECT (transaction), "bytes-already-processed", GINT_TO_POINTER (bytes_done)); g_object_set_data (G_OBJECT (transaction), "bytes-left-to-process", GINT_TO_POINTER (bytes_left)); if (bytes_left == 0) { pop_transaction_set_result (transaction, GINT_TO_POINTER (bytes_done), NULL); g_object_set_data (G_OBJECT (transaction), "bytes-already-processed", GINT_TO_POINTER (0)); pop_transaction_set_action_data (G_OBJECT (transaction), "is-ready", GINT_TO_POINTER (FALSE)); return POP_ACTION_PROCESS_STATUS_SUCCEEDED; } return POP_ACTION_PROCESS_STATUS_NOT_FINISHED; } static PopActionProcessStatus process_read_action (PopTransaction *transaction, gpointer arguments) { g_printerr ("processing read action\n"); return proces_io_action (transaction, (InputOutputFunc) read, arguments, G_IO_IN); } static PopActionProcessStatus process_write_action (PopTransaction *transaction, gpointer arguments) { g_printerr ("processing write action\n"); return proces_io_action (transaction, (InputOutputFunc) write, arguments, G_IO_OUT); } static PopActionProcessStatus process_set_fd_nonblocking_action (PopTransaction *transaction, gpointer data) { int fd; int flags, status; fd = GPOINTER_TO_INT (data); flags = fcntl (fd, F_GETFL); if (flags < 0) { pop_transaction_set_error_from_errno (transaction); return POP_ACTION_PROCESS_STATUS_FAILED; } status = fcntl (fd, F_SETFL, flags | O_NONBLOCK); if (status < 0) { pop_transaction_set_error_from_errno (transaction); return POP_ACTION_PROCESS_STATUS_FAILED; } pop_transaction_set_action_data (G_OBJECT (transaction), "file-flags", GINT_TO_POINTER (flags)); return POP_ACTION_PROCESS_STATUS_SUCCEEDED; } static PopActionRollbackStatus rollback_set_fd_nonblocking_action (PopTransaction *transaction, gpointer data) { int fd, flags; gpointer action_data; fd = GPOINTER_TO_INT (data); action_data = pop_transaction_get_action_data (transaction, "file-flags"); flags = GPOINTER_TO_INT (action_data); fcntl (fd, F_SETFL, flags); return POP_ACTION_ROLLBACK_STATUS_FINISHED; } static PopTransaction * input_transaction_new (guint8 *buffer, gsize buffer_size) { PopTransaction *transaction; InputOutputArguments *input_arguments; transaction = pop_transaction_new (); pop_transaction_add_action (transaction, process_set_fd_nonblocking_action, rollback_set_fd_nonblocking_action, GINT_TO_POINTER (STDIN_FILENO)); input_arguments = g_slice_new (InputOutputArguments); input_arguments->fd = STDIN_FILENO; input_arguments->buffer = buffer; input_arguments->number_of_bytes_to_process = buffer_size; pop_transaction_add_action (transaction, process_read_action, NULL, input_arguments); return transaction; } static PopTransaction * output_transaction_new (guint8 *buffer, gsize buffer_size) { PopTransaction *transaction; InputOutputArguments *output_arguments; transaction = pop_transaction_new (); pop_transaction_add_action (transaction, process_set_fd_nonblocking_action, rollback_set_fd_nonblocking_action, GINT_TO_POINTER (STDOUT_FILENO)); output_arguments = g_slice_new (InputOutputArguments); output_arguments->fd = STDOUT_FILENO; output_arguments->buffer = buffer; output_arguments->number_of_bytes_to_process = buffer_size; pop_transaction_add_action (transaction, process_write_action, NULL, output_arguments); return transaction; } static void on_input_transaction_wait (PopTransaction *input_transaction, PopTransaction *output_transaction) { gint bytes_read, bytes_written; bytes_read = GPOINTER_TO_INT (g_object_get_data (G_OBJECT (input_transaction), "bytes-already-processed")); bytes_written = GPOINTER_TO_INT (g_object_get_data (G_OBJECT (output_transaction), "bytes-already-processed")); if (bytes_read > bytes_written) pop_transaction_resume (output_transaction); } static void on_input_transaction_finish (PopTransaction *input_transaction, PopTransactionStatus status, PopTransaction *output_transaction) { pop_transaction_resume (output_transaction); } static PopActionProcessStatus process_cat_action (PopTransaction *transaction, gpointer data) { PopTransaction *input_transaction, *output_transaction; char *buffer; static const int buffer_size = 4096; buffer = g_malloc0 (buffer_size); input_transaction = input_transaction_new (buffer, buffer_size); pop_transaction_commit (input_transaction); output_transaction = output_transaction_new (buffer, buffer_size); g_signal_connect (G_OBJECT (input_transaction), "wait", G_CALLBACK (on_input_transaction_wait), output_transaction); g_signal_connect (G_OBJECT (input_transaction), "finish", G_CALLBACK (on_input_transaction_finish), output_transaction); pop_transaction_wait (output_transaction); pop_transaction_commit (output_transaction); return POP_ACTION_PROCESS_STATUS_NOT_FINISHED; } static PopActionRollbackStatus rollback_cat_action (PopTransaction *transaction, gpointer data) { InputOutputArguments *input_arguments, *output_arguments; input_arguments = g_object_get_data (G_OBJECT (transaction), "cat-input-args"); g_slice_free (InputOutputArguments, input_arguments); output_arguments = g_object_get_data (G_OBJECT (transaction), "cat-output-args"); g_slice_free (InputOutputArguments, output_arguments); g_free (output_arguments->buffer); return POP_ACTION_ROLLBACK_STATUS_FINISHED; } static void on_transaction_finish (PopTransaction *transaction, PopTransactionStatus status, gpointer data) { GMainLoop *loop; loop = (GMainLoop *) data; g_print ("transaction completed with status %d\n", status); g_main_loop_quit (loop); } int main (int argc, char **argv) { GMainLoop *loop; PopTransaction *transaction; int exit_code; g_log_set_always_fatal (G_LOG_LEVEL_ERROR | G_LOG_LEVEL_CRITICAL | G_LOG_LEVEL_WARNING); g_type_init (); loop = g_main_loop_new (NULL, FALSE); transaction = pop_transaction_new (); g_signal_connect (G_OBJECT (transaction), "finish", G_CALLBACK (on_transaction_finish), loop); pop_transaction_add_action (transaction, process_cat_action, rollback_cat_action, NULL); pop_transaction_commit (transaction); g_main_loop_run (loop); g_main_loop_unref (loop); g_object_unref (transaction); exit_code = 0; return exit_code; } #endif /* POP_TRANSACTION_ENABLE_TEST */