#ifndef _MAINLOOP_H #define _MAINLOOP_H #include #include "mainloop2.h" struct FdInfo { JobQueue * queue; MainFdFunc func; gpointer data; }; struct MainLoop { EPoll *epoll; GMutex *mutex; int wake_up_pipe_write; int wake_up_pipe_read; GQueue *new_fds; GMutex *new_fds_mutex; }; struct FdCallbackInfo { EPoll *epoll; int fd; EPollEventType mask; MainFdFunc func; gpointer data; }; struct MainLoop { }; #if 0 typedef void (* MainFdFunc) (int fd, EPollEventType mask, gpointer data); #endif static void main_loop_lock (MainLoop *loop) { g_mutex_lock (loop->mutex); } static void main_loop_unlock (MainLoop *loop) { g_mutex_unlock (loop->mutex); } static void setup_wakeup_pipe (MainLoop *main_loop) { /* FIXME: check errors */ int p[2]; pipe (p); main_loop->wake_up_pipe_read = p[0]; main_loop->wake_up_pipe_write = p[1]; epoll_add_fd (main_loop->epoll, main_loop->wake_up_pipe_read, NULL); } MainLoop * main_loop_new (void) { MainLoop *main_loop = g_new0 (MainLoop, 1); int p[2]; main_loop->epoll = epoll_new (void); main_loop->mutex = g_mutex_new (); setup_wakeup_pipe (main_loop); main_loop->new_fds = g_queue_new (); main_loop->new_fds_mutex = g_mutex_new (); return main_loop; } typedef struct { int fd; MainFdFunc func; gpointer data; EPollEventType mask; } EventInfo; static void handle_fd (gpointer data) { EventInfo *info = data; info->func (info->fd, info->mask, info->data); g_free (info); } void main_loop_run (MainLoop *loop) { while (TRUE) { EPollEvent *events; int n_events; int i; GList *list; main_loop_lock (loop); events = epoll_wait (loop->epoll, &n_events, -1); main_loop_unlock (loop); for (i = 0; i < n_events; ++i) { EPollEvent *event = &(events[i]); if (event->fd == loop->wake_up_pipe_read) { read_wake_up_pipe (event->fd); } else { FdInfo *info = epoll_get_fd_data (loop->epoll); EventInfo *event_info = g_new (EventInfo, 1); event_info->fd = event->fd; event_info->func = info->func; event_info->data = info->func; event_info->mask = event->mask; job_queue_add (info->queue, handle_fd, event_info); } } main_loop_lock (loop); g_mutex_lock (loop->new_fds_mutex); for (list = loop->new_fds; list != NULL; list = list->next->next) { int fd = GPOINTER_TO_INT (list->data); FdInfo *info = list->next->data; epoll_add_fd (loop->epoll, fd, info); } g_list_free (loop->new_fds); loop->new_fds = NULL; g_mutex_unlock (loop->new_fds_mutex); } } void main_loop_add_fd (MainLoop *loop, int fd, JobQueue *queue, MainFdFunc func, gpointer data) { FdInfo *info; info = g_new0 (FdInfo, 1); info->queue = queue; info->func = func; info->data = data; g_mutex_lock (loop->new_fds_mutex); loop->new_fds = g_list_prepend (loop->new_fds, info); loop->new_fds = g_list_prepend (loop->new_fds, GINT_TO_POINTER (fd)); g_mutex_unlock (loop->new_fds_mutex); main_loop_wake_up (loop); } void main_loop_set_fd_poll_mask (MainLoop * loop, int fd, EPollEventType poll_mask) { FdInfo *info; } void main_loop_remove_fd (MainLoop *loop, int fd) { } /* Missing: */ /* Maybe these functions really belong in the JobQueue? * That would require support from the executor though. * But fundamentally these are valid (in a threaded server) * without fds, which is what the main loop provides. * The executor already has a gcond_wait() that could be * made a timed one. * * Note that a good priority queue is going to be needed in * whatever class eventually implements these. * * Also, note that cancellation support is essential for these * tasks. * * Yes, should probably happen in JobQueue + Executor: * * executor should maintain all jobs in a priority queue * sorted by execution time. * * normal jobs get an execution time of "now", where timeout * jobs get an execution of sometime in the future. * * this avoids starvation of timeouts since a timeout will * eventually move to the front of the queue (with a time * in the past possibly) * * * */ #if 0 void main_loop_add_timeout (MainLoop *loop, JobQueue *queue, MainFunc *task); void main_loop_add_idle (MainLoop *loop, JobQueue *queue, MainFunc *task); #endif #endif /* _MAINLOO_H_ */