StdScheduler: Split into StdSchedulerWin32/StdSchedulerPoll/StdSchedulerMac

stable-5.4
Martin Plicht 2014-01-12 21:05:35 +01:00
parent 57463b61c4
commit d504626a40
12 changed files with 771 additions and 459 deletions

View File

@ -977,6 +977,9 @@ add_executable(c4group
add_executable(netpuncher EXCLUDE_FROM_ALL
src/platform/StdScheduler.cpp
src/platform/StdSchedulerWin32.cpp
src/platform/StdSchedulerPoll.cpp
src/platform/StdSchedulerMac.mm
src/netpuncher/main.cpp
)
@ -1006,6 +1009,9 @@ src/platform/StdFile.h
src/platform/StdRegistry.cpp
src/platform/StdRegistry.h
src/platform/StdScheduler.cpp
src/platform/StdSchedulerWin32.cpp
src/platform/StdSchedulerPoll.cpp
src/platform/StdSchedulerMac.mm
src/platform/StdScheduler.h
src/platform/C4TimeMilliseconds.cpp
src/platform/C4TimeMilliseconds.h

View File

@ -927,6 +927,8 @@ C4NetIOTCP::Peer *C4NetIOTCP::Accept(SOCKET nsock, const addr_t &ConnectAddr) //
// clear add-lock
PeerListAddLock.Clear();
Changed();
// ask callback if connection should be permitted
if (pCB && !pCB->OnConn(addr, caddr, NULL, this))
@ -989,6 +991,7 @@ bool C4NetIOTCP::Listen(uint16_t inListenPort)
// ok
iListenPort = inListenPort;
Changed();
return true;
}
@ -1063,6 +1066,7 @@ void C4NetIOTCP::AddConnectWait(SOCKET sock, const addr_t &addr) // (mt-safe)
// unblock, so new FD can be realized
UnBlock();
#endif
Changed();
}
C4NetIOTCP::ConnectWait *C4NetIOTCP::GetConnectWait(const addr_t &addr) // (mt-safe)
@ -3067,6 +3071,7 @@ void C4NetIOUDP::AddPeer(Peer *pPeer)
// add
pPeer->Next = pPeerList;
pPeerList = pPeer;
Changed();
}
void C4NetIOUDP::OnShareFree(CStdCSecEx *pCSec)

View File

@ -114,6 +114,7 @@ public:
virtual bool CloseBroadcast() = 0;
virtual bool Execute(int iTimeout = -1, pollfd * = 0) = 0; // (for StdSchedulerProc)
virtual bool IsNotify() { return true; }
// * multithreading safe
virtual bool Connect(const addr_t &addr) = 0; // async!

View File

@ -97,7 +97,7 @@ public:
// Gamma
virtual bool ApplyGammaRamp(struct _GAMMARAMP &ramp, bool fForce);
virtual bool SaveDefaultGammaRamp(struct _GAMMARAMP &ramp);
bool ScheduleProcs(int iTimeout = -1);
virtual bool ScheduleProcs(int iTimeout = -1);
bool FlushMessages();
C4Window * pWindow;
bool fQuitMsgReceived; // if true, a quit message has been received and the application should terminate

View File

@ -125,14 +125,14 @@
{
NSLog(@"Game running, only simulating Esc key");
[self simulateKeyPressed:K_ESCAPE];
return;
}
else
{
if (Application.isEditor)
Console.FileClose();
Application.fQuitMsgReceived = true;
return;
Application.ScheduleProcs();
Application.Quit();
}
}

View File

