summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJunyan He <junyan.he@intel.com>2017-06-11 13:50:00 +0800
committerYang Rong <rong.r.yang@intel.com>2017-08-02 17:16:29 +0800
commit94cf9e40a59b8a34a5f86d5c4a56ef97ec0264c0 (patch)
tree8f62157e3e85f6d23bdd2226605285139fa63c32
parentfa1ea94be3444dfb58f3b68729e97bb07de52bba (diff)
Add cl_command_queue define to runtime.
We create a worker thread in command queue to handle the event requirement. Each equeueXXX api will generate a enqueue event, and will be add to exec list in command queue worker thread. Signed-off-by: Junyan He <junyan.he@intel.com>
-rw-r--r--runtime/cl_command_queue.c465
-rw-r--r--runtime/cl_command_queue.h75
2 files changed, 540 insertions, 0 deletions
diff --git a/runtime/cl_command_queue.c b/runtime/cl_command_queue.c
new file mode 100644
index 00000000..b54f1cb5
--- /dev/null
+++ b/runtime/cl_command_queue.c
@@ -0,0 +1,465 @@
+/*
+ * Copyright © 2012 Intel Corporation
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * Author: He Junyan <junyan.he@intel.com>
+ */
+
+#include "cl_command_queue.h"
+#include "cl_alloc.h"
+#include "cl_device_id.h"
+#include "cl_event.h"
+
+static cl_command_queue
+cl_command_queue_new(cl_context ctx)
+{
+ cl_command_queue queue = NULL;
+
+ assert(ctx);
+ queue = CL_CALLOC(1, sizeof(_cl_command_queue));
+ if (queue == NULL)
+ return NULL;
+
+ CL_OBJECT_INIT_BASE(queue, CL_OBJECT_COMMAND_QUEUE_MAGIC);
+ if (cl_command_queue_init_enqueue(queue) != CL_SUCCESS) {
+ CL_FREE(queue);
+ return NULL;
+ }
+
+ /* Append the command queue in the list */
+ cl_context_add_queue(ctx, queue);
+ return queue;
+}
+
+LOCAL cl_command_queue
+cl_command_queue_create(cl_context ctx, cl_device_id device, cl_command_queue_properties properties,
+ cl_uint queue_size, cl_int *errcode_ret)
+{
+ cl_command_queue queue = cl_command_queue_new(ctx);
+ if (queue == NULL) {
+ *errcode_ret = CL_OUT_OF_HOST_MEMORY;
+ }
+
+ queue->props = properties;
+ queue->device = device;
+ queue->size = queue_size;
+
+ *errcode_ret = device->api.command_queue_create(device, queue);
+ if (*errcode_ret != CL_SUCCESS) {
+ cl_command_queue_delete(queue);
+ return NULL;
+ }
+
+ return queue;
+}
+
+LOCAL void
+cl_command_queue_delete(cl_command_queue queue)
+{
+ assert(queue);
+ if (CL_OBJECT_DEC_REF(queue) > 1)
+ return;
+
+ cl_command_queue_destroy_enqueue(queue);
+
+ queue->device->api.command_queue_create(queue->device, queue);
+
+ cl_context_remove_queue(queue->ctx, queue);
+ if (queue->barrier_events) {
+ CL_FREE(queue->barrier_events);
+ }
+ CL_OBJECT_DESTROY_BASE(queue);
+ CL_FREE(queue);
+}
+
+LOCAL void
+cl_command_queue_add_ref(cl_command_queue queue)
+{
+ CL_OBJECT_INC_REF(queue);
+}
+
+LOCAL void
+cl_command_queue_insert_barrier_event(cl_command_queue queue, cl_event event)
+{
+ cl_int i = 0;
+
+ cl_event_add_ref(event);
+
+ assert(queue != NULL);
+ CL_OBJECT_LOCK(queue);
+
+ if (queue->barrier_events == NULL) {
+ queue->barrier_events_size = 4;
+ queue->barrier_events = CL_CALLOC(queue->barrier_events_size, sizeof(cl_event));
+ assert(queue->barrier_events);
+ }
+
+ for (i = 0; i < queue->barrier_events_num; i++) {
+ assert(queue->barrier_events[i] != event);
+ }
+
+ if (queue->barrier_events_num < queue->barrier_events_size) {
+ queue->barrier_events[queue->barrier_events_num++] = event;
+ CL_OBJECT_UNLOCK(queue);
+ return;
+ }
+
+ /* Array is full, double expand. */
+ queue->barrier_events_size *= 2;
+ queue->barrier_events = CL_REALLOC(queue->barrier_events,
+ queue->barrier_events_size * sizeof(cl_event));
+ assert(queue->barrier_events);
+
+ queue->barrier_events[queue->barrier_events_num++] = event;
+ CL_OBJECT_UNLOCK(queue);
+ return;
+}
+
+LOCAL void
+cl_command_queue_remove_barrier_event(cl_command_queue queue, cl_event event)
+{
+ cl_int i = 0;
+ assert(queue != NULL);
+
+ CL_OBJECT_LOCK(queue);
+
+ assert(queue->barrier_events_num > 0);
+ assert(queue->barrier_events);
+
+ for (i = 0; i < queue->barrier_events_num; i++) {
+ if (queue->barrier_events[i] == event)
+ break;
+ }
+ assert(i < queue->barrier_events_num); // Must find it.
+
+ if (i == queue->barrier_events_num - 1) { // The last one.
+ queue->barrier_events[i] = NULL;
+ } else {
+ for (; i < queue->barrier_events_num - 1; i++) { // Move forward.
+ queue->barrier_events[i] = queue->barrier_events[i + 1];
+ }
+ }
+ queue->barrier_events_num -= 1;
+ CL_OBJECT_UNLOCK(queue);
+
+ cl_event_delete(event);
+}
+
+static void *
+worker_thread_function(void *Arg)
+{
+ cl_command_queue_enqueue_worker worker = (cl_command_queue_enqueue_worker)Arg;
+ cl_command_queue queue = worker->queue;
+ cl_event e;
+ cl_uint cookie = -1;
+ list_node *pos;
+ list_node *n;
+ list_head ready_list;
+ cl_int exec_status;
+
+ CL_OBJECT_LOCK(queue);
+
+ while (1) {
+ /* Must have locked here. */
+
+ if (worker->quit == CL_TRUE) {
+ CL_OBJECT_UNLOCK(queue);
+ return NULL;
+ }
+
+ if (list_empty(&worker->enqueued_events)) {
+ CL_OBJECT_WAIT_ON_COND(queue);
+ continue;
+ }
+
+ /* The cookie will change when event status change or something happend to
+ this command queue. If we already checked the event list and do not find
+ anything to exec, we need to wait the cookie update, to avoid loop for ever. */
+ if (cookie == worker->cookie) {
+ CL_OBJECT_WAIT_ON_COND(queue);
+ continue;
+ }
+
+ /* Here we hold lock to check event status, to avoid missing the status notify*/
+ list_init(&ready_list);
+ list_for_each_safe(pos, n, &worker->enqueued_events)
+ {
+ e = list_entry(pos, _cl_event, enqueue_node);
+ if (cl_event_is_ready(e) <= CL_COMPLETE) {
+ list_node_del(&e->enqueue_node);
+ list_add_tail(&ready_list, &e->enqueue_node);
+ }
+ }
+
+ if (list_empty(&ready_list)) { /* Nothing to do, just wait. */
+ cookie = worker->cookie;
+ continue;
+ }
+
+ /* Notify waiters, we change the event list. */
+ CL_OBJECT_NOTIFY_COND(queue);
+
+ worker->in_exec_status = CL_QUEUED;
+ CL_OBJECT_UNLOCK(queue);
+
+ /* Do the really job without lock.*/
+ exec_status = CL_SUBMITTED;
+ list_for_each_safe(pos, n, &ready_list)
+ {
+ e = list_entry(pos, _cl_event, enqueue_node);
+ cl_event_exec(e, exec_status, CL_FALSE);
+ }
+
+ /* Notify all waiting for flush. */
+ CL_OBJECT_LOCK(queue);
+ worker->in_exec_status = CL_SUBMITTED;
+ CL_OBJECT_NOTIFY_COND(queue);
+ CL_OBJECT_UNLOCK(queue);
+
+ list_for_each_safe(pos, n, &ready_list)
+ {
+ e = list_entry(pos, _cl_event, enqueue_node);
+ cl_event_exec(e, CL_COMPLETE, CL_FALSE);
+ }
+
+ /* Clear and delete all the events. */
+ list_for_each_safe(pos, n, &ready_list)
+ {
+ e = list_entry(pos, _cl_event, enqueue_node);
+ list_node_del(&e->enqueue_node);
+ cl_event_delete(e);
+ }
+
+ CL_OBJECT_LOCK(queue);
+ worker->in_exec_status = CL_COMPLETE;
+
+ /* Notify finish waiters, we have done all the ready event. */
+ CL_OBJECT_NOTIFY_COND(queue);
+ }
+}
+
+LOCAL void
+cl_command_queue_notify(cl_command_queue queue)
+{
+ if (CL_OBJECT_GET_REF(queue) < 1) {
+ return;
+ }
+
+ assert(queue && (((cl_base_object)queue)->magic == CL_OBJECT_COMMAND_QUEUE_MAGIC));
+ CL_OBJECT_LOCK(queue);
+ queue->worker.cookie++;
+ CL_OBJECT_NOTIFY_COND(queue);
+ CL_OBJECT_UNLOCK(queue);
+}
+
+LOCAL void
+cl_command_queue_enqueue_event(cl_command_queue queue, cl_event event)
+{
+ CL_OBJECT_INC_REF(event);
+ assert(CL_OBJECT_IS_COMMAND_QUEUE(queue));
+ CL_OBJECT_LOCK(queue);
+ assert(queue->worker.quit == CL_FALSE);
+ assert(list_node_out_of_list(&event->enqueue_node));
+ list_add_tail(&queue->worker.enqueued_events, &event->enqueue_node);
+ queue->worker.cookie++;
+ CL_OBJECT_NOTIFY_COND(queue);
+ CL_OBJECT_UNLOCK(queue);
+}
+
+LOCAL cl_int
+cl_command_queue_init_enqueue(cl_command_queue queue)
+{
+ cl_command_queue_enqueue_worker worker = &queue->worker;
+ worker->queue = queue;
+ worker->quit = CL_FALSE;
+ worker->in_exec_status = CL_COMPLETE;
+ worker->cookie = 8;
+ list_init(&worker->enqueued_events);
+
+ if (pthread_create(&worker->tid, NULL, worker_thread_function, worker)) {
+ CL_LOG_ERROR("Can not create worker thread for queue %p...\n", queue);
+ return CL_OUT_OF_RESOURCES;
+ }
+
+ return CL_SUCCESS;
+}
+
+LOCAL void
+cl_command_queue_destroy_enqueue(cl_command_queue queue)
+{
+ cl_command_queue_enqueue_worker worker = &queue->worker;
+ list_node *pos;
+ list_node *n;
+ cl_event e;
+
+ assert(worker->queue == queue);
+ assert(worker->quit == CL_FALSE);
+
+ CL_OBJECT_LOCK(queue);
+ worker->quit = 1;
+ CL_OBJECT_NOTIFY_COND(queue);
+ CL_OBJECT_UNLOCK(queue);
+
+ pthread_join(worker->tid, NULL);
+
+ /* We will wait for finish before destroy the command queue. */
+ if (!list_empty(&worker->enqueued_events)) {
+ CL_LOG_WARNING("There are still some enqueued works in the queue %p when this"
+ " queue is destroyed, this may cause very serious problems.\n",
+ queue);
+
+ list_for_each_safe(pos, n, &worker->enqueued_events)
+ {
+ e = list_entry(pos, _cl_event, enqueue_node);
+ list_node_del(&e->enqueue_node);
+ cl_event_set_status(e, -1); // Give waiters a chance to wakeup.
+ cl_event_delete(e);
+ }
+ }
+}
+
+/* Note: Must call this function with queue's lock. */
+LOCAL cl_event *
+cl_command_queue_record_in_queue_events(cl_command_queue queue, cl_uint *list_num)
+{
+ int event_num = 0;
+ list_node *pos;
+ cl_command_queue_enqueue_worker worker = &queue->worker;
+ cl_event *enqueued_list = NULL;
+ int i;
+ cl_event tmp_e = NULL;
+
+ list_for_each(pos, &worker->enqueued_events)
+ {
+ event_num++;
+ }
+ assert(event_num > 0);
+
+ enqueued_list = CL_CALLOC(event_num, sizeof(cl_event));
+ assert(enqueued_list);
+
+ i = 0;
+ list_for_each(pos, &worker->enqueued_events)
+ {
+ tmp_e = list_entry(pos, _cl_event, enqueue_node);
+ cl_event_add_ref(tmp_e); // Add ref temp avoid delete.
+ enqueued_list[i] = tmp_e;
+ i++;
+ }
+ assert(i == event_num);
+
+ *list_num = event_num;
+ return enqueued_list;
+}
+
+LOCAL cl_int
+cl_command_queue_wait_flush(cl_command_queue queue)
+{
+ cl_command_queue_enqueue_worker worker = &queue->worker;
+ cl_event *enqueued_list = NULL;
+ cl_uint enqueued_num = 0;
+ int i;
+
+ CL_OBJECT_LOCK(queue);
+
+ if (worker->quit) { // already destroy the queue?
+ CL_OBJECT_UNLOCK(queue);
+ return CL_INVALID_COMMAND_QUEUE;
+ }
+
+ if (!list_empty(&worker->enqueued_events)) {
+ enqueued_list = cl_command_queue_record_in_queue_events(queue, &enqueued_num);
+ assert(enqueued_num > 0);
+ assert(enqueued_list);
+ }
+
+ while (worker->in_exec_status == CL_QUEUED) {
+ CL_OBJECT_WAIT_ON_COND(queue);
+
+ if (worker->quit) { // already destroy the queue?
+ CL_OBJECT_UNLOCK(queue);
+ return CL_INVALID_COMMAND_QUEUE;
+ }
+ }
+
+ CL_OBJECT_UNLOCK(queue);
+
+ /* Wait all event enter submitted status. */
+ for (i = 0; i < enqueued_num; i++) {
+ CL_OBJECT_LOCK(enqueued_list[i]);
+ while (enqueued_list[i]->status > CL_SUBMITTED) {
+ CL_OBJECT_WAIT_ON_COND(enqueued_list[i]);
+ }
+ CL_OBJECT_UNLOCK(enqueued_list[i]);
+ }
+
+ for (i = 0; i < enqueued_num; i++) {
+ cl_event_delete(enqueued_list[i]);
+ }
+ if (enqueued_list)
+ CL_FREE(enqueued_list);
+
+ return CL_SUCCESS;
+}
+
+LOCAL cl_int
+cl_command_queue_wait_finish(cl_command_queue queue)
+{
+ cl_command_queue_enqueue_worker worker = &queue->worker;
+ cl_event *enqueued_list = NULL;
+ cl_uint enqueued_num = 0;
+ int i;
+
+ CL_OBJECT_LOCK(queue);
+
+ if (worker->quit) { // already destroy the queue?
+ CL_OBJECT_UNLOCK(queue);
+ return CL_INVALID_COMMAND_QUEUE;
+ }
+
+ if (!list_empty(&worker->enqueued_events)) {
+ enqueued_list = cl_command_queue_record_in_queue_events(queue, &enqueued_num);
+ assert(enqueued_num > 0);
+ assert(enqueued_list);
+ }
+
+ while (worker->in_exec_status > CL_COMPLETE) {
+ CL_OBJECT_WAIT_ON_COND(queue);
+
+ if (worker->quit) { // already destroy the queue?
+ CL_OBJECT_UNLOCK(queue);
+ return CL_INVALID_COMMAND_QUEUE;
+ }
+ }
+
+ CL_OBJECT_UNLOCK(queue);
+
+ /* Wait all event enter submitted status. */
+ for (i = 0; i < enqueued_num; i++) {
+ CL_OBJECT_LOCK(enqueued_list[i]);
+ while (enqueued_list[i]->status > CL_COMPLETE) {
+ CL_OBJECT_WAIT_ON_COND(enqueued_list[i]);
+ }
+ CL_OBJECT_UNLOCK(enqueued_list[i]);
+ }
+
+ for (i = 0; i < enqueued_num; i++) {
+ cl_event_delete(enqueued_list[i]);
+ }
+ if (enqueued_list)
+ CL_FREE(enqueued_list);
+
+ return CL_SUCCESS;
+}
diff --git a/runtime/cl_command_queue.h b/runtime/cl_command_queue.h
new file mode 100644
index 00000000..5058443c
--- /dev/null
+++ b/runtime/cl_command_queue.h
@@ -0,0 +1,75 @@
+/*
+ * Copyright © 2012 Intel Corporation
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * Author: Benjamin Segovia <benjamin.segovia@intel.com>
+ */
+
+#ifndef __CL_COMMAND_QUEUE_H__
+#define __CL_COMMAND_QUEUE_H__
+
+#include "cl_base_object.h"
+#include "cl_context.h"
+#include "CL/cl.h"
+
+typedef struct _cl_command_queue_enqueue_worker {
+ cl_command_queue queue;
+ pthread_t tid;
+ cl_uint cookie;
+ cl_bool quit;
+ list_head enqueued_events;
+ cl_uint in_exec_status; // Same value as CL_COMPLETE, CL_SUBMITTED ...
+} _cl_command_queue_enqueue_worker;
+
+typedef _cl_command_queue_enqueue_worker *cl_command_queue_enqueue_worker;
+
+/* Basically, this is a (kind-of) batch buffer */
+typedef struct _cl_command_queue {
+ _cl_base_object base;
+ _cl_command_queue_enqueue_worker worker;
+ cl_context ctx; /* Its parent context */
+ cl_device_id device; /* Its device */
+ cl_event *barrier_events; /* Point to array of non-complete user events that block this command queue */
+ cl_int barrier_events_num; /* Number of Non-complete user events */
+ cl_int barrier_events_size; /* The size of array that wait_events point to */
+ cl_command_queue_properties props; /* Queue properties */
+ cl_uint size; /* Store the specified size for queueu */
+ void *command_queue_for_device; /* device's specific content */
+} _cl_command_queue;
+
+#define CL_OBJECT_COMMAND_QUEUE_MAGIC 0x83650a12b79ce4efLL
+#define CL_OBJECT_IS_COMMAND_QUEUE(obj) ((obj && \
+ ((cl_base_object)obj)->magic == CL_OBJECT_COMMAND_QUEUE_MAGIC && \
+ CL_OBJECT_GET_REF(obj) >= 1))
+
+/* Allocate and initialize a new command queue. Also insert it in the list of
+ * command queue in the associated context */
+extern cl_command_queue cl_command_queue_create(cl_context, cl_device_id,
+ cl_command_queue_properties, cl_uint, cl_int *);
+/* Destroy and deallocate the command queue */
+extern void cl_command_queue_delete(cl_command_queue);
+/* Keep one more reference on the queue */
+extern void cl_command_queue_add_ref(cl_command_queue);
+extern void cl_command_queue_insert_barrier_event(cl_command_queue queue, cl_event event);
+extern void cl_command_queue_remove_barrier_event(cl_command_queue queue, cl_event event);
+extern void cl_command_queue_notify(cl_command_queue queue);
+extern void cl_command_queue_enqueue_event(cl_command_queue queue, cl_event event);
+extern cl_int cl_command_queue_init_enqueue(cl_command_queue queue);
+extern void cl_command_queue_destroy_enqueue(cl_command_queue queue);
+extern cl_int cl_command_queue_wait_finish(cl_command_queue queue);
+extern cl_int cl_command_queue_wait_flush(cl_command_queue queue);
+/* Note: Must call this function with queue's lock. */
+extern cl_event *cl_command_queue_record_in_queue_events(cl_command_queue queue, cl_uint *list_num);
+#endif /* __CL_COMMAND_QUEUE_H__ */