- Add critsec debugging info.

- Move the modal loop called during RPCs into CoWaitForMultipleHandles.
- Use a mutex for long remoting calls to IRemUnknown methods.
- Remove locking in apartment_disconnectproxies as it is not needed.
- Use PostMessage instead of SendMessage so we can run the message
  loop or not as appropriate.
oldstable
Robert Shearman 2005-03-17 10:26:20 +00:00 committed by Alexandre Julliard
parent 40df53aae2
commit 1b5ebabdce
6 changed files with 216 additions and 87 deletions

View File

@ -232,6 +232,7 @@ static APARTMENT *apartment_construct(DWORD model)
apt->remunk_exported = FALSE;
apt->oidc = 1;
InitializeCriticalSection(&apt->cs);
DEBUG_SET_CRITSEC_NAME(&apt->cs, "apartment");
apt->model = model;
@ -329,6 +330,9 @@ DWORD apartment_release(struct apartment *apt)
TRACE("destroying apartment %p, oxid %s\n", apt, wine_dbgstr_longlong(apt->oxid));
/* no locking is needed for this apartment, because no other thread
* can access it at this point */
apartment_disconnectproxies(apt);
if (apt->win) DestroyWindow(apt->win);
@ -351,8 +355,10 @@ DWORD apartment_release(struct apartment *apt)
if (apt->filter) IUnknown_Release(apt->filter);
DEBUG_CLEAR_CRITSEC_NAME(&apt->cs);
DeleteCriticalSection(&apt->cs);
CloseHandle(apt->thread);
HeapFree(GetProcessHeap(), 0, apt);
}
@ -414,7 +420,7 @@ static LRESULT CALLBACK apartment_wndproc(HWND hWnd, UINT msg, WPARAM wParam, LP
switch (msg)
{
case DM_EXECUTERPC:
return RPC_ExecuteCall((RPCOLEMESSAGE *)wParam, (IRpcStubBuffer *)lParam);
return RPC_ExecuteCall((struct dispatch_params *)lParam);
default:
return DefWindowProcW(hWnd, msg, wParam, lParam);
}
@ -2532,3 +2538,90 @@ HRESULT WINAPI CoCopyProxy(IUnknown *pProxy, IUnknown **ppCopy)
if (FAILED(hr)) ERR("-- failed with 0x%08lx\n", hr);
return hr;
}
/***********************************************************************
* CoWaitForMultipleHandles [OLE32.@]
*
* Waits for one or more handles to become signaled.
*
* PARAMS
* dwFlags [I] Flags. See notes.
* dwTimeout [I] Timeout in milliseconds.
* cHandles [I] Number of handles pointed to by pHandles.
* pHandles [I] Handles to wait for.
* lpdwindex [O] Index of handle that was signaled.
*
* RETURNS
* Success: S_OK.
* Failure: RPC_S_CALLPENDING on timeout.
*
* NOTES
*
* The dwFlags parameter can be zero or more of the following:
*| COWAIT_WAITALL - Wait for all of the handles to become signaled.
*| COWAIT_ALERTABLE - Allows a queued APC to run during the wait.
*
* SEE ALSO
* MsgWaitForMultipleObjects, WaitForMultipleObjects.
*/
HRESULT WINAPI CoWaitForMultipleHandles(DWORD dwFlags, DWORD dwTimeout,
ULONG cHandles, const HANDLE* pHandles, LPDWORD lpdwindex)
{
HRESULT hr = S_OK;
DWORD wait_flags = (dwFlags & COWAIT_WAITALL) ? MWMO_WAITALL : 0 |
(dwFlags & COWAIT_ALERTABLE ) ? MWMO_ALERTABLE : 0;
DWORD start_time = GetTickCount();
TRACE("(0x%08lx, 0x%08lx, %ld, %p, %p)\n", dwFlags, dwTimeout, cHandles,
pHandles, lpdwindex);
while (TRUE)
{
DWORD now = GetTickCount();
DWORD res;
if ((dwTimeout != INFINITE) && (start_time + dwTimeout >= now))
{
hr = RPC_S_CALLPENDING;
break;
}
TRACE("waiting for rpc completion or window message\n");
res = MsgWaitForMultipleObjectsEx(cHandles, pHandles,
(dwTimeout == INFINITE) ? INFINITE : start_time + dwTimeout - now,
QS_ALLINPUT, wait_flags);
if (res == WAIT_OBJECT_0 + cHandles) /* messages available */
{
MSG msg;
while (PeekMessageW(&msg, NULL, 0, 0, PM_REMOVE))
{
/* FIXME: filter the messages here */
TRACE("received message whilst waiting for RPC: 0x%04x\n", msg.message);
TranslateMessage(&msg);
DispatchMessageW(&msg);
}
}
else if ((res >= WAIT_OBJECT_0) && (res < WAIT_OBJECT_0 + cHandles))
{
/* handle signaled, store index */
*lpdwindex = (res - WAIT_OBJECT_0);
break;
}
else if (res == WAIT_TIMEOUT)
{
hr = RPC_S_CALLPENDING;
break;
}
else
{
ERR("Unexpected wait termination: %ld, %ld\n", res, GetLastError());
hr = E_UNEXPECTED;
break;
}
}
TRACE("-- 0x%08lx\n", hr);
return hr;
}

