rtworkq: Add support for serial work queues.

Signed-off-by: Nikolay Sivov <nsivov@codeweavers.com>
Signed-off-by: Alexandre Julliard <julliard@winehq.org>
feature/deterministic
Nikolay Sivov 2020-02-11 12:19:08 +03:00 committed by Alexandre Julliard
parent 9b1fac2dfd
commit 5737cdc97d
3 changed files with 280 additions and 50 deletions

View File

@ -97,6 +97,16 @@ enum rtwq_callback_queue_id
RTWQ_CALLBACK_QUEUE_ALL = 0xffffffff,
};
/* Should be kept in sync with corresponding MFASYNC_ constants. */
enum rtwq_callback_flags
{
RTWQ_FAST_IO_PROCESSING_CALLBACK = 0x00000001,
RTWQ_SIGNAL_CALLBACK = 0x00000002,
RTWQ_BLOCKING_CALLBACK = 0x00000004,
RTWQ_REPLY_CALLBACK = 0x00000008,
RTWQ_LOCALIZE_REMOTE_CALLBACK = 0x00000010,
};
enum system_queue_index
{
SYS_QUEUE_STANDARD = 0,
@ -115,10 +125,12 @@ struct work_item
LONG refcount;
struct list entry;
IRtwqAsyncResult *result;
IRtwqAsyncResult *reply_result;
struct queue *queue;
RTWQWORKITEM_KEY key;
LONG priority;
DWORD flags;
PTP_SIMPLE_CALLBACK finalization_callback;
union
{
TP_WAIT *wait_object;
@ -152,15 +164,123 @@ struct queue_desc
{
RTWQ_WORKQUEUE_TYPE queue_type;
const struct queue_ops *ops;
DWORD target_queue;
};
struct queue
{
IRtwqAsyncCallback IRtwqAsyncCallback_iface;
const struct queue_ops *ops;
TP_POOL *pool;
TP_CALLBACK_ENVIRON_V3 envs[ARRAY_SIZE(priorities)];
CRITICAL_SECTION cs;
struct list pending_items;
DWORD id;
/* Data used for serial queues only. */
PTP_SIMPLE_CALLBACK finalization_callback;
DWORD target_queue;
};
static void shutdown_queue(struct queue *queue);
static HRESULT lock_user_queue(DWORD queue)
{
HRESULT hr = RTWQ_E_INVALID_WORKQUEUE;
struct queue_handle *entry;
if (!(queue & RTWQ_CALLBACK_QUEUE_PRIVATE_MASK))
return S_OK;
EnterCriticalSection(&queues_section);
entry = get_queue_obj(queue);
if (entry && entry->refcount)
{
entry->refcount++;
hr = S_OK;
}
LeaveCriticalSection(&queues_section);
return hr;
}
static HRESULT unlock_user_queue(DWORD queue)
{
HRESULT hr = RTWQ_E_INVALID_WORKQUEUE;
struct queue_handle *entry;
if (!(queue & RTWQ_CALLBACK_QUEUE_PRIVATE_MASK))
return S_OK;
EnterCriticalSection(&queues_section);
entry = get_queue_obj(queue);
if (entry && entry->refcount)
{
if (--entry->refcount == 0)
{
shutdown_queue((struct queue *)entry->obj);
heap_free(entry->obj);
entry->obj = next_free_user_queue;
next_free_user_queue = entry;
}
hr = S_OK;
}
LeaveCriticalSection(&queues_section);
return hr;
}
static struct queue *queue_impl_from_IRtwqAsyncCallback(IRtwqAsyncCallback *iface)
{
return CONTAINING_RECORD(iface, struct queue, IRtwqAsyncCallback_iface);
}
static HRESULT WINAPI queue_serial_callback_QueryInterface(IRtwqAsyncCallback *iface, REFIID riid, void **obj)
{
if (IsEqualIID(riid, &IID_IRtwqAsyncCallback) ||
IsEqualIID(riid, &IID_IUnknown))
{
*obj = iface;
IRtwqAsyncCallback_AddRef(iface);
return S_OK;
}
*obj = NULL;
return E_NOINTERFACE;
}
static ULONG WINAPI queue_serial_callback_AddRef(IRtwqAsyncCallback *iface)
{
return 2;
}
static ULONG WINAPI queue_serial_callback_Release(IRtwqAsyncCallback *iface)
{
return 1;
}
static HRESULT WINAPI queue_serial_callback_GetParameters(IRtwqAsyncCallback *iface, DWORD *flags, DWORD *queue_id)
{
struct queue *queue = queue_impl_from_IRtwqAsyncCallback(iface);
*flags = 0;
*queue_id = queue->id;
return S_OK;
}
static HRESULT WINAPI queue_serial_callback_Invoke(IRtwqAsyncCallback *iface, IRtwqAsyncResult *result)
{
/* Reply callback won't be called in a regular way, pending items and chained queues will make it
unnecessary complicated to reach actual work queue that's able to execute this item. Instead
serial queues are cleaned up right away on submit(). */
return S_OK;
}
static const IRtwqAsyncCallbackVtbl queue_serial_callback_vtbl =
{
queue_serial_callback_QueryInterface,
queue_serial_callback_AddRef,
queue_serial_callback_Release,
queue_serial_callback_GetParameters,
queue_serial_callback_Invoke,
};
static struct queue system_queues[SYS_QUEUE_COUNT];
@ -181,6 +301,8 @@ static struct queue *get_system_queue(DWORD queue_id)
}
}
static HRESULT grab_queue(DWORD queue_id, struct queue **ret);
static void CALLBACK standard_queue_cleanup_callback(void *object_data, void *group_data)
{
}
@ -237,7 +359,11 @@ static void CALLBACK standard_queue_worker(TP_CALLBACK_INSTANCE *instance, void
TRACE("result object %p.\n", result);
IRtwqAsyncCallback_Invoke(result->pCallback, item->result);
/* Submitting from serial queue in reply mode, use different result object acting as receipt token.
It's submitted to user callback still, but when invoked, special serial queue callback will be used
to ensure correct destination queue. */
IRtwqAsyncCallback_Invoke(result->pCallback, item->reply_result ? item->reply_result : item->result);
IUnknown_Release(&item->IUnknown_iface);
}
@ -245,6 +371,7 @@ static void CALLBACK standard_queue_worker(TP_CALLBACK_INSTANCE *instance, void
static void pool_queue_submit(struct queue *queue, struct work_item *item)
{
TP_CALLBACK_PRIORITY callback_priority;
TP_CALLBACK_ENVIRON_V3 env;
TP_WORK *work_object;
if (item->priority == 0)
@ -253,8 +380,14 @@ static void pool_queue_submit(struct queue *queue, struct work_item *item)
callback_priority = TP_CALLBACK_PRIORITY_LOW;
else
callback_priority = TP_CALLBACK_PRIORITY_HIGH;
work_object = CreateThreadpoolWork(standard_queue_worker, item,
(TP_CALLBACK_ENVIRON *)&queue->envs[callback_priority]);
env = queue->envs[callback_priority];
env.FinalizationCallback = item->finalization_callback;
/* Worker pool callback will release one reference. Grab one more to keep object alive when
we need finalization callback. */
if (item->finalization_callback)
IUnknown_AddRef(&item->IUnknown_iface);
work_object = CreateThreadpoolWork(standard_queue_worker, item, (TP_CALLBACK_ENVIRON *)&env);
SubmitThreadpoolWork(work_object);
TRACE("dispatched %p.\n", item->result);
@ -267,6 +400,129 @@ static const struct queue_ops pool_queue_ops =
pool_queue_submit,
};
static struct work_item * serial_queue_get_next(struct queue *queue, struct work_item *item)
{
struct work_item *next_item = NULL;
list_remove(&item->entry);
if (!list_empty(&item->queue->pending_items))
next_item = LIST_ENTRY(list_head(&item->queue->pending_items), struct work_item, entry);
return next_item;
}
static void CALLBACK serial_queue_finalization_callback(PTP_CALLBACK_INSTANCE instance, void *user_data)
{
struct work_item *item = (struct work_item *)user_data, *next_item;
struct queue *target_queue, *queue = item->queue;
HRESULT hr;
EnterCriticalSection(&queue->cs);
if ((next_item = serial_queue_get_next(queue, item)))
{
if (SUCCEEDED(hr = grab_queue(queue->target_queue, &target_queue)))
target_queue->ops->submit(target_queue, next_item);
else
WARN("Failed to grab queue for id %#x, hr %#x.\n", queue->target_queue, hr);
}
LeaveCriticalSection(&queue->cs);
IUnknown_Release(&item->IUnknown_iface);
}
static HRESULT serial_queue_init(const struct queue_desc *desc, struct queue *queue)
{
queue->IRtwqAsyncCallback_iface.lpVtbl = &queue_serial_callback_vtbl;
queue->target_queue = desc->target_queue;
lock_user_queue(queue->target_queue);
queue->finalization_callback = serial_queue_finalization_callback;
return S_OK;
}
static BOOL serial_queue_shutdown(struct queue *queue)
{
unlock_user_queue(queue->target_queue);
return TRUE;
}
static struct work_item * serial_queue_is_ack_token(struct queue *queue, struct work_item *item)
{
RTWQASYNCRESULT *async_result = (RTWQASYNCRESULT *)item->result;
struct work_item *head;
if (list_empty(&queue->pending_items))
return NULL;
head = LIST_ENTRY(list_head(&queue->pending_items), struct work_item, entry);
if (head->reply_result == item->result && async_result->pCallback == &queue->IRtwqAsyncCallback_iface)
return head;
return NULL;
}
static void serial_queue_submit(struct queue *queue, struct work_item *item)
{
struct work_item *head, *next_item = NULL;
struct queue *target_queue;
HRESULT hr;
/* In reply mode queue will advance when 'reply_result' is invoked, in regular mode it will advance automatically,
via finalization callback. */
if (item->flags & RTWQ_REPLY_CALLBACK)
{
if (FAILED(hr = RtwqCreateAsyncResult(NULL, &queue->IRtwqAsyncCallback_iface, NULL, &item->reply_result)))
WARN("Failed to create reply object, hr %#x.\n", hr);
}
else
item->finalization_callback = queue->finalization_callback;
/* Serial queues could be chained together, detach from current queue before transitioning item to this one.
Items are not detached when submitted to pool queues, because pool queues won't forward them further. */
EnterCriticalSection(&item->queue->cs);
list_remove(&item->entry);
LeaveCriticalSection(&item->queue->cs);
EnterCriticalSection(&queue->cs);
item->queue = queue;
if ((head = serial_queue_is_ack_token(queue, item)))
{
/* Ack receipt token - pop waiting item, advance. */
next_item = serial_queue_get_next(queue, head);
IUnknown_Release(&head->IUnknown_iface);
}
else
{
if (list_empty(&queue->pending_items))
next_item = item;
list_add_tail(&queue->pending_items, &item->entry);
IUnknown_AddRef(&item->IUnknown_iface);
}
if (next_item)
{
if (SUCCEEDED(hr = grab_queue(queue->target_queue, &target_queue)))
target_queue->ops->submit(target_queue, next_item);
else
WARN("Failed to grab queue for id %#x, hr %#x.\n", queue->target_queue, hr);
}
LeaveCriticalSection(&queue->cs);
}
static const struct queue_ops serial_queue_ops =
{
serial_queue_init,
serial_queue_shutdown,
serial_queue_submit,
};
static HRESULT WINAPI work_item_QueryInterface(IUnknown *iface, REFIID riid, void **obj)
{
if (IsEqualIID(riid, &IID_IUnknown))
@ -293,8 +549,10 @@ static ULONG WINAPI work_item_Release(IUnknown *iface)
if (!refcount)
{
IRtwqAsyncResult_Release(item->result);
heap_free(item);
if (item->reply_result)
IRtwqAsyncResult_Release(item->reply_result);
IRtwqAsyncResult_Release(item->result);
heap_free(item);
}
return refcount;
@ -375,6 +633,7 @@ static HRESULT grab_queue(DWORD queue_id, struct queue **ret)
desc.queue_type = queue_type;
desc.ops = &pool_queue_ops;
desc.target_queue = 0;
init_work_queue(&desc, queue);
LeaveCriticalSection(&queues_section);
*ret = queue;
@ -388,25 +647,6 @@ static HRESULT grab_queue(DWORD queue_id, struct queue **ret)
return *ret ? S_OK : RTWQ_E_INVALID_WORKQUEUE;
}
static HRESULT lock_user_queue(DWORD queue)
{
HRESULT hr = RTWQ_E_INVALID_WORKQUEUE;
struct queue_handle *entry;
if (!(queue & RTWQ_CALLBACK_QUEUE_PRIVATE_MASK))
return S_OK;
EnterCriticalSection(&queues_section);
entry = get_queue_obj(queue);
if (entry && entry->refcount)
{
entry->refcount++;
hr = S_OK;
}
LeaveCriticalSection(&queues_section);
return hr;
}
static void shutdown_queue(struct queue *queue)
{
struct work_item *item, *item2;
@ -427,31 +667,6 @@ static void shutdown_queue(struct queue *queue)
memset(queue, 0, sizeof(*queue));
}
static HRESULT unlock_user_queue(DWORD queue)
{
HRESULT hr = RTWQ_E_INVALID_WORKQUEUE;
struct queue_handle *entry;
if (!(queue & RTWQ_CALLBACK_QUEUE_PRIVATE_MASK))
return S_OK;
EnterCriticalSection(&queues_section);
entry = get_queue_obj(queue);
if (entry && entry->refcount)
{
if (--entry->refcount == 0)
{
shutdown_queue((struct queue *)entry->obj);
heap_free(entry->obj);
entry->obj = next_free_user_queue;
next_free_user_queue = entry;
}
hr = S_OK;
}
LeaveCriticalSection(&queues_section);
return hr;
}
static HRESULT queue_submit_item(struct queue *queue, LONG priority, IRtwqAsyncResult *result)
{
struct work_item *item;
@ -915,6 +1130,7 @@ static void init_system_queues(void)
desc.queue_type = RTWQ_STANDARD_WORKQUEUE;
desc.ops = &pool_queue_ops;
desc.target_queue = 0;
init_work_queue(&desc, &system_queues[SYS_QUEUE_STANDARD]);
LeaveCriticalSection(&queues_section);
@ -1168,6 +1384,7 @@ HRESULT WINAPI RtwqAllocateWorkQueue(RTWQ_WORKQUEUE_TYPE queue_type, DWORD *queu
desc.queue_type = queue_type;
desc.ops = &pool_queue_ops;
desc.target_queue = 0;
return alloc_user_queue(&desc, queue);
}
@ -1233,3 +1450,15 @@ HRESULT WINAPI RtwqCancelDeadline(HANDLE request)
return E_NOTIMPL;
}
HRESULT WINAPI RtwqAllocateSerialWorkQueue(DWORD target_queue, DWORD *queue)
{
struct queue_desc desc;
TRACE("%#x, %p.\n", target_queue, queue);
desc.queue_type = RTWQ_STANDARD_WORKQUEUE;
desc.ops = &serial_queue_ops;
desc.target_queue = target_queue;
return alloc_user_queue(&desc, queue);
}

View File

@ -1,5 +1,5 @@
@ stdcall RtwqAddPeriodicCallback(ptr ptr ptr)
@ stub RtwqAllocateSerialWorkQueue
@ stdcall RtwqAllocateSerialWorkQueue(long ptr)
@ stdcall RtwqAllocateWorkQueue(long ptr)
@ stub RtwqBeginRegisterWorkQueueWithMMCSS
@ stub RtwqBeginUnregisterWorkQueueWithMMCSS

View File

@ -78,6 +78,7 @@ cpp_quote("} RTWQASYNCRESULT;")
cpp_quote("typedef void (WINAPI *RTWQPERIODICCALLBACK)(IUnknown *context);")
cpp_quote("HRESULT WINAPI RtwqAddPeriodicCallback(RTWQPERIODICCALLBACK callback, IUnknown *context, DWORD *key);")
cpp_quote("HRESULT WINAPI RtwqAllocateSerialWorkQueue(DWORD target_queue, DWORD *queue);")
cpp_quote("HRESULT WINAPI RtwqAllocateWorkQueue(RTWQ_WORKQUEUE_TYPE queue_type, DWORD *queue);")
cpp_quote("HRESULT WINAPI RtwqCancelDeadline(HANDLE request);")
cpp_quote("HRESULT WINAPI RtwqCancelWorkItem(RTWQWORKITEM_KEY key);")