20#ifdef HAVE_PORT_CREATE
28 typedef std::set<User *> polldatamap_t;
31 class Poll_port :
public Provider {
33 static const int port_maxevents = 256;
35 pthread_mutex_t _mutex;
37 polldatamap_t _internal;
44 ::Event::Manager *_manager;
48 int PortControl (User *user) {
51 if ( user->_pevents & Event_In )
52 port_events |= POLLIN;
54 if ( user->_pevents & Event_Out )
55 port_events |= POLLOUT;
59 port_associate (_portfd, PORT_SOURCE_FD,
60 user->_fd, user->_pevents,
61 static_cast<void *
>(user)) != 0 ) {
62 MSG_IOevent_Pollerror_LOG (
"port_associate");
72 virtual ~Poll_port ();
76 virtual int Register (User *user);
79 virtual void Deregister (User *user);
82 virtual void Control (User *user);
85 virtual void Deliver (pollAction *msg);
88 Poll_port::Poll_port ()
90 pthread_mutex_init (&_mutex, NULL);
94 action.terminate =
false;
95 _myevent.SetUserData (
this, action);
97 action.terminate =
true;
98 _termevent.SetUserData (
this, action);
101 _wakepending =
false;
103 _terminating =
false;
106 _portfd = port_create ();
108 MSG_IOevent_Pollerror_LOG (
"port_create");
110 _manager = ::Event::Manager::GetManager();
112 _manager->QueueAtExit (&_termevent);
115 Poll_port::~Poll_port ()
117 pthread_mutex_destroy (&_mutex);
118 if ( _portfd >= 0 ) close (_portfd);
121 void Poll_port::Deliver (pollAction *msg)
124 LOG_DEBUG ((
"Poll_port::Deliver ()"));
126 if ( (rc = pthread_mutex_lock (&_mutex)) != 0 ) {
127 MSG_IOevent_Pollthread_LOG (rc);
128 _manager->QueueAt (&_myevent, 10000);
133 if ( _terminating ) {
134 pthread_mutex_unlock(&_mutex);
138 if ( msg->terminate ) {
155 polldatamap_t::iterator scan;
156 for ( scan = _internal.begin(); scan != _internal.end(); ++scan )
157 (*scan)->_event.SendAtExit (cancelmsg);
160 if ( _inpoll && !_wakepending ) {
165 pthread_mutex_unlock(&_mutex);
167 if ( wake && port_alert (_portfd, PORT_ALERT_SET, -1, 0) != 0 )
168 MSG_IOevent_Pollerror_LOG (
"port_alert");
173 const timespec *wakeat = _manager->NextEventDue();
182 now.tv_sec = timeout/1000000;
183 now.tv_nsec = (timeout%1000000)*1000;
184 LOG_DEBUG ((
" timeout=%d", timeout));
190 pthread_mutex_unlock(&_mutex);
193 _manager->Blocking();
195 port_event_t events[port_maxevents];
199 LOG_DEBUG ((
"Poll_port::Wait: port_getn"));
200 rc = port_getn (_portfd, events,port_maxevents, &nfds, timep);
202 LOG_DEBUG ((
"Poll_port::Wait port_getn()=%d (%d) nfds=%u",
206 _manager->Unblocked();
209 if ( rc != 0 && eno != ETIME && eno != EINTR ) {
210 if ( (rc = pthread_mutex_lock (&_mutex)) != 0 ) {
211 MSG_IOevent_Pollthread_LOG (rc);
212 _manager->QueueAt (&_myevent, 10000);
221 MSG_IOevent_Pollerror_LOG (
"port_get");
223 _manager->QueueAt (&_myevent, 1000);
226 pthread_mutex_unlock(&_mutex);
230 if ( (rc = pthread_mutex_lock (&_mutex)) != 0 ) {
231 MSG_IOevent_Pollthread_LOG (rc);
232 _manager->QueueAt (&_myevent, 10000);
235 _wakepending =
false;
240 _wakepending =
false;
246 for (
int i = 0; i < nfds; i++ ) {
247 if ( events[i].portev_source == PORT_SOURCE_ALERT ) {
250 }
else if ( events[i].portev_source == PORT_SOURCE_FD ) {
251 User *user =
static_cast<User *
>(events[i].portev_user);
253 polldatamap_t::iterator entry =
254 _internal.find (user);
257 if ( entry == _internal.end() )
continue;
261 pollmsg these_events;
262 if ( events[i].portev_events & POLLIN )
263 these_events.events |= Event_In;
265 if ( events[i].portev_events & POLLOUT )
268 if ( events[i].portev_events & POLLERR )
271 if ( events[i].portev_events & POLLHUP )
279 LOG_DEBUG ((
"Poll_port: send %d events %x",
280 user->_fd, these_events.events));
282 user->_event.Send(these_events);
286 if ( these_events.events & (Event_Err|Event_Hup|Event_Nval) ) {
288 _internal.erase (entry);
294 if ( !_internal.empty() ) {
296 _manager->Queue (&_myevent, ::Event::Manager::MAXPRIO,
true);
301 LOG_DEBUG((
"Ending processing, queued=%d", _queued));
302 pthread_mutex_unlock(&_mutex);
305 int Poll_port::Register (User *user)
309 LOG_DEBUG ((
"Poll_port: Register: %d", user->_fd));
311 if ( (rc = pthread_mutex_lock (&_mutex)) != 0 )
314 if ( _terminating ) {
316 }
else if ( _portfd < 0 ) {
321 polldatamap_t::iterator entry = _internal.find (user);
323 if ( entry == _internal.end() ) {
325 _internal.insert(user);
333 if ( !_inpoll && !_queued ) {
334 LOG_DEBUG ((
"Control: adding Poll event"));
336 _manager->Queue (&_myevent, ::Event::Manager::MAXPRIO);
339 pthread_mutex_unlock (&_mutex);
343 void Poll_port::Deregister (User *user)
345 LOG_DEBUG ((
"Poll_port: Deregister: %d", user->_fd));
347 if ( (pthread_mutex_lock (&_mutex)) != 0 )
350 polldatamap_t::iterator entry = _internal.find (user);
352 if ( entry != _internal.end() ) {
357 if (user->_pevents &&
358 port_dissociate (_portfd, PORT_SOURCE_FD, user->_fd) &&
360 MSG_IOevent_Pollerror_LOG (
"port_dissociate");
365 _internal.erase (entry);
368 pthread_mutex_unlock (&_mutex);
371 void Poll_port::Control (User *user)
375 LOG_DEBUG ((
"Poll_port: Control: %d %x", fd, user->_uevents));
377 if ( _terminating )
return;
379 user->_pevents = user->_uevents;
380 (void) PortControl (user);
383 Provider *CreatePollportProvider ()
385 return new Poll_port();
394 Provider *CreatePollportProvider ()
@ Event_Terminate
Invalid request: fd not open.
@ Event_Hup
Error condition.
@ Event_Err
Writing now will not block.
@ Event_Out
There is urgent data to read.
::Event::AsyncEventAux< Provider, pollAction > ProviderEvent
Type for delivering events to the provider.
Carries the events on the appropriate FD, or that the end point should terminate.
void getNow(struct timespec &now)
Return the current time.
unsigned long timespec_diff(const struct timespec *time1, const struct timespec *time2)
Function for getting the difference between two times.