Poll::Poll_select Class Reference

! Poll provider using poll() itself. More...

Inheritance diagram for Poll::Poll_select:
Poll::Provider

Public Member Functions

 Poll_select ()
 Tell users we are stopping.
 
virtual int Register (User *user)
 Register an endpoint.
 
virtual void Deregister (User *user)
 Deregister endpoint.
 
virtual void Control (User *user)
 Set the events which are interesting for end point.
 
virtual void Deliver (pollAction *msg)
 Interface to itself, via Event Manager.
 
- Public Member Functions inherited from Poll::Provider
virtual ~Provider ()
 Virtual destructor.
 

Additional Inherited Members

- Static Public Member Functions inherited from Poll::Provider
static ProviderGetPollService ()
 Implement as a singleton.
 
static int ProviderFactory (const char *type, MSGstruct *msg)
 Factory method.for stream provider.
 

Detailed Description

! Poll provider using poll() itself.

Definition at line 88 of file Poll_select.C.

Constructor & Destructor Documentation

◆ Poll_select()

Poll::Poll_select::Poll_select ( )

Tell users we are stopping.

Definition at line 127 of file Poll_select.C.

128 {
129 pthread_mutex_init (&_mutex, NULL);
130
131 pollAction action;
132
133 action.terminate = false;
134 _myevent.SetUserData (this, action);
135
136 action.terminate = true;
137 _termevent.SetUserData (this, action);
138
139 _nfds = 1; // Count pipe, although not yet open
140
141 _inpoll = false;
142 _wakepending = false;
143
144 _terminating = false;
145 _queued = false;
146
147 // Leave creating sockets until we can report error
148 _wakefds[0] = _wakefds[1] = INVALID_ENDPOINT;
149
150 _manager = ::Event::Manager::GetManager();
151
152 _manager->QueueAtExit (&_termevent);
153 }

◆ ~Poll_select()

Poll::Poll_select::~Poll_select ( )
virtual

Definition at line 155 of file Poll_select.C.

156 {
157 pthread_mutex_destroy (&_mutex);
158 if ( _wakefds[0] != INVALID_ENDPOINT) closesocket (_wakefds[0]);
159 if ( _wakefds[1] != INVALID_ENDPOINT) closesocket (_wakefds[1]);
160 }

Member Function Documentation

◆ Register()

int Poll::Poll_select::Register ( User user)
virtual

Register an endpoint.

Returns
zero on success, -1 if already registered or other system errno value

Implements Poll::Provider.

Definition at line 492 of file Poll_select.C.

493 {
494 int rc;
495 endpoint_t fd = user->_fd;
496
497 LOG_DEBUG (("Poll_select: Register: " ENDPOINT_FMT, fd));
498
499 if ( (rc = pthread_mutex_lock (&_mutex)) != 0 )
500 return rc;
501
502 if ( _terminating ) {
503 rc = 0;
504 } else {
505
506 // Check to see if fd already registered
507 polldatamap_t::iterator entry = _internal.find (fd);
508
509 if ( entry == _internal.end() ) {
510 _internal[fd] = user;
511 _nfds++;
512
513 rc = 0;
514
515 } else {
516 rc = -1;
517 }
518 }
519
520 pthread_mutex_unlock (&_mutex);
521 return rc;
522 }

References Poll::User::_fd.

◆ Deregister()

void Poll::Poll_select::Deregister ( User user)
virtual

Deregister endpoint.

Implements Poll::Provider.

Definition at line 524 of file Poll_select.C.

525 {
526 endpoint_t fd = user->_fd;
527
528 LOG_DEBUG (("Poll_select: Deregister: " ENDPOINT_FMT, fd));
529
530 if ( (pthread_mutex_lock (&_mutex)) != 0 )
531 return;
532
533 polldatamap_t::iterator entry = _internal.find (fd);
534
535 if ( entry != _internal.end() ) {
536
537 // Do NOT cancel the event, as the poll user should be the
538 // only user of this.
539
540 _internal.erase (entry);
541 _nfds--;
542
543 // Do not wake poll(). If the end point were in use, any events
544 // will be cancelled.
545 }
546
547 pthread_mutex_unlock (&_mutex);
548 }