View File

@ -43,12 +43,15 @@ typedef struct apartment APARTMENT;
/* Thread-safety Annotation Legend:
*
* RO - The value is read only. It never changes after creation, so no
* locking is required.
* LOCK - The value is written to only using Interlocked* functions.
* CS - The value is read or written to with a critical section held.
* The identifier following "CS" is the specific critical section that
* must be used.
* RO - The value is read only. It never changes after creation, so no
* locking is required.
* LOCK - The value is written to only using Interlocked* functions.
* CS - The value is read or written to inside a critical section.
* The identifier following "CS" is the specific critical setion that
* must be used.
* MUTEX - The value is read or written to with a mutex held.
* The identifier following "MUTEX" is the specific mutex that
* must be used.
*/
typedef enum ifstub_state
@ -96,7 +99,7 @@ struct ifproxy
IID iid; /* interface ID (RO) */
IPID ipid; /* imported interface ID (RO) */
LPRPCPROXYBUFFER proxy; /* interface proxy (RO) */
DWORD refs; /* imported (public) references (CS parent->cs) */
DWORD refs; /* imported (public) references (MUTEX parent->remoting_mutex) */
IRpcChannelBuffer *chan; /* channel to object (CS parent->cs) */
};
@ -113,6 +116,7 @@ struct proxy_manager
CRITICAL_SECTION cs; /* thread safety for this object and children */
ULONG sorflags; /* STDOBJREF flags (RO) */
IRemUnknown *remunk; /* proxy to IRemUnknown used for lifecycle management (CS cs) */
HANDLE remoting_mutex; /* mutex used for synchronizing access to IRemUnknown */
};
/* this needs to become a COM object that implements IRemUnknown */
@ -182,9 +186,11 @@ HRESULT marshal_object(APARTMENT *apt, STDOBJREF *stdobjref, REFIID riid, IUnkno
/* RPC Backend */
struct dispatch_params;
void RPC_StartRemoting(struct apartment *apt);
HRESULT RPC_CreateClientChannel(const OXID *oxid, const IPID *ipid, IRpcChannelBuffer **pipebuf);
HRESULT RPC_ExecuteCall(RPCOLEMESSAGE *msg, IRpcStubBuffer *stub);
HRESULT RPC_ExecuteCall(struct dispatch_params *params);
HRESULT RPC_RegisterInterface(REFIID riid);
void RPC_UnregisterInterface(REFIID riid);
void RPC_StartLocalServer(REFCLSID clsid, IStream *stream);
@ -214,8 +220,9 @@ static inline HRESULT apartment_getoxid(struct apartment *apt, OXID *oxid)
return S_OK;
}
/* messages used by the apartment window (not compatible with native) */
#define DM_EXECUTERPC (WM_USER + 0) /* WPARAM = (RPCOLEMESSAGE *), LPARAM = (IRpcStubBuffer *) */
/* DCOM messages used by the apartment window (not compatible with native) */
#define DM_EXECUTERPC (WM_USER + 0) /* WPARAM = 0, LPARAM = (struct dispatch_params *) */
/*
* Per-thread values are stored in the TEB on offset 0xF80,
@ -238,4 +245,13 @@ static inline APARTMENT* COM_CurrentApt(void)
#define ICOM_THIS_MULTI(impl,field,iface) impl* const This=(impl*)((char*)(iface) - offsetof(impl,field))
/* helpers for debugging */
#ifdef __i386__
# define DEBUG_SET_CRITSEC_NAME(cs, name) (cs)->DebugInfo->Spare[1] = (DWORD)(__FILE__ ": " name)
# define DEBUG_CLEAR_CRITSEC_NAME(cs) (cs)->DebugInfo->Spare[1] = 0
#else
# define DEBUG_SET_CRITSEC_NAME(cs, name)
# define DEBUG_CLEAR_CRITSEC_NAME(cs)
#endif
#endif /* __WINE_OLE_COMPOBJ_H */

