24#ifdef HAVE_SYS_EPOLL_H
32 typedef std::set<User *> polldatamap_t;
35 class Poll_epoll :
public Provider {
37 static const int epoll_maxevents = 256;
39 pthread_mutex_t _mutex;
41 polldatamap_t _internal;
49 ::Event::Manager *_manager;
53 int EpollControl (User *user) {
58 if ( user->_pevents & Event_In )
59 event.events |= (EPOLLIN | EPOLLPRI | EPOLLERR | EPOLLHUP);
61 if ( user->_pevents & Event_Out )
62 event.events |= (EPOLLOUT | EPOLLERR | EPOLLHUP);
64 event.events |= EPOLLONESHOT;
66 event.data.ptr =
static_cast<void *
>(user);
68 if ( epoll_ctl (_epfd, EPOLL_CTL_MOD, user->_fd, &event) != 0 ) {
69 MSG_IOevent_Pollerror_LOG (
"epoll_ctl mod");
79 virtual ~Poll_epoll ();
83 virtual int Register (User *user);
86 virtual void Deregister (User *user);
89 virtual void Control (User *user);
92 virtual void Deliver (pollAction *msg);
96 const int Poll_epoll::epoll_maxevents;
99 Poll_epoll::Poll_epoll ()
101 pthread_mutex_init (&_mutex, NULL);
105 action.terminate =
false;
106 _myevent.SetUserData (
this, action);
108 action.terminate =
true;
109 _termevent.SetUserData (
this, action);
112 _wakepending =
false;
114 _terminating =
false;
117 _epfd = epoll_create (100);
119 MSG_IOevent_Pollerror_LOG (
"epoll_create");
122 _wakefds[0] = _wakefds[1] = -1;
124 _manager = ::Event::Manager::GetManager();
126 _manager->QueueAtExit (&_termevent);
129 Poll_epoll::~Poll_epoll ()
131 pthread_mutex_destroy (&_mutex);
132 if ( _wakefds[0] >= 0 ) close (_wakefds[0]);
133 if ( _wakefds[1] >= 0 ) close (_wakefds[1]);
134 if ( _epfd >= 0 ) close (_epfd);
137 void Poll_epoll::Deliver (pollAction *msg)
140 LOG_DEBUG ((
"Poll_epoll::Deliver ()"));
142 if ( (rc = pthread_mutex_lock (&_mutex)) != 0 ) {
143 MSG_IOevent_Pollthread_LOG (rc);
144 _manager->QueueAt (&_myevent, 10000);
149 if ( _terminating ) {
150 pthread_mutex_unlock(&_mutex);
154 if ( msg->terminate ) {
171 polldatamap_t::iterator scan;
172 for ( scan = _internal.begin(); scan != _internal.end(); ++scan )
173 (*scan)->_event.SendAtExit (cancelmsg);
178 if ( _inpoll && !_wakepending ) {
183 pthread_mutex_unlock(&_mutex);
188 if ( write (_wakefds[1], bytev, 1) < 1 )
189 MSG_IOevent_Pollerror_LOG (
"write");
196 if ( _wakefds[0] == -1 ) {
197 if ( pipe (_wakefds) != 0 ) {
198 MSG_IOevent_Pollerror_LOG (
"pipe");
199 _manager->QueueAt (&_myevent, 1000);
201 pthread_mutex_unlock(&_mutex);
207 wevent.events = EPOLLIN;
210 if ( epoll_ctl (_epfd, EPOLL_CTL_ADD, _wakefds[0], &wevent) != 0 ){
211 MSG_IOevent_Pollerror_LOG (
"epoll_ctl");
212 _manager->QueueAt (&_myevent, 1000);
214 pthread_mutex_unlock(&_mutex);
221 const timespec *wakeat = _manager->NextEventDue ();
229 LOG_DEBUG ((
"epoll timeout=%d", timeout));
234 pthread_mutex_unlock(&_mutex);
237 _manager->Blocking();
239 epoll_event events[epoll_maxevents];
241 LOG_DEBUG ((
"Poll_epoll::Wait: epoll_wait"));
242 int nfds = epoll_wait (_epfd, events, epoll_maxevents, timeout);
243 LOG_DEBUG ((
"Poll_epoll::Wait poll()=%d", nfds));
247 _manager->Unblocked();
250 if ( (rc = pthread_mutex_lock (&_mutex)) != 0 ) {
251 MSG_IOevent_Pollthread_LOG (rc);
252 _manager->QueueAt (&_myevent, 10000);
260 if ( eno == EINTR ) {
261 LOG_DEBUG ((
"Poll_epoll::Wait: epoll_wait EINTR"));
263 _manager->Queue (&_myevent, ::Event::Manager::MAXPRIO);
266 MSG_IOevent_Pollerror_LOG (
"epoll_wait");
268 _manager->QueueAt (&_myevent, 1000);
272 pthread_mutex_unlock(&_mutex);
276 if ( (rc = pthread_mutex_lock (&_mutex)) != 0 ) {
277 MSG_IOevent_Pollthread_LOG (rc);
278 _manager->QueueAt (&_myevent, 10000);
281 _wakepending =
false;
286 _wakepending =
false;
293 for (
int i = 0; i < nfds; i++ ) {
294 if ( events[i].data.ptr == 0 ) {
296 if ( read (_wakefds[0], buffer,
sizeof buffer) < 0 )
297 MSG_IOevent_Pollerror_LOG (
"read");
299 }
else if ( events[i].events != 0 ) {
300 User *user =
static_cast<User *
>(events[i].data.ptr);
302 polldatamap_t::iterator entry =
303 _internal.find (user);
306 if ( entry == _internal.end() )
continue;
312 pollmsg these_events;
313 if ( events[i].events & EPOLLIN )
314 these_events.events |= Event_In;
316 if ( events[i].events & EPOLLOUT )
319 if ( events[i].events & EPOLLERR )
322 if ( events[i].events & EPOLLHUP )
330 LOG_DEBUG ((
"Poll_epoll: send %d events %x",
331 fd, these_events.events));
333 user->_event.Send(these_events);
337 if ( these_events.events & (Event_Err|Event_Hup|Event_Nval) ){
339 struct epoll_event dummy;
340 memset (&dummy,0,
sizeof dummy);
342 if ( epoll_ctl (_epfd, EPOLL_CTL_DEL, fd, &dummy)
344 MSG_IOevent_Pollerror_LOG (
"epoll_ctl del");
345 _internal.erase (entry);
351 if ( !_internal.empty() ) {
353 _manager->Queue (&_myevent, ::Event::Manager::MAXPRIO,
true);
358 LOG_DEBUG((
"Ending processing, queued=%d", _queued));
359 pthread_mutex_unlock(&_mutex);
362 int Poll_epoll::Register (User *user)
366 LOG_DEBUG ((
"Poll_epoll: Register: %d", user->_fd));
368 if ( (rc = pthread_mutex_lock (&_mutex)) != 0 )
371 if ( _terminating ) {
373 }
else if ( _epfd < 0 ) {
378 polldatamap_t::iterator entry = _internal.find (user);
380 if ( entry == _internal.end() ) {
381 _internal.insert (user);
386 event.data.ptr =
static_cast<void *
>(user);
388 rc = epoll_ctl (_epfd, EPOLL_CTL_ADD, user->_fd, &event);
391 MSG_IOevent_Pollerror_LOG (
"epoll_ctl add");
393 entry = _internal.find (user);
394 if ( entry != _internal.end() )
395 _internal.erase (entry);
404 if ( !_inpoll && !_queued ) {
405 LOG_DEBUG ((
"Control: adding Poll event"));
407 _manager->Queue (&_myevent, ::Event::Manager::MAXPRIO);
410 pthread_mutex_unlock (&_mutex);
414 void Poll_epoll::Deregister (User *user)
416 LOG_DEBUG ((
"Poll_epoll: Deregister: %d", user->_fd));
418 if ( (pthread_mutex_lock (&_mutex)) != 0 )
421 polldatamap_t::iterator entry = _internal.find (user);
423 if ( entry != _internal.end() ) {
428 _internal.erase (entry);
431 struct epoll_event dummy;
432 memset (&dummy,0,
sizeof dummy);
435 if ( epoll_ctl (_epfd, EPOLL_CTL_DEL, user->_fd, &dummy) != 0 &&
437 MSG_IOevent_Pollerror_LOG (
"epoll_ctl del");
443 pthread_mutex_unlock (&_mutex);
446 void Poll_epoll::Control (User *user)
450 LOG_DEBUG ((
"Poll_epoll: Control: %d %x", fd, user->_uevents));
455 user->_pevents = user->_uevents;
456 (void) EpollControl (user);
459 Provider *CreatePollepollProvider ()
462 int fd = epoll_create (1);
465 LOG_DEBUG ((
"epoll_create() failed"));
471 return new Poll_epoll();
480 Provider *CreatePollepollProvider ()
@ Event_Terminate
Invalid request: fd not open.
@ Event_Hup
Error condition.
@ Event_Err
Writing now will not block.
@ Event_Out
There is urgent data to read.
::Event::AsyncEventAux< Provider, pollAction > ProviderEvent
Type for delivering events to the provider.
Carries the events on the appropriate FD, or that the end point should terminate.
void getNow(struct timespec &now)
Return the current time.
unsigned long timespec_diff(const struct timespec *time1, const struct timespec *time2)
Function for getting the difference between two times.