diff options
Diffstat (limited to 'libclapi')
-rw-r--r-- | libclapi/cl_command_queue.c | 9 | ||||
-rw-r--r-- | libclapi/cl_enqueue.c | 79 |
2 files changed, 62 insertions, 26 deletions
diff --git a/libclapi/cl_command_queue.c b/libclapi/cl_command_queue.c index ae02cd1f..d529f79c 100644 --- a/libclapi/cl_command_queue.c +++ b/libclapi/cl_command_queue.c @@ -93,6 +93,8 @@ LOCAL void cl_release_command_queue(cl_command_queue queue) assert(queue->device); queue->device->driver->release_command_queue(queue); + + cl_command_queue_worker_destroy(queue); cl_command_queue_delete(queue); } @@ -109,6 +111,13 @@ static cl_command_queue cl_create_command_queue(cl_context ctx, cl_device_id dev goto exit; } + err = cl_command_queue_worker_init(queue); + if (err != CL_SUCCESS) { + cl_command_queue_delete(queue); + queue = NULL; + goto exit; + } + err = queue->device->driver->create_command_queue(queue); if (err != CL_SUCCESS) { cl_command_queue_delete(queue); diff --git a/libclapi/cl_enqueue.c b/libclapi/cl_enqueue.c index 5509b6c3..8d4470a5 100644 --- a/libclapi/cl_enqueue.c +++ b/libclapi/cl_enqueue.c @@ -46,7 +46,6 @@ static void insert_one_work_item(cl_command_queue_worker worker, cl_command_queu if (worker->work_items == NULL) { worker->work_items = it; it->prev = it->next = it; - worker->work_items = it; return; } else { worker->work_items->prev->next = it; @@ -106,12 +105,13 @@ static cl_int check_work_item_ready(cl_command_queue_work_item item) static void *worker_thread_function(void *Arg) { - + cl_int ret; cl_command_queue_worker worker = (cl_command_queue_worker)Arg; - CL_MUTEX_LOCK(&worker->mutex); cl_uint last_cookie = worker->cookie; cl_command_queue_work_item it, start_it; + CL_MUTEX_LOCK(&worker->mutex); + while (worker->quit == CL_FALSE) { while (worker->work_items == NULL || last_cookie == worker->cookie) { CL_COND_WAIT(&worker->cond, &worker->mutex); @@ -129,14 +129,17 @@ static void *worker_thread_function(void *Arg) /* Get the first available one and execute it. */ it = worker->work_items; start_it = it; - while(it && it != start_it) { + while (it) { if ((is_ready = check_work_item_ready(it)) != 0) { // error or ready ready_one = it; delete_one_work_item(worker, it); it->queued = CL_FALSE; break; } + it = it->next; + if (it == start_it) + it = NULL; } if (ready_one == NULL) { // nothing new is ready, wait again. @@ -152,31 +155,20 @@ static void *worker_thread_function(void *Arg) if (is_ready < 0) // Error happend, just cancel. set_work_item_status(ready_one, -1); -#if 0 - if (ready_one->status == CL_QUEUE) { - if (readyOne->submit() == CL_SUCCESS) { - set_work_item_status(ready_one, CL_SUBMITTED); - } else { - set_work_item_status(ready_one, -1); - } + if (ready_one->status == CL_QUEUED) { + ret = ready_one->submit(ready_one); + set_work_item_status(ready_one, ret); } if (ready_one->status == CL_SUBMITTED) { - if (readyOne->run() == CL_SUCCESS) { - set_work_item_status(ready_one, CL_RUNNING); - } else { - set_work_item_status(ready_one, -1); - } + ret = ready_one->run(ready_one); + set_work_item_status(ready_one, ret); } if (ready_one->status == CL_RUNNING) { - if (readyOne->complete() == CL_SUCCESS) { - set_work_item_status(ready_one, CL_COMPLETE); - } else { - set_work_item_status(ready_one, -1); - } + ret = ready_one->complete(ready_one); + set_work_item_status(ready_one, ret); } -#endif cl_enqueue_destroy_work_item(worker->queue, ready_one); CL_MUTEX_LOCK(&worker->mutex); @@ -339,20 +331,55 @@ LOCAL void cl_command_queue_worker_destroy(cl_command_queue queue) CL_MUTEX_DESTROY(&worker->mutex); CL_COND_DESTROY(&worker->cond); + /* We will wait for finish before destroy the command queue. */ if (worker->work_items) { printf("There are still some enqueued works in the queue %p when this queue is destroyed," " this may cause very serious problems.\n", queue); assert(0); + } - /* Free the work items. */ + CL_FREE(worker); + queue->worker = NULL; +} +LOCAL void cl_enqueue_wait_for_flush(cl_command_queue queue) +{ + cl_command_queue_worker worker = NULL; + cl_command_queue_work_item it, start_it; + assert(queue->worker != NULL); + worker = queue->worker; + CL_MUTEX_LOCK(&worker->mutex); + + if (worker->quit) { // already destroy the queue? + CL_MUTEX_UNLOCK(&worker->mutex); + return; } + while (worker->quit == CL_FALSE) { + cl_bool need_to_wait = CL_FALSE; + it = worker->work_items; + start_it = it; + + while (it) { + if (it->status > CL_SUBMITTED) { + need_to_wait = CL_TRUE; + break; + } - CL_FREE(worker); - queue->worker = NULL; -} + it = it->next; + if (it == start_it) + it = NULL; + } + + if (need_to_wait) { + CL_COND_WAIT(&worker->cond, &worker->mutex); + } else { + break; + } + } + CL_MUTEX_UNLOCK(&worker->mutex); +} |