summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJunyan He <junyan.he@intel.com>2016-04-26 16:18:39 +0800
committerJunyan He <junyan.he@intel.com>2016-04-26 16:18:39 +0800
commitb9ab4ae79f7dcbfe8c891f8dfe04eeda10b6ca84 (patch)
treeea7ad8527d6dd5f0608ae7d2e811e17c01a0ab73
parent8870399c41866e4b28d4d40df003dfef525bf6fc (diff)
move worker to cl
-rw-r--r--include/cl_command_queue.h3
-rw-r--r--libclapi/cl_command_queue.c9
-rw-r--r--libclapi/cl_enqueue.c79
3 files changed, 65 insertions, 26 deletions
diff --git a/include/cl_command_queue.h b/include/cl_command_queue.h
index 7f673f00..30e05086 100644
--- a/include/cl_command_queue.h
+++ b/include/cl_command_queue.h
@@ -48,6 +48,9 @@ typedef struct _cl_command_queue_work_item {
cl_event* depend_events;
cl_uint depend_event_num;
cl_bool queued;
+ cl_int (*submit)(struct _cl_command_queue_work_item* it);
+ cl_int (*run)(struct _cl_command_queue_work_item* it);
+ cl_int (*complete)(struct _cl_command_queue_work_item* it);
void* pdata;
} _cl_command_queue_work_item;
typedef _cl_command_queue_work_item* cl_command_queue_work_item;
diff --git a/libclapi/cl_command_queue.c b/libclapi/cl_command_queue.c
index ae02cd1f..d529f79c 100644
--- a/libclapi/cl_command_queue.c
+++ b/libclapi/cl_command_queue.c
@@ -93,6 +93,8 @@ LOCAL void cl_release_command_queue(cl_command_queue queue)
assert(queue->device);
queue->device->driver->release_command_queue(queue);
+
+ cl_command_queue_worker_destroy(queue);
cl_command_queue_delete(queue);
}
@@ -109,6 +111,13 @@ static cl_command_queue cl_create_command_queue(cl_context ctx, cl_device_id dev
goto exit;
}
+ err = cl_command_queue_worker_init(queue);
+ if (err != CL_SUCCESS) {
+ cl_command_queue_delete(queue);
+ queue = NULL;
+ goto exit;
+ }
+
err = queue->device->driver->create_command_queue(queue);
if (err != CL_SUCCESS) {
cl_command_queue_delete(queue);
diff --git a/libclapi/cl_enqueue.c b/libclapi/cl_enqueue.c
index 5509b6c3..8d4470a5 100644
--- a/libclapi/cl_enqueue.c
+++ b/libclapi/cl_enqueue.c
@@ -46,7 +46,6 @@ static void insert_one_work_item(cl_command_queue_worker worker, cl_command_queu
if (worker->work_items == NULL) {
worker->work_items = it;
it->prev = it->next = it;
- worker->work_items = it;
return;
} else {
worker->work_items->prev->next = it;
@@ -106,12 +105,13 @@ static cl_int check_work_item_ready(cl_command_queue_work_item item)
static void *worker_thread_function(void *Arg)
{
-
+ cl_int ret;
cl_command_queue_worker worker = (cl_command_queue_worker)Arg;
- CL_MUTEX_LOCK(&worker->mutex);
cl_uint last_cookie = worker->cookie;
cl_command_queue_work_item it, start_it;
+ CL_MUTEX_LOCK(&worker->mutex);
+
while (worker->quit == CL_FALSE) {
while (worker->work_items == NULL || last_cookie == worker->cookie) {
CL_COND_WAIT(&worker->cond, &worker->mutex);
@@ -129,14 +129,17 @@ static void *worker_thread_function(void *Arg)
/* Get the first available one and execute it. */
it = worker->work_items;
start_it = it;
- while(it && it != start_it) {
+ while (it) {
if ((is_ready = check_work_item_ready(it)) != 0) { // error or ready
ready_one = it;
delete_one_work_item(worker, it);
it->queued = CL_FALSE;
break;
}
+
it = it->next;
+ if (it == start_it)
+ it = NULL;
}
if (ready_one == NULL) { // nothing new is ready, wait again.
@@ -152,31 +155,20 @@ static void *worker_thread_function(void *Arg)
if (is_ready < 0) // Error happend, just cancel.
set_work_item_status(ready_one, -1);
-#if 0
- if (ready_one->status == CL_QUEUE) {
- if (readyOne->submit() == CL_SUCCESS) {
- set_work_item_status(ready_one, CL_SUBMITTED);
- } else {
- set_work_item_status(ready_one, -1);
- }
+ if (ready_one->status == CL_QUEUED) {
+ ret = ready_one->submit(ready_one);
+ set_work_item_status(ready_one, ret);
}
if (ready_one->status == CL_SUBMITTED) {
- if (readyOne->run() == CL_SUCCESS) {
- set_work_item_status(ready_one, CL_RUNNING);
- } else {
- set_work_item_status(ready_one, -1);
- }
+ ret = ready_one->run(ready_one);
+ set_work_item_status(ready_one, ret);
}
if (ready_one->status == CL_RUNNING) {
- if (readyOne->complete() == CL_SUCCESS) {
- set_work_item_status(ready_one, CL_COMPLETE);
- } else {
- set_work_item_status(ready_one, -1);
- }
+ ret = ready_one->complete(ready_one);
+ set_work_item_status(ready_one, ret);
}
-#endif
cl_enqueue_destroy_work_item(worker->queue, ready_one);
CL_MUTEX_LOCK(&worker->mutex);
@@ -339,20 +331,55 @@ LOCAL void cl_command_queue_worker_destroy(cl_command_queue queue)
CL_MUTEX_DESTROY(&worker->mutex);
CL_COND_DESTROY(&worker->cond);
+ /* We will wait for finish before destroy the command queue. */
if (worker->work_items) {
printf("There are still some enqueued works in the queue %p when this queue is destroyed,"
" this may cause very serious problems.\n", queue);
assert(0);
+ }
- /* Free the work items. */
+ CL_FREE(worker);
+ queue->worker = NULL;
+}
+LOCAL void cl_enqueue_wait_for_flush(cl_command_queue queue)
+{
+ cl_command_queue_worker worker = NULL;
+ cl_command_queue_work_item it, start_it;
+ assert(queue->worker != NULL);
+ worker = queue->worker;
+ CL_MUTEX_LOCK(&worker->mutex);
+
+ if (worker->quit) { // already destroy the queue?
+ CL_MUTEX_UNLOCK(&worker->mutex);
+ return;
}
+ while (worker->quit == CL_FALSE) {
+ cl_bool need_to_wait = CL_FALSE;
+ it = worker->work_items;
+ start_it = it;
+
+ while (it) {
+ if (it->status > CL_SUBMITTED) {
+ need_to_wait = CL_TRUE;
+ break;
+ }
- CL_FREE(worker);
- queue->worker = NULL;
-}
+ it = it->next;
+ if (it == start_it)
+ it = NULL;
+ }
+
+ if (need_to_wait) {
+ CL_COND_WAIT(&worker->cond, &worker->mutex);
+ } else {
+ break;
+ }
+ }
+ CL_MUTEX_UNLOCK(&worker->mutex);
+}