Poll_poll.C
1// Copyright (c) 2005-2010, Isode Limited, London, England.
2// All rights reserved.
3//
4// Acquisition and use of this software and related materials for any
5// purpose requires a written licence agreement from Isode Limited,
6// or a written licence from an organisation licenced by Isode Limited
7// to grant such a licence.
8//
9
10
11//
12//
13// Poll_poll.C
14//
15// Poll provider using poll
16//
17// @VERSION@
18
19#include <errno.h>
20#include <map>
21
22#include "Event_p.h"
23
24#ifdef HAVE_POLL
25
26namespace Poll {
27
28 typedef std::map<int, User *> polldatamap_t;
29
31 class Poll_poll : public Provider {
32 private:
33 pthread_mutex_t _mutex;
34
35 polldatamap_t _internal;
36 struct pollfd *_ufds;
37 int _nfds;
38 int _sfds;
39
40 int _wakefds[2];
41 bool _inpoll;
42 bool _wakepending;
43 bool _terminating;
44 bool _queued;
45 ::Event::Manager *_manager;
46 ProviderEvent _myevent;
47 ProviderEvent _termevent;
48
49 public:
50 Poll_poll ();
51
52 virtual ~Poll_poll ();
53
56 virtual int Register (User *user);
57
59 virtual void Deregister (User *user);
60
62 virtual void Control (User *user);
63
65 virtual void Deliver (pollAction *msg);
66 };
67
68 Poll_poll::Poll_poll ()
69 {
70 pthread_mutex_init (&_mutex, NULL);
71
72 pollAction action;
73
74 action.terminate = false;
75 _myevent.SetUserData (this, action);
76
77 action.terminate = true;
78 _termevent.SetUserData (this, action);
79
80 _ufds = 0;
81 _nfds = 1; // Count pipe, although not yet open
82 _sfds = 0;
83
84 _inpoll = false;
85 _wakepending = false;
86
87 _terminating = false;
88 _queued = false;
89
90 // Leave creating pipe until we can report error
91 _wakefds[0] = _wakefds[1] = -1;
92
93 _manager = ::Event::Manager::GetManager();
94
95 _manager->QueueAtExit (&_termevent);
96 }
97
98 Poll_poll::~Poll_poll ()
99 {
100 pthread_mutex_destroy (&_mutex);
101 if ( _ufds ) delete[] _ufds;
102 if ( _wakefds[0] >= 0 ) close (_wakefds[0]);
103 if ( _wakefds[1] >= 0 ) close (_wakefds[1]);
104 }
105
106 void Poll_poll::Deliver (pollAction *msg)
107 {
108 int rc;
109 LOG_DEBUG (( "Poll_poll::Deliver ()"));
110
111 if ( (rc = pthread_mutex_lock (&_mutex)) != 0 ) {
112 MSG_IOevent_Pollthread_LOG (rc);
113 _manager->QueueAt (&_myevent, 10000);
114 _queued = true;
115 return;
116 }
117
118 if ( _terminating ) {
119 pthread_mutex_unlock(&_mutex);
120 return;
121 }
122
123 if ( msg->terminate ) {
124 // System is shutting down
125
126 // Prevents queuing
127 _queued = true;
128
129 _terminating = true;
130
131 // Tell users that we are terminating
132 // There is an implied removal of registrations
133 // The PollUser MUST NOT refer to the EventSource again
134 // Note: this is in critical section, so would get deadlock
135 // if the PollUser called back to Deregister.
136
137 ::Poll::pollmsg cancelmsg;
138 cancelmsg.events = Poll::Event_Terminate;
139
140 polldatamap_t::iterator scan;
141 for ( scan = _internal.begin(); scan != _internal.end(); ++scan )
142 scan->second->_event.SendAtExit (cancelmsg);
143
144 bool wake = false;
145 if ( _inpoll && !_wakepending ) {
146 wake = true;
147 _wakepending = true;
148 }
149
150 pthread_mutex_unlock(&_mutex);
151
152 if ( wake ) {
153 char bytev[1];
154 bytev[0] = 2;
155 if ( write (_wakefds[1], bytev, 1) < 1 )
156 MSG_IOevent_Pollerror_LOG ("write");
157 }
158 return;
159 } // end of handling termination
160
161 if ( _wakefds[0] == -1 && pipe (_wakefds) != 0 ) {
162 MSG_IOevent_Pollerror_LOG ("pipe");
163 _manager->QueueAt (&_myevent, 1000);
164 _queued = true;
165 pthread_mutex_unlock(&_mutex);
166 return;
167 }
168
169 if ( _ufds == 0 ) {
170 _sfds = _nfds > 1024 ? _nfds : 1024;
171
172 _ufds = new pollfd[_sfds];
173 } else if ( _nfds > _sfds ) {
174
175 delete[] _ufds;
176
177 while ( _sfds < _nfds ) _sfds += 1024;
178
179 _ufds = new pollfd[_sfds];
180 }
181
182 // First entry is always for wake-up FD
183 _ufds[0].fd = _wakefds[0];
184 _ufds[0].events = POLLIN;
185 _ufds[0].revents = 0;
186
187 // scan registered FDs for those which are needing polling
188 int nfds = 1;
189 polldatamap_t::const_iterator scan;
190 for ( scan = _internal.begin(); scan != _internal.end(); ++scan ) {
191 if ( scan->first < 0 || scan->second->_pevents == 0 ) continue;
192
193 _ufds[nfds].fd = scan->first;
194 _ufds[nfds].events = scan->second->_pevents;
195 _ufds[nfds].revents = 0;
196
197 nfds++;
198 }
199
200 int timeout = -1;
201
202 const timespec *wakeat = _manager->NextEventDue ();
203
204 if ( wakeat ) {
205 struct timespec now;
206 getNow(now);
207 // timespec_diff returns difference in microseconds.
208 // returns 0 if delay preceeds now.
209 timeout = timespec_diff (wakeat, &now)/1000;
210 }
211
212 _inpoll = true; // about to be true
213
214 pthread_mutex_unlock(&_mutex);
215
216 // Tell event manager we might go away for a while
217 _manager->Blocking();
218
219 LOG_DEBUG (("Poll_poll::Wait: poll nfds=%d", nfds));
220 rc = poll (_ufds, nfds, timeout);
221 LOG_DEBUG (("Poll_poll::Wait poll()=%d", rc));
222 int eno = errno; // Preserve errno
223
224 // Tell event manager we are back
225 _manager->Unblocked();
226
227 if ( rc < 0 ) {
228 if ( (rc = pthread_mutex_lock (&_mutex)) != 0 ) {
229 MSG_IOevent_Pollthread_LOG (rc);
230 _manager->QueueAt (&_myevent, 10000);
231 _queued = true;
232 _inpoll = false;
233 return;
234 }
235
236 _inpoll = false;
237
238 if ( eno == EINTR ) {
239 LOG_DEBUG (( "Poll_poll::Wait: poll EINTR"));
240 // Queue event to self again.
241 _manager->Queue (&_myevent, ::Event::Manager::MAXPRIO);
242
243 } else {
244 MSG_IOevent_Pollerror_LOG ("poll");
245 // Try again in one second
246 _manager->QueueAt (&_myevent, 1000);
247 }
248
249 _queued = true;
250 pthread_mutex_unlock(&_mutex);
251 return;
252 }
253
254 // Soak up wakeup bytes outside critical section
255 if ( _ufds[0].revents & POLLIN ) {
256 char buffer[128];
257 if ( read (_wakefds[0], buffer, sizeof buffer) < 0 )
258 MSG_IOevent_Pollerror_LOG ("read");
259 rc--;
260 }
261
262 if ( (rc = pthread_mutex_lock (&_mutex)) != 0 ) {
263 MSG_IOevent_Pollthread_LOG (rc);
264 _manager->QueueAt (&_myevent, 10000);
265 _queued = true;
266 _inpoll = false;
267 _wakepending = false;
268 return;
269 }
270
271 _inpoll = false;
272 _wakepending = false;
273
274 if ( _terminating )
275 return;
276
277 pollmsg doevents;
278
279 for ( int i = 1; i < nfds; i++ )
280 if ( _ufds[i].revents ) {
281 polldatamap_t::iterator entry = _internal.find (_ufds[i].fd);
282
283 // Check was not unregistered in the interim
284 if ( entry == _internal.end() ) continue;
285
286 // Clear requested events
287 // Do this in case the event reaches the user after
288 // the poller event gets reexcuted.
289 entry->second->_pevents = 0;
290
291 // Translate events into external mask
292 pollmsg these_events;
293 these_events.events = _ufds[i].revents;
294
295 // Under normal circumstances expect a PollAsyncEvent to
296 // be queued to the event manager
297 // But if it returns a non-zero result, then
298 // we deregister the PollUser
299
300 LOG_DEBUG (("Poll_poll: send %d events %x",
301 entry->first, these_events.events));
302
303 entry->second->_event.Send(these_events);
304
305 // If the events include an error then the endpoint
306 // is automatically deregistered
307 if ( these_events.events & (POLLERR|POLLHUP|POLLNVAL) ) {
308 _internal.erase (entry);
309 _nfds--;
310 }
311 }
312
313 // Count FDs which remain
314 // Need to find if still waiting FDs from the real data
315 // in case this was updated while in poll
316
317 unsigned waitingfds = 0;
318
319 for ( scan = _internal.begin(); scan != _internal.end(); ++scan )
320 if ( scan->first >= 0 && scan->second->_pevents != 0 )
321 waitingfds++;
322
323 // Only queue event for processing with poll if items remain
324 LOG_DEBUG (("Poll_poll: waitingfds=%d", waitingfds));
325 if ( waitingfds ) {
326 _queued = true;
327 _manager->Queue (&_myevent, ::Event::Manager::MAXPRIO, true);
328 } else {
329 _queued = false;
330 }
331
332 pthread_mutex_unlock(&_mutex);
333 }
334
335 int Poll_poll::Register (User *user)
336 {
337 int rc;
338 int fd = user->_fd;
339
340 LOG_DEBUG (("Poll_poll: Register: %d", fd));
341
342 if ( (rc = pthread_mutex_lock (&_mutex)) != 0 )
343 return rc;
344
345 if ( _terminating ) {
346 rc = 0;
347 } else {
348
349 // Check to see if fd already registered
350 polldatamap_t::iterator entry = _internal.find (fd);
351
352 if ( entry == _internal.end() ) {
353 _internal[fd] = user;
354 _nfds++;
355
356 rc = 0;
357
358 } else {
359 rc = -1;
360 }
361 }
362
363 pthread_mutex_unlock (&_mutex);
364 return rc;
365 }
366
367 void Poll_poll::Deregister (User *user)
368 {
369 int fd = user->_fd;
370
371 LOG_DEBUG (("Poll_poll: Deregister: %d", fd));
372
373 if ( (pthread_mutex_lock (&_mutex)) != 0 )
374 return;
375
376 polldatamap_t::iterator entry = _internal.find (fd);
377
378 if ( entry != _internal.end() ) {
379
380 // Do NOT cancel the event, as the poll user should be the
381 // only user of this.
382
383 _internal.erase (entry);
384 _nfds--;
385
386 // Do not wake poll(). If the end point were in use, any events
387 // will be cancelled.
388 }
389
390 pthread_mutex_unlock (&_mutex);
391 }
392
393 void Poll_poll::Control (User *user)
394 {
395 int fd = user->_fd;
396
397 LOG_DEBUG (("Poll_poll: Control: %d %x", fd, user->_uevents));
398
399 if ( (pthread_mutex_lock (&_mutex)) != 0 )
400 return;
401
402 if ( _terminating ) {
403 pthread_mutex_unlock (&_mutex);
404 return;
405 }
406
407 bool wake = false;
408 unsigned old_pevents = user->_pevents;
409
410 LOG_DEBUG (("Control: queued=%d inpoll=%d wakepending=%d uevents=%x pevents=%x",
411 _queued, _inpoll, _wakepending,
412 user->_uevents, old_pevents));
413
414 user->_pevents = user->_uevents;
415
416 // Check for events set in new mask which are NOT in old
417 // If set need to wake poll unless wake pending or not in poll
418 if ( _inpoll && !_wakepending &&
419 (user->_uevents & (~old_pevents)) != 0 ) {
420 wake = true;
421
422 // Set here, in critical section, even though it will
423 // not actually be true until later
424 _wakepending = true;
425
426 LOG_DEBUG (("Control: waking polling thread"));
427
428 } else if ( !_inpoll && !_queued && user->_uevents != 0 ) {
429 // If there is no event queued on event manager, and there
430 // are events here to be waited for, then need to queue
431
432 LOG_DEBUG (("Control: adding Poll event"));
433 _queued = true;
434 _manager->Queue (&_myevent, ::Event::Manager::MAXPRIO);
435 }
436
437 pthread_mutex_unlock (&_mutex);
438
439 if ( wake ) {
440 // Then wake by writing byte
441 char bytev[1];
442 bytev[0] = 1;
443 if ( write (_wakefds[1], bytev, 1) < 0 )
444 MSG_IOevent_Pollerror_LOG ("write");
445 }
446 }
447
448 Provider *CreatePollpollProvider ()
449 {
450 return new Poll_poll();
451 }
452}
453
454#else
455
456namespace Poll {
457
458 // Don't have poll() therefore cannot have this kind of provider
459 Provider *CreatePollpollProvider ()
460 {
461 return 0;
462 }
463}
464#endif
@ Event_Terminate
Invalid request: fd not open.
::Event::AsyncEventAux< Provider, pollAction > ProviderEvent
Type for delivering events to the provider.
Carries the events on the appropriate FD, or that the end point should terminate.
void getNow(struct timespec &now)
Return the current time.
Definition timeutil.h:60
unsigned long timespec_diff(const struct timespec *time1, const struct timespec *time2)
Function for getting the difference between two times.
Definition timeutil.h:42

All rights reserved © 2002 - 2024 Isode Ltd.