21#include <isode/base/logging.h>
22#include <isode/base/util.h>
23#include <isode/compat/tailor.h>
25#include "AsyncEventRef.h"
28 static void * Event_fnx (
void *context ARGNOTUSED);
29 static void * Timer_fnx (
void *context ARGNOTUSED);
33 AsyncEventPlainQueue *AsyncEventPlainQueue::_freelist;
38 pthread_mutex_t _mutex;
39 pthread_cond_t _waitQ;
40 pthread_cond_t _timeQ;
41 pthread_cond_t _termwait;
43 pthread_attr_t _threadattrs;
49 volatile int _nthreads;
50 volatile int _nactive;
51 volatile int _nblocked;
63 inline void startThread ();
66 inline void _deQueue (AsyncEvent *event) {
67 AsyncEventRef *eventref =
event->Reference();
73 Receiver *rcvr =
event->GetReceiver();
78 rcvr->Extract (event);
86 inline void eventDeliver (AsyncEvent *event) {
87 Receiver *rcvr =
event->GetReceiver();
95 LOG_DEBUG ((
"eventDeliver: queue %p", event));
98 pthread_mutex_unlock (&_mutex);
104 pthread_mutex_unlock (&_mutex);
105 LOG_DEBUG ((
"eventDeliver: deliver %p", event));
114 static Manager *Create ();
123 virtual bool IsThreaded () {
return true; }
130 virtual void Run (
bool isthread);
133 virtual bool Poll (
const timespec *when,
135 pthread_mutex_t *mutex,
136 pthread_cond_t *cond) {
137 if ( immediate || mutex == 0 )
return false;
142 pthread_cond_timedwait (cond, mutex, when);
144 pthread_cond_wait (cond, mutex);
152 virtual void Terminate(
unsigned int millisecs);
155 virtual void Blocking();
158 virtual void Unblocked();
162 virtual bool LongLived();
167 virtual void SetThreads (
int activethreads,
int maxthreads);
170 virtual int GetActiveLimit () {
return _activelimit; }
174 virtual void Queue (AsyncEvent *event,
176 bool nostart =
false);
179 virtual void QueueAt (AsyncEvent *event,
const struct timespec *abstime);
182 virtual void QueueAt (AsyncEvent *event,
unsigned int millisec);
185 virtual void Dequeue (AsyncEvent *event);
188 virtual void QueueAtExit (AsyncEvent *event);
193 virtual const struct timespec *NextEventDue () {
return 0; }
198 ThreadManager::ThreadManager ()
200 pthread_mutex_init (&_mutex, NULL);
201 pthread_cond_init (&_waitQ, NULL);
202 pthread_cond_init (&_timeQ, NULL);
203 pthread_cond_init (&_termwait, NULL);
205 pthread_key_create (&_llkey, NULL);
207 pthread_attr_init (&_threadattrs);
209 const size_t TWOMEG = (2*1024*1024);
210 size_t default_stacksize = 0;
212 if ( pthread_attr_getstacksize (&_threadattrs,
213 &default_stacksize) != 0 )
214 default_stacksize = TWOMEG;
216 if ( thread_stacksize != 0 ) {
217 default_stacksize = thread_stacksize;
218 }
else if ( default_stacksize < TWOMEG ) {
219 default_stacksize = TWOMEG;
222 if ( default_stacksize < PTHREAD_STACK_MIN )
223 default_stacksize = PTHREAD_STACK_MIN;
225 pthread_attr_setstacksize (&_threadattrs, default_stacksize);
227 pthread_attr_setdetachstate (&_threadattrs, PTHREAD_CREATE_DETACHED);
230 _terminating =
false;
239 _duetime.tv_nsec = 0;
241 long config_cpu, online_cpu;
243 isode_cpucount (&config_cpu, &online_cpu, 0, 0);
245 if ( num_threads != 0 )
246 online_cpu = num_threads;
248 _activelimit = online_cpu > 2 ? online_cpu : 2;
249 _threadlimit = _activelimit > 10 ? 2*_activelimit : 20;
251 if ( _threadlimit < thread_limit )
252 _threadlimit = thread_limit;
254 _mainQ =
new AsyncEventPriorityQueue;
255 _timerQ =
new AsyncEventTimerQueue;
256 _atExitQ =
new AsyncEventPlainQueue;
258 LOG_DEBUG ((
"ThreadManager created"));
261 Manager *ThreadManager::Create () {
262 if ( threadinstance != NULL )
263 return threadinstance;
265 threadinstance =
new ThreadManager ();
267 return threadinstance;
276 inline void ThreadManager::startThread()
278 LOG_DEBUG ((
"startThread: active=%d limit=%d events=%d",
279 _nactive, _activelimit, (
int) _mainQ->HasEvents()));
282 if ( !_running || _nactive >= _activelimit ||
283 !_mainQ->HasEvents() )
286 LOG_DEBUG ((
"startThread: timer=%d nwait=%d threads=%d limit=%d",
287 _ntimer, _nwait, _nthreads, _threadlimit));
291 pthread_cond_signal (&_waitQ);
293 }
else if ( _nthreads < _threadlimit ) {
298 if ( pthread_create (&tid, &_threadattrs, Event_fnx, 0) == 0 ) {
310 void ThreadManager::Run (
bool isthread)
312 if ( pthread_mutex_lock (&_mutex) != 0 ) {
329 pthread_mutex_unlock (&_mutex);
335 void ThreadManager::Manager ()
337 if ( pthread_mutex_lock (&_mutex) != 0 ) {
343 while ( !_terminating && _nthreads <= _threadlimit ) {
345 LOG_DEBUG ((
"Manager: active=(%d:%d) events=%d",
346 _nactive, _activelimit, _mainQ->HasEvents()));
348 if ( _nactive <= _activelimit && _mainQ->HasEvents() ) {
350 AsyncEvent *
event = _mainQ->First();
353 eventDeliver (event);
355 if ( pthread_mutex_lock(&_mutex) != 0 ) {
362 if ( pthread_getspecific (_llkey) != NULL ) {
363 LOG_DEBUG ((
"Manager: %p longlived (%d)",
364 event, _activelimit));
366 pthread_setspecific (_llkey, NULL);
367 if ( _activelimit > 1 ) _activelimit--;
377 LOG_DEBUG ((
"Manager: wait on queue"));
379 int ret = pthread_cond_wait (&_waitQ, &_mutex);
393 pthread_cond_signal (&_termwait);
395 pthread_mutex_unlock (&_mutex);
397 LOG_DEBUG ((
"Manager: thread exiting (n=%d, active=%d)",
398 _nthreads, _nactive));
402 void ThreadManager::SetThreads (
int activethreads,
int maxthreads)
404 LOG_DEBUG ((
"SetThreads: %d %d",activethreads, maxthreads));
406 if ( pthread_mutex_lock (&_mutex) != 0 )
409 if ( activethreads > 1 )
410 _activelimit = activethreads;
412 if ( maxthreads > _activelimit ) {
413 _threadlimit = maxthreads;
414 }
else if ( maxthreads == 0 ) {
415 if ( _activelimit > 10 )
416 _threadlimit = 2*_activelimit;
420 if ( _threadlimit < thread_limit )
421 _threadlimit = thread_limit;
426 pthread_mutex_unlock (&_mutex);
429 void ThreadManager::Timer()
431 LOG_DEBUG ((
"Timer() starting"));
433 if ( pthread_mutex_lock (&_mutex) != 0 ) {
439 while ( !_terminating ) {
443 LOG_DEBUG ((
"Timer: timer events: %d", _timerQ->HasEvents ()));
444 if ( _timerQ->HasEvents () ) {
448 while ( (tevent = _timerQ->First (now)) != 0 ) {
449 LOG_DEBUG ((
"timerQ event %p due", tevent));
450 _mainQ->Insert (tevent, 0);
451 if ( _running && _nactive < _activelimit )
456 if ( _timerQ->HasEvents () ) {
457 _duetime = *_timerQ->NextPrio();
465 LOG_DEBUG ((
"Timer: waiting until %ld.%ld",
466 (
long)_duetime.tv_sec, (
long)_duetime.tv_nsec));
468 int ret = pthread_cond_timedwait (&_timeQ, &_mutex, &_duetime);
470 if ( ret != 0 && ret != ETIMEDOUT ) {
475 LOG_DEBUG ((
"Timer() stopping"));
479 pthread_mutex_unlock (&_mutex);
482 void ThreadManager::Blocking ()
484 LOG_DEBUG ((
"Blocking"));
486 if ( pthread_mutex_lock (&_mutex) != 0 ) {
499 pthread_mutex_unlock (&_mutex);
502 void ThreadManager::Unblocked ()
504 LOG_DEBUG ((
"Unblocked"));
506 if ( pthread_mutex_lock (&_mutex) != 0 ) {
515 pthread_mutex_unlock (&_mutex);
518 bool ThreadManager::LongLived()
520 LOG_DEBUG ((
"LongLived"));
522 if ( pthread_mutex_lock (&_mutex) != 0 ) {
529 if ( _terminating ) {
530 pthread_mutex_unlock (&_mutex);
537 if ( _activelimit < _threadlimit &&
538 pthread_getspecific (_llkey) == NULL ) {
543 if (pthread_setspecific (_llkey, (
void *)1) == 0) {
549 pthread_mutex_unlock (&_mutex);
553 void ThreadManager::Queue (AsyncEvent *event,
557 LOG_DEBUG ((
"Queue: %p prio=%u nostart=%d", event, priority, nostart));
559 if ( pthread_mutex_lock (&_mutex) != 0 ) {
567 if ( !_terminating ) {
568 _mainQ->Insert (event, priority);
574 pthread_mutex_unlock (&_mutex);
577 void ThreadManager::QueueAtExit (AsyncEvent *event)
579 LOG_DEBUG ((
"QueueAtExit: %p", event));
581 if ( pthread_mutex_lock (&_mutex) != 0 ) {
589 if ( _terminating ) {
591 eventDeliver (event);
595 pthread_mutex_unlock (&_mutex);
601 void ThreadManager::QueueAt (AsyncEvent *event,
const struct timespec *duetime)
603 LOG_DEBUG ((
"Queue: %p @ %d.%9.9ld",
604 event, (
int)duetime->tv_sec, duetime->tv_nsec));
606 if ( pthread_mutex_lock (&_mutex) != 0 ) {
614 if (!_terminating ) {
615 _timerQ->Insert (event, *duetime);
617 if ( _ntimer <= 0 ) {
622 if ( pthread_create (&tid, &_threadattrs, Timer_fnx, 0) == 0 ) {
632 pthread_cond_signal (&_timeQ);
636 pthread_mutex_unlock (&_mutex);
639 void ThreadManager::QueueAt (AsyncEvent *event,
unsigned millisecs)
641 LOG_DEBUG ((
"Queue: %p in %dms", event, millisecs));
649 QueueAt (event, &now);
652 void ThreadManager::Dequeue (AsyncEvent *event)
654 if ( pthread_mutex_lock (&_mutex) != 0 ) {
662 pthread_mutex_unlock (&_mutex);
665 void ThreadManager::Terminate (
unsigned int millisecs)
667 LOG_DEBUG ((
"Manager: Terminate in %dms", millisecs));
675 if ( pthread_mutex_lock (&_mutex) != 0 ) {
682 pthread_cond_broadcast (&_waitQ);
683 pthread_cond_broadcast (&_timeQ);
688 while ( (event = _mainQ->First()) != 0 )
691 while ( (event = _timerQ->First()) != 0 )
696 while ( (event = _atExitQ->
Last()) != 0 ) {
698 eventDeliver (event);
700 if ( pthread_mutex_lock (&_mutex) != 0 ) {
706 while ( _nthreads > 0 &&
707 pthread_cond_timedwait (&_termwait, &_mutex, &now) == 0 )
710 pthread_mutex_unlock (&_mutex);
713 Manager *CreateThreadManager ()
715 return ThreadManager::Create ();
719static void * Event_fnx (
void *context ARGNOTUSED) {
720 ::Event::threadinstance->Manager ();
724static void * Timer_fnx (
void *context ARGNOTUSED) {
725 ::Event::threadinstance->Timer ();
A simple FIFO queue for events.
void Insert(AsyncEvent *event)
Insert in queue.
AsyncEvent * Last()
Remove last element of queue.
Event manager which is multi-threaded.
bool timespec_cmp(const struct timespec *time1, const struct timespec *time2)
Function for comparing two times.
void addMillisecs(struct timespec &now, unsigned millisecs)
Add millisecs to a time.
void getNow(struct timespec &now)
Return the current time.