View File

@ -314,10 +314,13 @@ static const IMultiQIVtbl ClientIdentity_Vtbl =
static HRESULT ifproxy_get_public_ref(struct ifproxy * This)
{
HRESULT hr = S_OK;
/* FIXME: as this call could possibly be going over the network, we
* are going to spend a long time in this CS. We might want to replace
* this with a mutex */
EnterCriticalSection(&This->parent->cs);
if (WAIT_OBJECT_0 != WaitForSingleObject(This->parent->remoting_mutex, INFINITE))
{
ERR("Wait failed for ifproxy %p\n", This);
return E_UNEXPECTED;
}
if (This->refs == 0)
{
IRemUnknown *remunk = NULL;
@ -339,7 +342,7 @@ static HRESULT ifproxy_get_public_ref(struct ifproxy * This)
ERR("IRemUnknown_RemAddRef returned with 0x%08lx, hrref = 0x%08lx\n", hr, hrref);
}
}
LeaveCriticalSection(&This->parent->cs);
ReleaseMutex(This->parent->remoting_mutex);
return hr;
}
@ -348,10 +351,12 @@ static HRESULT ifproxy_release_public_refs(struct ifproxy * This)
{
HRESULT hr = S_OK;
/* FIXME: as this call could possibly be going over the network, we
* are going to spend a long time in this CS. We might want to replace
* this with a mutex */
EnterCriticalSection(&This->parent->cs);
if (WAIT_OBJECT_0 != WaitForSingleObject(This->parent->remoting_mutex, INFINITE))
{
ERR("Wait failed for ifproxy %p\n", This);
return E_UNEXPECTED;
}
if (This->refs > 0)
{
IRemUnknown *remunk = NULL;
@ -377,7 +382,7 @@ static HRESULT ifproxy_release_public_refs(struct ifproxy * This)
ERR("IRemUnknown_RemRelease failed with error 0x%08lx\n", hr);
}
}
LeaveCriticalSection(&This->parent->cs);
ReleaseMutex(This->parent->remoting_mutex);
return hr;
}
@ -423,12 +428,20 @@ static HRESULT proxy_manager_construct(
struct proxy_manager * This = HeapAlloc(GetProcessHeap(), 0, sizeof(*This));
if (!This) return E_OUTOFMEMORY;
This->remoting_mutex = CreateMutexW(NULL, FALSE, NULL);
if (!This->remoting_mutex)
{
HeapFree(GetProcessHeap(), 0, This);
return HRESULT_FROM_WIN32(GetLastError());
}
This->lpVtbl = &ClientIdentity_Vtbl;
list_init(&This->entry);
list_init(&This->interfaces);
InitializeCriticalSection(&This->cs);
DEBUG_SET_CRITSEC_NAME(&This->cs, "proxy_manager");
/* the apartment the object was unmarshaled into */
This->parent = apt;
@ -441,7 +454,7 @@ static HRESULT proxy_manager_construct(
/* the DCOM draft specification states that the SORF_NOPING flag is
* proxy manager specific, not ifproxy specific, so this implies that we
* should store the STDOBJREF flags in the proxy manager. */
* should store the STDOBJREF flags here in the proxy manager. */
This->sorflags = sorflags;
/* we create the IRemUnknown proxy on demand */
@ -688,8 +701,11 @@ static void proxy_manager_destroy(struct proxy_manager * This)
if (This->remunk) IRemUnknown_Release(This->remunk);
DEBUG_CLEAR_CRITSEC_NAME(&This->cs);
DeleteCriticalSection(&This->cs);
CloseHandle(This->remoting_mutex);
HeapFree(GetProcessHeap(), 0, This);
}
@ -721,13 +737,11 @@ HRESULT apartment_disconnectproxies(struct apartment *apt)
{
struct list * cursor;
EnterCriticalSection(&apt->cs);
LIST_FOR_EACH(cursor, &apt->proxies)
{
struct proxy_manager * proxy = LIST_ENTRY(cursor, struct proxy_manager, entry);
proxy_manager_disconnect(proxy);
}
LeaveCriticalSection(&apt->cs);
return S_OK;
}

View File

@ -1,5 +1,5 @@
/*
* (Local) RPC Stuff
* RPC Manager
*
* Copyright 2001 Ove Kåven, TransGaming Technologies
* Copyright 2002 Marcus Meissner
@ -98,6 +98,15 @@ typedef struct
RPC_BINDING_HANDLE bind; /* handle to the remote server */
} ClientRpcChannelBuffer;
struct dispatch_params
{
RPCOLEMESSAGE *msg; /* message */
IRpcStubBuffer *stub; /* stub buffer, if applicable */
IRpcChannelBuffer *chan; /* server channel buffer, if applicable */
HANDLE handle; /* handle that will become signaled when call finishes */
RPC_STATUS status; /* status (out) */
};
static HRESULT WINAPI RpcChannelBuffer_QueryInterface(LPRPCCHANNELBUFFER iface, REFIID riid, LPVOID *ppv)
{
*ppv = NULL;
@ -149,7 +158,7 @@ static HRESULT WINAPI ServerRpcChannelBuffer_GetBuffer(LPRPCCHANNELBUFFER iface,
RPC_MESSAGE *msg = (RPC_MESSAGE *)olemsg;
RPC_STATUS status;
TRACE("(%p)->(%p,%p)\n", This, olemsg, riid);
TRACE("(%p)->(%p,%s)\n", This, olemsg, debugstr_guid(riid));
status = I_RpcGetBuffer(msg);
@ -165,7 +174,7 @@ static HRESULT WINAPI ClientRpcChannelBuffer_GetBuffer(LPRPCCHANNELBUFFER iface,
RPC_CLIENT_INTERFACE *cif;
RPC_STATUS status;
TRACE("(%p)->(%p,%p)\n", This, olemsg, riid);
TRACE("(%p)->(%p,%s)\n", This, olemsg, debugstr_guid(riid));
cif = HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(RPC_CLIENT_INTERFACE));
if (!cif)
@ -187,21 +196,13 @@ static HRESULT WINAPI ClientRpcChannelBuffer_GetBuffer(LPRPCCHANNELBUFFER iface,
return HRESULT_FROM_WIN32(status);
}
struct rpc_sendreceive_params
{
RPC_MESSAGE *msg;
RPC_STATUS status;
};
/* this thread runs an outgoing RPC */
static DWORD WINAPI rpc_sendreceive_thread(LPVOID param)
{
struct rpc_sendreceive_params *data = (struct rpc_sendreceive_params *) param;
struct dispatch_params *data = (struct dispatch_params *) param;
TRACE("starting up\n");
/* FIXME: trap and rethrow RPC exceptions in app thread */
data->status = I_RpcSendReceive(data->msg);
data->status = I_RpcSendReceive((RPC_MESSAGE *)data->msg);
TRACE("completed with status 0x%lx\n", data->status);
@ -210,19 +211,19 @@ static DWORD WINAPI rpc_sendreceive_thread(LPVOID param)
static HRESULT WINAPI RpcChannelBuffer_SendReceive(LPRPCCHANNELBUFFER iface, RPCOLEMESSAGE *olemsg, ULONG *pstatus)
{
RPC_MESSAGE *msg = (RPC_MESSAGE *)olemsg;
HRESULT hr = S_OK;
HANDLE thread;
struct rpc_sendreceive_params *params;
DWORD tid, res;
RPC_STATUS status;
DWORD index;
struct dispatch_params *params;
DWORD tid;
TRACE("(%p)\n", msg);
TRACE("(%p) iMethod=%ld\n", olemsg, olemsg->iMethod);
params = HeapAlloc(GetProcessHeap(), 0, sizeof(*params));
params = HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(*params));
if (!params) return E_OUTOFMEMORY;
params->msg = msg;
params->msg = olemsg;
params->status = RPC_S_OK;
/* we use a separate thread here because we need to be able to
* pump the message loop in the application thread: if we do not,
@ -230,50 +231,21 @@ static HRESULT WINAPI RpcChannelBuffer_SendReceive(LPRPCCHANNELBUFFER iface, RPC
* and re-enter this STA from an incoming server thread will
* deadlock. InstallShield is an example of that.
*/
thread = CreateThread(NULL, 0, rpc_sendreceive_thread, params, 0, &tid);
if (!thread)
params->handle = CreateThread(NULL, 0, rpc_sendreceive_thread, params, 0, &tid);
if (!params->handle)
{
ERR("Could not create RpcSendReceive thread, error %lx\n", GetLastError());
return E_UNEXPECTED;
hr = E_UNEXPECTED;
}
while (TRUE)
{
TRACE("waiting for rpc completion or window message\n");
res = MsgWaitForMultipleObjectsEx(1, &thread, INFINITE, QS_ALLINPUT, 0);
if (res == WAIT_OBJECT_0 + 1) /* messages available */
{
MSG message;
while (PeekMessageW(&message, NULL, 0, 0, PM_REMOVE))
{
/* FIXME: filter the messages here */
if (message.message == DM_EXECUTERPC)
TRACE("received DM_EXECUTRPC dispatch request, re-entering ...\n");
else
TRACE("received message whilst waiting for RPC: 0x%x\n", message.message);
TranslateMessage(&message);
DispatchMessageW(&message);
}
}
else if (res == WAIT_OBJECT_0)
{
break; /* RPC is completed */
}
else
{
ERR("Unexpected wait termination: %ld, %ld\n", res, GetLastError());
hr = E_UNEXPECTED;
break;
}
}
CloseHandle(thread);
if (hr == S_OK)
hr = CoWaitForMultipleHandles(0, INFINITE, 1, &params->handle, &index);
CloseHandle(params->handle);
status = params->status;
HeapFree(GetProcessHeap(), 0, params);
params = NULL;
if (hr) return hr;
if (pstatus) *pstatus = status;
@ -282,7 +254,7 @@ static HRESULT WINAPI RpcChannelBuffer_SendReceive(LPRPCCHANNELBUFFER iface, RPC
if (status == RPC_S_OK)
hr = S_OK;
else if (status == RPC_S_CALL_FAILED)
hr = *(HRESULT *)msg->Buffer;
hr = *(HRESULT *)olemsg->Buffer;
else
hr = HRESULT_FROM_WIN32(status);
@ -433,20 +405,28 @@ HRESULT RPC_CreateServerChannel(IRpcChannelBuffer **chan)
}
HRESULT RPC_ExecuteCall(RPCOLEMESSAGE *msg, IRpcStubBuffer *stub)
HRESULT RPC_ExecuteCall(struct dispatch_params *params)
{
/* FIXME: pass server channel buffer, but don't create it every time */
return IRpcStubBuffer_Invoke(stub, msg, NULL);
HRESULT hr = IRpcStubBuffer_Invoke(params->stub, params->msg, params->chan);
IRpcStubBuffer_Release(params->stub);
if (params->handle) SetEvent(params->handle);
return hr;
}
static void __RPC_STUB dispatch_rpc(RPC_MESSAGE *msg)
{
struct dispatch_params *params;
IRpcStubBuffer *stub;
APARTMENT *apt;
IPID ipid;
RpcBindingInqObject(msg->Handle, &ipid);
TRACE("ipid = %s, iMethod = %d\n", debugstr_guid(&ipid), msg->ProcNum);
params = HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(*params));
if (!params) return RpcRaiseException(E_OUTOFMEMORY);
stub = ipid_to_apt_and_stubbuffer(&ipid, &apt);
if (!apt || !stub)
{
@ -455,16 +435,30 @@ static void __RPC_STUB dispatch_rpc(RPC_MESSAGE *msg)
return RpcRaiseException(RPC_E_DISCONNECTED);
}
params->msg = (RPCOLEMESSAGE *)msg;
params->stub = stub;
params->chan = NULL; /* FIXME: pass server channel */
params->status = RPC_S_OK;
/* Note: this is the important difference between STAs and MTAs - we
* always execute RPCs to STAs in the thread that originally created the
* apartment (i.e. the one that pumps messages to the window) */
if (apt->model & COINIT_APARTMENTTHREADED)
SendMessageW(apt->win, DM_EXECUTERPC, (WPARAM)msg, (LPARAM)stub);
{
params->handle = CreateEventW(NULL, FALSE, FALSE, NULL);
TRACE("Calling apartment thread 0x%08lx...\n", apt->tid);
PostMessageW(apt->win, DM_EXECUTERPC, 0, (LPARAM)params);
WaitForSingleObject(params->handle, INFINITE);
CloseHandle(params->handle);
}
else
RPC_ExecuteCall((RPCOLEMESSAGE *)msg, stub);
RPC_ExecuteCall(params);
HeapFree(GetProcessHeap(), 0, params);
apartment_release(apt);
IRpcStubBuffer_Release(stub);
}
/* stub registration */

View File

@ -59,7 +59,10 @@ struct stub_manager *new_stub_manager(APARTMENT *apt, IUnknown *object, MSHLFLAG
if (!sm) return NULL;
list_init(&sm->ifstubs);
InitializeCriticalSection(&sm->lock);
DEBUG_SET_CRITSEC_NAME(&sm->lock, "stub_manager");
IUnknown_AddRef(object);
sm->object = object;
sm->apt = apt;
@ -111,6 +114,7 @@ static void stub_manager_delete(struct stub_manager *m)
IUnknown_Release(m->object);
DEBUG_CLEAR_CRITSEC_NAME(&m->lock);
DeleteCriticalSection(&m->lock);
HeapFree(GetProcessHeap(), 0, m);

View File

@ -392,6 +392,14 @@ BOOL WINAPI CoFileTimeToDosDateTime(FILETIME* lpFileTime, WORD* lpDosDate, WORD*
HRESULT WINAPI CoFileTimeNow(FILETIME* lpFileTime);
HRESULT WINAPI CoRegisterMessageFilter(LPMESSAGEFILTER lpMessageFilter,LPMESSAGEFILTER *lplpMessageFilter);
typedef enum tagCOWAIT_FLAGS
{
COWAIT_WAITALL = 0x00000001,
COWAIT_ALERTABLE = 0x00000002
} COWAIT_FLAGS;
HRESULT WINAPI CoWaitForMultipleHandles(DWORD dwFlags,DWORD dwTimeout,ULONG cHandles,const HANDLE* pHandles,LPDWORD lpdwindex);
/*****************************************************************************
* GUID API
*/