diff options
author | Junyan He <junyan.he@intel.com> | 2016-04-25 17:57:29 +0800 |
---|---|---|
committer | Junyan He <junyan.he@intel.com> | 2016-04-25 17:57:29 +0800 |
commit | 1ca82c1cd31c29b96effe0565332cfb4b7eb08e2 (patch) | |
tree | 82c4cd1ced9aa99a2c3e6ff4174cf75d80c50506 | |
parent | f0daeb7a225316cae4c63125db2a1421c72841a0 (diff) |
move worker to cl
-rw-r--r-- | include/cl_command_queue.h | 1 | ||||
-rw-r--r-- | libclapi/cl_enqueue.c | 201 | ||||
-rw-r--r-- | libclapi/cl_event.c | 13 | ||||
-rw-r--r-- | libclapi/cl_internals.h | 2 |
4 files changed, 208 insertions, 9 deletions
diff --git a/include/cl_command_queue.h b/include/cl_command_queue.h index 1cc29904..ee6c5c3b 100644 --- a/include/cl_command_queue.h +++ b/include/cl_command_queue.h @@ -42,6 +42,7 @@ typedef struct _cl_command_queue { /* Represent one enqueued work item, eg, EnqueueNDRange. */ typedef struct _cl_command_queue_work_item { struct _cl_command_queue_work_item *prev, *next; + cl_int status; // Same as event->status, but sometimes the event maybe NULL cl_event event; // The event represent this work. cl_event* depend_events; cl_uint depend_event_num; diff --git a/libclapi/cl_enqueue.c b/libclapi/cl_enqueue.c index 018b263f..d48299a5 100644 --- a/libclapi/cl_enqueue.c +++ b/libclapi/cl_enqueue.c @@ -24,43 +24,228 @@ #include "cl_alloc.h" #include "cl_internals.h" #include "cl_command_queue.h" +#include "cl_event.h" typedef struct _cl_command_queue_worker { + cl_command_queue queue; pthread_t tid; pthread_cond_t cond; pthread_mutex_t mutex; cl_uint cookie; cl_bool quit; cl_bool in_exec; - cl_command_queue_work_item* work_items; - cl_int work_item_num; + cl_command_queue_work_item work_items; } _cl_command_queue_worker; typedef _cl_command_queue_worker* cl_command_queue_worker; +/* Call this must with worker->mutex locked. */ +static void insert_one_work_item(cl_command_queue_worker worker, cl_command_queue_work_item it) +{ + it->next = worker->work_items; + if (worker->work_items != NULL) + worker->work_items->prev = it; + worker->work_items = it; +} +/* Call this must with worker->mutex locked. */ +static void delete_one_work_item(cl_command_queue_worker worker, cl_command_queue_work_item it) +{ + if (it->prev) + it->prev->next = it->next; + if (it->next) + it->next->prev = it->prev; + if (worker->work_items == it) + worker->work_items = it->next; +} + +static void set_work_item_status(cl_command_queue_work_item it, cl_int status) +{ + it->status = status; + if (it->event) + cl_event_set_status(it->event, status); +} + + +/* Return 0 means still not ready, 1 means ready, -1 means err and need to cancel. */ +static cl_int check_work_item_ready(cl_command_queue_work_item item) +{ + int i; + cl_event e; + cl_int ret_val; + cl_int status; + + if (item->depend_event_num == 0) + return 1; + + assert(item->depend_events); + ret_val = 1; + for (i = 0; i < item->depend_event_num; i++) { + e = item->depend_events[i]; + status = cl_event_get_status(e); + + if (status > CL_COMPLETE) { + ret_val = 0; + } + + if (status < 0) { + return -1; + } + } + + return ret_val; +} + +static void *worker_thread_function(void *Arg) +{ + +#if 0 + cl_command_queue_worker worker = (cl_command_queue_worker)Arg; + CL_MUTEX_LOCK(&worker->mutex); + cl_uint last_cookie = worker->cookie; + cl_command_queue_work_item it; + + while (worker->quit == false) { + while (worker->work_items == NULL || last_cookie == worker->cookie) { + CL_COND_WAIT(&worker->cond, &worker->mutex); + if (worker->quit) { + CL_MUTEX_UNLOCK(&worker->mutex); + return NULL; + } + } + + last_cookie = worker->cookie; + if (worker->work_items != NULL) { // Do the real jobs. + cl_command_queue_work_item ready_one = NULL; + cl_int is_ready = 0; + + /* Get the first available one and execute it. */ + it = worker->work_items; + while(it) { + if ((is_ready = check_work_item_ready(it)) != 0) { // error or ready + ready_one = it; + delete_one_work_item(worker, it); + break; + } + it = it->next; + } + + if (readyOne == NULL) { // nothing new is ready, wait again. + assert(is_ready == 0); + continue; + } + + /* We execute it without lock. */ + worker->in_exec = CL_TRUE; + pthread_mutex_unlock(&worker->mutex); + + assert(is_ready != 0); + if (is_ready < 0) // Error happend, just cancel. + set_work_item_status(ready_one, -1); + + if (ready_one->status == CL_QUEUE) { + if (readyOne->submit() == CL_SUCCESS) { + set_work_item_status(ready_one, CL_SUBMITTED); + } else { + set_work_item_status(ready_one, -1); + } + } + + if (ready_one->status == CL_SUBMITTED) { + if (readyOne->run() == CL_SUCCESS) { + set_work_item_status(ready_one, CL_RUNNING); + } else { + set_work_item_status(ready_one, -1); + } + } + + if (ready_one->status == CL_RUNNING) { + if (readyOne->complete() == CL_SUCCESS) { + set_work_item_status(ready_one, CL_COMPLETE); + } else { + set_work_item_status(ready_one, -1); + } + } + + fffffffffffffff(readyOne); + CL_MUTEX_LOCK(&worker->mutex); + worker->in_exec = CL_FALSE; + worker->cookie++; // something has changed. + /* Some event should have changed, notify the thread block on event to check status. */ + CL_COND_BROADCAST(&worker->cond); + } + } + + CL_MUTEX_UNLOCK(&worker->mutex); +#endif + + return NULL; +} + LOCAL cl_int cl_command_queue_worker_init(cl_command_queue queue) { cl_command_queue_worker worker = NULL; assert(queue->worker == NULL); - + worker = CL_CALLOC(1, sizeof(_cl_command_queue_worker)); if (worker == NULL) return CL_OUT_OF_HOST_MEMORY; - + worker->cookie = 1; // start from 1 worker->quit = CL_FALSE; worker->in_exec = CL_FALSE; worker->work_items = NULL; - worker->work_item_num = 0; + + CL_MUTEX_INIT(&worker->mutex); + CL_COND_INIT(&worker->cond); + + if (pthread_create(&worker->tid, NULL, worker_thread_function, worker)) { + printf("Can not create worker thread for queue %p...\n", queue); + CL_MUTEX_DESTROY(&worker->mutex); + CL_COND_DESTROY(&worker->cond); + CL_FREE(worker); + return CL_OUT_OF_RESOURCES; + } + + queue->worker = worker; + worker->queue = queue; + return CL_SUCCESS; } LOCAL void cl_command_queue_worker_destroy(cl_command_queue queue) { + cl_command_queue_worker worker = NULL; + assert(queue->worker != NULL); + worker = queue->worker; + CL_MUTEX_LOCK(&worker->mutex); + if (worker->quit) { // already destroy the queue? + CL_MUTEX_UNLOCK(&worker->mutex); + assert(0); + return; + } -} + worker->quit = 1; + CL_COND_BROADCAST(&worker->cond); + CL_MUTEX_UNLOCK(&worker->mutex); -static void *worker_thread_function(void *Arg) -{ + pthread_join(worker->tid, NULL); + + CL_MUTEX_DESTROY(&worker->mutex); + CL_COND_DESTROY(&worker->cond); + + if (worker->work_items) { + printf("There are still some enqueued works in the queue %p when this queue is destroyed," + " this may cause very serious problems.\n", queue); + assert(0); + /* Free the work items. */ + + + + } + + + CL_FREE(worker); + queue->worker = NULL; } + diff --git a/libclapi/cl_event.c b/libclapi/cl_event.c index 171a966d..2af16f3e 100644 --- a/libclapi/cl_event.c +++ b/libclapi/cl_event.c @@ -28,7 +28,18 @@ #include "cl_context.h" #include "cl_internals.h" -cl_int cl_event_set_status(cl_event event, cl_int status) +LOCAL cl_int cl_event_get_status(cl_event event) +{ + cl_int ret; + + assert(event); + CL_MUTEX_LOCK(&event->lock); + ret = event->status; + CL_MUTEX_UNLOCK(&event->lock); + return ret; +} + +LOCAL cl_int cl_event_set_status(cl_event event, cl_int status) { cl_event_user_cb user_cb = NULL; cl_command_queue* queue_array = NULL; diff --git a/libclapi/cl_internals.h b/libclapi/cl_internals.h index 1c6b03c0..7290f33d 100644 --- a/libclapi/cl_internals.h +++ b/libclapi/cl_internals.h @@ -485,5 +485,7 @@ extern cl_int cl_driver_check(cl_driver drv); extern cl_int cl_retain_command_queue(cl_command_queue queue); extern void cl_release_command_queue(cl_command_queue queue); extern cl_int cl_retain_event(cl_event e); +extern cl_int cl_event_get_status(cl_event event); +extern cl_int cl_event_set_status(cl_event event, cl_int status); #endif /* __CL_INTERNALS_H__ */ |