summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJunyan He <junyan.he@intel.com>2016-04-25 17:57:29 +0800
committerJunyan He <junyan.he@intel.com>2016-04-25 17:57:29 +0800
commit1ca82c1cd31c29b96effe0565332cfb4b7eb08e2 (patch)
tree82c4cd1ced9aa99a2c3e6ff4174cf75d80c50506
parentf0daeb7a225316cae4c63125db2a1421c72841a0 (diff)
move worker to cl
-rw-r--r--include/cl_command_queue.h1
-rw-r--r--libclapi/cl_enqueue.c201
-rw-r--r--libclapi/cl_event.c13
-rw-r--r--libclapi/cl_internals.h2
4 files changed, 208 insertions, 9 deletions
diff --git a/include/cl_command_queue.h b/include/cl_command_queue.h
index 1cc29904..ee6c5c3b 100644
--- a/include/cl_command_queue.h
+++ b/include/cl_command_queue.h
@@ -42,6 +42,7 @@ typedef struct _cl_command_queue {
/* Represent one enqueued work item, eg, EnqueueNDRange. */
typedef struct _cl_command_queue_work_item {
struct _cl_command_queue_work_item *prev, *next;
+ cl_int status; // Same as event->status, but sometimes the event maybe NULL
cl_event event; // The event represent this work.
cl_event* depend_events;
cl_uint depend_event_num;
diff --git a/libclapi/cl_enqueue.c b/libclapi/cl_enqueue.c
index 018b263f..d48299a5 100644
--- a/libclapi/cl_enqueue.c
+++ b/libclapi/cl_enqueue.c
@@ -24,43 +24,228 @@
#include "cl_alloc.h"
#include "cl_internals.h"
#include "cl_command_queue.h"
+#include "cl_event.h"
typedef struct _cl_command_queue_worker {
+ cl_command_queue queue;
pthread_t tid;
pthread_cond_t cond;
pthread_mutex_t mutex;
cl_uint cookie;
cl_bool quit;
cl_bool in_exec;
- cl_command_queue_work_item* work_items;
- cl_int work_item_num;
+ cl_command_queue_work_item work_items;
} _cl_command_queue_worker;
typedef _cl_command_queue_worker* cl_command_queue_worker;
+/* Call this must with worker->mutex locked. */
+static void insert_one_work_item(cl_command_queue_worker worker, cl_command_queue_work_item it)
+{
+ it->next = worker->work_items;
+ if (worker->work_items != NULL)
+ worker->work_items->prev = it;
+ worker->work_items = it;
+}
+/* Call this must with worker->mutex locked. */
+static void delete_one_work_item(cl_command_queue_worker worker, cl_command_queue_work_item it)
+{
+ if (it->prev)
+ it->prev->next = it->next;
+ if (it->next)
+ it->next->prev = it->prev;
+ if (worker->work_items == it)
+ worker->work_items = it->next;
+}
+
+static void set_work_item_status(cl_command_queue_work_item it, cl_int status)
+{
+ it->status = status;
+ if (it->event)
+ cl_event_set_status(it->event, status);
+}
+
+
+/* Return 0 means still not ready, 1 means ready, -1 means err and need to cancel. */
+static cl_int check_work_item_ready(cl_command_queue_work_item item)
+{
+ int i;
+ cl_event e;
+ cl_int ret_val;
+ cl_int status;
+
+ if (item->depend_event_num == 0)
+ return 1;
+
+ assert(item->depend_events);
+ ret_val = 1;
+ for (i = 0; i < item->depend_event_num; i++) {
+ e = item->depend_events[i];
+ status = cl_event_get_status(e);
+
+ if (status > CL_COMPLETE) {
+ ret_val = 0;
+ }
+
+ if (status < 0) {
+ return -1;
+ }
+ }
+
+ return ret_val;
+}
+
+static void *worker_thread_function(void *Arg)
+{
+
+#if 0
+ cl_command_queue_worker worker = (cl_command_queue_worker)Arg;
+ CL_MUTEX_LOCK(&worker->mutex);
+ cl_uint last_cookie = worker->cookie;
+ cl_command_queue_work_item it;
+
+ while (worker->quit == false) {
+ while (worker->work_items == NULL || last_cookie == worker->cookie) {
+ CL_COND_WAIT(&worker->cond, &worker->mutex);
+ if (worker->quit) {
+ CL_MUTEX_UNLOCK(&worker->mutex);
+ return NULL;
+ }
+ }
+
+ last_cookie = worker->cookie;
+ if (worker->work_items != NULL) { // Do the real jobs.
+ cl_command_queue_work_item ready_one = NULL;
+ cl_int is_ready = 0;
+
+ /* Get the first available one and execute it. */
+ it = worker->work_items;
+ while(it) {
+ if ((is_ready = check_work_item_ready(it)) != 0) { // error or ready
+ ready_one = it;
+ delete_one_work_item(worker, it);
+ break;
+ }
+ it = it->next;
+ }
+
+ if (readyOne == NULL) { // nothing new is ready, wait again.
+ assert(is_ready == 0);
+ continue;
+ }
+
+ /* We execute it without lock. */
+ worker->in_exec = CL_TRUE;
+ pthread_mutex_unlock(&worker->mutex);
+
+ assert(is_ready != 0);
+ if (is_ready < 0) // Error happend, just cancel.
+ set_work_item_status(ready_one, -1);
+
+ if (ready_one->status == CL_QUEUE) {
+ if (readyOne->submit() == CL_SUCCESS) {
+ set_work_item_status(ready_one, CL_SUBMITTED);
+ } else {
+ set_work_item_status(ready_one, -1);
+ }
+ }
+
+ if (ready_one->status == CL_SUBMITTED) {
+ if (readyOne->run() == CL_SUCCESS) {
+ set_work_item_status(ready_one, CL_RUNNING);
+ } else {
+ set_work_item_status(ready_one, -1);
+ }
+ }
+
+ if (ready_one->status == CL_RUNNING) {
+ if (readyOne->complete() == CL_SUCCESS) {
+ set_work_item_status(ready_one, CL_COMPLETE);
+ } else {
+ set_work_item_status(ready_one, -1);
+ }
+ }
+
+ fffffffffffffff(readyOne);
+ CL_MUTEX_LOCK(&worker->mutex);
+ worker->in_exec = CL_FALSE;
+ worker->cookie++; // something has changed.
+ /* Some event should have changed, notify the thread block on event to check status. */
+ CL_COND_BROADCAST(&worker->cond);
+ }
+ }
+
+ CL_MUTEX_UNLOCK(&worker->mutex);
+#endif
+
+ return NULL;
+}
+
LOCAL cl_int cl_command_queue_worker_init(cl_command_queue queue)
{
cl_command_queue_worker worker = NULL;
assert(queue->worker == NULL);
-
+
worker = CL_CALLOC(1, sizeof(_cl_command_queue_worker));
if (worker == NULL)
return CL_OUT_OF_HOST_MEMORY;
-
+
worker->cookie = 1; // start from 1
worker->quit = CL_FALSE;
worker->in_exec = CL_FALSE;
worker->work_items = NULL;
- worker->work_item_num = 0;
+
+ CL_MUTEX_INIT(&worker->mutex);
+ CL_COND_INIT(&worker->cond);
+
+ if (pthread_create(&worker->tid, NULL, worker_thread_function, worker)) {
+ printf("Can not create worker thread for queue %p...\n", queue);
+ CL_MUTEX_DESTROY(&worker->mutex);
+ CL_COND_DESTROY(&worker->cond);
+ CL_FREE(worker);
+ return CL_OUT_OF_RESOURCES;
+ }
+
+ queue->worker = worker;
+ worker->queue = queue;
+ return CL_SUCCESS;
}
LOCAL void cl_command_queue_worker_destroy(cl_command_queue queue)
{
+ cl_command_queue_worker worker = NULL;
+ assert(queue->worker != NULL);
+ worker = queue->worker;
+ CL_MUTEX_LOCK(&worker->mutex);
+ if (worker->quit) { // already destroy the queue?
+ CL_MUTEX_UNLOCK(&worker->mutex);
+ assert(0);
+ return;
+ }
-}
+ worker->quit = 1;
+ CL_COND_BROADCAST(&worker->cond);
+ CL_MUTEX_UNLOCK(&worker->mutex);
-static void *worker_thread_function(void *Arg)
-{
+ pthread_join(worker->tid, NULL);
+
+ CL_MUTEX_DESTROY(&worker->mutex);
+ CL_COND_DESTROY(&worker->cond);
+
+ if (worker->work_items) {
+ printf("There are still some enqueued works in the queue %p when this queue is destroyed,"
+ " this may cause very serious problems.\n", queue);
+ assert(0);
+ /* Free the work items. */
+
+
+
+ }
+
+
+ CL_FREE(worker);
+ queue->worker = NULL;
}
+
diff --git a/libclapi/cl_event.c b/libclapi/cl_event.c
index 171a966d..2af16f3e 100644
--- a/libclapi/cl_event.c
+++ b/libclapi/cl_event.c
@@ -28,7 +28,18 @@
#include "cl_context.h"
#include "cl_internals.h"
-cl_int cl_event_set_status(cl_event event, cl_int status)
+LOCAL cl_int cl_event_get_status(cl_event event)
+{
+ cl_int ret;
+
+ assert(event);
+ CL_MUTEX_LOCK(&event->lock);
+ ret = event->status;
+ CL_MUTEX_UNLOCK(&event->lock);
+ return ret;
+}
+
+LOCAL cl_int cl_event_set_status(cl_event event, cl_int status)
{
cl_event_user_cb user_cb = NULL;
cl_command_queue* queue_array = NULL;
diff --git a/libclapi/cl_internals.h b/libclapi/cl_internals.h
index 1c6b03c0..7290f33d 100644
--- a/libclapi/cl_internals.h
+++ b/libclapi/cl_internals.h
@@ -485,5 +485,7 @@ extern cl_int cl_driver_check(cl_driver drv);
extern cl_int cl_retain_command_queue(cl_command_queue queue);
extern void cl_release_command_queue(cl_command_queue queue);
extern cl_int cl_retain_event(cl_event e);
+extern cl_int cl_event_get_status(cl_event event);
+extern cl_int cl_event_set_status(cl_event event, cl_int status);
#endif /* __CL_INTERNALS_H__ */