Poll_port.C
1// Copyright (c) 2010, Isode Limited, London, England.
2// All rights reserved.
3//
4// Acquisition and use of this software and related materials for any
5// purpose requires a written licence agreement from Isode Limited,
6// or a written licence from an organisation licenced by Isode Limited
7// to grant such a licence.
8//
9
10
11//
12//
13// Poll_port.C
14//
15// Poll provider using Solaris Ports
16
17
18#include "Event_p.h"
19
20#ifdef HAVE_PORT_CREATE
21
22#ifdef HAVE_PORT_H
23#include <port.h>
24#endif
25
26namespace Poll {
27
28 typedef std::set<User *> polldatamap_t;
29
31 class Poll_port : public Provider {
32 private:
33 static const int port_maxevents = 256;
34
35 pthread_mutex_t _mutex;
36
37 polldatamap_t _internal;
38
39 int _portfd;
40 bool _inpoll;
41 bool _wakepending;
42 bool _terminating;
43 bool _queued;
44 ::Event::Manager *_manager;
45 ProviderEvent _myevent;
46 ProviderEvent _termevent;
47
48 int PortControl (User *user) {
49 int port_events = 0;
50
51 if ( user->_pevents & Event_In )
52 port_events |= POLLIN;
53
54 if ( user->_pevents & Event_Out )
55 port_events |= POLLOUT;
56
57 // Only associate if events of interest
58 if ( port_events &&
59 port_associate (_portfd, PORT_SOURCE_FD,
60 user->_fd, user->_pevents,
61 static_cast<void *>(user)) != 0 ) {
62 MSG_IOevent_Pollerror_LOG ("port_associate");
63 return NOTOK;
64 }
65
66 return OK;
67 }
68
69 public:
70 Poll_port ();
71
72 virtual ~Poll_port ();
73
76 virtual int Register (User *user);
77
79 virtual void Deregister (User *user);
80
82 virtual void Control (User *user);
83
85 virtual void Deliver (pollAction *msg);
86 };
87
88 Poll_port::Poll_port ()
89 {
90 pthread_mutex_init (&_mutex, NULL);
91
92 pollAction action;
93
94 action.terminate = false;
95 _myevent.SetUserData (this, action);
96
97 action.terminate = true;
98 _termevent.SetUserData (this, action);
99
100 _inpoll = false;
101 _wakepending = false;
102
103 _terminating = false;
104 _queued = false;
105
106 _portfd = port_create ();
107 if ( _portfd < 0 )
108 MSG_IOevent_Pollerror_LOG ("port_create");
109
110 _manager = ::Event::Manager::GetManager();
111
112 _manager->QueueAtExit (&_termevent);
113 }
114
115 Poll_port::~Poll_port ()
116 {
117 pthread_mutex_destroy (&_mutex);
118 if ( _portfd >= 0 ) close (_portfd);
119 }
120
121 void Poll_port::Deliver (pollAction *msg)
122 {
123 int rc;
124 LOG_DEBUG (( "Poll_port::Deliver ()"));
125
126 if ( (rc = pthread_mutex_lock (&_mutex)) != 0 ) {
127 MSG_IOevent_Pollthread_LOG (rc);
128 _manager->QueueAt (&_myevent, 10000);
129 _queued = true;
130 return;
131 }
132
133 if ( _terminating ) {
134 pthread_mutex_unlock(&_mutex);
135 return;
136 }
137
138 if ( msg->terminate ) {
139 // System is shutting down
140
141 // Prevents queuing
142 _queued = true;
143
144 _terminating = true;
145
146 // Tell users that we are terminating
147 // There is an implied removal of registrations
148 // The PollUser MUST NOT refer to the EventSource again
149 // Note: this is in critical section, so would get deadlock
150 // if the PollUser called back to Deregister.
151
152 ::Poll::pollmsg cancelmsg;
153 cancelmsg.events = Poll::Event_Terminate;
154
155 polldatamap_t::iterator scan;
156 for ( scan = _internal.begin(); scan != _internal.end(); ++scan )
157 (*scan)->_event.SendAtExit (cancelmsg);
158
159 bool wake = false;
160 if ( _inpoll && !_wakepending ) {
161 wake = true;
162 _wakepending = true;
163 }
164
165 pthread_mutex_unlock(&_mutex);
166
167 if ( wake && port_alert (_portfd, PORT_ALERT_SET, -1, 0) != 0 )
168 MSG_IOevent_Pollerror_LOG ("port_alert");
169
170 return;
171 } // end of handling termination
172
173 const timespec *wakeat = _manager->NextEventDue();
174 timespec *timep = 0;
175 timespec now;
176
177 if ( wakeat ) {
178 getNow(now);
179 // timespec_diff returns difference in microseconds.
180 // returns 0 if delay preceeds now.
181 int timeout = timespec_diff (wakeat, &now);
182 now.tv_sec = timeout/1000000;
183 now.tv_nsec = (timeout%1000000)*1000;
184 LOG_DEBUG ((" timeout=%d", timeout));
185 timep = &now;
186 }
187
188 _inpoll = true; // about to be true
189
190 pthread_mutex_unlock(&_mutex);
191
192 // Tell event manager we might go away for a while
193 _manager->Blocking();
194
195 port_event_t events[port_maxevents];
196
197 // Attempt wait for (at least) one event
198 unsigned nfds = 1;
199 LOG_DEBUG (("Poll_port::Wait: port_getn"));
200 rc = port_getn (_portfd, events,port_maxevents, &nfds, timep);
201 int eno = errno;
202 LOG_DEBUG (("Poll_port::Wait port_getn()=%d (%d) nfds=%u",
203 rc, errno, nfds));
204
205 // Tell event manager we are back
206 _manager->Unblocked();
207
208 // Timeout and EINTR are not treated as errors.
209 if ( rc != 0 && eno != ETIME && eno != EINTR ) {
210 if ( (rc = pthread_mutex_lock (&_mutex)) != 0 ) {
211 MSG_IOevent_Pollthread_LOG (rc);
212 _manager->QueueAt (&_myevent, 10000);
213 _queued = true;
214 _inpoll = false;
215 return;
216 }
217
218 _inpoll = false;
219
220 errno = eno;
221 MSG_IOevent_Pollerror_LOG ("port_get");
222 // Try again in one second
223 _manager->QueueAt (&_myevent, 1000);
224
225 _queued = true;
226 pthread_mutex_unlock(&_mutex);
227 return;
228 }
229
230 if ( (rc = pthread_mutex_lock (&_mutex)) != 0 ) {
231 MSG_IOevent_Pollthread_LOG (rc);
232 _manager->QueueAt (&_myevent, 10000);
233 _queued = true;
234 _inpoll = false;
235 _wakepending = false;
236 return;
237 }
238
239 _inpoll = false;
240 _wakepending = false;
241
242 if ( _terminating )
243 return;
244
245 pollmsg doevents;
246 for ( int i = 0; i < nfds; i++ ) {
247 if ( events[i].portev_source == PORT_SOURCE_ALERT ) {
248 // Must be terminating, no point in carrying on
249 break;
250 } else if ( events[i].portev_source == PORT_SOURCE_FD ) {
251 User *user = static_cast<User *>(events[i].portev_user);
252
253 polldatamap_t::iterator entry =
254 _internal.find (user);
255
256 // Check was not unregistered in the interim
257 if ( entry == _internal.end() ) continue;
258
259 // Translate events into external mask
260 // If error, clear all events
261 pollmsg these_events;
262 if ( events[i].portev_events & POLLIN )
263 these_events.events |= Event_In;
264
265 if ( events[i].portev_events & POLLOUT )
266 these_events.events |= Event_Out;
267
268 if ( events[i].portev_events & POLLERR )
269 these_events.events |= Event_Err;
270
271 if ( events[i].portev_events & POLLHUP )
272 these_events.events |= Event_Hup;
273
274 // Under normal circumstances expect a PollAsyncEvent to
275 // be queued to the event manager
276 // But if it returns a non-zero result, then
277 // we deregister the PollUser
278
279 LOG_DEBUG (("Poll_port: send %d events %x",
280 user->_fd, these_events.events));
281
282 user->_event.Send(these_events);
283
284 // If the events include an error then the endpoint
285 // is automatically deregistered
286 if ( these_events.events & (Event_Err|Event_Hup|Event_Nval) ) {
287 // No need to dissociate
288 _internal.erase (entry);
289 }
290 }
291 }
292
293 // Only queue event for processing with poll if items remain
294 if ( !_internal.empty() ) {
295 _queued = true;
296 _manager->Queue (&_myevent, ::Event::Manager::MAXPRIO, true);
297 } else {
298 _queued = false;
299 }
300
301 LOG_DEBUG(("Ending processing, queued=%d", _queued));
302 pthread_mutex_unlock(&_mutex);
303 }
304
305 int Poll_port::Register (User *user)
306 {
307 int rc;
308
309 LOG_DEBUG (("Poll_port: Register: %d", user->_fd));
310
311 if ( (rc = pthread_mutex_lock (&_mutex)) != 0 )
312 return rc;
313
314 if ( _terminating ) {
315 rc = 0;
316 } else if ( _portfd < 0 ) {
317 rc = -1;
318 } else {
319
320 // Check to see if fd already registered
321 polldatamap_t::iterator entry = _internal.find (user);
322
323 if ( entry == _internal.end() ) {
324
325 _internal.insert(user);
326
327 } else {
328 rc = -1;
329 }
330 }
331
332 // Have an poll event if there are any registered FDs
333 if ( !_inpoll && !_queued ) {
334 LOG_DEBUG (("Control: adding Poll event"));
335 _queued = true;
336 _manager->Queue (&_myevent, ::Event::Manager::MAXPRIO);
337 }
338
339 pthread_mutex_unlock (&_mutex);
340 return rc;
341 }
342
343 void Poll_port::Deregister (User *user)
344 {
345 LOG_DEBUG (("Poll_port: Deregister: %d", user->_fd));
346
347 if ( (pthread_mutex_lock (&_mutex)) != 0 )
348 return;
349
350 polldatamap_t::iterator entry = _internal.find (user);
351
352 if ( entry != _internal.end() ) {
353
354 // Disassociate if currently associated
355 // Note: ENOENT is not regarded as an error,
356 // It means the FD was not actually associated
357 if (user->_pevents &&
358 port_dissociate (_portfd, PORT_SOURCE_FD, user->_fd) &&
359 errno != ENOENT )
360 MSG_IOevent_Pollerror_LOG ("port_dissociate");
361
362 // Do NOT cancel the event, as the poll user should be the
363 // only user of this.
364
365 _internal.erase (entry);
366 }
367
368 pthread_mutex_unlock (&_mutex);
369 }
370
371 void Poll_port::Control (User *user)
372 {
373 int fd = user->_fd;
374
375 LOG_DEBUG (("Poll_port: Control: %d %x", fd, user->_uevents));
376
377 if ( _terminating ) return;
378
379 user->_pevents = user->_uevents;
380 (void) PortControl (user);
381 }
382
383 Provider *CreatePollportProvider ()
384 {
385 return new Poll_port();
386 }
387}
388
389#else
390
391namespace Poll {
392
393 // Don't have Solaris ports therefore cannot have this kind of provider
394 Provider *CreatePollportProvider ()
395 {
396 return 0;
397 }
398}
399#endif
@ Event_Terminate
Invalid request: fd not open.
@ Event_Hup
Error condition.
@ Event_Err
Writing now will not block.
@ Event_Out
There is urgent data to read.
::Event::AsyncEventAux< Provider, pollAction > ProviderEvent
Type for delivering events to the provider.
Carries the events on the appropriate FD, or that the end point should terminate.
void getNow(struct timespec &now)
Return the current time.
Definition timeutil.h:60
unsigned long timespec_diff(const struct timespec *time1, const struct timespec *time2)
Function for getting the difference between two times.
Definition timeutil.h:42

All rights reserved © 2002 - 2024 Isode Ltd.