#include #include "engine.h" #include "epoll.h" typedef struct FdInfo FdInfo; typedef struct FdAction FdAction; struct FdInfo { }; struct Context { Engine *engine; gpointer data; GMutex *mutex; }; typedef void (* EngineFdFunc) (const EPollEvent *event, gpointer data); struct FdAction { int fd; EPollEventType mask; EngineFdFunc func; gpointer data; }; struct Engine { gboolean quitting; GMutex * mutex; GCond * new_job_cond; GCond * idle_cond; GCond * exit_cond; GQueue * jobs; int n_threads; int n_idle_threads; int n_exited_threads; GQueue * fd_actions; /* polling */ gboolean polling; EPoll * epoll; int read_fd; int write_fd; }; static gpointer engine_thread (gpointer); /* * Engine */ static void engine_lock (Engine *engine) { g_mutex_lock (engine->mutex); } static void engine_unlock (Engine *engine) { g_mutex_unlock (engine->mutex); } static void make_pipe (int *rd, int *wr) { int fds[2]; if (pipe (fds) < 0) g_error ("Could not create wake-up pipe\n"); *rd = fds[0]; *wr = fds[1]; } static void poll_job (gpointer data) { Engine *engine = data; int i, n_events; EPollEvent *events; GList *list; engine_lock (engine); for (list = engine->fd_actions->head; list != NULL; list = list->next) { FdAction *action = list->data; EPollEventType mask; } engine->polling = TRUE; engine_unlock (engine); events = epoll_wait (engine->epoll, &n_events, -1); for (i = 0; i < n_events; ++i) { #if 0 FdInfo *info; #endif EPollEvent *event = &(events[i]); #if 0 info = &(engine->fd_infos[event->fd]); #endif } g_free (events); engine_append_job (engine, poll_job, engine); } Engine * engine_new (guint n_threads) { Engine *engine; int i; g_return_val_if_fail (n_threads > 0, NULL); if (!g_thread_supported()) g_thread_init (NULL); engine = g_new0 (Engine, 1); engine->mutex = g_mutex_new (); engine->jobs = g_queue_new (); engine->new_job_cond = g_cond_new (); engine->idle_cond = g_cond_new (); engine->exit_cond = g_cond_new (); engine->n_idle_threads = 0; engine->n_exited_threads = 0; engine->quitting = FALSE; engine->n_threads = n_threads; engine->epoll = epoll_new (); engine->polling = FALSE; make_pipe (&engine->read_fd, &engine->write_fd); epoll_add_fd (engine->epoll, engine->read_fd, EPOLL_READ | EPOLL_ERROR, engine); engine->fd_actions = g_queue_new (); engine_lock (engine); for (i = 0; i < n_threads; ++i) g_thread_create (engine_thread, engine, TRUE, NULL); engine_unlock (engine); engine_append_job (engine, poll_job, engine); return engine; } static guint engine_n_jobs (Engine *engine) { g_assert (g_queue_get_length (engine->jobs) % 2 == 0); return (g_queue_get_length (engine->jobs)) / 2; } /* Wait until all previously submitted jobs have finished */ void engine_sync (Engine *engine) { engine_lock (engine); g_cond_broadcast (engine->new_job_cond); while (!(engine->n_idle_threads == engine->n_threads && engine_n_jobs (engine) == 0)) { g_cond_wait (engine->idle_cond, engine->mutex); } engine_unlock (engine); } void engine_free (Engine *engine) { engine_lock (engine); engine->quitting = TRUE; g_cond_broadcast (engine->new_job_cond); while (engine->n_exited_threads != engine->n_threads) g_cond_wait (engine->exit_cond, engine->mutex); engine_unlock (engine); while (!g_queue_is_empty (engine->jobs)) g_queue_pop_head (engine->jobs); g_mutex_free (engine->mutex); g_cond_free (engine->new_job_cond); g_cond_free (engine->idle_cond); g_cond_free (engine->exit_cond); g_queue_free (engine->jobs); } void engine_append_job (Engine *engine, EngineFunc job, gpointer data) { engine_lock (engine); g_queue_push_tail (engine->jobs, job); g_queue_push_tail (engine->jobs, data); g_cond_signal (engine->new_job_cond); engine_unlock (engine); } void engine_prepend_job (Engine *engine, EngineFunc job, gpointer data) { engine_lock (engine); g_queue_push_head (engine->jobs, data); g_queue_push_head (engine->jobs, job); g_cond_signal (engine->new_job_cond); engine_unlock (engine); } static gpointer engine_thread (gpointer data) { Engine *engine = data; engine_lock (engine); while (!engine->quitting) { if (!engine_n_jobs (engine)) { engine->n_idle_threads++; g_cond_signal (engine->idle_cond); g_cond_wait (engine->new_job_cond, engine->mutex); engine->n_idle_threads--; } if (engine->quitting) break; if (engine_n_jobs (engine)) { EngineFunc job; gpointer data; job = g_queue_pop_head (engine->jobs); data = g_queue_pop_head (engine->jobs); engine_unlock (engine); job (data); engine_lock (engine); } } engine->n_exited_threads++; g_cond_signal (engine->exit_cond); engine_unlock (engine); return NULL; } static void ensure_wake_up (Engine *engine) { if (engine->polling) { char c = '!'; write (engine->write_fd, &c, 1); engine->polling = FALSE; } } typedef struct { int fd; EngineFdFunc func; gpointer data; } EngineAddFd; typedef struct { int fd; EPollEventType mask; } EngineChangeFd; typedef struct { int fd; } EngineRemoveFd; static void engine_queue_add_fd (Engine *engine, int fd, EngineFdFunc func, gpointer data) { EngineAddFd *add_fd = g_new (EngineAddFd, 1); engine_lock (engine); add_fd->fd = fd; add_fd->func = func; add_fd->data = data; g_queue_push_tail (engine->fd_actions, add_fd); ensure_wake_up (engine); engine_unlock (engine); } static void engine_queue_change_fd (Engine *engine, int fd, EPollEventType mask) { EngineChangeFd *change_fd = g_new (EngineChangeFd, 1); engine_lock (engine); change_fd->fd = fd; change_fd->mask = mask; ensure_wake_up (engine); engine_unlock (engine); } static void engine_queue_remove_fd (Engine *engine, int fd) { EngineRemoveFd *remove_fd = g_new (EngineRemoveFd, 1); engine_lock (engine); remove_fd->fd = fd; ensure_wake_up (engine); engine_unlock (engine); } /* * Context */ static void context_lock (Context *context) { g_mutex_lock (context->mutex); } static void context_unlock (Context *context) { g_mutex_unlock (context->mutex); } Context * context_new (Engine *engine, gpointer data) { Context *context = g_new0 (Context, 1); context->engine = engine; context->data = data; context->mutex = g_mutex_new (); return context; } void context_free (Context *context) { context_lock (context); context_unlock (context); g_mutex_free (context->mutex); g_free (context); } Engine * context_get_engine (Context *context) { Engine *result; context_lock (context); result = context->engine; context_unlock (context); return result; } gpointer context_get_data (Context *context) { gpointer result; context_lock (context); result = context->data; context_unlock (context); return result; } static void on_fd_event (const EPollEvent *event, gpointer data) { Context *context = data; context_lock (context); /* find fd in context */ /* add relevant callbacks to queue */ context_unlock (context); } void context_add_fd (Context *context, int fd) { context_lock (context); engine_queue_add_fd (context->engine, fd, on_fd_event, context); context_unlock (context); } void context_remove_fd (Context *context, int fd) { context_lock (context); engine_queue_remove_fd (context->engine, fd); context_unlock (context); } void context_set_write (Context *context, int fd, ContextFdFunc callback) { EPollEventType mask; context_lock (context); mask = 0; /* FIXME */ engine_queue_change_fd (context->engine, fd, mask); context_unlock (context); } void context_set_read (Context *context, int fd, ContextFdFunc callback) { EPollEventType mask; context_lock (context); mask = 0; /* FIXME */ engine_queue_change_fd (context->engine, fd, mask); context_unlock (context); }