19#include <isode/base/logging.h>
21#include "AsyncEventRef.h"
27 pthread_mutex_t _mutex;
30 const timespec *_duetime;
40 inline void _deQueue (AsyncEvent *event) {
41 AsyncEventRef *eventref =
event->Reference();
47 Receiver *rcvr =
event->GetReceiver();
52 rcvr->Extract (event);
62 static Manager *Create ();
65 virtual bool IsThreaded () {
return false; }
71 bool Poll (
const timespec *when,
73 pthread_mutex_t *mutex,
74 pthread_cond_t *cond);
77 virtual void Terminate(
unsigned int millisecs);
80 virtual void Blocking() {}
83 virtual void Unblocked() {}
87 virtual bool LongLived() {
return false; }
92 virtual void SetThreads (
int ,
int ) {}
95 virtual int GetActiveLimit () {
return 0; }
99 virtual void Queue (AsyncEvent *event,
101 bool nostart =
false);
104 virtual void QueueAt (AsyncEvent *event,
const struct timespec *abstime);
107 virtual void QueueAt (AsyncEvent *event,
unsigned int millisec);
110 virtual void Dequeue (AsyncEvent *event);
113 virtual void QueueAtExit (AsyncEvent *event);
118 virtual const struct timespec *NextEventDue () {
return _duetime; }
123 PollManager::PollManager () : _busy(false)
125 pthread_mutex_init (&_mutex, NULL);
126 pthread_cond_init (&_cond, NULL);
132 LOG_DEBUG ((
"PollManager created"));
135 Manager *PollManager::Create () {
136 if ( threadinstance != NULL )
137 return threadinstance;
139 threadinstance =
new PollManager ();
141 return threadinstance;
144 bool PollManager::Poll (
const timespec *when,
146 pthread_mutex_t *clientmutex,
149 if ( pthread_mutex_lock (&_mutex) != 0 )
154 if ( clientmutex ) pthread_mutex_unlock (clientmutex);
158 if ( _busy && !pthread_equal(_thread, pthread_self()) ) {
162 pthread_cond_timedwait (&_cond, &_mutex, when);
164 pthread_cond_wait (&_cond, &_mutex);
170 _thread = pthread_self();
175 if ( _timerQ->HasEvents () ) {
180 while ( (tevent = _timerQ->First (now)) != 0 ) {
181 LOG_DEBUG ((
"timerQ event %p due", tevent));
182 _mainQ->Insert (tevent, 0);
188 const struct timespec *nextdue = _timerQ->NextPrio();
198 LOG_DEBUG ((
"PollManager: events=%d",_mainQ->HasEvents()));
200 if ( _mainQ->HasEvents() ) {
202 AsyncEvent *
event = _mainQ->First();
204 Receiver *rcvr =
event ?
event->GetReceiver() : 0;
213 LOG_DEBUG ((
"Manager: queue %p", event));
216 pthread_mutex_unlock (&_mutex);
222 pthread_mutex_unlock (&_mutex);
224 LOG_DEBUG ((
"Manager: deliver %p", event));
229 if ( pthread_mutex_lock(&_mutex) != 0 ) {
233 }
else if ( _duetime != 0 ) {
235 (void) pthread_cond_timedwait (&_cond, &_mutex, _duetime);
241 bool retval = ( _mainQ->HasEvents() &&
242 *(_mainQ->NextPrio()) < (
unsigned) MAXPRIO );
245 pthread_cond_broadcast (&_cond);
247 pthread_mutex_unlock (&_mutex);
249 if ( clientmutex ) pthread_mutex_lock (clientmutex);
254 void PollManager::Queue (AsyncEvent *event,
258 LOG_DEBUG ((
"Queue: %p prio=%u", event, priority));
260 if ( pthread_mutex_lock (&_mutex) != 0 )
265 _mainQ->Insert (event, priority);
267 pthread_mutex_unlock (&_mutex);
271 void PollManager::QueueAtExit (AsyncEvent *event)
273 LOG_DEBUG ((
"QueueAtExit: %p", event));
275 if ( pthread_mutex_lock (&_mutex) != 0 )
282 pthread_mutex_unlock (&_mutex);
285 void PollManager::QueueAt (AsyncEvent *event,
const struct timespec *duetime)
287 LOG_DEBUG ((
"Queue: %p @ %d.%9.9ld",
288 event, (
int)duetime->tv_sec, duetime->tv_nsec));
290 if ( pthread_mutex_lock (&_mutex) != 0 )
295 _timerQ->Insert (event, *duetime);
297 pthread_mutex_unlock (&_mutex);
300 void PollManager::QueueAt (AsyncEvent *event,
unsigned millisecs)
302 LOG_DEBUG ((
"Queue: %p in %dms", event, millisecs));
310 QueueAt (event, &now);
313 void PollManager::Dequeue (AsyncEvent *event)
315 if ( pthread_mutex_lock (&_mutex) != 0 )
320 pthread_mutex_unlock (&_mutex);
323 void PollManager::Terminate (
unsigned int millisecs)
325 LOG_DEBUG ((
"Manager: Terminate in %dms", millisecs));
333 if ( pthread_mutex_lock (&_mutex) != 0 ) {
340 while ( (event = _mainQ->First()) != 0 )
343 while ( (event = _timerQ->First()) != 0 )
348 while ( (event = _atExitQ->
Last()) != 0 ) {
350 pthread_mutex_unlock (&_mutex);
354 if ( pthread_mutex_lock (&_mutex) != 0 ) {
359 pthread_mutex_unlock (&_mutex);
362 Manager *CreatePollManager ()
364 return PollManager::Create ();
A simple FIFO queue for events.
void Insert(AsyncEvent *event)
Insert in queue.
AsyncEvent * Last()
Remove last element of queue.
Event manager used in polling fashion.
bool timespec_cmp(const struct timespec *time1, const struct timespec *time2)
Function for comparing two times.
void addMillisecs(struct timespec &now, unsigned millisecs)
Add millisecs to a time.
void getNow(struct timespec &now)
Return the current time.