@ -76,7 +76,7 @@
{
NSString* pathExtension = [[filename pathExtension] lowercaseString];
NSArray* clonkFileNameExtensions = [NSArray arrayWithObjects:@"ocd", @"ocs", @"ocf", @"ocg", nil];
NSArray* clonkFileNameExtensions = @[@"ocd", @"ocs", @"ocf", @"ocg"];
if ([clonkFileNameExtensions containsObject:pathExtension])
{
// later decide whether to install or run
@ -150,19 +150,20 @@
- (void) delayedRun:(id)sender
{
running = YES;
while (!Application.fQuitMsgReceived)
Application.ScheduleProcs();
[NSApp replyToApplicationShouldTerminate:YES];
running = NO;
[self quitAndMaybeRestart];
[NSApp terminate:self];
//while (!Application.fQuitMsgReceived)
// Application.ScheduleProcs();
//[NSApp replyToApplicationShouldTerminate:YES];
//running = NO;
//[self quitAndMaybeRestart];
//[NSApp terminate:self];
}
#endif
- (NSApplicationTerminateReply)applicationShouldTerminate:(NSApplication*)application
{
[self suggestQuitting:self];
return running ? NSTerminateCancel : NSTerminateNow;
if (!Application.fQuitMsgReceived)
[self suggestQuitting:self];
return NSTerminateNow;
}
- (void)terminate:(NSApplication*)sender

View File

@ -94,7 +94,7 @@ void C4AbstractApp::Clear() {}
void C4AbstractApp::Quit()
{
fQuitMsgReceived = true;
[NSApp terminate:[NSApp delegate]];
}
bool C4AbstractApp::FlushMessages()

View File

@ -30,23 +30,16 @@
#ifdef HAVE_SHARE_H
#include <share.h>
#endif
#ifdef _WIN32
#include <process.h>
#include <mmsystem.h>
static int pipe(int *phandles)
{
// This doesn't work with select(), rendering the non-event-solution
// unusable for Win32. Oh well, it isn't desirable performance-wise, anyway.
return _pipe(phandles, 10, O_BINARY);
}
#endif
#ifdef HAVE_UNISTD_H
// For pipe()
#include <unistd.h>
#endif
#ifdef _WIN32
#include <process.h>
#endif
// *** StdSchedulerProc
// Keep calling Execute until timeout has elapsed
@ -74,32 +67,10 @@ bool StdSchedulerProc::ExecuteUntil(int iTimeout)
return true;
}
// Is this process currently signaled?
bool StdSchedulerProc::IsSignaled()
{
#ifdef STDSCHEDULER_USE_EVENTS
return GetEvent() && WaitForSingleObject(GetEvent(), 0) == WAIT_OBJECT_0;
#else
// Initialize file descriptor sets
std::vector<struct pollfd> fds;
// Get file descriptors
GetFDs(fds);
// Test
return poll(&fds[0], fds.size(), 0) > 0;
#endif
}
// *** StdScheduler
StdScheduler::StdScheduler()
: ppProcs(NULL), iProcCnt(0), iProcCapacity(0)
StdScheduler::StdScheduler() : isInManualLoop(false)
{
#ifdef STDSCHEDULER_USE_EVENTS
pEventHandles = NULL;
ppEventProcs = NULL;
#endif
Add(&Unblocker);
}
@ -108,208 +79,79 @@ StdScheduler::~StdScheduler()
Clear();
}
int StdScheduler::getProc(StdSchedulerProc *pProc)
{
for (int i = 0; i < iProcCnt; i++)
if (ppProcs[i] == pProc)
return i;
return -1;
}
void StdScheduler::Clear()
{
delete[] ppProcs; ppProcs = NULL;
#ifdef STDSCHEDULER_USE_EVENTS
delete[] pEventHandles; pEventHandles = NULL;
delete[] ppEventProcs; ppEventProcs = NULL;
#endif
iProcCnt = iProcCapacity = 0;
while (procs.size() > 0)
Remove(procs[procs.size()-1]);
}
void StdScheduler::Set(StdSchedulerProc **ppnProcs, int inProcCnt)
{
// Remove previous data
Clear();
// Set size
Enlarge(inProcCnt - iProcCapacity);
// Copy new
iProcCnt = inProcCnt;
for (int i = 0; i < iProcCnt; i++)
ppProcs[i] = ppnProcs[i];
for (int i = 0; i < inProcCnt; i++)
Add(ppnProcs[i]);
}
void StdScheduler::Add(StdSchedulerProc *pProc)
{
// Alrady in list?
if (hasProc(pProc)) return;
// Enlarge
if (iProcCnt >= iProcCapacity) Enlarge(10);
// Already added to some scheduler
if (pProc->scheduler)
return;
// Add
ppProcs[iProcCnt] = pProc;
iProcCnt++;
procs.push_back(pProc);
pProc->scheduler = this;
Added(pProc);
}
void StdScheduler::Remove(StdSchedulerProc *pProc)
{
// Search
int iPos = getProc(pProc);
// Not found?
if (iPos < 0 || iPos >= iProcCnt) return;
// Remove
for (int i = iPos + 1; i < iProcCnt; i++)
ppProcs[i-1] = ppProcs[i];
iProcCnt--;
// :o ?
if (pProc->scheduler != this)
return;
Removing(pProc);
pProc->scheduler = NULL;
auto pos = std::find(procs.begin(), procs.end(), pProc);
if (pos != procs.end())
procs.erase(pos);
}
void StdSchedulerProc::Changed()
{
auto s = scheduler;
if (s)
s->Changed(this);
}
C4TimeMilliseconds StdSchedulerProc::GetNextTick(C4TimeMilliseconds tNow)
{
return C4TimeMilliseconds::PositiveInfinity;
}
bool StdScheduler::ScheduleProcs(int iTimeout)
{
isInManualLoop = true;
// Needs at least one process to work properly
if (!iProcCnt) return false;
if (!procs.size()) return false;
// Get timeout
int i;
C4TimeMilliseconds tProcTick;
C4TimeMilliseconds tNow = C4TimeMilliseconds::Now();
for (i = 0; i < iProcCnt; i++)
for (auto i = 0; i < procs.size(); i++)
{
tProcTick = ppProcs[i]->GetNextTick(tNow);
auto proc = procs[i];
tProcTick = proc->GetNextTick(tNow);
if (iTimeout == -1 || tNow + iTimeout > tProcTick)
{
iTimeout = Max<uint32_t>(tProcTick - tNow, 0);
iTimeout = Max<decltype(iTimeout)>(tProcTick - tNow, 0);
}
}
#ifdef STDSCHEDULER_USE_EVENTS
// Collect event handles
int iEventCnt = 0; HANDLE hEvent;
StdSchedulerProc *pMessageProc = NULL;
for (i = 0; i < iProcCnt; i++)
if ( (hEvent = ppProcs[i]->GetEvent()) )
{
if (hEvent == STDSCHEDULER_EVENT_MESSAGE)
pMessageProc = ppProcs[i];
else
{
pEventHandles[iEventCnt] = hEvent;
ppEventProcs[iEventCnt] = ppProcs[i];
iEventCnt++;
}
}
// Wait for something to happen
DWORD ret; DWORD dwMsec = iTimeout < 0 ? INFINITE : iTimeout;
if (pMessageProc)
ret = MsgWaitForMultipleObjects(iEventCnt, pEventHandles, false, dwMsec, QS_ALLINPUT);
else
ret = WaitForMultipleObjects(iEventCnt, pEventHandles, false, dwMsec);
bool fSuccess = true;
// Event?
if (ret != WAIT_TIMEOUT)
{
// Which event?
int iEventNr = ret - WAIT_OBJECT_0;
// Execute the signaled process
StdSchedulerProc *pProc = iEventNr < iEventCnt ? ppEventProcs[iEventNr] : pMessageProc;
if (!pProc->Execute(0))
{
OnError(pProc);
fSuccess = false;
}
}
// Execute all processes with timeout
tNow = C4TimeMilliseconds::Now();
for (i = 0; i < iProcCnt; i++)
{
tProcTick = ppProcs[i]->GetNextTick(tNow);
if (tProcTick <= tNow)
if (!ppProcs[i]->Execute(0))
{
OnError(ppProcs[i]);
fSuccess = false;
}
}
#else
// Initialize file descriptor sets
std::vector<struct pollfd> fds;
std::map<StdSchedulerProc *, std::pair<unsigned int, unsigned int> > fds_for_proc;
// Collect file descriptors
for (i = 0; i < iProcCnt; i++)
{
unsigned int os = fds.size();
ppProcs[i]->GetFDs(fds);
if (os != fds.size())
fds_for_proc[ppProcs[i]] = std::pair<unsigned int, unsigned int>(os, fds.size());
}
// Wait for something to happen
int cnt = poll(&fds[0], fds.size(), iTimeout);
bool fSuccess = true;
if (cnt >= 0)
{
bool any_executed = false;
tNow = C4TimeMilliseconds::Now();
// Which process?
for (i = 0; i < iProcCnt; i++)
{
tProcTick = ppProcs[i]->GetNextTick(tNow);
if (tProcTick <= tNow)
{
struct pollfd * pfd = 0;
if (fds_for_proc.find(ppProcs[i]) != fds_for_proc.end())
pfd = &fds[fds_for_proc[ppProcs[i]].first];
if (!ppProcs[i]->Execute(0, pfd))
{
OnError(ppProcs[i]);
fSuccess = false;
}
any_executed = true;
continue;
}
// no fds?
if (fds_for_proc.find(ppProcs[i]) == fds_for_proc.end())
continue;
// Check intersection
unsigned int begin = fds_for_proc[ppProcs[i]].first;
unsigned int end = fds_for_proc[ppProcs[i]].second;
for (unsigned int j = begin; j < end; ++j)
{
if (fds[j].events & fds[j].revents)
{
if (any_executed && ppProcs[i]->IsLowPriority())
break;
if (!ppProcs[i]->Execute(0, &fds[begin]))
{
OnError(ppProcs[i]);
fSuccess = false;
}
any_executed = true;
// the list of procs might have been changed, but procs must be in both ppProcs and
// fds_for_proc to be executed, which prevents execution of any proc not polled this round
// or deleted. Some procs might be skipped or executed twice, but that should be save.
break;
}
}
}
}
else if (cnt < 0)
{
printf("StdScheduler::Execute: poll failed: %s\n",strerror(errno));
}
#endif
return fSuccess;
bool res = DoScheduleProcs(iTimeout);
isInManualLoop = false;
return res;
}
void StdScheduler::UnBlock()
@ -317,23 +159,6 @@ void StdScheduler::UnBlock()
Unblocker.Notify();
}
void StdScheduler::Enlarge(int iBy)
{
iProcCapacity += iBy;
// Realloc
StdSchedulerProc **ppnProcs = new StdSchedulerProc *[iProcCapacity];
// Set data
for (int i = 0; i < iProcCnt; i++)
ppnProcs[i] = ppProcs[i];
delete[] ppProcs;
ppProcs = ppnProcs;
#ifdef STDSCHEDULER_USE_EVENTS
// Allocate dummy arrays (one handle neede for unlocker!)
delete[] pEventHandles; pEventHandles = new HANDLE[iProcCapacity + 1];
delete[] ppEventProcs; ppEventProcs = new StdSchedulerProc *[iProcCapacity];
#endif
}
// *** StdSchedulerThread
StdSchedulerThread::StdSchedulerThread()
@ -446,7 +271,7 @@ unsigned int StdSchedulerThread::ThreadFunc()
{
// Keep calling Execute until someone gets fed up and calls StopThread()
while (fRunThreadRun)
ScheduleProcs();
ScheduleProcs(1000);
return(0);
}
@ -491,7 +316,7 @@ void StdThread::Stop()
// Wait for thread to terminate itself
HANDLE hThread = reinterpret_cast<HANDLE>(iThread);
if (WaitForSingleObject(hThread, 10000) == WAIT_TIMEOUT)
// ... or kill him in case he refuses to do so
// ... or kill it in case it refuses to do so
TerminateThread(hThread, -1);
#elif defined(HAVE_PTHREAD)
// wait for thread to terminate itself
@ -503,20 +328,6 @@ void StdThread::Stop()
return;
}
#ifdef HAVE_WINTHREAD
void __cdecl StdThread::_ThreadFunc(void *pPar)
{
StdThread *pThread = reinterpret_cast<StdThread *>(pPar);
_endthreadex(pThread->ThreadFunc());
}
#elif defined(HAVE_PTHREAD)
void *StdThread::_ThreadFunc(void *pPar)
{
StdThread *pThread = reinterpret_cast<StdThread *>(pPar);
return reinterpret_cast<void *>(pThread->ThreadFunc());
}
#endif
unsigned int StdThread::ThreadFunc()
{
// Keep calling Execute until someone gets fed up and calls Stop()
@ -529,199 +340,3 @@ bool StdThread::IsStopSignaled()
{
return fStopSignaled;
}
namespace
{
void Fail(const char* msg)
{
Log(msg);
}
}
#ifdef STDSCHEDULER_USE_EVENTS
CStdNotifyProc::CStdNotifyProc() : Event(true) {}
void CStdNotifyProc::Notify() { Event.Set(); }
bool CStdNotifyProc::CheckAndReset()
{
if (!Event.WaitFor(0)) return false;
Event.Reset();
return true;
}
#else // STDSCHEDULER_USE_EVENTS
#ifdef HAVE_SYS_EVENTFD_H
#include <sys/eventfd.h>
CStdNotifyProc::CStdNotifyProc()
{
fds[0] = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (fds[0] == -1)
Fail("eventfd failed");
}
CStdNotifyProc::~CStdNotifyProc()
{
close(fds[0]);
}
void CStdNotifyProc::Notify()
{
uint64_t n = 1;
if (write(fds[0], &n, 8) == -1)
Fail("write failed");
}
bool CStdNotifyProc::CheckAndReset()
{
uint64_t n;
return (read(fds[0], &n, 8) != -1);
}
#else
CStdNotifyProc::CStdNotifyProc()
{
if (pipe(fds) == -1)
Fail("pipe failed");
fcntl(fds[0], F_SETFL, fcntl(fds[0], F_GETFL) | O_NONBLOCK);
fcntl(fds[0], F_SETFD, FD_CLOEXEC);
fcntl(fds[1], F_SETFD, FD_CLOEXEC);
}
CStdNotifyProc::~CStdNotifyProc()
{
close(fds[0]);
close(fds[1]);
}
void CStdNotifyProc::Notify()
{
char c = 42;
if (write(fds[1], &c, 1) == -1)
Fail("write failed");
}
bool CStdNotifyProc::CheckAndReset()
{
bool r = false;
while (1)
{
char c;
if (read(fds[0], &c, 1) <= 0)
break;
else
r = true;
}
return r;
}
#endif
void CStdNotifyProc::GetFDs(std::vector<struct pollfd> & checkfds)
{
pollfd pfd = { fds[0], POLLIN, 0 };
checkfds.push_back(pfd);
}
#endif
/* CStdMultimediaTimerProc */
#ifdef STDSCHEDULER_USE_EVENTS
int CStdMultimediaTimerProc::iTimePeriod = 0;
CStdMultimediaTimerProc::CStdMultimediaTimerProc(uint32_t iDelay) :
uCriticalTimerDelay(28),
idCriticalTimer(0),
uCriticalTimerResolution(5),
Event(true)
{
if (!iTimePeriod)
{
// Get resolution caps
TIMECAPS tc;
timeGetDevCaps(&tc, sizeof(tc));
// Establish minimum resolution
uCriticalTimerResolution = BoundBy(uCriticalTimerResolution, tc.wPeriodMin, tc.wPeriodMax);
timeBeginPeriod(uCriticalTimerResolution);
}
iTimePeriod++;
SetDelay(iDelay);
}
CStdMultimediaTimerProc::~CStdMultimediaTimerProc()
{
if (idCriticalTimer)
{
timeKillEvent(idCriticalTimer);
idCriticalTimer = 0;
iTimePeriod--;
if (!iTimePeriod)
timeEndPeriod(uCriticalTimerResolution);
}
}
void CStdMultimediaTimerProc::SetDelay(uint32_t iDelay)
{
// Kill old timer (of any)
if (idCriticalTimer)
timeKillEvent(idCriticalTimer);
// Set new delay
uCriticalTimerDelay = iDelay;
// Set critical timer
idCriticalTimer=timeSetEvent(
uCriticalTimerDelay,uCriticalTimerResolution,
(LPTIMECALLBACK) Event.GetEvent(),0,TIME_PERIODIC | TIME_CALLBACK_EVENT_SET);
if(idCriticalTimer == 0)
DebugLogF("Creating Critical Timer failed: %d", GetLastError());
}
bool CStdMultimediaTimerProc::CheckAndReset()
{
if (!Check()) return false;
Event.Reset();
return true;
}
#elif defined(HAVE_SYS_TIMERFD_H)
#include <sys/timerfd.h>
#include <unistd.h>
#include <fcntl.h>
CStdMultimediaTimerProc::CStdMultimediaTimerProc(uint32_t iDelay)
{
fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
if (fd == -1)
Log("timerfd_create failed");
SetDelay(iDelay);
}
CStdMultimediaTimerProc::~CStdMultimediaTimerProc()
{
close(fd);
}
void CStdMultimediaTimerProc::SetDelay(uint32_t inDelay)
{
struct itimerspec nv, ov;
nv.it_interval.tv_sec = inDelay / 1000;
nv.it_interval.tv_nsec = (inDelay % 1000) * 1000000;
nv.it_value = nv.it_interval;
timerfd_settime(fd, 0, &nv, &ov);
}
void CStdMultimediaTimerProc::Set()
{
struct itimerspec nv, ov;
timerfd_gettime(fd, &nv);
nv.it_value.tv_sec = 0;
nv.it_value.tv_nsec = 1;
timerfd_settime(fd, 0, &nv, &ov);
}
bool CStdMultimediaTimerProc::CheckAndReset()
{
uint64_t n;
return read(fd, &n, 8) != -1;
}
void CStdMultimediaTimerProc::GetFDs(std::vector<struct pollfd> & checkfds)
{
pollfd pfd = { fd, POLLIN, 0 };
checkfds.push_back(pfd);
}
#endif

View File

@ -42,12 +42,21 @@ struct pollfd;
#endif // HAVE_PTHREAD
#endif // _WIN32
#include <vector>
typedef struct _GMainLoop GMainLoop;
// Abstract class for a process
class StdSchedulerProc
{
private:
class StdScheduler *scheduler;
protected:
void Changed();
public:
StdSchedulerProc(): scheduler(NULL) {}
virtual ~StdSchedulerProc() { }
// Do whatever the process wishes to do. Should not block longer than the timeout value.
@ -65,13 +74,17 @@ public:
#endif
// Call Execute() after this time has elapsed
virtual C4TimeMilliseconds GetNextTick(C4TimeMilliseconds tNow) { return C4TimeMilliseconds::PositiveInfinity; };
virtual C4TimeMilliseconds GetNextTick(C4TimeMilliseconds tNow);
// Is the process signal currently set?
bool IsSignaled();
// Is this the expensive game tick?
virtual bool IsLowPriority() { return false; }
virtual bool IsNotify() { return false; }
virtual uint32_t TimerInterval() { return 0; }
friend class StdScheduler;
};
@ -91,7 +104,7 @@ public:
{
tLastTimer = C4TimeMilliseconds::NegativeInfinity;
}
void SetDelay(uint32_t inDelay) { iDelay = inDelay; }
void SetDelay(uint32_t inDelay) { iDelay = inDelay; Changed(); }
bool CheckAndReset()
{
C4TimeMilliseconds tTime = C4TimeMilliseconds::Now();
@ -107,6 +120,7 @@ public:
{
return tLastTimer + iDelay;
}
virtual uint32_t TimerInterval() { return iDelay; }
};
// A simple alertable proc
@ -117,6 +131,7 @@ public:
void Notify();
bool CheckAndReset();
virtual bool IsNotify() { return true; }
#ifdef STDSCHEDULER_USE_EVENTS
~CStdNotifyProc() { }
@ -197,8 +212,8 @@ public:
private:
// Process list
StdSchedulerProc **ppProcs;
int iProcCnt, iProcCapacity;
std::vector<StdSchedulerProc*> procs;
bool isInManualLoop;
// Unblocker
class NoopNotifyProc : public CStdNotifyProc
@ -209,21 +224,25 @@ private:
// Dummy lists (preserved to reduce allocs)
#ifdef STDSCHEDULER_USE_EVENTS
HANDLE *pEventHandles;
StdSchedulerProc **ppEventProcs;
std::vector<HANDLE> eventHandles;
std::vector<StdSchedulerProc*> eventProcs;
#endif
public:
int getProcCnt() const { return iProcCnt-1; } // ignore internal NoopNotifyProc
int getProc(StdSchedulerProc *pProc);
bool hasProc(StdSchedulerProc *pProc) { return getProc(pProc) >= 0; }
int getProcCnt() const { return procs.size()-1; } // ignore internal NoopNotifyProc
bool hasProc(StdSchedulerProc *pProc) { return std::find(procs.begin(), procs.end(), pProc) != procs.end(); }
bool IsInManualLoop() { return isInManualLoop; }
void Clear();
void Set(StdSchedulerProc **ppProcs, int iProcCnt);
void Add(StdSchedulerProc *pProc);
void Remove(StdSchedulerProc *pProc);
void Added(StdSchedulerProc *pProc);
void Removing(StdSchedulerProc *pProc);
void Changed(StdSchedulerProc *pProc);
bool ScheduleProcs(int iTimeout = -1);
virtual bool ScheduleProcs(int iTimeout = -1);
void UnBlock();
protected:
@ -231,8 +250,7 @@ protected:
virtual void OnError(StdSchedulerProc *) { }
private:
void Enlarge(int iBy);
bool DoScheduleProcs(int iTimeout);
};
// A simple process scheduler thread
@ -245,7 +263,7 @@ public:
private:
// thread control
bool fRunThreadRun, fWait;
bool fRunThreadRun;
bool fThread;
#ifdef HAVE_WINTHREAD

