PollManager.C
1// -*- C++ -*-
2
3// Copyright (c) 2006-2010, Isode Limited, London, England.
4// All rights reserved.
5//
6// Acquisition and use of this software and related materials for any
7// purpose requires a written licence agreement from Isode Limited,
8// or a written licence from an organisation licenced by Isode Limited
9// to grant such a licence.
10
11//
12//
13// PollManager.C
14//
15// Event Service Manager used in a polling fashion
16//
17// @VERSION@
18
19#include <isode/base/logging.h>
20
21#include "AsyncEventRef.h"
22
23namespace Event {
25 class PollManager : public Manager {
26 private:
27 pthread_mutex_t _mutex;
28 pthread_cond_t _cond;
29
30 const timespec *_duetime;
31
33 AsyncEventTimerQueue *_timerQ;
34 AsyncEventPlainQueue *_atExitQ;
35
36 pthread_t _thread;
37 bool _busy;
38
39 // Internal dequeue of event
40 inline void _deQueue (AsyncEvent *event) {
41 AsyncEventRef *eventref = event->Reference();
42
43 if ( eventref == 0 )
44 // Not queued
45 return;
46
47 Receiver *rcvr = event->GetReceiver();
48
49 if ( rcvr != 0 ) {
50 // Might be on the receiver's internal queue
51 // So need to dequeue with that locked
52 rcvr->Extract (event);
53 } else {
54 eventref->Extract ();
55 }
56 }
57
58 public:
59 PollManager ();
60
61 // Create a instance
62 static Manager *Create ();
63
64 // Is threading
65 virtual bool IsThreaded () { return false; }
66
67 // Run a thread - does nothing
68 void Run (bool ) {}
69
70 // Poll the event manager
71 bool Poll (const timespec *when,
72 bool immediate,
73 pthread_mutex_t *mutex,
74 pthread_cond_t *cond);
75
76 // Stop the event service.
77 virtual void Terminate(unsigned int millisecs);
78
79 // Signal that the thread is about to block for some time
80 virtual void Blocking() {}
81
82 // Signal that the thread is no longer blocking
83 virtual void Unblocked() {}
84
85 // Say a long-lived event
86 // Returns true if terminating
87 virtual bool LongLived() { return false; }
88
89 // Set the thread limits
90 // activethreads - number of threads 'active' at once
91 // maxthreads - limit on the total number of threads
92 virtual void SetThreads (int , int ) {}
93
94 // Get the allowed active thread limit
95 virtual int GetActiveLimit () { return 0; }
96
97 // Queue an event for immediate execution with priority
98 // Lower numerical values indicates higher priority
99 virtual void Queue (AsyncEvent *event,
100 unsigned priority,
101 bool nostart = false);
102
103 // Queue an event for execution at a specific time in the future
104 virtual void QueueAt (AsyncEvent *event, const struct timespec *abstime);
105
106 // Queue an event for execution after some interval
107 virtual void QueueAt (AsyncEvent *event, unsigned int millisec);
108
109 // Remove an event from its queue via its reference
110 virtual void Dequeue (AsyncEvent *event);
111
112 // Queue an event to be triggered when terminating
113 virtual void QueueAtExit (AsyncEvent *event);
114
115 // Return the time at which the next timed event is due or NULL
116 // In a multithreaded environment this is not used as
117 // We use a thread for the timer, rather than poll() or whatever.
118 virtual const struct timespec *NextEventDue () { return _duetime; }
119 };
120
121 static PollManager *threadinstance;
122
123 PollManager::PollManager () : _busy(false)
124 {
125 pthread_mutex_init (&_mutex, NULL);
126 pthread_cond_init (&_cond, NULL);
127
128 _mainQ = new AsyncEventPriorityQueue;
129 _timerQ = new AsyncEventTimerQueue;
130 _atExitQ = new AsyncEventPlainQueue;
131
132 LOG_DEBUG (("PollManager created"));
133 }
134
135 Manager *PollManager::Create () {
136 if ( threadinstance != NULL )
137 return threadinstance;
138
139 threadinstance = new PollManager ();
140
141 return threadinstance;
142 }
143
144 bool PollManager::Poll (const timespec *when,
145 bool immediate,
146 pthread_mutex_t *clientmutex,
147 pthread_cond_t *)
148 {
149 if ( pthread_mutex_lock (&_mutex) != 0 )
150 return false;
151
152 // Unlock the client mutex for the duration of this call
153 // So that stuff can be delivered to it via events
154 if ( clientmutex ) pthread_mutex_unlock (clientmutex);
155
156 // If the current thread is the busy thread, then recursing,
157 // which is allowed.
158 if ( _busy && !pthread_equal(_thread, pthread_self()) ) {
159 // Someone else is executing the manager
160 if ( !immediate ) {
161 if ( when != 0 )
162 pthread_cond_timedwait (&_cond, &_mutex, when);
163 else
164 pthread_cond_wait (&_cond, &_mutex);
165 }
166
167 } else {
168
169 _busy = true;
170 _thread = pthread_self();
171
172 struct timespec now;
173 getNow (now);
174
175 if ( _timerQ->HasEvents () ) {
176
177 AsyncEvent *tevent;
178
179 // Get any due events onto the main event queue
180 while ( (tevent = _timerQ->First (now)) != 0 ) {
181 LOG_DEBUG (("timerQ event %p due", tevent));
182 _mainQ->Insert (tevent, 0);
183 }
184 }
185
186 _duetime = 0;
187
188 const struct timespec *nextdue = _timerQ->NextPrio();
189
190 if ( nextdue != 0 )
191 _duetime = nextdue;
192
193 if ( when != 0 ) {
194 if ( _duetime == 0 || timespec_cmp (when, _duetime) )
195 _duetime = when;
196 }
197
198 LOG_DEBUG (("PollManager: events=%d",_mainQ->HasEvents()));
199
200 if ( _mainQ->HasEvents() ) {
201
202 AsyncEvent *event = _mainQ->First();
203
204 Receiver *rcvr = event ? event->GetReceiver() : 0;
205
206 // Client mutex is unlocked while events are delivered
207 if ( rcvr != 0 ) {
208 // Queue event to receiver object prior to processing
209 // This avoids the race condition where the event
210 // object is being handled by one thread, and so cannot
211 // be cancelled, but another thread wants to delete it.
212
213 LOG_DEBUG (("Manager: queue %p", event));
214 rcvr->Queue (event);
215
216 pthread_mutex_unlock (&_mutex);
217
218 rcvr->Process ();
219
220 } else {
221
222 pthread_mutex_unlock (&_mutex);
223
224 LOG_DEBUG (("Manager: deliver %p", event));
225
226 event->Deliver();
227 }
228
229 if ( pthread_mutex_lock(&_mutex) != 0 ) {
230 return false;
231 }
232
233 } else if ( _duetime != 0 ) {
234 // Nothing to process until duetime, so wait
235 (void) pthread_cond_timedwait (&_cond, &_mutex, _duetime);
236 }
237
238 _busy = false;
239 }
240
241 bool retval = ( _mainQ->HasEvents() &&
242 *(_mainQ->NextPrio()) < (unsigned) MAXPRIO );
243
244 // Tell any other threads that something has happened
245 pthread_cond_broadcast (&_cond);
246
247 pthread_mutex_unlock (&_mutex);
248
249 if ( clientmutex ) pthread_mutex_lock (clientmutex);
250
251 return retval;
252 }
253
254 void PollManager::Queue (AsyncEvent *event,
255 unsigned priority,
256 bool)
257 {
258 LOG_DEBUG (("Queue: %p prio=%u", event, priority));
259
260 if ( pthread_mutex_lock (&_mutex) != 0 )
261 return;
262
263 _deQueue (event);
264
265 _mainQ->Insert (event, priority);
266
267 pthread_mutex_unlock (&_mutex);
268
269 }
270
271 void PollManager::QueueAtExit (AsyncEvent *event)
272 {
273 LOG_DEBUG (("QueueAtExit: %p", event));
274
275 if ( pthread_mutex_lock (&_mutex) != 0 )
276 return;
277
278 _deQueue (event);
279
280 _atExitQ->Insert (event);
281
282 pthread_mutex_unlock (&_mutex);
283 }
284
285 void PollManager::QueueAt (AsyncEvent *event, const struct timespec *duetime)
286 {
287 LOG_DEBUG (("Queue: %p @ %d.%9.9ld",
288 event, (int)duetime->tv_sec, duetime->tv_nsec));
289
290 if ( pthread_mutex_lock (&_mutex) != 0 )
291 return;
292
293 _deQueue (event);
294
295 _timerQ->Insert (event, *duetime);
296
297 pthread_mutex_unlock (&_mutex);
298 }
299
300 void PollManager::QueueAt (AsyncEvent *event, unsigned millisecs)
301 {
302 LOG_DEBUG (("Queue: %p in %dms", event, millisecs));
303
304 struct timespec now;
305
306 getNow (now);
307
308 addMillisecs (now, millisecs);
309
310 QueueAt (event, &now);
311 }
312
313 void PollManager::Dequeue (AsyncEvent *event)
314 {
315 if ( pthread_mutex_lock (&_mutex) != 0 )
316 return;
317
318 _deQueue (event);
319
320 pthread_mutex_unlock (&_mutex);
321 }
322
323 void PollManager::Terminate (unsigned int millisecs)
324 {
325 LOG_DEBUG (("Manager: Terminate in %dms", millisecs));
326
327 struct timespec now;
328
329 getNow (now);
330
331 addMillisecs (now, millisecs);
332
333 if ( pthread_mutex_lock (&_mutex) != 0 ) {
334 pthread_exit(0);
335 }
336
337 // Cancel all queued events
338 AsyncEvent *event;
339
340 while ( (event = _mainQ->First()) != 0 )
341 _deQueue (event);
342
343 while ( (event = _timerQ->First()) != 0 )
344 _deQueue (event);
345
346 // Deliver all at exit events
347 // in the reverse order to their being added
348 while ( (event = _atExitQ->Last()) != 0 ) {
349 // Need to unlock during delivery
350 pthread_mutex_unlock (&_mutex);
351
352 event->Deliver ();
353
354 if ( pthread_mutex_lock (&_mutex) != 0 ) {
355 pthread_exit(0);
356 }
357 }
358
359 pthread_mutex_unlock (&_mutex);
360 }
361
362 Manager *CreatePollManager ()
363 {
364 return PollManager::Create ();
365 }
366}
A simple FIFO queue for events.
void Insert(AsyncEvent *event)
Insert in queue.
AsyncEvent * Last()
Remove last element of queue.
Priority queue template.
Event manager used in polling fashion.
Definition PollManager.C:25
bool timespec_cmp(const struct timespec *time1, const struct timespec *time2)
Function for comparing two times.
Definition timeutil.h:27
void addMillisecs(struct timespec &now, unsigned millisecs)
Add millisecs to a time.
Definition timeutil.h:70
void getNow(struct timespec &now)
Return the current time.
Definition timeutil.h:60

All rights reserved © 2002 - 2024 Isode Ltd.