Event::ThreadManager Class Reference

Event manager which is multi-threaded. More...

Inheritance diagram for Event::ThreadManager:

Public Member Functions

void Manager ()
 
void Timer ()
 
virtual bool IsThreaded ()
 
virtual void Run (bool isthread)
 
virtual bool Poll (const timespec *when, bool immediate, pthread_mutex_t *mutex, pthread_cond_t *cond)
 
virtual void Terminate (unsigned int millisecs)
 
virtual void Blocking ()
 
virtual void Unblocked ()
 
virtual bool LongLived ()
 
virtual void SetThreads (int activethreads, int maxthreads)
 
virtual int GetActiveLimit ()
 
virtual void Queue (AsyncEvent *event, unsigned priority, bool nostart=false)
 
virtual void QueueAt (AsyncEvent *event, const struct timespec *abstime)
 
virtual void QueueAt (AsyncEvent *event, unsigned int millisec)
 
virtual void Dequeue (AsyncEvent *event)
 
virtual void QueueAtExit (AsyncEvent *event)
 
virtual const struct timespec * NextEventDue ()
 

Static Public Member Functions

static Manager * Create ()
 

Detailed Description

Event manager which is multi-threaded.

Definition at line 36 of file ThreadManager.C.

Constructor & Destructor Documentation

◆ ThreadManager()

Event::ThreadManager::ThreadManager ( )

Definition at line 198 of file ThreadManager.C.

199 {
200 pthread_mutex_init (&_mutex, NULL);
201 pthread_cond_init (&_waitQ, NULL);
202 pthread_cond_init (&_timeQ, NULL);
203 pthread_cond_init (&_termwait, NULL);
204
205 pthread_key_create (&_llkey, NULL);
206
207 pthread_attr_init (&_threadattrs);
208
209 const size_t TWOMEG = (2*1024*1024);
210 size_t default_stacksize = 0;
211
212 if ( pthread_attr_getstacksize (&_threadattrs,
213 &default_stacksize) != 0 )
214 default_stacksize = TWOMEG;
215
216 if ( thread_stacksize != 0 ) {
217 default_stacksize = thread_stacksize;
218 } else if ( default_stacksize < TWOMEG ) {
219 default_stacksize = TWOMEG;
220 }
221
222 if ( default_stacksize < PTHREAD_STACK_MIN )
223 default_stacksize = PTHREAD_STACK_MIN;
224
225 pthread_attr_setstacksize (&_threadattrs, default_stacksize);
226
227 pthread_attr_setdetachstate (&_threadattrs, PTHREAD_CREATE_DETACHED);
228
229 _running = false;
230 _terminating = false;
231
232 _nthreads = 0;
233 _nactive = 0;
234 _nblocked = 0;
235 _ntimer = 0;
236 _nwait = 0;
237
238 _duetime.tv_sec = 0;
239 _duetime.tv_nsec = 0;
240
241 long config_cpu, online_cpu;
242
243 isode_cpucount (&config_cpu, &online_cpu, 0, 0);
244
245 if ( num_threads != 0 )
246 online_cpu = num_threads;
247
248 _activelimit = online_cpu > 2 ? online_cpu : 2;
249 _threadlimit = _activelimit > 10 ? 2*_activelimit : 20;
250
251 if ( _threadlimit < thread_limit )
252 _threadlimit = thread_limit;
253
254 _mainQ = new AsyncEventPriorityQueue;
255 _timerQ = new AsyncEventTimerQueue;
256 _atExitQ = new AsyncEventPlainQueue;
257
258 LOG_DEBUG (("ThreadManager created"));
259 }

Member Function Documentation

◆ Create()

Manager * Event::ThreadManager::Create ( )
static

Definition at line 261 of file ThreadManager.C.

261 {
262 if ( threadinstance != NULL )
263 return threadinstance;
264
265 threadinstance = new ThreadManager ();
266
267 return threadinstance;
268 }

◆ Manager()

void Event::ThreadManager::Manager ( )

Definition at line 335 of file ThreadManager.C.

