Poll_select.C
1// Copyright (c) 2005-2010, Isode Limited, London, England.
2// All rights reserved.
3//
4// Acquisition and use of this software and related materials for any
5// purpose requires a written licence agreement from Isode Limited,
6// or a written licence from an organisation licenced by Isode Limited
7// to grant such a licence.
8//
9
10
11//
12//
13// Poll_select.C
14//
15// Poll provider using select
16//
17// @VERSION@
18
19// Define this here before the inclusion of the Winsock header files
20// but only redefine on Windows
21#include <isode/config.h>
22#ifdef IC_WIN32
23#define FD_SETSIZE 4096
24#endif
25
26#include <errno.h>
27#include <map>
28
29#include "Event_p.h"
30
31#include <isode/ll/internet.h>
32
33#ifdef HAVE_SYS_SELECT_H
34#include <sys/select.h>
35#endif
36
37namespace Poll {
38
39 // Wrap up an FD set implementation, so that we can have an extensible
40 // set on Windows.
41
42#ifdef IC_WIN32
43 class isode_fd_set {
44 private:
45 fd_set _ifds;
46 fd_set *_fdsp;
47
48 public:
49 isode_fd_set (int nfds) {
50 if ( nfds > FD_SETSIZE ) {
51 _fdsp = (fd_set *)
52 smalloc (sizeof (*_fdsp)+(nfds-FD_SETSIZE)*sizeof(endpoint_t));
53 _fdsp->fd_count = 0;
54
55 } else {
56 _fdsp = &_ifds;
57 FD_ZERO (_fdsp);
58 }
59 }
60
61 ~isode_fd_set () { if ( _fdsp != &_ifds ) free (_fdsp); }
62
63 void set (endpoint_t fd) { FD_SET (fd, _fdsp); }
64
65 bool is_set (endpoint_t fd) { return 0 != FD_ISSET(fd, _fdsp); }
66
67 fd_set *fds () { return _fdsp; }
68 };
69#else
71 private:
72 fd_set _fds;
73
74 public:
75 isode_fd_set (int nfds ARGNOTUSED) { FD_ZERO (&_fds); }
76
77 void set (endpoint_t fd) { FD_SET (fd, &_fds); }
78
79 bool is_set (endpoint_t fd) { return FD_ISSET (fd, &_fds); }
80
81 fd_set *fds () { return &_fds; }
82 };
83#endif
84
85 typedef std::map<int, User *> polldatamap_t;
86
88 class Poll_select : public Provider {
89 private:
90 pthread_mutex_t _mutex;
91
92 polldatamap_t _internal;
93 int _nfds;
94
95 struct sockaddr_in _addr;
96 endpoint_t _wakefds[2];
97 bool _inpoll;
98 bool _wakepending;
99 bool _terminating;
100 bool _queued;
101 ::Event::Manager *_manager;
102 ProviderEvent _myevent;
103 ProviderEvent _termevent;
104
105 int setup_wakeup ();
106 void tellTerminating();
107
108 public:
109 Poll_select ();
110
111 virtual ~Poll_select ();
112
115 virtual int Register (User *user);
116
118 virtual void Deregister (User *user);
119
121 virtual void Control (User *user);
122
124 virtual void Deliver (pollAction *msg);
125 };
126
128 {
129 pthread_mutex_init (&_mutex, NULL);
130
131 pollAction action;
132
133 action.terminate = false;
134 _myevent.SetUserData (this, action);
135
136 action.terminate = true;
137 _termevent.SetUserData (this, action);
138
139 _nfds = 1; // Count pipe, although not yet open
140
141 _inpoll = false;
142 _wakepending = false;
143
144 _terminating = false;
145 _queued = false;
146
147 // Leave creating sockets until we can report error
148 _wakefds[0] = _wakefds[1] = INVALID_ENDPOINT;
149
150 _manager = ::Event::Manager::GetManager();
151
152 _manager->QueueAtExit (&_termevent);
153 }
154
155 Poll_select::~Poll_select ()
156 {
157 pthread_mutex_destroy (&_mutex);
158 if ( _wakefds[0] != INVALID_ENDPOINT) closesocket (_wakefds[0]);
159 if ( _wakefds[1] != INVALID_ENDPOINT) closesocket (_wakefds[1]);
160 }
161
162 int Poll_select::setup_wakeup () {
163
164 if ( (_wakefds[1] = socket (AF_INET, SOCK_DGRAM, 0)) == INVALID_ENDPOINT ) {
165 MSG_IOevent_Pollerror_LOG ("socket");
166 return NOTOK;
167 }
168
169 _addr.sin_family = AF_INET;
170
171 isode_inet_aton ("127.0.0.1", &_addr.sin_addr);
172
173 u_short port;
174 for ( port = 0xFFFF; port > 0; port-- ) {
175 _addr.sin_port = htons (port);
176
177 if ( (_wakefds[0] = socket (AF_INET, SOCK_DGRAM, 0)) == INVALID_ENDPOINT ) {
178 MSG_IOevent_Pollerror_LOG ("socket");
179 closesocket (_wakefds[1]);
180 _wakefds[1] = INVALID_ENDPOINT;
181 return NOTOK;
182 }
183
184 if ( bind (_wakefds[0], reinterpret_cast<sockaddr *>(&_addr),
185 sizeof (_addr)) == 0 )
186 break;
187
188 isode_set_errno ();
189
190 // Always need to close port, even if we are trying again
191 closesocket (_wakefds[0]);
192 _wakefds[0] = INVALID_ENDPOINT;
193
194 if ( errno == EADDRINUSE ) {
195 LOG_DEBUG (("Failed to bind to port %d - in use", port));
196 } else {
197 MSG_IOevent_Pollerror_LOG ("bind");
198 closesocket (_wakefds[1]);
199 _wakefds[1] = INVALID_ENDPOINT;
200 return NOTOK;
201 }
202 }
203
204 if ( port == 0 ) {
205 MSG_IOevent_Pollerror_LOG ("bind");
206 return NOTOK;
207 }
208
209 return OK;
210 }
211
212 void Poll_select::tellTerminating ()
213 {
214 // Tell users that we are terminating
215 // There is an implied removal of registrations
216 // The PollUser MUST NOT refer to the EventSource again
217 // Note: this is in critical section, so would get deadlock
218 // if the PollUser called back to Deregister.
219
220 ::Poll::pollmsg cancelmsg;
221 cancelmsg.events = Poll::Event_Terminate;
222
223 polldatamap_t::iterator scan;
224 for ( scan = _internal.begin(); scan != _internal.end(); ++scan )
225 scan->second->_event.SendAtExit (cancelmsg);
226 }
227
229 {
230 int rc;
231 LOG_DEBUG (( "Poll_select::Deliver ()"));
232
233 if ( (rc = pthread_mutex_lock (&_mutex)) != 0 ) {
234 MSG_IOevent_Pollthread_LOG (rc);
235 _manager->QueueAt (&_myevent, 10000);
236 _queued = true;
237 return;
238 }
239
240 if ( _terminating ) {
241 pthread_mutex_unlock(&_mutex);
242 return;
243 }
244
245 if ( msg->terminate ) {
246 // System is shutting down
247
248 // Prevents queuing
249 _queued = true;
250
251 _terminating = true;
252
253 tellTerminating();
254
255 bool wake = false;
256 if ( _inpoll && !_wakepending ) {
257 wake = true;
258 _wakepending = true;
259 }
260
261 pthread_mutex_unlock(&_mutex);
262
263 if ( wake ) {
264 char bytev[1];
265 bytev[0] = 2;
266 if ( sendto (_wakefds[1], bytev, 1, 0,
267 reinterpret_cast<sockaddr *>(&_addr), sizeof _addr) < 1 )
268 MSG_IOevent_Pollerror_LOG ("write");
269 }
270 return;
271 } // end of handling termination
272
273 if ( _wakefds[0] == INVALID_ENDPOINT && setup_wakeup() != OK ) {
274 _manager->QueueAt (&_myevent, 1000);
275 _queued = true;
276 pthread_mutex_unlock(&_mutex);
277 return;
278 }
279
280 isode_fd_set rfds (_nfds);
281 isode_fd_set wfds (_nfds);
282 isode_fd_set efds (_nfds);
283
284 // First entry is always for wake-up FD
285 endpoint_t maxfd = _wakefds[0];
286
287 rfds.set (_wakefds[0]);
288 efds.set (_wakefds[0]);
289
290 // scan registered FDs for those which are needing polling
291 int nfds = 1;
292 polldatamap_t::const_iterator scan;
293 for ( scan = _internal.begin(); scan != _internal.end(); ++scan ) {
294 if ( scan->first == INVALID_ENDPOINT || scan->second->_pevents == 0 ) continue;
295
296 if ( scan->second->_pevents & POLLIN )
297 rfds.set (scan->first);
298
299 if ( scan->second->_pevents & POLLOUT )
300 wfds.set (scan->first);
301
302 efds.set (scan->first);
303
304 if ( scan->first > maxfd )
305 maxfd = scan->first;
306
307 nfds++;
308 }
309
310 int timeout = -1;
311
312 const timespec *wakeat = _manager->NextEventDue ();
313
314 if ( wakeat ) {
315 struct timespec now;
316 getNow(now);
317 // timespec_diff returns difference in microseconds.
318 // returns 0 if delay preceeds now.
319 timeout = timespec_diff (wakeat, &now)/1000;
320 }
321
322 _inpoll = true; // about to be true
323
324 pthread_mutex_unlock(&_mutex);
325
326 // Tell event manager we might go away for a while
327 _manager->Blocking();
328
329 struct timeval timev, *tp;
330
331 if ( timeout >= 0 ) {
332 // timeout is in millisecs
333 tp = &timev;
334 tp->tv_sec = timeout / 1000;
335 tp->tv_usec = (timeout % 1000) * 1000;
336 } else {
337 tp = 0;
338 }
339
340 LOG_DEBUG (("Poll_select::Wait: select nfds=%d", nfds));
341 rc = select (maxfd+1, rfds.fds(), wfds.fds(), efds.fds(), tp);
342 LOG_DEBUG (("Poll_select::Wait select()=%d", rc));
343 int eno = errno; // Preserve errno
344
345 // Tell event manager we are back
346 _manager->Unblocked();
347
348 if ( rc < 0 ) {
349 if ( (rc = pthread_mutex_lock (&_mutex)) != 0 ) {
350 MSG_IOevent_Pollthread_LOG (rc);
351 _manager->QueueAt (&_myevent, 10000);
352 _queued = true;
353 _inpoll = false;
354 return;
355 }
356
357 _inpoll = false;
358
359#ifdef IC_WIN32
360 int lasterr = WSAGetLastError();
361 LOG_DEBUG (("Poll_select::Wait: select error %d", lasterr));
362 switch ( lasterr ) {
363 case WSANOTINITIALISED:
364 case WSAENETDOWN:
365 // No point in carrying on, terminate
366 MSG_IOevent_Pollerror_LOG ("select");
367 _queued = true;
368 _terminating = true;
369 tellTerminating();
370 return;
371
372 case WSAEINTR:
373 LOG_DEBUG (( "Poll_select::Wait: select EINTR"));
374 // Queue event to self again.
375 _manager->Queue (&_myevent, ::Event::Manager::MAXPRIO);
376 break;
377
378 default:
379 MSG_IOevent_Pollerror_LOG ("select");
380 // Try again in one second
381 _manager->QueueAt (&_myevent, 1000);
382 break;
383 }
384#else
385 if ( eno == EINTR ) {
386 LOG_DEBUG (( "Poll_select::Wait: select EINTR"));
387 // Queue event to self again.
388 _manager->Queue (&_myevent, ::Event::Manager::MAXPRIO);
389
390 } else {
391 MSG_IOevent_Pollerror_LOG ("select");
392 // Try again in one second
393 _manager->QueueAt (&_myevent, 1000);
394 }
395#endif
396
397 _queued = true;
398 pthread_mutex_unlock(&_mutex);
399 return;
400 }
401
402 // Soak up wakeup bytes outside critical section
403 if ( rfds.is_set (_wakefds[0]) ) {
404 char buffer[128];
405 struct sockaddr_in addr;
406#ifdef RECVFROM_NEEDS_INTP
407 int alen;
408#else
409 socklen_t alen;
410#endif
411 alen = sizeof addr;
412 if ( recvfrom (_wakefds[0], buffer, sizeof buffer, 0,
413 reinterpret_cast<sockaddr *>(&addr), &alen) < 0 )
414 MSG_IOevent_Pollerror_LOG ("recvfrom");
415 rc--;
416 }
417
418 if ( (rc = pthread_mutex_lock (&_mutex)) != 0 ) {
419 MSG_IOevent_Pollthread_LOG (rc);
420 _manager->QueueAt (&_myevent, 10000);
421 _queued = true;
422 _inpoll = false;
423 _wakepending = false;
424 return;
425 }
426
427 _inpoll = false;
428 _wakepending = false;
429
430 if ( _terminating )
431 return;
432
433 pollmsg doevents;
434
435 // Count FDs which remain
436 int waitingfds = 0;
437
438 // With select, we scan all the endpoints again.
439 polldatamap_t::iterator it1, it2;
440 for ( it1 = _internal.begin(); it1 != _internal.end(); ++it1 ) {
441 if ( it1->first == INVALID_ENDPOINT || it1->second->_pevents == 0 ) continue;
442
443 pollmsg these_events;
444
445 these_events.events = 0;
446
447 if ( efds.is_set (it1->first) )
448 these_events.events |= Event_Err;
449
450 if ( rfds.is_set (it1->first) )
451 these_events.events |= Event_In;
452
453 if ( wfds.is_set (it1->first) )
454 these_events.events |= Event_Out;
455
456 if ( these_events.events != 0 ) {
457 it1->second->_pevents = 0;
458 it1->second->_event.Send(these_events);
459 }
460
461 // If events remain, then still need to wait
462 if ( it1->second->_pevents != 0 )
463 waitingfds++;
464 }
465
466 // Need a second scan, as we need to deregister all entries
467 // which had errors
468
469 for ( it1 = _internal.begin() ; it1 != _internal.end() ; ) {
470 it2 = it1;
471 it1++;
472
473 if ( efds.is_set (it2->first) ) {
474 // I hope this does not invalidate it1
475 _internal.erase (it2);
476 _nfds--;
477 }
478 }
479
480 // Only queue event for processing with poll if items remain
481 LOG_DEBUG (("Poll_select: waitingfds=%d", waitingfds));
482 if ( waitingfds ) {
483 _queued = true;
484 _manager->Queue (&_myevent, ::Event::Manager::MAXPRIO, true);
485 } else {
486 _queued = false;
487 }
488
489 pthread_mutex_unlock(&_mutex);
490 }
491
493 {
494 int rc;
495 endpoint_t fd = user->_fd;
496
497 LOG_DEBUG (("Poll_select: Register: " ENDPOINT_FMT, fd));
498
499 if ( (rc = pthread_mutex_lock (&_mutex)) != 0 )
500 return rc;
501
502 if ( _terminating ) {
503 rc = 0;
504 } else {
505
506 // Check to see if fd already registered
507 polldatamap_t::iterator entry = _internal.find (fd);
508
509 if ( entry == _internal.end() ) {
510 _internal[fd] = user;
511 _nfds++;
512
513 rc = 0;
514
515 } else {
516 rc = -1;
517 }
518 }
519
520 pthread_mutex_unlock (&_mutex);
521 return rc;
522 }
523
525 {
526 endpoint_t fd = user->_fd;
527
528 LOG_DEBUG (("Poll_select: Deregister: " ENDPOINT_FMT, fd));
529
530 if ( (pthread_mutex_lock (&_mutex)) != 0 )
531 return;
532
533 polldatamap_t::iterator entry = _internal.find (fd);
534
535 if ( entry != _internal.end() ) {
536
537 // Do NOT cancel the event, as the poll user should be the
538 // only user of this.
539
540 _internal.erase (entry);
541 _nfds--;
542
543 // Do not wake poll(). If the end point were in use, any events
544 // will be cancelled.
545 }
546
547 pthread_mutex_unlock (&_mutex);
548 }
549
551 {
552 endpoint_t fd = user->_fd;
553
554 LOG_DEBUG (("Poll_poll: Control: " ENDPOINT_FMT " %x", fd, user->_uevents));
555
556 if ( (pthread_mutex_lock (&_mutex)) != 0 )
557 return;
558
559 if ( _terminating ) {
560 pthread_mutex_unlock (&_mutex);
561 return;
562 }
563
564 bool wake = false;
565 unsigned old_pevents = user->_pevents;
566
567 LOG_DEBUG (("Control: queued=%d inpoll=%d wakepending=%d uevents=%x pevents=%x",
568 _queued, _inpoll, _wakepending,
569 user->_uevents, old_pevents));
570
571 user->_pevents = user->_uevents;
572
573 // Check for events set in new mask which are NOT in old
574 // If set need to wake poll unless wake pending or not in poll
575 if ( _inpoll && !_wakepending &&
576 (user->_uevents & (~old_pevents)) != 0 ) {
577 wake = true;
578
579 // Set here, in critical section, even though it will
580 // not actually be true until later
581 _wakepending = true;
582
583 LOG_DEBUG (("Control: waking polling thread"));
584
585 } else if ( !_inpoll && !_queued && user->_uevents != 0 ) {
586 // If there is no event queued on event manager, and there
587 // are events here to be waited for, then need to queue
588
589 LOG_DEBUG (("Control: adding Poll event"));
590 _queued = true;
591 _manager->Queue (&_myevent, ::Event::Manager::MAXPRIO);
592 }
593
594 pthread_mutex_unlock (&_mutex);
595
596 if ( wake ) {
597 // Then wake by writing byte
598 char bytev[1];
599 bytev[0] = 1;
600 if ( sendto (_wakefds[1], bytev, 1, 0,
601 reinterpret_cast<sockaddr *>(&_addr), sizeof _addr) < 1 )
602 MSG_IOevent_Pollerror_LOG ("write");
603 }
604 }
605
606 Provider *CreatePollselectProvider ()
607 {
608 return new Poll_select();
609 }
610}
@ Event_Terminate
Invalid request: fd not open.
@ Event_Err
Writing now will not block.
@ Event_Out
There is urgent data to read.
::Event::AsyncEventAux< Provider, pollAction > ProviderEvent
Type for delivering events to the provider.
! Poll provider using poll() itself.
Definition Poll_select.C:88
virtual void Control(User *user)
Set the events which are interesting for end point.
virtual void Deregister(User *user)
Deregister endpoint.
Poll_select()
Tell users we are stopping.
virtual void Deliver(pollAction *msg)
Interface to itself, via Event Manager.
virtual int Register(User *user)
Register an endpoint.
Poll provider interface.
Poll user object interface.
unsigned _pevents
FD for this user.
endpoint_t _fd
Event to User.
unsigned _uevents
Provider's view of events.
Messages from Poll Provider to self, via event manager.
Carries the events on the appropriate FD, or that the end point should terminate.
void getNow(struct timespec &now)
Return the current time.
Definition timeutil.h:60
unsigned long timespec_diff(const struct timespec *time1, const struct timespec *time2)
Function for getting the difference between two times.
Definition timeutil.h:42

All rights reserved © 2002 - 2024 Isode Ltd.