References Poll::User::_fd.

◆ Control()

void Poll::Poll_select::Control ( User user)
virtual

Set the events which are interesting for end point.

Implements Poll::Provider.

Definition at line 550 of file Poll_select.C.

551 {
552 endpoint_t fd = user->_fd;
553
554 LOG_DEBUG (("Poll_poll: Control: " ENDPOINT_FMT " %x", fd, user->_uevents));
555
556 if ( (pthread_mutex_lock (&_mutex)) != 0 )
557 return;
558
559 if ( _terminating ) {
560 pthread_mutex_unlock (&_mutex);
561 return;
562 }
563
564 bool wake = false;
565 unsigned old_pevents = user->_pevents;
566
567 LOG_DEBUG (("Control: queued=%d inpoll=%d wakepending=%d uevents=%x pevents=%x",
568 _queued, _inpoll, _wakepending,
569 user->_uevents, old_pevents));
570
571 user->_pevents = user->_uevents;
572
573 // Check for events set in new mask which are NOT in old
574 // If set need to wake poll unless wake pending or not in poll
575 if ( _inpoll && !_wakepending &&
576 (user->_uevents & (~old_pevents)) != 0 ) {
577 wake = true;
578
579 // Set here, in critical section, even though it will
580 // not actually be true until later
581 _wakepending = true;
582
583 LOG_DEBUG (("Control: waking polling thread"));
584
585 } else if ( !_inpoll && !_queued && user->_uevents != 0 ) {
586 // If there is no event queued on event manager, and there
587 // are events here to be waited for, then need to queue
588
589 LOG_DEBUG (("Control: adding Poll event"));
590 _queued = true;
591 _manager->Queue (&_myevent, ::Event::Manager::MAXPRIO);
592 }
593
594 pthread_mutex_unlock (&_mutex);
595
596 if ( wake ) {
597 // Then wake by writing byte
598 char bytev[1];
599 bytev[0] = 1;
600 if ( sendto (_wakefds[1], bytev, 1, 0,
601 reinterpret_cast<sockaddr *>(&_addr), sizeof _addr) < 1 )
602 MSG_IOevent_Pollerror_LOG ("write");
603 }
604 }

References Poll::User::_fd, Poll::User::_pevents, and Poll::User::_uevents.

◆ Deliver()

void Poll::Poll_select::Deliver ( pollAction msg)
virtual

Interface to itself, via Event Manager.

Implements Poll::Provider.

Definition at line 228 of file Poll_select.C.