336 {
337 if ( pthread_mutex_lock (&_mutex) != 0 ) {
338 _nactive--;
339 _nthreads--;
340 return;
341 }
342
343 while ( !_terminating && _nthreads <= _threadlimit ) {
344
345 LOG_DEBUG (("Manager: active=(%d:%d) events=%d",
346 _nactive, _activelimit, _mainQ->HasEvents()));
347
348 if ( _nactive <= _activelimit && _mainQ->HasEvents() ) {
349
350 AsyncEvent *event = _mainQ->First();
351
352 // Unlocks mutex
353 eventDeliver (event);
354
355 if ( pthread_mutex_lock(&_mutex) != 0 ) {
356 _nactive--;
357 _nthreads--;
358 return;
359 }
360
361 // Check to see if thread was executing long lived event
362 if ( pthread_getspecific (_llkey) != NULL ) {
363 LOG_DEBUG (("Manager: %p longlived (%d)",
364 event, _activelimit));
365
366 pthread_setspecific (_llkey, NULL);
367 if ( _activelimit > 1 ) _activelimit--;
368 }
369
370 } else {
371
372 // Wait for something to do
373
374 _nactive--;
375 _nwait++;
376
377 LOG_DEBUG (("Manager: wait on queue"));
378
379 int ret = pthread_cond_wait (&_waitQ, &_mutex);
380
381 _nwait--;
382 _nactive++;
383
384 if ( ret != 0 )
385 break;
386 }
387 }
388
389 _nthreads--;
390 _nactive--;
391
392 if ( _terminating )
393 pthread_cond_signal (&_termwait);
394
395 pthread_mutex_unlock (&_mutex);
396
397 LOG_DEBUG (("Manager: thread exiting (n=%d, active=%d)",
398 _nthreads, _nactive));
399
400 }

◆ Timer()

void Event::ThreadManager::Timer ( )

Definition at line 429 of file ThreadManager.C.

430 {
431 LOG_DEBUG (("Timer() starting"));
432
433 if ( pthread_mutex_lock (&_mutex) != 0 ) {
434 _ntimer--;
435 _nthreads--;
436 return;
437 }
438
439 while ( !_terminating ) {
440 struct timespec now;
441 getNow (now);
442
443 LOG_DEBUG (("Timer: timer events: %d", _timerQ->HasEvents ()));
444 if ( _timerQ->HasEvents () ) {
445
446 AsyncEvent *tevent;
447
448 while ( (tevent = _timerQ->First (now)) != 0 ) {
449 LOG_DEBUG (("timerQ event %p due", tevent));
450 _mainQ->Insert (tevent, 0);
451 if ( _running && _nactive < _activelimit )
452 startThread();
453 }
454 }
455
456 if ( _timerQ->HasEvents () ) {
457 _duetime = *_timerQ->NextPrio();
458
459 } else {
460 // Wake up in 10 secs
461 now.tv_sec += 10;
462 _duetime = now;
463 }
464
465 LOG_DEBUG (("Timer: waiting until %ld.%ld",
466 (long)_duetime.tv_sec, (long)_duetime.tv_nsec));
467
468 int ret = pthread_cond_timedwait (&_timeQ, &_mutex, &_duetime);
469
470 if ( ret != 0 && ret != ETIMEDOUT ) {
471 break;
472 }
473 }
474
475 LOG_DEBUG (("Timer() stopping"));
476
477 _ntimer--;
478 _nthreads--;
479 pthread_mutex_unlock (&_mutex);
480 }
void getNow(struct timespec &now)
Return the current time.
Definition timeutil.h:60

◆ IsThreaded()

virtual bool Event::ThreadManager::IsThreaded ( )
inlinevirtual

Definition at line 123 of file ThreadManager.C.

123{ return true; }

◆ Run()

void Event::ThreadManager::Run ( bool  isthread)
virtual

Definition at line 310 of file ThreadManager.C.

311 {
312 if ( pthread_mutex_lock (&_mutex) != 0 ) {
313 return;
314 }
315
316 // Start Manager
317 _running = true;
318
319 if ( isthread ) {
320 // This thread becomes one of the threads
321 _nthreads++;
322 _nactive++;
323
324 } else {
325 // May need to start a thread
326 startThread ();
327 }
328
329 pthread_mutex_unlock (&_mutex);
330
331 if ( isthread )
332 Manager ();
333 }

◆ Poll()

virtual bool Event::ThreadManager::Poll ( const timespec *  when,
bool  immediate,
pthread_mutex_t *  mutex,
pthread_cond_t *  cond 
)
inlinevirtual

Definition at line 133 of file ThreadManager.C.

136 {
137 if ( immediate || mutex == 0 ) return false;
138
139 Blocking();
140
141 if ( when )
142 pthread_cond_timedwait (cond, mutex, when);
143 else
144 pthread_cond_wait (cond, mutex);
145
146 Unblocked();
147
148 return false;
149 }

