28 typedef std::map<int, User *> polldatamap_t;
31 class Poll_poll :
public Provider {
33 pthread_mutex_t _mutex;
35 polldatamap_t _internal;
45 ::Event::Manager *_manager;
52 virtual ~Poll_poll ();
56 virtual int Register (User *user);
59 virtual void Deregister (User *user);
62 virtual void Control (User *user);
65 virtual void Deliver (pollAction *msg);
68 Poll_poll::Poll_poll ()
70 pthread_mutex_init (&_mutex, NULL);
74 action.terminate =
false;
75 _myevent.SetUserData (
this, action);
77 action.terminate =
true;
78 _termevent.SetUserData (
this, action);
91 _wakefds[0] = _wakefds[1] = -1;
93 _manager = ::Event::Manager::GetManager();
95 _manager->QueueAtExit (&_termevent);
98 Poll_poll::~Poll_poll ()
100 pthread_mutex_destroy (&_mutex);
101 if ( _ufds )
delete[] _ufds;
102 if ( _wakefds[0] >= 0 ) close (_wakefds[0]);
103 if ( _wakefds[1] >= 0 ) close (_wakefds[1]);
106 void Poll_poll::Deliver (pollAction *msg)
109 LOG_DEBUG ((
"Poll_poll::Deliver ()"));
111 if ( (rc = pthread_mutex_lock (&_mutex)) != 0 ) {
112 MSG_IOevent_Pollthread_LOG (rc);
113 _manager->QueueAt (&_myevent, 10000);
118 if ( _terminating ) {
119 pthread_mutex_unlock(&_mutex);
123 if ( msg->terminate ) {
140 polldatamap_t::iterator scan;
141 for ( scan = _internal.begin(); scan != _internal.end(); ++scan )
142 scan->second->_event.SendAtExit (cancelmsg);
145 if ( _inpoll && !_wakepending ) {
150 pthread_mutex_unlock(&_mutex);
155 if ( write (_wakefds[1], bytev, 1) < 1 )
156 MSG_IOevent_Pollerror_LOG (
"write");
161 if ( _wakefds[0] == -1 && pipe (_wakefds) != 0 ) {
162 MSG_IOevent_Pollerror_LOG (
"pipe");
163 _manager->QueueAt (&_myevent, 1000);
165 pthread_mutex_unlock(&_mutex);
170 _sfds = _nfds > 1024 ? _nfds : 1024;
172 _ufds =
new pollfd[_sfds];
173 }
else if ( _nfds > _sfds ) {
177 while ( _sfds < _nfds ) _sfds += 1024;
179 _ufds =
new pollfd[_sfds];
183 _ufds[0].fd = _wakefds[0];
184 _ufds[0].events = POLLIN;
185 _ufds[0].revents = 0;
189 polldatamap_t::const_iterator scan;
190 for ( scan = _internal.begin(); scan != _internal.end(); ++scan ) {
191 if ( scan->first < 0 || scan->second->_pevents == 0 )
continue;
193 _ufds[nfds].fd = scan->first;
194 _ufds[nfds].events = scan->second->_pevents;
195 _ufds[nfds].revents = 0;
202 const timespec *wakeat = _manager->NextEventDue ();
214 pthread_mutex_unlock(&_mutex);
217 _manager->Blocking();
219 LOG_DEBUG ((
"Poll_poll::Wait: poll nfds=%d", nfds));
220 rc = poll (_ufds, nfds, timeout);
221 LOG_DEBUG ((
"Poll_poll::Wait poll()=%d", rc));
225 _manager->Unblocked();
228 if ( (rc = pthread_mutex_lock (&_mutex)) != 0 ) {
229 MSG_IOevent_Pollthread_LOG (rc);
230 _manager->QueueAt (&_myevent, 10000);
238 if ( eno == EINTR ) {
239 LOG_DEBUG ((
"Poll_poll::Wait: poll EINTR"));
241 _manager->Queue (&_myevent, ::Event::Manager::MAXPRIO);
244 MSG_IOevent_Pollerror_LOG (
"poll");
246 _manager->QueueAt (&_myevent, 1000);
250 pthread_mutex_unlock(&_mutex);
255 if ( _ufds[0].revents & POLLIN ) {
257 if ( read (_wakefds[0], buffer,
sizeof buffer) < 0 )
258 MSG_IOevent_Pollerror_LOG (
"read");
262 if ( (rc = pthread_mutex_lock (&_mutex)) != 0 ) {
263 MSG_IOevent_Pollthread_LOG (rc);
264 _manager->QueueAt (&_myevent, 10000);
267 _wakepending =
false;
272 _wakepending =
false;
279 for (
int i = 1; i < nfds; i++ )
280 if ( _ufds[i].revents ) {
281 polldatamap_t::iterator entry = _internal.find (_ufds[i].fd);
284 if ( entry == _internal.end() )
continue;
289 entry->second->_pevents = 0;
292 pollmsg these_events;
293 these_events.events = _ufds[i].revents;
300 LOG_DEBUG ((
"Poll_poll: send %d events %x",
301 entry->first, these_events.events));
303 entry->second->_event.Send(these_events);
307 if ( these_events.events & (POLLERR|POLLHUP|POLLNVAL) ) {
308 _internal.erase (entry);
317 unsigned waitingfds = 0;
319 for ( scan = _internal.begin(); scan != _internal.end(); ++scan )
320 if ( scan->first >= 0 && scan->second->_pevents != 0 )
324 LOG_DEBUG ((
"Poll_poll: waitingfds=%d", waitingfds));
327 _manager->Queue (&_myevent, ::Event::Manager::MAXPRIO,
true);
332 pthread_mutex_unlock(&_mutex);
335 int Poll_poll::Register (User *user)
340 LOG_DEBUG ((
"Poll_poll: Register: %d", fd));
342 if ( (rc = pthread_mutex_lock (&_mutex)) != 0 )
345 if ( _terminating ) {
350 polldatamap_t::iterator entry = _internal.find (fd);
352 if ( entry == _internal.end() ) {
353 _internal[fd] = user;
363 pthread_mutex_unlock (&_mutex);
367 void Poll_poll::Deregister (User *user)
371 LOG_DEBUG ((
"Poll_poll: Deregister: %d", fd));
373 if ( (pthread_mutex_lock (&_mutex)) != 0 )
376 polldatamap_t::iterator entry = _internal.find (fd);
378 if ( entry != _internal.end() ) {
383 _internal.erase (entry);
390 pthread_mutex_unlock (&_mutex);
393 void Poll_poll::Control (User *user)
397 LOG_DEBUG ((
"Poll_poll: Control: %d %x", fd, user->_uevents));
399 if ( (pthread_mutex_lock (&_mutex)) != 0 )
402 if ( _terminating ) {
403 pthread_mutex_unlock (&_mutex);
408 unsigned old_pevents = user->_pevents;
410 LOG_DEBUG ((
"Control: queued=%d inpoll=%d wakepending=%d uevents=%x pevents=%x",
411 _queued, _inpoll, _wakepending,
412 user->_uevents, old_pevents));
414 user->_pevents = user->_uevents;
418 if ( _inpoll && !_wakepending &&
419 (user->_uevents & (~old_pevents)) != 0 ) {
426 LOG_DEBUG ((
"Control: waking polling thread"));
428 }
else if ( !_inpoll && !_queued && user->_uevents != 0 ) {
432 LOG_DEBUG ((
"Control: adding Poll event"));
434 _manager->Queue (&_myevent, ::Event::Manager::MAXPRIO);
437 pthread_mutex_unlock (&_mutex);
443 if ( write (_wakefds[1], bytev, 1) < 0 )
444 MSG_IOevent_Pollerror_LOG (
"write");
448 Provider *CreatePollpollProvider ()
450 return new Poll_poll();
459 Provider *CreatePollpollProvider ()
@ Event_Terminate
Invalid request: fd not open.
::Event::AsyncEventAux< Provider, pollAction > ProviderEvent
Type for delivering events to the provider.
Carries the events on the appropriate FD, or that the end point should terminate.
void getNow(struct timespec &now)
Return the current time.
unsigned long timespec_diff(const struct timespec *time1, const struct timespec *time2)
Function for getting the difference between two times.