diff --git a/dlls/ntdll/threadpool.c b/dlls/ntdll/threadpool.c index 10bda485b6f..15726bb7863 100644 --- a/dlls/ntdll/threadpool.c +++ b/dlls/ntdll/threadpool.c @@ -18,7 +18,11 @@ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ +#include "config.h" +#include "wine/port.h" + #include +#include #define NONAMELESSUNION #include "ntstatus.h" @@ -26,35 +30,123 @@ #include "winternl.h" #include "wine/debug.h" +#include "wine/list.h" + #include "ntdll_misc.h" WINE_DEFAULT_DEBUG_CHANNEL(threadpool); +#define WORKER_TIMEOUT 30000 /* 30 seconds */ + +static LONG num_workers; +static LONG num_work_items; +static LONG num_busy_workers; + +static struct list work_item_list = LIST_INIT(work_item_list); +static HANDLE work_item_event; + +static CRITICAL_SECTION threadpool_cs; +static CRITICAL_SECTION_DEBUG critsect_debug = +{ + 0, 0, &threadpool_cs, + { &critsect_debug.ProcessLocksList, &critsect_debug.ProcessLocksList }, + 0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_cs") } +}; +static CRITICAL_SECTION threadpool_cs = { &critsect_debug, -1, 0, 0, 0, 0 }; + struct work_item { + struct list entry; PRTL_WORK_ITEM_ROUTINE function; PVOID context; }; +inline static LONG interlocked_inc( PLONG dest ) +{ + return interlocked_xchg_add( (int *)dest, 1 ) + 1; +} + +inline static LONG interlocked_dec( PLONG dest ) +{ + return interlocked_xchg_add( (int *)dest, -1 ) - 1; +} + static void WINAPI worker_thread_proc(void * param) { - struct work_item *work_item_ptr = (struct work_item *)param; - struct work_item work_item; + interlocked_inc(&num_workers); /* free the work item memory sooner to reduce memory usage */ - work_item = *work_item_ptr; - RtlFreeHeap(GetProcessHeap(), 0, work_item_ptr); + while (TRUE) + { + if (num_work_items > 0) + { + struct list *item; + RtlEnterCriticalSection(&threadpool_cs); + item = list_head(&work_item_list); + if (item) + { + struct work_item *work_item_ptr = LIST_ENTRY(item, struct work_item, entry); + struct work_item work_item; + list_remove(&work_item_ptr->entry); + interlocked_dec(&num_work_items); - TRACE("executing %p(%p)\n", work_item.function, work_item.context); + RtlLeaveCriticalSection(&threadpool_cs); - /* do the work */ - work_item.function(work_item.context); + work_item = *work_item_ptr; + RtlFreeHeap(GetProcessHeap(), 0, work_item_ptr); + + TRACE("executing %p(%p)\n", work_item.function, work_item.context); + + interlocked_inc(&num_busy_workers); + + /* do the work */ + work_item.function(work_item.context); + + interlocked_dec(&num_busy_workers); + } + else + RtlLeaveCriticalSection(&threadpool_cs); + } + else + { + NTSTATUS status; + LARGE_INTEGER timeout; + timeout.QuadPart = -(WORKER_TIMEOUT * (ULONGLONG)10000); + status = NtWaitForSingleObject(work_item_event, FALSE, &timeout); + if (status != STATUS_WAIT_0) + break; + } + } + + interlocked_dec(&num_workers); RtlExitUserThread(0); /* never reached */ } +static NTSTATUS add_work_item_to_queue(struct work_item *work_item) +{ + NTSTATUS status; + + RtlEnterCriticalSection(&threadpool_cs); + list_add_tail(&work_item_list, &work_item->entry); + num_work_items++; + RtlLeaveCriticalSection(&threadpool_cs); + + if (!work_item_event) + { + HANDLE sem; + status = NtCreateSemaphore(&sem, SEMAPHORE_ALL_ACCESS, NULL, 1, LONG_MAX); + if (interlocked_cmpxchg_ptr( (PVOID *)&work_item_event, (PVOID)sem, 0 )) + NtClose(sem); /* somebody beat us to it */ + } + else + status = NtReleaseSemaphore(work_item_event, 1, NULL); + + return status; +} + /*********************************************************************** * RtlQueueWorkItem (NTDLL.@) * @@ -92,16 +184,35 @@ NTSTATUS WINAPI RtlQueueWorkItem(PRTL_WORK_ITEM_ROUTINE Function, PVOID Context, if (Flags != WT_EXECUTEDEFAULT) FIXME("Flags 0x%lx not supported\n", Flags); - /* FIXME: very crude implementation that doesn't support pooling at all */ - status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, - NULL, 0, 0, - worker_thread_proc, work_item, &thread, NULL ); + status = add_work_item_to_queue(work_item); + + if ((status == STATUS_SUCCESS) && + ((num_workers == 0) || (num_workers == num_busy_workers))) + { + status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, + NULL, 0, 0, + worker_thread_proc, NULL, &thread, NULL ); + if (status == STATUS_SUCCESS) + NtClose( thread ); + + /* NOTE: we don't care if we couldn't create the thread if there is at + * least one other available to process the request */ + if ((num_workers > 0) && (status != STATUS_SUCCESS)) + status = STATUS_SUCCESS; + } + if (status != STATUS_SUCCESS) { + RtlEnterCriticalSection(&threadpool_cs); + + interlocked_dec(&num_work_items); + list_remove(&work_item->entry); RtlFreeHeap(GetProcessHeap(), 0, work_item); + + RtlLeaveCriticalSection(&threadpool_cs); + return status; } - NtClose( thread ); return STATUS_SUCCESS; }