View File

@ -0,0 +1,246 @@
#include <C4Include.h>
#include <StdScheduler.h>
#ifdef USE_COCOA
#import <Cocoa/Cocoa.h>
using namespace std;
@class SCHAdditions;
@interface SCHAddition : NSObject
{
@protected
__weak SCHAdditions* schedulerAdditions;
StdSchedulerProc* proc;
}
- (id) initWithProc:(StdSchedulerProc*)_proc;
- (void) registerAt:(SCHAdditions*) _additions;
- (void) unregisterFrom:(SCHAdditions*) _additions;
- (bool) shouldExecuteProc;
- (void) changed;
@end
@interface SCHNotify : SCHAddition
{
list<CFRunLoopSourceRef> socketSources;
}
- (void) registerAt:(SCHAdditions*) _additions;
- (void) unregisterFrom:(SCHAdditions*) _additions;
@end
@interface SCHTimer : SCHAddition
{
@private
NSTimer* timer;
}
- (void) registerAt:(SCHAdditions*) _additions;
- (void) unregisterFrom:(SCHAdditions*) _additions;
@end
@interface SCHAdditions : NSObject
{
NSMutableDictionary* procAdditions;
}
- (id) initWithScheduler:(StdScheduler*) scheduler;
- (SCHAddition*) additionForProc:(StdSchedulerProc*) proc;
- (SCHAddition*) assignAdditionForProc:(StdSchedulerProc*) proc;
+ (SCHAdditions*) requestAdditionForScheduler:(StdScheduler*) scheduler;
- (BOOL) removeAdditionForProc:(StdSchedulerProc*) proc;
@property(readonly) NSRunLoop* runLoop;
@property(readonly) StdScheduler* scheduler;
@end
@implementation SCHAdditions
static NSMutableDictionary* additionsDictionary;
- (id) initWithScheduler:(StdScheduler*) scheduler
{
if (self = [super init])
{
_scheduler = scheduler;
_runLoop = [NSRunLoop currentRunLoop];
procAdditions = [NSMutableDictionary new];
return self;
} else
return nil;
}
- (SCHAddition*) additionForProc:(StdSchedulerProc*) proc
{
return [procAdditions objectForKey:[NSNumber valueWithPointer:proc]];
}
- (BOOL) removeAdditionForProc:(StdSchedulerProc*) proc
{
auto key = [NSNumber valueWithPointer:proc];
SCHAddition* x = [procAdditions objectForKey:key];
if (x)
{
[x unregisterFrom:self];
[procAdditions removeObjectForKey:key];
return YES;
}
else
return NO;
}
- (SCHAddition*) assignAdditionForProc:(StdSchedulerProc*) proc
{
auto timerInterval = proc->TimerInterval();
auto addition =
timerInterval ? [[SCHTimer alloc] initWithProc:proc] :
proc->IsNotify() ? [[SCHNotify alloc] initWithProc:proc] :
nullptr;
if (addition)
{
[procAdditions setObject:addition forKey:[NSNumber valueWithPointer:proc]];
return addition;
} else
return nullptr;
}
+ (SCHAdditions*) requestAdditionForScheduler:(StdScheduler *)scheduler
{
static dispatch_once_t onceToken;
dispatch_once(&onceToken,
^{ additionsDictionary = [NSMutableDictionary new]; });
auto key = [NSNumber valueWithPointer:scheduler];
@synchronized (additionsDictionary)
{
SCHAdditions* additions = [additionsDictionary objectForKey:key];
if (!additions)
{
additions = [[SCHAdditions alloc] initWithScheduler:scheduler];
[additionsDictionary setObject:additions forKey:key];
}
return additions;
}
}
@end
@implementation SCHAddition
- (id) initWithProc:(StdSchedulerProc *) _proc
{
if (self = [super init])
{
proc = _proc;
return self;
} else
return nil;
}
- (bool) shouldExecuteProc
{
auto s = schedulerAdditions;
return s && !s.scheduler->IsInManualLoop();
}
- (void) registerAt:(SCHAdditions*) _additions
{
schedulerAdditions = _additions;
}
- (void) unregisterFrom:(SCHAdditions*) _additions
{
schedulerAdditions = nil;
}
- (void) changed
{
auto s = schedulerAdditions;
[self unregisterFrom:s];
[self registerAt:s];
}
@end
@implementation SCHTimer
- (id) initWithProc:(StdSchedulerProc *) _proc
{
if (self = [super init])
{
proc = _proc;
return self;
} else
return nil;
}
- (void) run:(id) sender
{
auto i = timer;
if (i && [self shouldExecuteProc])
proc->Execute();
}
- (void) registerAt:(SCHAdditions*) _additions
{
[super registerAt:_additions];
auto loop = _additions.runLoop;
timer = [NSTimer timerWithTimeInterval:proc->TimerInterval()/1000.0 target:self selector:@selector(run:) userInfo:nil repeats:YES];
if ([timer respondsToSelector:@selector(setTolerance:)])
[timer setTolerance:0.0];
[loop addTimer:timer forMode:NSDefaultRunLoopMode];
}
- (void) unregisterFrom:(SCHAdditions*) _additions
{
[timer invalidate];
timer = nil;
[super unregisterFrom:_additions];
}
@end
@implementation SCHNotify
void callback (CFSocketRef s, CFSocketCallBackType type, CFDataRef address, const void *data, void *info)
{
auto notify = (__bridge SCHNotify*)info;
pollfd p = {.fd=CFSocketGetNative(s)};
if ([notify shouldExecuteProc])
notify->proc->Execute(-1, &p);
}
- (void) registerAt:(SCHAdditions*) _additions
{
[super registerAt:_additions];
vector<struct pollfd> vecs;
proc->GetFDs(vecs);
CFSocketContext ctx = {};
ctx.info = (__bridge void*)self;
for (auto p : vecs)
{
auto socket = CFSocketCreateWithNative(NULL,
p.fd, kCFSocketReadCallBack,
callback, &ctx
);
auto runLoopSource = CFSocketCreateRunLoopSource(NULL, socket, 0);
CFRunLoopAddSource([_additions.runLoop getCFRunLoop], runLoopSource, kCFRunLoopDefaultMode);
socketSources.push_back(runLoopSource);
}
}
- (void) unregisterFrom:(SCHAdditions*) _additions
{
for (auto r : socketSources)
{
CFRunLoopSourceInvalidate(r);
CFRelease(r);
}
socketSources.clear();
[super unregisterFrom:_additions];
}
@end
void StdScheduler::Added(StdSchedulerProc *pProc)
{
auto x = [SCHAdditions requestAdditionForScheduler:this];
auto addition = [x assignAdditionForProc:pProc];
if (addition)
[addition registerAt:x];
}
void StdScheduler::Removing(StdSchedulerProc *pProc)
{
auto x = [SCHAdditions requestAdditionForScheduler:this];
[x removeAdditionForProc:pProc];
}
void StdScheduler::Changed(StdSchedulerProc* pProc)
{
auto x = [SCHAdditions requestAdditionForScheduler:this];
auto addition = [x additionForProc:pProc];
if (addition)
[addition changed];
}
#endif

