ThreadManager.C
1// -*- C++ -*-
2
3// Copyright (c) 2005-2010, Isode Limited, London, England.
4// All rights reserved.
5//
6// Acquisition and use of this software and related materials for any
7// purpose requires a written licence agreement from Isode Limited,
8// or a written licence from an organisation licenced by Isode Limited
9// to grant such a licence.
10
11//
12//
13// Manager.C
14//
15// Event Service Manager
16//
17// @VERSION@
18
19#include <errno.h>
20
21#include <isode/base/logging.h>
22#include <isode/base/util.h>
23#include <isode/compat/tailor.h>
24
25#include "AsyncEventRef.h"
26
27extern "C" {
28 static void * Event_fnx (void *context ARGNOTUSED);
29 static void * Timer_fnx (void *context ARGNOTUSED);
30}
31
32namespace Event {
33 AsyncEventPlainQueue *AsyncEventPlainQueue::_freelist;
34
36 class ThreadManager : public Manager {
37 private:
38 pthread_mutex_t _mutex;
39 pthread_cond_t _waitQ;
40 pthread_cond_t _timeQ;
41 pthread_cond_t _termwait;
42
43 pthread_attr_t _threadattrs;
44 pthread_key_t _llkey;
45
46 timespec _duetime;
47 bool _running;
48 bool _terminating;
49 volatile int _nthreads;
50 volatile int _nactive;
51 volatile int _nblocked;
52 volatile int _ntimer;
53 volatile int _nwait;
54
55 int _activelimit;
56 int _threadlimit;
57
59 AsyncEventTimerQueue *_timerQ;
60 AsyncEventPlainQueue *_atExitQ;
61
62 // Wake or create a thread
63 inline void startThread ();
64
65 // Internal dequeue of event
66 inline void _deQueue (AsyncEvent *event) {
67 AsyncEventRef *eventref = event->Reference();
68
69 if ( eventref == 0 )
70 // Not queued
71 return;
72
73 Receiver *rcvr = event->GetReceiver();
74
75 if ( rcvr != 0 ) {
76 // Might be on the receiver's internal queue
77 // So need to dequeue with that locked
78 rcvr->Extract (event);
79 } else {
80 eventref->Extract ();
81 }
82 }
83
84 // Deliver an event
85 // This unlocks the mutex
86 inline void eventDeliver (AsyncEvent *event) {
87 Receiver *rcvr = event->GetReceiver();
88
89 if ( rcvr != 0 ) {
90 // Queue event to receiver object prior to processing
91 // This avoids the race condition where the event
92 // object is being handled by one thread, and so cannot
93 // be cancelled, but another thread wants to delete it.
94
95 LOG_DEBUG (("eventDeliver: queue %p", event));
96 rcvr->Queue (event);
97
98 pthread_mutex_unlock (&_mutex);
99
100 rcvr->Process ();
101
102 } else {
103
104 pthread_mutex_unlock (&_mutex);
105 LOG_DEBUG (("eventDeliver: deliver %p", event));
106 event->Deliver();
107 }
108 }
109
110 public:
111 ThreadManager ();
112
113 // Create a instance
114 static Manager *Create ();
115
116 // Run an event manager thread
117 void Manager ();
118
119 // Run a thread handing timed events
120 void Timer ();
121
122 // Is threading
123 virtual bool IsThreaded () { return true; }
124
125 // Start running the event service
126 // If isthread is true, the event service use the current thread as
127 // one of its threads.
128 // Unless external threads inject events after this point,
129 // this will never stop.
130 virtual void Run (bool isthread);
131
132 // Polling
133 virtual bool Poll (const timespec *when,
134 bool immediate,
135 pthread_mutex_t *mutex,
136 pthread_cond_t *cond) {
137 if ( immediate || mutex == 0 ) return false;
138
139 Blocking();
140
141 if ( when )
142 pthread_cond_timedwait (cond, mutex, when);
143 else
144 pthread_cond_wait (cond, mutex);
145
146 Unblocked();
147
148 return false;
149 }
150
151 // Stop the event service.
152 virtual void Terminate(unsigned int millisecs);
153
154 // Signal that the thread is about to block for some time
155 virtual void Blocking();
156
157 // Signal that the thread is no longer blocking
158 virtual void Unblocked();
159
160 // Say a long-lived event
161 // Returns true if terminating
162 virtual bool LongLived();
163
164 // Set the thread limits
165 // activethreads - number of threads 'active' at once
166 // maxthreads - limit on the total number of threads
167 virtual void SetThreads (int activethreads, int maxthreads);
168
169 // Get the allowed active thread limit
170 virtual int GetActiveLimit () { return _activelimit; }
171
172 // Queue an event for immediate execution with priority
173 // Lower numerical values indicates higher priority
174 virtual void Queue (AsyncEvent *event,
175 unsigned priority,
176 bool nostart = false);
177
178 // Queue an event for execution at a specific time in the future
179 virtual void QueueAt (AsyncEvent *event, const struct timespec *abstime);
180
181 // Queue an event for execution after some interval
182 virtual void QueueAt (AsyncEvent *event, unsigned int millisec);
183
184 // Remove an event from its queue via its reference
185 virtual void Dequeue (AsyncEvent *event);
186
187 // Queue an event to be triggered when terminating
188 virtual void QueueAtExit (AsyncEvent *event);
189
190 // Return the time at which the next timed event is due or NULL
191 // In a multithreaded environment this is not used as
192 // We use a thread for the timer, rather than poll() or whatever.
193 virtual const struct timespec *NextEventDue () { return 0; }
194 };
195
196 static ThreadManager *threadinstance;
197
198 ThreadManager::ThreadManager ()
199 {
200 pthread_mutex_init (&_mutex, NULL);
201 pthread_cond_init (&_waitQ, NULL);
202 pthread_cond_init (&_timeQ, NULL);
203 pthread_cond_init (&_termwait, NULL);
204
205 pthread_key_create (&_llkey, NULL);
206
207 pthread_attr_init (&_threadattrs);
208
209 const size_t TWOMEG = (2*1024*1024);
210 size_t default_stacksize = 0;
211
212 if ( pthread_attr_getstacksize (&_threadattrs,
213 &default_stacksize) != 0 )
214 default_stacksize = TWOMEG;
215
216 if ( thread_stacksize != 0 ) {
217 default_stacksize = thread_stacksize;
218 } else if ( default_stacksize < TWOMEG ) {
219 default_stacksize = TWOMEG;
220 }
221
222 if ( default_stacksize < PTHREAD_STACK_MIN )
223 default_stacksize = PTHREAD_STACK_MIN;
224
225 pthread_attr_setstacksize (&_threadattrs, default_stacksize);
226
227 pthread_attr_setdetachstate (&_threadattrs, PTHREAD_CREATE_DETACHED);
228
229 _running = false;
230 _terminating = false;
231
232 _nthreads = 0;
233 _nactive = 0;
234 _nblocked = 0;
235 _ntimer = 0;
236 _nwait = 0;
237
238 _duetime.tv_sec = 0;
239 _duetime.tv_nsec = 0;
240
241 long config_cpu, online_cpu;
242
243 isode_cpucount (&config_cpu, &online_cpu, 0, 0);
244
245 if ( num_threads != 0 )
246 online_cpu = num_threads;
247
248 _activelimit = online_cpu > 2 ? online_cpu : 2;
249 _threadlimit = _activelimit > 10 ? 2*_activelimit : 20;
250
251 if ( _threadlimit < thread_limit )
252 _threadlimit = thread_limit;
253
254 _mainQ = new AsyncEventPriorityQueue;
255 _timerQ = new AsyncEventTimerQueue;
256 _atExitQ = new AsyncEventPlainQueue;
257
258 LOG_DEBUG (("ThreadManager created"));
259 }
260
261 Manager *ThreadManager::Create () {
262 if ( threadinstance != NULL )
263 return threadinstance;
264
265 threadinstance = new ThreadManager ();
266
267 return threadinstance;
268 }
269
270 // The conditions under which a thread can be started are
271 // closely related to the conditions under which the main Manager loop
272 // will do something other than queue the thread on the default _waitQ
273 // Note that we will be making the started thread 'active',
274 // So this needs to be allowed for with in the use of _nactive
275
276 inline void ThreadManager::startThread()
277 {
278 LOG_DEBUG (("startThread: active=%d limit=%d events=%d",
279 _nactive, _activelimit, (int) _mainQ->HasEvents()));
280
281 // If started thread will exceed active limit or nothing to do
282 if ( !_running || _nactive >= _activelimit ||
283 !_mainQ->HasEvents() )
284 return;
285
286 LOG_DEBUG (("startThread: timer=%d nwait=%d threads=%d limit=%d",
287 _ntimer, _nwait, _nthreads, _threadlimit));
288
289 if ( _nwait > 0 ) {
290 // Wake one thread.
291 pthread_cond_signal (&_waitQ);
292
293 } else if ( _nthreads < _threadlimit ) {
294 // No threads to do the work; create another
295 pthread_t tid;
296
297 // What should we do with errors here?
298 if ( pthread_create (&tid, &_threadattrs, Event_fnx, 0) == 0 ) {
299 // Counters are incremented here as it may be while
300 // until the actual thread gets a go.
301 // Threads are created 'active' as they can be active
302 // as soon as they start up
303 _nactive++;
304 _nthreads++;
305
306 }
307 }
308 }
309
310 void ThreadManager::Run (bool isthread)
311 {
312 if ( pthread_mutex_lock (&_mutex) != 0 ) {
313 return;
314 }
315
316 // Start Manager
317 _running = true;
318
319 if ( isthread ) {
320 // This thread becomes one of the threads
321 _nthreads++;
322 _nactive++;
323
324 } else {
325 // May need to start a thread
326 startThread ();
327 }
328
329 pthread_mutex_unlock (&_mutex);
330
331 if ( isthread )
332 Manager ();
333 }
334
335 void ThreadManager::Manager ()
336 {
337 if ( pthread_mutex_lock (&_mutex) != 0 ) {
338 _nactive--;
339 _nthreads--;
340 return;
341 }
342
343 while ( !_terminating && _nthreads <= _threadlimit ) {
344
345 LOG_DEBUG (("Manager: active=(%d:%d) events=%d",
346 _nactive, _activelimit, _mainQ->HasEvents()));
347
348 if ( _nactive <= _activelimit && _mainQ->HasEvents() ) {
349
350 AsyncEvent *event = _mainQ->First();
351
352 // Unlocks mutex
353 eventDeliver (event);
354
355 if ( pthread_mutex_lock(&_mutex) != 0 ) {
356 _nactive--;
357 _nthreads--;
358 return;
359 }
360
361 // Check to see if thread was executing long lived event
362 if ( pthread_getspecific (_llkey) != NULL ) {
363 LOG_DEBUG (("Manager: %p longlived (%d)",
364 event, _activelimit));
365
366 pthread_setspecific (_llkey, NULL);
367 if ( _activelimit > 1 ) _activelimit--;
368 }
369
370 } else {
371
372 // Wait for something to do
373
374 _nactive--;
375 _nwait++;
376
377 LOG_DEBUG (("Manager: wait on queue"));
378
379 int ret = pthread_cond_wait (&_waitQ, &_mutex);
380
381 _nwait--;
382 _nactive++;
383
384 if ( ret != 0 )
385 break;
386 }
387 }
388
389 _nthreads--;
390 _nactive--;
391
392 if ( _terminating )
393 pthread_cond_signal (&_termwait);
394
395 pthread_mutex_unlock (&_mutex);
396
397 LOG_DEBUG (("Manager: thread exiting (n=%d, active=%d)",
398 _nthreads, _nactive));
399
400 }
401
402 void ThreadManager::SetThreads (int activethreads, int maxthreads)
403 {
404 LOG_DEBUG (("SetThreads: %d %d",activethreads, maxthreads));
405
406 if ( pthread_mutex_lock (&_mutex) != 0 )
407 return;
408
409 if ( activethreads > 1 )
410 _activelimit = activethreads;
411
412 if ( maxthreads > _activelimit ) {
413 _threadlimit = maxthreads;
414 } else if ( maxthreads == 0 ) {
415 if ( _activelimit > 10 )
416 _threadlimit = 2*_activelimit;
417 else
418 _threadlimit = 20;
419
420 if ( _threadlimit < thread_limit )
421 _threadlimit = thread_limit;
422 }
423
424 startThread ();
425
426 pthread_mutex_unlock (&_mutex);
427 }
428
429 void ThreadManager::Timer()
430 {
431 LOG_DEBUG (("Timer() starting"));
432
433 if ( pthread_mutex_lock (&_mutex) != 0 ) {
434 _ntimer--;
435 _nthreads--;
436 return;
437 }
438
439 while ( !_terminating ) {
440 struct timespec now;
441 getNow (now);
442
443 LOG_DEBUG (("Timer: timer events: %d", _timerQ->HasEvents ()));
444 if ( _timerQ->HasEvents () ) {
445
446 AsyncEvent *tevent;
447
448 while ( (tevent = _timerQ->First (now)) != 0 ) {
449 LOG_DEBUG (("timerQ event %p due", tevent));
450 _mainQ->Insert (tevent, 0);
451 if ( _running && _nactive < _activelimit )
452 startThread();
453 }
454 }
455
456 if ( _timerQ->HasEvents () ) {
457 _duetime = *_timerQ->NextPrio();
458
459 } else {
460 // Wake up in 10 secs
461 now.tv_sec += 10;
462 _duetime = now;
463 }
464
465 LOG_DEBUG (("Timer: waiting until %ld.%ld",
466 (long)_duetime.tv_sec, (long)_duetime.tv_nsec));
467
468 int ret = pthread_cond_timedwait (&_timeQ, &_mutex, &_duetime);
469
470 if ( ret != 0 && ret != ETIMEDOUT ) {
471 break;
472 }
473 }
474
475 LOG_DEBUG (("Timer() stopping"));
476
477 _ntimer--;
478 _nthreads--;
479 pthread_mutex_unlock (&_mutex);
480 }
481
482 void ThreadManager::Blocking ()
483 {
484 LOG_DEBUG (("Blocking"));
485
486 if ( pthread_mutex_lock (&_mutex) != 0 ) {
487 _nthreads--;
488 _nactive--;
489 pthread_exit(0);
490 }
491
492 _nblocked++;
493 _nactive--;
494
495 // Since _nactive has fallen, another thread might be able to start
496 // or may need to be created.
497 startThread ();
498
499 pthread_mutex_unlock (&_mutex);
500 }
501
502 void ThreadManager::Unblocked ()
503 {
504 LOG_DEBUG (("Unblocked"));
505
506 if ( pthread_mutex_lock (&_mutex) != 0 ) {
507 _nthreads--;
508 _nblocked--;
509 pthread_exit(0);
510 }
511
512 _nblocked--;
513 _nactive++;
514
515 pthread_mutex_unlock (&_mutex);
516 }
517
518 bool ThreadManager::LongLived()
519 {
520 LOG_DEBUG (("LongLived"));
521
522 if ( pthread_mutex_lock (&_mutex) != 0 ) {
523 _nactive--;
524 _nthreads--;
525 _nblocked--;
526 pthread_exit(0);
527 }
528
529 if ( _terminating ) {
530 pthread_mutex_unlock (&_mutex);
531 return true;
532 }
533
534 // If the total thread limit has been reached, then do nothing.
535 // Otherwise, so long as we have not already marked the thread as
536 // long lived, then do so and start it.
537 if ( _activelimit < _threadlimit &&
538 pthread_getspecific (_llkey) == NULL ) {
539 // This is a long-lived thread possibly blocking other events
540 // We have not previously marked it as such
541
542 // mark this thread as long-lived
543 if (pthread_setspecific (_llkey, (void *)1) == 0) {
544 _activelimit++;
545 startThread ();
546 }
547 }
548
549 pthread_mutex_unlock (&_mutex);
550 return false;
551 }
552
553 void ThreadManager::Queue (AsyncEvent *event,
554 unsigned priority,
555 bool nostart)
556 {
557 LOG_DEBUG (("Queue: %p prio=%u nostart=%d", event, priority, nostart));
558
559 if ( pthread_mutex_lock (&_mutex) != 0 ) {
560 _nthreads--;
561 _nblocked--;
562 pthread_exit(0);
563 }
564
565 _deQueue (event);
566
567 if ( !_terminating ) {
568 _mainQ->Insert (event, priority);
569
570 if ( !nostart )
571 startThread ();
572 }
573
574 pthread_mutex_unlock (&_mutex);
575 }
576
577 void ThreadManager::QueueAtExit (AsyncEvent *event)
578 {
579 LOG_DEBUG (("QueueAtExit: %p", event));
580
581 if ( pthread_mutex_lock (&_mutex) != 0 ) {
582 _nthreads--;
583 _nblocked--;
584 pthread_exit(0);
585 }
586
587 _deQueue (event);
588
589 if ( _terminating ) {
590 // Unlocks mutex
591 eventDeliver (event);
592
593 } else {
594 _atExitQ->Insert (event);
595 pthread_mutex_unlock (&_mutex);
596
597 }
598
599 }
600
601 void ThreadManager::QueueAt (AsyncEvent *event, const struct timespec *duetime)
602 {
603 LOG_DEBUG (("Queue: %p @ %d.%9.9ld",
604 event, (int)duetime->tv_sec, duetime->tv_nsec));
605
606 if ( pthread_mutex_lock (&_mutex) != 0 ) {
607 _nthreads--;
608 _nblocked--;
609 pthread_exit(0);
610 }
611
612 _deQueue (event);
613
614 if (!_terminating ) {
615 _timerQ->Insert (event, *duetime);
616
617 if ( _ntimer <= 0 ) {
618 // Need to start a thread to handle the timer
619 pthread_t tid;
620
621 // What should we do with errors here?
622 if ( pthread_create (&tid, &_threadattrs, Timer_fnx, 0) == 0 ) {
623 // Counters are incremented here as it may be while
624 // until the actual thread gets a go.
625 _ntimer++;
626 _nthreads++;
627 }
628
629 } else if ( timespec_cmp (duetime, &_duetime) ) {
630 // This event due before timer thread wakes up,
631 // so wake it up now to recalculate
632 pthread_cond_signal (&_timeQ);
633 }
634 }
635
636 pthread_mutex_unlock (&_mutex);
637 }
638
639 void ThreadManager::QueueAt (AsyncEvent *event, unsigned millisecs)
640 {
641 LOG_DEBUG (("Queue: %p in %dms", event, millisecs));
642
643 struct timespec now;
644
645 getNow (now);
646
647 addMillisecs (now, millisecs);
648
649 QueueAt (event, &now);
650 }
651
652 void ThreadManager::Dequeue (AsyncEvent *event)
653 {
654 if ( pthread_mutex_lock (&_mutex) != 0 ) {
655 _nthreads--;
656 _nblocked--;
657 pthread_exit(0);
658 }
659
660 _deQueue (event);
661
662 pthread_mutex_unlock (&_mutex);
663 }
664
665 void ThreadManager::Terminate (unsigned int millisecs)
666 {
667 LOG_DEBUG (("Manager: Terminate in %dms", millisecs));
668
669 struct timespec now;
670
671 getNow (now);
672
673 addMillisecs (now, millisecs);
674
675 if ( pthread_mutex_lock (&_mutex) != 0 ) {
676 pthread_exit(0);
677 }
678
679 _terminating = true;
680
681 // Wake up all waiters
682 pthread_cond_broadcast (&_waitQ);
683 pthread_cond_broadcast (&_timeQ);
684
685 // Cancel all queued events
686 AsyncEvent *event;
687
688 while ( (event = _mainQ->First()) != 0 )
689 _deQueue (event);
690
691 while ( (event = _timerQ->First()) != 0 )
692 _deQueue (event);
693
694 // Deliver all at exit events
695 // in the reverse order to their being added
696 while ( (event = _atExitQ->Last()) != 0 ) {
697 // Unlocks mutex
698 eventDeliver (event);
699
700 if ( pthread_mutex_lock (&_mutex) != 0 ) {
701 pthread_exit(0);
702 }
703 }
704
705 // Wait for all threads to terminate
706 while ( _nthreads > 0 &&
707 pthread_cond_timedwait (&_termwait, &_mutex, &now) == 0 )
708 /* empty */;
709
710 pthread_mutex_unlock (&_mutex);
711 }
712
713 Manager *CreateThreadManager ()
714 {
715 return ThreadManager::Create ();
716 }
717}
718
719static void * Event_fnx (void *context ARGNOTUSED) {
720 ::Event::threadinstance->Manager ();
721 return 0;
722}
723
724static void * Timer_fnx (void *context ARGNOTUSED) {
725 ::Event::threadinstance->Timer ();
726 return 0;
727}
728
A simple FIFO queue for events.
void Insert(AsyncEvent *event)
Insert in queue.
AsyncEvent * Last()
Remove last element of queue.
Priority queue template.
Event manager which is multi-threaded.
bool timespec_cmp(const struct timespec *time1, const struct timespec *time2)
Function for comparing two times.
Definition timeutil.h:27
void addMillisecs(struct timespec &now, unsigned millisecs)
Add millisecs to a time.
Definition timeutil.h:70
void getNow(struct timespec &now)
Return the current time.
Definition timeutil.h:60

All rights reserved © 2002 - 2024 Isode Ltd.