◆ Terminate()

void Event::ThreadManager::Terminate ( unsigned int  millisecs)
virtual

Definition at line 665 of file ThreadManager.C.

666 {
667 LOG_DEBUG (("Manager: Terminate in %dms", millisecs));
668
669 struct timespec now;
670
671 getNow (now);
672
673 addMillisecs (now, millisecs);
674
675 if ( pthread_mutex_lock (&_mutex) != 0 ) {
676 pthread_exit(0);
677 }
678
679 _terminating = true;
680
681 // Wake up all waiters
682 pthread_cond_broadcast (&_waitQ);
683 pthread_cond_broadcast (&_timeQ);
684
685 // Cancel all queued events
686 AsyncEvent *event;
687
688 while ( (event = _mainQ->First()) != 0 )
689 _deQueue (event);
690
691 while ( (event = _timerQ->First()) != 0 )
692 _deQueue (event);
693
694 // Deliver all at exit events
695 // in the reverse order to their being added
696 while ( (event = _atExitQ->Last()) != 0 ) {
697 // Unlocks mutex
698 eventDeliver (event);
699
700 if ( pthread_mutex_lock (&_mutex) != 0 ) {
701 pthread_exit(0);
702 }
703 }
704
705 // Wait for all threads to terminate
706 while ( _nthreads > 0 &&
707 pthread_cond_timedwait (&_termwait, &_mutex, &now) == 0 )
708 /* empty */;
709
710 pthread_mutex_unlock (&_mutex);
711 }
AsyncEvent * Last()
Remove last element of queue.
void addMillisecs(struct timespec &now, unsigned millisecs)
Add millisecs to a time.
Definition timeutil.h:70

◆ Blocking()

void Event::ThreadManager::Blocking ( )
virtual

Definition at line 482 of file ThreadManager.C.

483 {
484 LOG_DEBUG (("Blocking"));
485
486 if ( pthread_mutex_lock (&_mutex) != 0 ) {
487 _nthreads--;
488 _nactive--;
489 pthread_exit(0);
490 }
491
492 _nblocked++;
493 _nactive--;
494
495 // Since _nactive has fallen, another thread might be able to start
496 // or may need to be created.
497 startThread ();
498
499 pthread_mutex_unlock (&_mutex);
500 }

◆ Unblocked()

void Event::ThreadManager::Unblocked ( )
virtual

Definition at line 502 of file ThreadManager.C.

503 {
504 LOG_DEBUG (("Unblocked"));
505
506 if ( pthread_mutex_lock (&_mutex) != 0 ) {
507 _nthreads--;
508 _nblocked--;
509 pthread_exit(0);
510 }
511
512 _nblocked--;
513 _nactive++;
514
515 pthread_mutex_unlock (&_mutex);
516 }

◆ LongLived()

bool Event::ThreadManager::LongLived ( )
virtual

Definition at line 518 of file ThreadManager.C.

519 {
520 LOG_DEBUG (("LongLived"));
521
522 if ( pthread_mutex_lock (&_mutex) != 0 ) {
523 _nactive--;
524 _nthreads--;
525 _nblocked--;
526 pthread_exit(0);
527 }
528
529 if ( _terminating ) {
530 pthread_mutex_unlock (&_mutex);
531 return true;
532 }
533
534 // If the total thread limit has been reached, then do nothing.
535 // Otherwise, so long as we have not already marked the thread as
536 // long lived, then do so and start it.
537 if ( _activelimit < _threadlimit &&
538 pthread_getspecific (_llkey) == NULL ) {
539 // This is a long-lived thread possibly blocking other events
540 // We have not previously marked it as such
541
542 // mark this thread as long-lived
543 if (pthread_setspecific (_llkey, (void *)1) == 0) {
544 _activelimit++;
545 startThread ();
546 }
547 }
548
549 pthread_mutex_unlock (&_mutex);
550 return false;
551 }

◆ SetThreads()

void Event::ThreadManager::SetThreads ( int  activethreads,
int  maxthreads 
)
virtual

Definition at line 402 of file ThreadManager.C.