View File

@ -0,0 +1,247 @@
#include "C4Include.h"
#include "StdScheduler.h"
#ifdef HAVE_POLL_H
#include <stdio.h>
#include <assert.h>
#include <errno.h>
#include <fcntl.h>
#include <vector>
#ifdef HAVE_IO_H
#include <io.h>
#endif
#ifdef HAVE_SHARE_H
#include <share.h>
#endif
#ifdef HAVE_UNISTD_H
// For pipe()
#include <unistd.h>
#endif
#include <map>
// Is this process currently signaled?
bool StdSchedulerProc::IsSignaled()
{
// Initialize file descriptor sets
std::vector<struct pollfd> fds;
// Get file descriptors
GetFDs(fds);
// Test
return poll(&fds[0], fds.size(), 0) > 0;
}
namespace
{
void Fail(const char* msg)
{
Log(msg);
}
}
#ifdef HAVE_SYS_EVENTFD_H
#include <sys/eventfd.h>
CStdNotifyProc::CStdNotifyProc()
{
fds[0] = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (fds[0] == -1)
Fail("eventfd failed");
}
CStdNotifyProc::~CStdNotifyProc()
{
close(fds[0]);
}
void CStdNotifyProc::Notify()
{
uint64_t n = 1;
if (write(fds[0], &n, 8) == -1)
Fail("write failed");
}
bool CStdNotifyProc::CheckAndReset()
{
uint64_t n;
return (read(fds[0], &n, 8) != -1);
}
#else
CStdNotifyProc::CStdNotifyProc()
{
if (pipe(fds) == -1)
Fail("pipe failed");
fcntl(fds[0], F_SETFL, fcntl(fds[0], F_GETFL) | O_NONBLOCK);
fcntl(fds[0], F_SETFD, FD_CLOEXEC);
fcntl(fds[1], F_SETFD, FD_CLOEXEC);
}
CStdNotifyProc::~CStdNotifyProc()
{
close(fds[0]);
close(fds[1]);
}
void CStdNotifyProc::Notify()
{
char c = 42;
if (write(fds[1], &c, 1) == -1)
Fail("write failed");
}
bool CStdNotifyProc::CheckAndReset()
{
bool r = false;
while (1)
{
char c;
if (read(fds[0], &c, 1) <= 0)
break;
else
r = true;
}
return r;
}
#endif
void CStdNotifyProc::GetFDs(std::vector<struct pollfd> & checkfds)
{
pollfd pfd = { fds[0], POLLIN, 0 };
checkfds.push_back(pfd);
}
bool StdScheduler::DoScheduleProcs(int iTimeout)
{
// Initialize file descriptor sets
std::vector<struct pollfd> fds;
std::map<StdSchedulerProc *, std::pair<unsigned int, unsigned int> > fds_for_proc;
// Collect file descriptors
for (auto proc : procs)
{
unsigned int os = fds.size();
proc->GetFDs(fds);
if (os != fds.size())
fds_for_proc[proc] = std::pair<unsigned int, unsigned int>(os, fds.size());
}
// Wait for something to happen
int cnt = poll(&fds[0], fds.size(), iTimeout);
bool fSuccess = true;
if (cnt >= 0)
{
bool any_executed = false;
auto tNow = C4TimeMilliseconds::Now();
// Which process?
for (auto i = 0; i < procs.size(); i++)
{
auto proc = procs[i];
auto tProcTick = proc->GetNextTick(tNow);
if (tProcTick <= tNow)
{
struct pollfd * pfd = 0;
if (fds_for_proc.find(proc) != fds_for_proc.end())
pfd = &fds[fds_for_proc[proc].first];
if (!proc->Execute(0, pfd))
{
OnError(proc);
fSuccess = false;
}
any_executed = true;
continue;
}
// no fds?
if (fds_for_proc.find(proc) == fds_for_proc.end())
continue;
// Check intersection
unsigned int begin = fds_for_proc[proc].first;
unsigned int end = fds_for_proc[proc].second;
for (unsigned int j = begin; j < end; ++j)
{
if (fds[j].events & fds[j].revents)
{
if (any_executed && proc->IsLowPriority())
break;
if (!proc->Execute(0, &fds[begin]))
{
OnError(proc);
fSuccess = false;
}
any_executed = true;
// the list of procs might have been changed, but procs must be in both ppProcs and
// fds_for_proc to be executed, which prevents execution of any proc not polled this round
// or deleted. Some procs might be skipped or executed twice, but that should be save.
break;
}
}
}
}
else if (cnt < 0)
{
printf("StdScheduler::Execute: poll failed: %s\n",strerror(errno));
}
return fSuccess;
}
#if defined(HAVE_SYS_TIMERFD_H)
#include <sys/timerfd.h>
#include <unistd.h>
#include <fcntl.h>
CStdMultimediaTimerProc::CStdMultimediaTimerProc(uint32_t iDelay)
{
fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
if (fd == -1)
Log("timerfd_create failed");
SetDelay(iDelay);
}
CStdMultimediaTimerProc::~CStdMultimediaTimerProc()
{
close(fd);
}
void CStdMultimediaTimerProc::SetDelay(uint32_t inDelay)
{
struct itimerspec nv, ov;
nv.it_interval.tv_sec = inDelay / 1000;
nv.it_interval.tv_nsec = (inDelay % 1000) * 1000000;
nv.it_value = nv.it_interval;
timerfd_settime(fd, 0, &nv, &ov);
}
void CStdMultimediaTimerProc::Set()
{
struct itimerspec nv, ov;
timerfd_gettime(fd, &nv);
nv.it_value.tv_sec = 0;
nv.it_value.tv_nsec = 1;
timerfd_settime(fd, 0, &nv, &ov);
}
bool CStdMultimediaTimerProc::CheckAndReset()
{
uint64_t n;
return read(fd, &n, 8) != -1;
}
void CStdMultimediaTimerProc::GetFDs(std::vector<struct pollfd> & checkfds)
{
pollfd pfd = { fd, POLLIN, 0 };
checkfds.push_back(pfd);
}
#endif
#if !defined(USE_COCOA)
void StdScheduler::Added(StdSchedulerProc *pProc) {}
void StdScheduler::Removing(StdSchedulerProc *pProc) {}
void StdScheduler::Changed(StdSchedulerProc* pProc) {}
#endif
void *StdThread::_ThreadFunc(void *pPar)
{
StdThread *pThread = reinterpret_cast<StdThread *>(pPar);
return reinterpret_cast<void *>(pThread->ThreadFunc());
}
#endif

