/* * Thread pooling * * Copyright (c) 2006 Robert Shearman * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include "config.h" #include "wine/port.h" #include #include #define NONAMELESSUNION #include "ntstatus.h" #define WIN32_NO_STATUS #include "winternl.h" #include "wine/debug.h" #include "wine/list.h" #include "ntdll_misc.h" WINE_DEFAULT_DEBUG_CHANNEL(threadpool); #define WORKER_TIMEOUT 30000 /* 30 seconds */ static LONG num_workers; static LONG num_work_items; static LONG num_busy_workers; static struct list work_item_list = LIST_INIT(work_item_list); static HANDLE work_item_event; static CRITICAL_SECTION threadpool_cs; static CRITICAL_SECTION_DEBUG critsect_debug = { 0, 0, &threadpool_cs, { &critsect_debug.ProcessLocksList, &critsect_debug.ProcessLocksList }, 0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_cs") } }; static CRITICAL_SECTION threadpool_cs = { &critsect_debug, -1, 0, 0, 0, 0 }; struct work_item { struct list entry; PRTL_WORK_ITEM_ROUTINE function; PVOID context; }; inline static LONG interlocked_inc( PLONG dest ) { return interlocked_xchg_add( (int *)dest, 1 ) + 1; } inline static LONG interlocked_dec( PLONG dest ) { return interlocked_xchg_add( (int *)dest, -1 ) - 1; } static void WINAPI worker_thread_proc(void * param) { interlocked_inc(&num_workers); /* free the work item memory sooner to reduce memory usage */ while (TRUE) { if (num_work_items > 0) { struct list *item; RtlEnterCriticalSection(&threadpool_cs); item = list_head(&work_item_list); if (item) { struct work_item *work_item_ptr = LIST_ENTRY(item, struct work_item, entry); struct work_item work_item; list_remove(&work_item_ptr->entry); interlocked_dec(&num_work_items); RtlLeaveCriticalSection(&threadpool_cs); work_item = *work_item_ptr; RtlFreeHeap(GetProcessHeap(), 0, work_item_ptr); TRACE("executing %p(%p)\n", work_item.function, work_item.context); interlocked_inc(&num_busy_workers); /* do the work */ work_item.function(work_item.context); interlocked_dec(&num_busy_workers); } else RtlLeaveCriticalSection(&threadpool_cs); } else { NTSTATUS status; LARGE_INTEGER timeout; timeout.QuadPart = -(WORKER_TIMEOUT * (ULONGLONG)10000); status = NtWaitForSingleObject(work_item_event, FALSE, &timeout); if (status != STATUS_WAIT_0) break; } } interlocked_dec(&num_workers); RtlExitUserThread(0); /* never reached */ } static NTSTATUS add_work_item_to_queue(struct work_item *work_item) { NTSTATUS status; RtlEnterCriticalSection(&threadpool_cs); list_add_tail(&work_item_list, &work_item->entry); num_work_items++; RtlLeaveCriticalSection(&threadpool_cs); if (!work_item_event) { HANDLE sem; status = NtCreateSemaphore(&sem, SEMAPHORE_ALL_ACCESS, NULL, 1, LONG_MAX); if (interlocked_cmpxchg_ptr( (PVOID *)&work_item_event, (PVOID)sem, 0 )) NtClose(sem); /* somebody beat us to it */ } else status = NtReleaseSemaphore(work_item_event, 1, NULL); return status; } /*********************************************************************** * RtlQueueWorkItem (NTDLL.@) * * Queues a work item into a thread in the thread pool. * * PARAMS * Function [I] Work function to execute. * Context [I] Context to pass to the work function when it is executed. * Flags [I] Flags. See notes. * * RETURNS * Success: STATUS_SUCCESS. * Failure: Any NTSTATUS code. * * NOTES * Flags can be one or more of the following: *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread. *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread. *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent. *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time. *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token. */ NTSTATUS WINAPI RtlQueueWorkItem(PRTL_WORK_ITEM_ROUTINE Function, PVOID Context, ULONG Flags) { HANDLE thread; NTSTATUS status; struct work_item *work_item = RtlAllocateHeap(GetProcessHeap(), 0, sizeof(struct work_item)); if (!work_item) return STATUS_NO_MEMORY; work_item->function = Function; work_item->context = Context; if (Flags != WT_EXECUTEDEFAULT) FIXME("Flags 0x%lx not supported\n", Flags); status = add_work_item_to_queue(work_item); if ((status == STATUS_SUCCESS) && ((num_workers == 0) || (num_workers == num_busy_workers))) { status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0, worker_thread_proc, NULL, &thread, NULL ); if (status == STATUS_SUCCESS) NtClose( thread ); /* NOTE: we don't care if we couldn't create the thread if there is at * least one other available to process the request */ if ((num_workers > 0) && (status != STATUS_SUCCESS)) status = STATUS_SUCCESS; } if (status != STATUS_SUCCESS) { RtlEnterCriticalSection(&threadpool_cs); interlocked_dec(&num_work_items); list_remove(&work_item->entry); RtlFreeHeap(GetProcessHeap(), 0, work_item); RtlLeaveCriticalSection(&threadpool_cs); return status; } return STATUS_SUCCESS; }