rtworkq: Add RtwqStartup()/RtwqShutdown().

Signed-off-by: Nikolay Sivov <nsivov@codeweavers.com>
Signed-off-by: Alexandre Julliard <julliard@winehq.org>
feature/deterministic
Nikolay Sivov 2020-02-05 08:44:07 +03:00 committed by Alexandre Julliard
parent fa420eddba
commit 9ca02920f1
3 changed files with 186 additions and 2 deletions

View File

@ -23,11 +23,108 @@
#include "rtworkq.h"
#include "wine/debug.h"
#include "wine/heap.h"
#include "wine/list.h"
WINE_DEFAULT_DEBUG_CHANNEL(mfplat);
static CRITICAL_SECTION queues_section;
static CRITICAL_SECTION_DEBUG queues_critsect_debug =
{
0, 0, &queues_section,
{ &queues_critsect_debug.ProcessLocksList, &queues_critsect_debug.ProcessLocksList },
0, 0, { (DWORD_PTR)(__FILE__ ": queues_section") }
};
static CRITICAL_SECTION queues_section = { &queues_critsect_debug, -1, 0, 0, 0, 0 };
static LONG platform_lock;
enum system_queue_index
{
SYS_QUEUE_STANDARD = 0,
SYS_QUEUE_RT,
SYS_QUEUE_IO,
SYS_QUEUE_TIMER,
SYS_QUEUE_MULTITHREADED,
SYS_QUEUE_DO_NOT_USE,
SYS_QUEUE_LONG_FUNCTION,
SYS_QUEUE_COUNT,
};
struct work_item
{
struct list entry;
LONG refcount;
IRtwqAsyncResult *result;
struct queue *queue;
RTWQWORKITEM_KEY key;
union
{
TP_WAIT *wait_object;
TP_TIMER *timer_object;
} u;
};
static const TP_CALLBACK_PRIORITY priorities[] =
{
TP_CALLBACK_PRIORITY_HIGH,
TP_CALLBACK_PRIORITY_NORMAL,
TP_CALLBACK_PRIORITY_LOW,
};
struct queue
{
TP_POOL *pool;
TP_CALLBACK_ENVIRON_V3 envs[ARRAY_SIZE(priorities)];
CRITICAL_SECTION cs;
struct list pending_items;
};
static struct queue system_queues[SYS_QUEUE_COUNT];
static void CALLBACK standard_queue_cleanup_callback(void *object_data, void *group_data)
{
}
static void release_work_item(struct work_item *item)
{
if (InterlockedDecrement(&item->refcount) == 0)
{
IRtwqAsyncResult_Release(item->result);
heap_free(item);
}
}
static void init_work_queue(RTWQ_WORKQUEUE_TYPE queue_type, struct queue *queue)
{
TP_CALLBACK_ENVIRON_V3 env;
unsigned int max_thread, i;
queue->pool = CreateThreadpool(NULL);
memset(&env, 0, sizeof(env));
env.Version = 3;
env.Size = sizeof(env);
env.Pool = queue->pool;
env.CleanupGroup = CreateThreadpoolCleanupGroup();
env.CleanupGroupCancelCallback = standard_queue_cleanup_callback;
env.CallbackPriority = TP_CALLBACK_PRIORITY_NORMAL;
for (i = 0; i < ARRAY_SIZE(queue->envs); ++i)
{
queue->envs[i] = env;
queue->envs[i].CallbackPriority = priorities[i];
}
list_init(&queue->pending_items);
InitializeCriticalSection(&queue->cs);
max_thread = (queue_type == RTWQ_STANDARD_WORKQUEUE || queue_type == RTWQ_WINDOW_WORKQUEUE) ? 1 : 4;
SetThreadpoolThreadMinimum(queue->pool, 1);
SetThreadpoolThreadMaximum(queue->pool, max_thread);
if (queue_type == RTWQ_WINDOW_WORKQUEUE)
FIXME("RTWQ_WINDOW_WORKQUEUE is not supported.\n");
}
struct async_result
{
RTWQASYNCRESULT result;
@ -217,3 +314,79 @@ HRESULT WINAPI RtwqUnlockPlatform(void)
return S_OK;
}
static void init_system_queues(void)
{
/* Always initialize standard queue, keep the rest lazy. */
EnterCriticalSection(&queues_section);
if (system_queues[SYS_QUEUE_STANDARD].pool)
{
LeaveCriticalSection(&queues_section);
return;
}
init_work_queue(RTWQ_STANDARD_WORKQUEUE, &system_queues[SYS_QUEUE_STANDARD]);
LeaveCriticalSection(&queues_section);
}
HRESULT WINAPI RtwqStartup(void)
{
if (InterlockedIncrement(&platform_lock) == 1)
{
init_system_queues();
}
return S_OK;
}
static void shutdown_queue(struct queue *queue)
{
struct work_item *item, *item2;
if (!queue->pool)
return;
CloseThreadpoolCleanupGroupMembers(queue->envs[0].CleanupGroup, TRUE, NULL);
CloseThreadpool(queue->pool);
queue->pool = NULL;
EnterCriticalSection(&queue->cs);
LIST_FOR_EACH_ENTRY_SAFE(item, item2, &queue->pending_items, struct work_item, entry)
{
list_remove(&item->entry);
release_work_item(item);
}
LeaveCriticalSection(&queue->cs);
DeleteCriticalSection(&queue->cs);
}
static void shutdown_system_queues(void)
{
unsigned int i;
EnterCriticalSection(&queues_section);
for (i = 0; i < ARRAY_SIZE(system_queues); ++i)
{
shutdown_queue(&system_queues[i]);
}
LeaveCriticalSection(&queues_section);
}
HRESULT WINAPI RtwqShutdown(void)
{
if (platform_lock <= 0)
return S_OK;
if (InterlockedExchangeAdd(&platform_lock, -1) == 1)
{
shutdown_system_queues();
}
return S_OK;
}

View File

@ -28,8 +28,8 @@
@ stub RtwqSetDeadline
@ stub RtwqSetDeadline2
@ stub RtwqSetLongRunning
@ stub RtwqShutdown
@ stub RtwqStartup
@ stdcall RtwqShutdown()
@ stdcall RtwqStartup()
@ stub RtwqUnjoinWorkQueue
@ stdcall RtwqUnlockPlatform()
@ stub RtwqUnlockWorkQueue

View File

@ -18,6 +18,15 @@
import "unknwn.idl";
typedef enum
{
RTWQ_STANDARD_WORKQUEUE = 0,
RTWQ_WINDOW_WORKQUEUE = 1,
RTWQ_MULTITHREADED_WORKQUEUE = 2,
} RTWQ_WORKQUEUE_TYPE;
typedef unsigned __int64 RTWQWORKITEM_KEY;
[
object,
uuid(ac6b7889-0740-4d51-8619-905994a55cc6),
@ -59,4 +68,6 @@ cpp_quote("} RTWQASYNCRESULT;")
cpp_quote("HRESULT WINAPI RtwqCreateAsyncResult(IUnknown *object, IRtwqAsyncCallback *callback, IUnknown *state, IRtwqAsyncResult **result);")
cpp_quote("HRESULT WINAPI RtwqLockPlatform(void);")
cpp_quote("HRESULT WINAPI RtwqShutdown(void);")
cpp_quote("HRESULT WINAPI RtwqStartup(void);")
cpp_quote("HRESULT WINAPI RtwqUnlockPlatform(void);")