Interface to itself, via Event Manager.
229 {
230 int rc;
231 LOG_DEBUG (( "Poll_select::Deliver ()"));
232
233 if ( (rc = pthread_mutex_lock (&_mutex)) != 0 ) {
234 MSG_IOevent_Pollthread_LOG (rc);
235 _manager->QueueAt (&_myevent, 10000);
236 _queued = true;
237 return;
238 }
239
240 if ( _terminating ) {
241 pthread_mutex_unlock(&_mutex);
242 return;
243 }
244
245 if ( msg->terminate ) {
246
247
248
249 _queued = true;
250
251 _terminating = true;
252
253 tellTerminating();
254
255 bool wake = false;
256 if ( _inpoll && !_wakepending ) {
257 wake = true;
258 _wakepending = true;
259 }
260
261 pthread_mutex_unlock(&_mutex);
262
263 if ( wake ) {
264 char bytev[1];
265 bytev[0] = 2;
266 if ( sendto (_wakefds[1], bytev, 1, 0,
267 reinterpret_cast<sockaddr *>(&_addr), sizeof _addr) < 1 )
268 MSG_IOevent_Pollerror_LOG ("write");
269 }
270 return;
271 }
272
273 if ( _wakefds[0] == INVALID_ENDPOINT && setup_wakeup() != OK ) {
274 _manager->QueueAt (&_myevent, 1000);
275 _queued = true;
276 pthread_mutex_unlock(&_mutex);
277 return;
278 }
279
280 isode_fd_set rfds (_nfds);
281 isode_fd_set wfds (_nfds);
282 isode_fd_set efds (_nfds);
283
284
285 endpoint_t maxfd = _wakefds[0];
286
287 rfds.set (_wakefds[0]);
288 efds.set (_wakefds[0]);
289
290
291 int nfds = 1;
292 polldatamap_t::const_iterator scan;
293 for ( scan = _internal.begin(); scan != _internal.end(); ++scan ) {
294 if ( scan->first == INVALID_ENDPOINT || scan->second->_pevents == 0 ) continue;
295
296 if ( scan->second->_pevents & POLLIN )
297 rfds.set (scan->first);
298
299 if ( scan->second->_pevents & POLLOUT )
300 wfds.set (scan->first);
301
302 efds.set (scan->first);
303
304 if ( scan->first > maxfd )
305 maxfd = scan->first;
306
307 nfds++;
308 }
309
310 int timeout = -1;
311
312 const timespec *wakeat = _manager->NextEventDue ();
313
314 if ( wakeat ) {
315 struct timespec now;
317
318
320 }
321
322 _inpoll = true;
323
324 pthread_mutex_unlock(&_mutex);
325
326
327 _manager->Blocking();
328
329 struct timeval timev, *tp;
330
331 if ( timeout >= 0 ) {
332
333 tp = &timev;
334 tp->tv_sec = timeout / 1000;
335 tp->tv_usec = (timeout % 1000) * 1000;
336 } else {
337 tp = 0;
338 }
339
340 LOG_DEBUG (("Poll_select::Wait: select nfds=%d", nfds));
341 rc = select (maxfd+1, rfds.fds(), wfds.fds(), efds.fds(), tp);
342 LOG_DEBUG (("Poll_select::Wait select()=%d", rc));
343 int eno = errno;
344
345
346 _manager->Unblocked();
347
348 if ( rc < 0 ) {
349 if ( (rc = pthread_mutex_lock (&_mutex)) != 0 ) {
350 MSG_IOevent_Pollthread_LOG (rc);
351 _manager->QueueAt (&_myevent, 10000);
352 _queued = true;
353 _inpoll = false;
354 return;
355 }
356
357 _inpoll = false;
358
359#ifdef IC_WIN32
360 int lasterr = WSAGetLastError();
361 LOG_DEBUG (("Poll_select::Wait: select error %d", lasterr));
362 switch ( lasterr ) {
363 case WSANOTINITIALISED:
364 case WSAENETDOWN:
365
366 MSG_IOevent_Pollerror_LOG ("select");
367 _queued = true;
368 _terminating = true;
369 tellTerminating();
370 return;
371
372 case WSAEINTR:
373 LOG_DEBUG (( "Poll_select::Wait: select EINTR"));
374
375 _manager->Queue (&_myevent, ::Event::Manager::MAXPRIO);
376 break;
377
378 default:
379 MSG_IOevent_Pollerror_LOG ("select");
380
381 _manager->QueueAt (&_myevent, 1000);
382 break;
383 }
384#else
385 if ( eno == EINTR ) {
386 LOG_DEBUG (( "Poll_select::Wait: select EINTR"));
387
388 _manager->Queue (&_myevent, ::Event::Manager::MAXPRIO);
389
390 } else {
391 MSG_IOevent_Pollerror_LOG ("select");
392
393 _manager->QueueAt (&_myevent, 1000);
394 }
395#endif
396
397 _queued = true;
398 pthread_mutex_unlock(&_mutex);
399 return;
400 }
401
402
403 if ( rfds.is_set (_wakefds[0]) ) {
404 char buffer[128];
405 struct sockaddr_in addr;
406#ifdef RECVFROM_NEEDS_INTP
407 int alen;
408#else
409 socklen_t alen;
410#endif
411 alen = sizeof addr;
412 if ( recvfrom (_wakefds[0], buffer, sizeof buffer, 0,
413 reinterpret_cast<sockaddr *>(&addr), &alen) < 0 )
414 MSG_IOevent_Pollerror_LOG ("recvfrom");
415 rc--;
416 }
417
418 if ( (rc = pthread_mutex_lock (&_mutex)) != 0 ) {
419 MSG_IOevent_Pollthread_LOG (rc);
420 _manager->QueueAt (&_myevent, 10000);
421 _queued = true;
422 _inpoll = false;
423 _wakepending = false;
424 return;
425 }
426
427 _inpoll = false;
428 _wakepending = false;
429
430 if ( _terminating )
431 return;
432
433 pollmsg doevents;
434
435
436 int waitingfds = 0;
437
438
439 polldatamap_t::iterator it1, it2;
440 for ( it1 = _internal.begin(); it1 != _internal.end(); ++it1 ) {
441 if ( it1->first == INVALID_ENDPOINT || it1->second->_pevents == 0 ) continue;
442
443 pollmsg these_events;
444
445 these_events.events = 0;
446
447 if ( efds.is_set (it1->first) )
448 these_events.events |= Event_Err;
449
450 if ( rfds.is_set (it1->first) )
451 these_events.events |= Event_In;
452
453 if ( wfds.is_set (it1->first) )
455
456 if ( these_events.events != 0 ) {
457 it1->second->_pevents = 0;
458 it1->second->_event.Send(these_events);
459 }
460
461
462 if ( it1->second->_pevents != 0 )
463 waitingfds++;
464 }
465
466
467
468
469 for ( it1 = _internal.begin() ; it1 != _internal.end() ; ) {
470 it2 = it1;
471 it1++;
472
473 if ( efds.is_set (it2->first) ) {
474
475 _internal.erase (it2);
476 _nfds--;
477 }
478 }
479
480
481 LOG_DEBUG (("Poll_select: waitingfds=%d", waitingfds));
482 if ( waitingfds ) {
483 _queued = true;
484 _manager->Queue (&_myevent, ::Event::Manager::MAXPRIO, true);
485 } else {
486 _queued = false;
487 }
488
489 pthread_mutex_unlock(&_mutex);
490 }
@ Event_Out
There is urgent data to read.
void getNow(struct timespec &now)
Return the current time.
unsigned long timespec_diff(const struct timespec *time1, const struct timespec *time2)
Function for getting the difference between two times.