Poll_epoll.C
1// Copyright (c) 2005-2010, Isode Limited, London, England.
2// All rights reserved.
3//
4// Acquisition and use of this software and related materials for any
5// purpose requires a written licence agreement from Isode Limited,
6// or a written licence from an organisation licenced by Isode Limited
7// to grant such a licence.
8//
9
10
11//
12//
13// Poll_epoll.C
14//
15// Poll provider using epoll
16//
17// @VERSION@
18
19#include <errno.h>
20#include <set>
21
22#include "Event_p.h"
23
24#ifdef HAVE_SYS_EPOLL_H
25#include <sys/epoll.h>
26#endif
27
28#ifdef HAVE_EPOLL_WAIT
29
30namespace Poll {
31
32 typedef std::set<User *> polldatamap_t;
33
35 class Poll_epoll : public Provider {
36 private:
37 static const int epoll_maxevents = 256;
38
39 pthread_mutex_t _mutex;
40
41 polldatamap_t _internal;
42
43 int _epfd;
44 int _wakefds[2];
45 bool _inpoll;
46 bool _wakepending;
47 bool _terminating;
48 bool _queued;
49 ::Event::Manager *_manager;
50 ProviderEvent _myevent;
51 ProviderEvent _termevent;
52
53 int EpollControl (User *user) {
54 epoll_event event;
55
56 event.events = 0;
57
58 if ( user->_pevents & Event_In )
59 event.events |= (EPOLLIN | EPOLLPRI | EPOLLERR | EPOLLHUP);
60
61 if ( user->_pevents & Event_Out )
62 event.events |= (EPOLLOUT | EPOLLERR | EPOLLHUP);
63
64 event.events |= EPOLLONESHOT;
65
66 event.data.ptr = static_cast<void *>(user);
67
68 if ( epoll_ctl (_epfd, EPOLL_CTL_MOD, user->_fd, &event) != 0 ) {
69 MSG_IOevent_Pollerror_LOG ("epoll_ctl mod");
70 return NOTOK;
71 } else {
72 return OK;
73 }
74 }
75
76 public:
77 Poll_epoll ();
78
79 virtual ~Poll_epoll ();
80
83 virtual int Register (User *user);
84
86 virtual void Deregister (User *user);
87
89 virtual void Control (User *user);
90
92 virtual void Deliver (pollAction *msg);
93 };
94
95#if defined(__clang__)
96 const int Poll_epoll::epoll_maxevents;
97#endif
98
99 Poll_epoll::Poll_epoll ()
100 {
101 pthread_mutex_init (&_mutex, NULL);
102
103 pollAction action;
104
105 action.terminate = false;
106 _myevent.SetUserData (this, action);
107
108 action.terminate = true;
109 _termevent.SetUserData (this, action);
110
111 _inpoll = false;
112 _wakepending = false;
113
114 _terminating = false;
115 _queued = false;
116
117 _epfd = epoll_create (100);
118 if ( _epfd < 0 )
119 MSG_IOevent_Pollerror_LOG ("epoll_create");
120
121 // Leave creating pipe etc. until we can report error
122 _wakefds[0] = _wakefds[1] = -1;
123
124 _manager = ::Event::Manager::GetManager();
125
126 _manager->QueueAtExit (&_termevent);
127 }
128
129 Poll_epoll::~Poll_epoll ()
130 {
131 pthread_mutex_destroy (&_mutex);
132 if ( _wakefds[0] >= 0 ) close (_wakefds[0]);
133 if ( _wakefds[1] >= 0 ) close (_wakefds[1]);
134 if ( _epfd >= 0 ) close (_epfd);
135 }
136
137 void Poll_epoll::Deliver (pollAction *msg)
138 {
139 int rc;
140 LOG_DEBUG (( "Poll_epoll::Deliver ()"));
141
142 if ( (rc = pthread_mutex_lock (&_mutex)) != 0 ) {
143 MSG_IOevent_Pollthread_LOG (rc);
144 _manager->QueueAt (&_myevent, 10000);
145 _queued = true;
146 return;
147 }
148
149 if ( _terminating ) {
150 pthread_mutex_unlock(&_mutex);
151 return;
152 }
153
154 if ( msg->terminate ) {
155 // System is shutting down
156
157 // Prevents queuing
158 _queued = true;
159
160 _terminating = true;
161
162 // Tell users that we are terminating
163 // There is an implied removal of registrations
164 // The PollUser MUST NOT refer to the EventSource again
165 // Note: this is in critical section, so would get deadlock
166 // if the PollUser called back to Deregister.
167
168 ::Poll::pollmsg cancelmsg;
169 cancelmsg.events = Poll::Event_Terminate;
170
171 polldatamap_t::iterator scan;
172 for ( scan = _internal.begin(); scan != _internal.end(); ++scan )
173 (*scan)->_event.SendAtExit (cancelmsg);
174
175 // (It is a pity we need to use the wakup mechanism, as it is
176 // only needed for termination)
177 bool wake = false;
178 if ( _inpoll && !_wakepending ) {
179 wake = true;
180 _wakepending = true;
181 }
182
183 pthread_mutex_unlock(&_mutex);
184
185 if ( wake ) {
186 char bytev[1];
187 bytev[0] = 2;
188 if ( write (_wakefds[1], bytev, 1) < 1 )
189 MSG_IOevent_Pollerror_LOG ("write");
190 }
191
192 close (_epfd);
193 return;
194 } // end of handling termination
195
196 if ( _wakefds[0] == -1 ) {
197 if ( pipe (_wakefds) != 0 ) {
198 MSG_IOevent_Pollerror_LOG ("pipe");
199 _manager->QueueAt (&_myevent, 1000);
200 _queued = true;
201 pthread_mutex_unlock(&_mutex);
202 return;
203 }
204
205 epoll_event wevent;
206
207 wevent.events = EPOLLIN;
208 wevent.data.ptr = 0;
209
210 if ( epoll_ctl (_epfd, EPOLL_CTL_ADD, _wakefds[0], &wevent) != 0 ){
211 MSG_IOevent_Pollerror_LOG ("epoll_ctl");
212 _manager->QueueAt (&_myevent, 1000);
213 _queued = true;
214 pthread_mutex_unlock(&_mutex);
215 return;
216 }
217 }
218
219 int timeout = -1;
220
221 const timespec *wakeat = _manager->NextEventDue ();
222
223 if ( wakeat ) {
224 struct timespec now;
225 getNow(now);
226 // timespec_diff returns difference in microseconds.
227 // returns 0 if delay preceeds now.
228 timeout = timespec_diff (wakeat, &now)/1000;
229 LOG_DEBUG (("epoll timeout=%d", timeout));
230 }
231
232 _inpoll = true; // about to be true
233
234 pthread_mutex_unlock(&_mutex);
235
236 // Tell event manager we might go away for a while
237 _manager->Blocking();
238
239 epoll_event events[epoll_maxevents];
240
241 LOG_DEBUG (("Poll_epoll::Wait: epoll_wait"));
242 int nfds = epoll_wait (_epfd, events, epoll_maxevents, timeout);
243 LOG_DEBUG (("Poll_epoll::Wait poll()=%d", nfds));
244 int eno = errno; // Preserve errno
245
246 // Tell event manager we are back
247 _manager->Unblocked();
248
249 if ( nfds < 0 ) {
250 if ( (rc = pthread_mutex_lock (&_mutex)) != 0 ) {
251 MSG_IOevent_Pollthread_LOG (rc);
252 _manager->QueueAt (&_myevent, 10000);
253 _queued = true;
254 _inpoll = false;
255 return;
256 }
257
258 _inpoll = false;
259
260 if ( eno == EINTR ) {
261 LOG_DEBUG (( "Poll_epoll::Wait: epoll_wait EINTR"));
262 // Queue event to self again.
263 _manager->Queue (&_myevent, ::Event::Manager::MAXPRIO);
264
265 } else {
266 MSG_IOevent_Pollerror_LOG ("epoll_wait");
267 // Try again in one second
268 _manager->QueueAt (&_myevent, 1000);
269 }
270
271 _queued = true;
272 pthread_mutex_unlock(&_mutex);
273 return;
274 }
275
276 if ( (rc = pthread_mutex_lock (&_mutex)) != 0 ) {
277 MSG_IOevent_Pollthread_LOG (rc);
278 _manager->QueueAt (&_myevent, 10000);
279 _queued = true;
280 _inpoll = false;
281 _wakepending = false;
282 return;
283 }
284
285 _inpoll = false;
286 _wakepending = false;
287
288 if ( _terminating )
289 return;
290
291 pollmsg doevents;
292
293 for ( int i = 0; i < nfds; i++ ) {
294 if ( events[i].data.ptr == 0 ) {
295 char buffer[128];
296 if ( read (_wakefds[0], buffer, sizeof buffer) < 0 )
297 MSG_IOevent_Pollerror_LOG ("read");
298
299 } else if ( events[i].events != 0 ) {
300 User *user = static_cast<User *>(events[i].data.ptr);
301
302 polldatamap_t::iterator entry =
303 _internal.find (user);
304
305 // Check was not unregistered in the interim
306 if ( entry == _internal.end() ) continue;
307
308 int fd = user->_fd;
309
310 // Translate events into external mask
311 // If error, clear all events
312 pollmsg these_events;
313 if ( events[i].events & EPOLLIN )
314 these_events.events |= Event_In;
315
316 if ( events[i].events & EPOLLOUT )
317 these_events.events |= Event_Out;
318
319 if ( events[i].events & EPOLLERR )
320 these_events.events |= Event_Err;
321
322 if ( events[i].events & EPOLLHUP )
323 these_events.events |= Event_Hup;
324
325 // Under normal circumstances expect a PollAsyncEvent to
326 // be queued to the event manager
327 // But if it returns a non-zero result, then
328 // we deregister the PollUser
329
330 LOG_DEBUG (("Poll_epoll: send %d events %x",
331 fd, these_events.events));
332
333 user->_event.Send(these_events);
334
335 // If the events include an error then the endpoint
336 // is automatically deregistered
337 if ( these_events.events & (Event_Err|Event_Hup|Event_Nval) ){
338 // This is needed to keep valgrind quiet
339 struct epoll_event dummy;
340 memset (&dummy,0,sizeof dummy);
341
342 if ( epoll_ctl (_epfd, EPOLL_CTL_DEL, fd, &dummy)
343 != 0 )
344 MSG_IOevent_Pollerror_LOG ("epoll_ctl del");
345 _internal.erase (entry);
346 }
347 }
348 }
349
350 // Only queue event for processing with epoll if items remain
351 if ( !_internal.empty() ) {
352 _queued = true;
353 _manager->Queue (&_myevent, ::Event::Manager::MAXPRIO, true);
354 } else {
355 _queued = false;
356 }
357
358 LOG_DEBUG(("Ending processing, queued=%d", _queued));
359 pthread_mutex_unlock(&_mutex);
360 }
361
362 int Poll_epoll::Register (User *user)
363 {
364 int rc;
365
366 LOG_DEBUG (("Poll_epoll: Register: %d", user->_fd));
367
368 if ( (rc = pthread_mutex_lock (&_mutex)) != 0 )
369 return rc;
370
371 if ( _terminating ) {
372 rc = 0;
373 } else if ( _epfd < 0 ) {
374 rc = -1;
375 } else {
376
377 // Check to see if fd already registered
378 polldatamap_t::iterator entry = _internal.find (user);
379
380 if ( entry == _internal.end() ) {
381 _internal.insert (user);
382
383 epoll_event event;
384
385 event.events = 0;
386 event.data.ptr = static_cast<void *>(user);
387
388 rc = epoll_ctl (_epfd, EPOLL_CTL_ADD, user->_fd, &event);
389
390 if ( rc < 0 ) {
391 MSG_IOevent_Pollerror_LOG ("epoll_ctl add");
392
393 entry = _internal.find (user);
394 if ( entry != _internal.end() )
395 _internal.erase (entry);
396 }
397
398 } else {
399 rc = -1;
400 }
401 }
402
403 // Have a poll event if there are any registered FDs
404 if ( !_inpoll && !_queued ) {
405 LOG_DEBUG (("Control: adding Poll event"));
406 _queued = true;
407 _manager->Queue (&_myevent, ::Event::Manager::MAXPRIO);
408 }
409
410 pthread_mutex_unlock (&_mutex);
411 return rc;
412 }
413
414 void Poll_epoll::Deregister (User *user)
415 {
416 LOG_DEBUG (("Poll_epoll: Deregister: %d", user->_fd));
417
418 if ( (pthread_mutex_lock (&_mutex)) != 0 )
419 return;
420
421 polldatamap_t::iterator entry = _internal.find (user);
422
423 if ( entry != _internal.end() ) {
424
425 // Do NOT cancel the event, as the poll user should be the
426 // only user of this.
427
428 _internal.erase (entry);
429
430 // This is needed to keep valgrind quiet
431 struct epoll_event dummy;
432 memset (&dummy,0,sizeof dummy);
433
434 // Don't log the EBADF case as the fd may have already been closed
435 if ( epoll_ctl (_epfd, EPOLL_CTL_DEL, user->_fd, &dummy) != 0 &&
436 errno != EBADF )
437 MSG_IOevent_Pollerror_LOG ("epoll_ctl del");
438
439 // Do not wake poll(). If the end point were in use, any events
440 // will be cancelled.
441 }
442
443 pthread_mutex_unlock (&_mutex);
444 }
445
446 void Poll_epoll::Control (User *user)
447 {
448 int fd = user->_fd;
449
450 LOG_DEBUG (("Poll_epoll: Control: %d %x", fd, user->_uevents));
451
452 if ( _terminating )
453 return;
454
455 user->_pevents = user->_uevents;
456 (void) EpollControl (user);
457 }
458
459 Provider *CreatePollepollProvider ()
460 {
461 // Can be in glibc but not in the kernel
462 int fd = epoll_create (1);
463
464 if ( fd < 0 ) {
465 LOG_DEBUG (("epoll_create() failed"));
466 return 0;
467 }
468
469 close (fd);
470
471 return new Poll_epoll();
472 }
473}
474
475#else
476
477namespace Poll {
478
479 // Don't have epoll() therefore cannot have this kind of provider
480 Provider *CreatePollepollProvider ()
481 {
482 return 0;
483 }
484}
485#endif
@ Event_Terminate
Invalid request: fd not open.
@ Event_Hup
Error condition.
@ Event_Err
Writing now will not block.
@ Event_Out
There is urgent data to read.
::Event::AsyncEventAux< Provider, pollAction > ProviderEvent
Type for delivering events to the provider.
Carries the events on the appropriate FD, or that the end point should terminate.
void getNow(struct timespec &now)
Return the current time.
Definition timeutil.h:60
unsigned long timespec_diff(const struct timespec *time1, const struct timespec *time2)
Function for getting the difference between two times.
Definition timeutil.h:42

All rights reserved © 2002 - 2024 Isode Ltd.