229 {
230 int rc;
231 LOG_DEBUG (( "Poll_select::Deliver ()"));
232
233 if ( (rc = pthread_mutex_lock (&_mutex)) != 0 ) {
234 MSG_IOevent_Pollthread_LOG (rc);
235 _manager->QueueAt (&_myevent, 10000);
236 _queued = true;
237 return;
238 }
239
240 if ( _terminating ) {
241 pthread_mutex_unlock(&_mutex);
242 return;
243 }
244
245 if ( msg->terminate ) {
246 // System is shutting down
247
248 // Prevents queuing
249 _queued = true;
250
251 _terminating = true;
252
253 tellTerminating();
254
255 bool wake = false;
256 if ( _inpoll && !_wakepending ) {
257 wake = true;
258 _wakepending = true;
259 }
260
261 pthread_mutex_unlock(&_mutex);
262
263 if ( wake ) {
264 char bytev[1];
265 bytev[0] = 2;
266 if ( sendto (_wakefds[1], bytev, 1, 0,
267 reinterpret_cast<sockaddr *>(&_addr), sizeof _addr) < 1 )
268 MSG_IOevent_Pollerror_LOG ("write");
269 }
270 return;
271 } // end of handling termination
272
273 if ( _wakefds[0] == INVALID_ENDPOINT && setup_wakeup() != OK ) {
274 _manager->QueueAt (&_myevent, 1000);
275 _queued = true;
276 pthread_mutex_unlock(&_mutex);
277 return;
278 }
279
280 isode_fd_set rfds (_nfds);
281 isode_fd_set wfds (_nfds);
282 isode_fd_set efds (_nfds);
283
284 // First entry is always for wake-up FD
285 endpoint_t maxfd = _wakefds[0];
286
287 rfds.set (_wakefds[0]);
288 efds.set (_wakefds[0]);
289
290 // scan registered FDs for those which are needing polling
291 int nfds = 1;
292 polldatamap_t::const_iterator scan;
293 for ( scan = _internal.begin(); scan != _internal.end(); ++scan ) {
294 if ( scan->first == INVALID_ENDPOINT || scan->second->_pevents == 0 ) continue;
295
296 if ( scan->second->_pevents & POLLIN )
297 rfds.set (scan->first);
298
299 if ( scan->second->_pevents & POLLOUT )
300 wfds.set (scan->first);
301
302 efds.set (scan->first);
303
304 if ( scan->first > maxfd )
305 maxfd = scan->first;
306
307 nfds++;
308 }
309
310 int timeout = -1;
311
312 const timespec *wakeat = _manager->NextEventDue ();
313
314 if ( wakeat ) {
315 struct timespec now;
316 getNow(now);
317 // timespec_diff returns difference in microseconds.
318 // returns 0 if delay preceeds now.
319 timeout = timespec_diff (wakeat, &now)/1000;
320 }
321
322 _inpoll = true; // about to be true
323
324 pthread_mutex_unlock(&_mutex);
325
326 // Tell event manager we might go away for a while
327 _manager->Blocking();
328
329 struct timeval timev, *tp;
330
331 if ( timeout >= 0 ) {
332 // timeout is in millisecs
333 tp = &timev;
334 tp->tv_sec = timeout / 1000;
335 tp->tv_usec = (timeout % 1000) * 1000;
336 } else {
337 tp = 0;
338 }
339
340 LOG_DEBUG (("Poll_select::Wait: select nfds=%d", nfds));
341 rc = select (maxfd+1, rfds.fds(), wfds.fds(), efds.fds(), tp);
342 LOG_DEBUG (("Poll_select::Wait select()=%d", rc));
343 int eno = errno; // Preserve errno
344
345 // Tell event manager we are back
346 _manager->Unblocked();
347
348 if ( rc < 0 ) {
349 if ( (rc = pthread_mutex_lock (&_mutex)) != 0 ) {
350 MSG_IOevent_Pollthread_LOG (rc);
351 _manager->QueueAt (&_myevent, 10000);
352 _queued = true;
353 _inpoll = false;
354 return;
355 }
356
357 _inpoll = false;
358
359#ifdef IC_WIN32
360 int lasterr = WSAGetLastError();
361 LOG_DEBUG (("Poll_select::Wait: select error %d", lasterr));
362 switch ( lasterr ) {
363 case WSANOTINITIALISED:
364 case WSAENETDOWN:
365 // No point in carrying on, terminate
366 MSG_IOevent_Pollerror_LOG ("select");
367 _queued = true;
368 _terminating = true;
369 tellTerminating();
370 return;
371
372 case WSAEINTR:
373 LOG_DEBUG (( "Poll_select::Wait: select EINTR"));
374 // Queue event to self again.
375 _manager->Queue (&_myevent, ::Event::Manager::MAXPRIO);
376 break;
377
378 default:
379 MSG_IOevent_Pollerror_LOG ("select");
380 // Try again in one second
381 _manager->QueueAt (&_myevent, 1000);
382 break;
383 }
384#else
385 if ( eno == EINTR ) {
386 LOG_DEBUG (( "Poll_select::Wait: select EINTR"));
387 // Queue event to self again.
388 _manager->Queue (&_myevent, ::Event::Manager::MAXPRIO);
389
390 } else {
391 MSG_IOevent_Pollerror_LOG ("select");
392 // Try again in one second
393 _manager->QueueAt (&_myevent, 1000);
394 }
395#endif
396
397 _queued = true;
398 pthread_mutex_unlock(&_mutex);
399 return;
400 }
401
402 // Soak up wakeup bytes outside critical section
403 if ( rfds.is_set (_wakefds[0]) ) {
404 char buffer[128];
405 struct sockaddr_in addr;
406#ifdef RECVFROM_NEEDS_INTP
407 int alen;
408#else
409 socklen_t alen;
410#endif
411 alen = sizeof addr;
412 if ( recvfrom (_wakefds[0], buffer, sizeof buffer, 0,
413 reinterpret_cast<sockaddr *>(&addr), &alen) < 0 )
414 MSG_IOevent_Pollerror_LOG ("recvfrom");
415 rc--;
416 }
417
418 if ( (rc = pthread_mutex_lock (&_mutex)) != 0 ) {
419 MSG_IOevent_Pollthread_LOG (rc);
420 _manager->QueueAt (&_myevent, 10000);
421 _queued = true;
422 _inpoll = false;
423 _wakepending = false;
424 return;
425 }
426
427 _inpoll = false;
428 _wakepending = false;
429
430 if ( _terminating )
431 return;
432
433 pollmsg doevents;
434
435 // Count FDs which remain
436 int waitingfds = 0;
437
438 // With select, we scan all the endpoints again.
439 polldatamap_t::iterator it1, it2;
440 for ( it1 = _internal.begin(); it1 != _internal.end(); ++it1 ) {
441 if ( it1->first == INVALID_ENDPOINT || it1->second->_pevents == 0 ) continue;
442
443 pollmsg these_events;
444
445 these_events.events = 0;
446
447 if ( efds.is_set (it1->first) )
448 these_events.events |= Event_Err;
449
450 if ( rfds.is_set (it1->first) )
451 these_events.events |= Event_In;
452
453 if ( wfds.is_set (it1->first) )
454 these_events.events |= Event_Out;
455
456 if ( these_events.events != 0 ) {
457 it1->second->_pevents = 0;
458 it1->second->_event.Send(these_events);
459 }
460
461 // If events remain, then still need to wait
462 if ( it1->second->_pevents != 0 )
463 waitingfds++;
464 }
465
466 // Need a second scan, as we need to deregister all entries
467 // which had errors
468
469 for ( it1 = _internal.begin() ; it1 != _internal.end() ; ) {
470 it2 = it1;
471 it1++;
472
473 if ( efds.is_set (it2->first) ) {
474 // I hope this does not invalidate it1
475 _internal.erase (it2);
476 _nfds--;
477 }
478 }
479
480 // Only queue event for processing with poll if items remain
481 LOG_DEBUG (("Poll_select: waitingfds=%d", waitingfds));
482 if ( waitingfds ) {
483 _queued = true;
484 _manager->Queue (&_myevent, ::Event::Manager::MAXPRIO, true);
485 } else {
486 _queued = false;
487 }
488
489 pthread_mutex_unlock(&_mutex);
490 }
@ Event_Out
There is urgent data to read.
void getNow(struct timespec &now)
Return the current time.
Definition timeutil.h:60
unsigned long timespec_diff(const struct timespec *time1, const struct timespec *time2)
Function for getting the difference between two times.
Definition timeutil.h:42

References Poll::Event_Err, Poll::Event_Out, getNow(), and timespec_diff().


The documentation for this class was generated from the following file:

All rights reserved © 2002 - 2024 Isode Ltd.