231 LOG_DEBUG ((
"Poll_select::Deliver ()"));
233 if ( (rc = pthread_mutex_lock (&_mutex)) != 0 ) {
234 MSG_IOevent_Pollthread_LOG (rc);
235 _manager->QueueAt (&_myevent, 10000);
240 if ( _terminating ) {
241 pthread_mutex_unlock(&_mutex);
245 if ( msg->terminate ) {
256 if ( _inpoll && !_wakepending ) {
261 pthread_mutex_unlock(&_mutex);
266 if ( sendto (_wakefds[1], bytev, 1, 0,
267 reinterpret_cast<sockaddr *
>(&_addr),
sizeof _addr) < 1 )
268 MSG_IOevent_Pollerror_LOG (
"write");
273 if ( _wakefds[0] == INVALID_ENDPOINT && setup_wakeup() != OK ) {
274 _manager->QueueAt (&_myevent, 1000);
276 pthread_mutex_unlock(&_mutex);
285 endpoint_t maxfd = _wakefds[0];
287 rfds.set (_wakefds[0]);
288 efds.set (_wakefds[0]);
292 polldatamap_t::const_iterator scan;
293 for ( scan = _internal.begin(); scan != _internal.end(); ++scan ) {
294 if ( scan->first == INVALID_ENDPOINT || scan->second->_pevents == 0 )
continue;
296 if ( scan->second->_pevents & POLLIN )
297 rfds.set (scan->first);
299 if ( scan->second->_pevents & POLLOUT )
300 wfds.set (scan->first);
302 efds.set (scan->first);
304 if ( scan->first > maxfd )
312 const timespec *wakeat = _manager->NextEventDue ();
324 pthread_mutex_unlock(&_mutex);
327 _manager->Blocking();
329 struct timeval timev, *tp;
331 if ( timeout >= 0 ) {
334 tp->tv_sec = timeout / 1000;
335 tp->tv_usec = (timeout % 1000) * 1000;
340 LOG_DEBUG ((
"Poll_select::Wait: select nfds=%d", nfds));
341 rc = select (maxfd+1, rfds.fds(), wfds.fds(), efds.fds(), tp);
342 LOG_DEBUG ((
"Poll_select::Wait select()=%d", rc));
346 _manager->Unblocked();
349 if ( (rc = pthread_mutex_lock (&_mutex)) != 0 ) {
350 MSG_IOevent_Pollthread_LOG (rc);
351 _manager->QueueAt (&_myevent, 10000);
360 int lasterr = WSAGetLastError();
361 LOG_DEBUG ((
"Poll_select::Wait: select error %d", lasterr));
363 case WSANOTINITIALISED:
366 MSG_IOevent_Pollerror_LOG (
"select");
373 LOG_DEBUG ((
"Poll_select::Wait: select EINTR"));
375 _manager->Queue (&_myevent, ::Event::Manager::MAXPRIO);
379 MSG_IOevent_Pollerror_LOG (
"select");
381 _manager->QueueAt (&_myevent, 1000);
385 if ( eno == EINTR ) {
386 LOG_DEBUG ((
"Poll_select::Wait: select EINTR"));
388 _manager->Queue (&_myevent, ::Event::Manager::MAXPRIO);
391 MSG_IOevent_Pollerror_LOG (
"select");
393 _manager->QueueAt (&_myevent, 1000);
398 pthread_mutex_unlock(&_mutex);
403 if ( rfds.is_set (_wakefds[0]) ) {
405 struct sockaddr_in addr;
406#ifdef RECVFROM_NEEDS_INTP
412 if ( recvfrom (_wakefds[0], buffer,
sizeof buffer, 0,
413 reinterpret_cast<sockaddr *
>(&addr), &alen) < 0 )
414 MSG_IOevent_Pollerror_LOG (
"recvfrom");
418 if ( (rc = pthread_mutex_lock (&_mutex)) != 0 ) {
419 MSG_IOevent_Pollthread_LOG (rc);
420 _manager->QueueAt (&_myevent, 10000);
423 _wakepending =
false;
428 _wakepending =
false;
439 polldatamap_t::iterator it1, it2;
440 for ( it1 = _internal.begin(); it1 != _internal.end(); ++it1 ) {
441 if ( it1->first == INVALID_ENDPOINT || it1->second->_pevents == 0 )
continue;
445 these_events.events = 0;
447 if ( efds.is_set (it1->first) )
450 if ( rfds.is_set (it1->first) )
451 these_events.events |= Event_In;
453 if ( wfds.is_set (it1->first) )
456 if ( these_events.events != 0 ) {
457 it1->second->_pevents = 0;
458 it1->second->_event.Send(these_events);
462 if ( it1->second->_pevents != 0 )
469 for ( it1 = _internal.begin() ; it1 != _internal.end() ; ) {
473 if ( efds.is_set (it2->first) ) {
475 _internal.erase (it2);
481 LOG_DEBUG ((
"Poll_select: waitingfds=%d", waitingfds));
484 _manager->Queue (&_myevent, ::Event::Manager::MAXPRIO,
true);
489 pthread_mutex_unlock(&_mutex);
552 endpoint_t fd = user->
_fd;
554 LOG_DEBUG ((
"Poll_poll: Control: " ENDPOINT_FMT
" %x", fd, user->
_uevents));
556 if ( (pthread_mutex_lock (&_mutex)) != 0 )
559 if ( _terminating ) {
560 pthread_mutex_unlock (&_mutex);
565 unsigned old_pevents = user->
_pevents;
567 LOG_DEBUG ((
"Control: queued=%d inpoll=%d wakepending=%d uevents=%x pevents=%x",
568 _queued, _inpoll, _wakepending,
575 if ( _inpoll && !_wakepending &&
576 (user->
_uevents & (~old_pevents)) != 0 ) {
583 LOG_DEBUG ((
"Control: waking polling thread"));
585 }
else if ( !_inpoll && !_queued && user->
_uevents != 0 ) {
589 LOG_DEBUG ((
"Control: adding Poll event"));
591 _manager->Queue (&_myevent, ::Event::Manager::MAXPRIO);
594 pthread_mutex_unlock (&_mutex);
600 if ( sendto (_wakefds[1], bytev, 1, 0,
601 reinterpret_cast<sockaddr *
>(&_addr),
sizeof _addr) < 1 )
602 MSG_IOevent_Pollerror_LOG (
"write");