403 {
404 LOG_DEBUG (("SetThreads: %d %d",activethreads, maxthreads));
405
406 if ( pthread_mutex_lock (&_mutex) != 0 )
407 return;
408
409 if ( activethreads > 1 )
410 _activelimit = activethreads;
411
412 if ( maxthreads > _activelimit ) {
413 _threadlimit = maxthreads;
414 } else if ( maxthreads == 0 ) {
415 if ( _activelimit > 10 )
416 _threadlimit = 2*_activelimit;
417 else
418 _threadlimit = 20;
419
420 if ( _threadlimit < thread_limit )
421 _threadlimit = thread_limit;
422 }
423
424 startThread ();
425
426 pthread_mutex_unlock (&_mutex);
427 }

◆ GetActiveLimit()

virtual int Event::ThreadManager::GetActiveLimit ( )
inlinevirtual

Definition at line 170 of file ThreadManager.C.

170{ return _activelimit; }

◆ Queue()

void Event::ThreadManager::Queue ( AsyncEvent *  event,
unsigned  priority,
bool  nostart = false 
)
virtual

Definition at line 553 of file ThreadManager.C.

556 {
557 LOG_DEBUG (("Queue: %p prio=%u nostart=%d", event, priority, nostart));
558
559 if ( pthread_mutex_lock (&_mutex) != 0 ) {
560 _nthreads--;
561 _nblocked--;
562 pthread_exit(0);
563 }
564
565 _deQueue (event);
566
567 if ( !_terminating ) {
568 _mainQ->Insert (event, priority);
569
570 if ( !nostart )
571 startThread ();
572 }
573
574 pthread_mutex_unlock (&_mutex);
575 }

◆ QueueAt()

void Event::ThreadManager::QueueAt ( AsyncEvent *  event,
const struct timespec *  abstime 
)
virtual

Definition at line 601 of file ThreadManager.C.

602 {
603 LOG_DEBUG (("Queue: %p @ %d.%9.9ld",
604 event, (int)duetime->tv_sec, duetime->tv_nsec));
605
606 if ( pthread_mutex_lock (&_mutex) != 0 ) {
607 _nthreads--;
608 _nblocked--;
609 pthread_exit(0);
610 }
611
612 _deQueue (event);
613
614 if (!_terminating ) {
615 _timerQ->Insert (event, *duetime);
616
617 if ( _ntimer <= 0 ) {
618 // Need to start a thread to handle the timer
619 pthread_t tid;
620
621 // What should we do with errors here?
622 if ( pthread_create (&tid, &_threadattrs, Timer_fnx, 0) == 0 ) {
623 // Counters are incremented here as it may be while
624 // until the actual thread gets a go.
625 _ntimer++;
626 _nthreads++;
627 }
628
629 } else if ( timespec_cmp (duetime, &_duetime) ) {
630 // This event due before timer thread wakes up,
631 // so wake it up now to recalculate
632 pthread_cond_signal (&_timeQ);
633 }
634 }
635
636 pthread_mutex_unlock (&_mutex);
637 }
bool timespec_cmp(const struct timespec *time1, const struct timespec *time2)
Function for comparing two times.
Definition timeutil.h:27

◆ Dequeue()

void Event::ThreadManager::Dequeue ( AsyncEvent *  event)
virtual

Definition at line 652 of file ThreadManager.C.

653 {
654 if ( pthread_mutex_lock (&_mutex) != 0 ) {
655 _nthreads--;
656 _nblocked--;
657 pthread_exit(0);
658 }
659
660 _deQueue (event);
661
662 pthread_mutex_unlock (&_mutex);
663 }

◆ QueueAtExit()

void Event::ThreadManager::QueueAtExit ( AsyncEvent *  event)
virtual

Definition at line 577 of file ThreadManager.C.

578 {
579 LOG_DEBUG (("QueueAtExit: %p", event));
580
581 if ( pthread_mutex_lock (&_mutex) != 0 ) {
582 _nthreads--;
583 _nblocked--;
584 pthread_exit(0);
585 }
586
587 _deQueue (event);
588
589 if ( _terminating ) {
590 // Unlocks mutex
591 eventDeliver (event);
592
593 } else {
594 _atExitQ->Insert (event);
595 pthread_mutex_unlock (&_mutex);
596
597 }
598
599 }
void Insert(AsyncEvent *event)
Insert in queue.

◆ NextEventDue()

virtual const struct timespec * Event::ThreadManager::NextEventDue ( )
inlinevirtual

Definition at line 193 of file ThreadManager.C.

193{ return 0; }

The documentation for this class was generated from the following file:

All rights reserved © 2002 - 2024 Isode Ltd.