View File

@ -0,0 +1,173 @@
// Events are Windows-specific
#include <C4Include.h>
#include <StdScheduler.h>
#ifdef USE_WIN32_WINDOWS
#include <mmsystem.h>
#include <process.h>
bool StdSchedulerProc::IsSignaled()
{
return GetEvent() && WaitForSingleObject(GetEvent(), 0) == WAIT_OBJECT_0;
}
CStdNotifyProc::CStdNotifyProc() : Event(true) {}
void CStdNotifyProc::Notify() { Event.Set(); }
bool CStdNotifyProc::CheckAndReset()
{
if (!Event.WaitFor(0)) return false;
Event.Reset();
return true;
}
bool StdScheduler::DoScheduleProcs(int iTimeout)
{
int i;
// Collect event handles
int iEventCnt = 0; HANDLE hEvent;
StdSchedulerProc *pMessageProc = NULL;
for (i = 0; i < procs.size(); i++)
{
auto proc = procs[i];
if ( (hEvent = procs[i]->GetEvent()) )
{
if (hEvent == STDSCHEDULER_EVENT_MESSAGE)
pMessageProc = procs[i];
else
{
eventHandles[iEventCnt] = hEvent;
eventProcs[iEventCnt] = procs[i];
iEventCnt++;
}
}
}
// Wait for something to happen
DWORD ret; DWORD dwMsec = iTimeout < 0 ? INFINITE : iTimeout;
if (pMessageProc)
ret = MsgWaitForMultipleObjects(iEventCnt, eventHandles.data(), false, dwMsec, QS_ALLINPUT);
else
ret = WaitForMultipleObjects(iEventCnt, eventHandles.data(), false, dwMsec);
bool fSuccess = true;
// Event?
if (ret != WAIT_TIMEOUT)
{
// Which event?
int iEventNr = ret - WAIT_OBJECT_0;
// Execute the signaled process
StdSchedulerProc *pProc = iEventNr < iEventCnt ? eventProcs[iEventNr] : pMessageProc;
if (!pProc->Execute(0))
{
OnError(pProc);
fSuccess = false;
}
}
// Execute all processes with timeout
auto tNow = C4TimeMilliseconds::Now();
for (auto proc : procs)
{
auto tProcTick = proc->GetNextTick(tNow);
if (tProcTick <= tNow)
if (!proc->Execute(0))
{
OnError(proc);
fSuccess = false;
}
}
return fSuccess;
}
/* CStdMultimediaTimerProc */
int CStdMultimediaTimerProc::iTimePeriod = 0;
CStdMultimediaTimerProc::CStdMultimediaTimerProc(uint32_t iDelay) :
uCriticalTimerDelay(28),
idCriticalTimer(0),
uCriticalTimerResolution(5),
Event(true)
{
if (!iTimePeriod)
{
// Get resolution caps
TIMECAPS tc;
timeGetDevCaps(&tc, sizeof(tc));
// Establish minimum resolution
uCriticalTimerResolution = BoundBy(uCriticalTimerResolution, tc.wPeriodMin, tc.wPeriodMax);
timeBeginPeriod(uCriticalTimerResolution);
}
iTimePeriod++;
SetDelay(iDelay);
}
CStdMultimediaTimerProc::~CStdMultimediaTimerProc()
{
if (idCriticalTimer)
{
timeKillEvent(idCriticalTimer);
idCriticalTimer = 0;
iTimePeriod--;
if (!iTimePeriod)
timeEndPeriod(uCriticalTimerResolution);
}
}
void CStdMultimediaTimerProc::SetDelay(uint32_t iDelay)
{
// Kill old timer (of any)
if (idCriticalTimer)
timeKillEvent(idCriticalTimer);
// Set new delay
uCriticalTimerDelay = iDelay;
// Set critical timer
idCriticalTimer=timeSetEvent(
uCriticalTimerDelay,uCriticalTimerResolution,
(LPTIMECALLBACK) Event.GetEvent(),0,TIME_PERIODIC | TIME_CALLBACK_EVENT_SET);
if(idCriticalTimer == 0)
DebugLogF("Creating Critical Timer failed: %d", GetLastError());
}
void StdScheduler::Added(StdSchedulerProc *pProc)
{
if (procs.size() > eventProcs.size())
{
eventProcs.resize(procs.size());
eventHandles.resize(procs.size()+1);
}
}
void StdScheduler::Removing(StdSchedulerProc *pProc)
{
}
void StdScheduler::Changed(StdSchedulerProc* pProc)
{
}
bool CStdMultimediaTimerProc::CheckAndReset()
{
if (!Check()) return false;
Event.Reset();
return true;
}
void __cdecl StdThread::_ThreadFunc(void *pPar)
{
StdThread *pThread = reinterpret_cast<StdThread *>(pPar);
_endthreadex(pThread->ThreadFunc());
}
#endif