#include <Reactor.h>
Collaboration diagram for ASSA::Reactor:
Public Member Functions | |
Reactor () | |
Constructor. | |
~Reactor () | |
Destructor. | |
TimerId | registerTimerHandler (EventHandler *eh_, const TimeVal &tv_, const std::string &name_="<unknown>") |
Register Timer Event handler with Reactor. | |
bool | registerIOHandler (EventHandler *eh_, handler_t fd_, EventType et_=RWE_EVENTS) |
Register I/O Event handler with Reactor. | |
bool | removeHandler (EventHandler *eh_, EventType et_=ALL_EVENTS) |
Remove Event handler from reactor for either all I/O events or timeout event or both. | |
bool | removeTimerHandler (TimerId id_) |
Remove Timer event from the queue. | |
bool | removeIOHandler (handler_t fd_) |
Remove IO Event handler from reactor. | |
void | waitForEvents (void) |
Main waiting loop that blocks indefinitely processing events. | |
void | waitForEvents (TimeVal *tv_) |
Wait for events for time specified. | |
void | stopReactor (void) |
Stop Reactor's activity. | |
void | deactivate (void) |
Deactivate Reactor. | |
Private Types | |
typedef std::map< u_int, EventHandler * > | Fd2Eh_Map_Type |
no cloning | |
typedef Fd2Eh_Map_Type::iterator | Fd2Eh_Map_Iter |
Private Member Functions | |
Reactor (const Reactor &) | |
Reactor & | operator= (const Reactor &) |
no cloning | |
void | adjust_maxfdp1 (handler_t fd_, handler_t rmax_, handler_t wmax_, handler_t emax_) |
Adjust maxfdp1 in a portable way (win32 ignores masfd alltogether). | |
bool | handleError (void) |
Handle error in select(2) loop appropriately. | |
bool | dispatch (int minimum_) |
Notify all EventHandlers registered on respecful events occured. | |
int | isAnyReady (void) |
Return number of file descriptors ready accross all sets. | |
bool | checkFDs (void) |
Check mask for bad file descriptors. | |
void | dispatchHandler (FdSet &mask_, Fd2Eh_Map_Type &fdSet_, EH_IO_Callback callback_) |
Call handler's callback and, if callback returns negative value, remove it from the Reactor. | |
void | calculateTimeout (TimeVal *&howlong_, TimeVal *maxwait_) |
Calculate closest timeout. | |
Private Attributes | |
int | m_fd_setsize |
Max number of open files per process. | |
handler_t | m_maxfd_plus1 |
Max file descriptor number (in all sets) plus 1. | |
bool | m_active |
Flag that indicates whether Reactor is active or had been stopped. | |
Fd2Eh_Map_Type | m_readSet |
Event handlers awaiting on READ_EVENT. | |
Fd2Eh_Map_Type | m_writeSet |
Event handlers awaiting on WRITE_EVENT. | |
Fd2Eh_Map_Type | m_exceptSet |
Event handlers awaiting on EXCEPT_EVENT. | |
MaskSet | m_waitSet |
Handlers to wait for event on. | |
MaskSet | m_readySet |
Handlers that are ready for processing. | |
TimerQueue | m_tqueue |
The queue of Timers. |
Definition at line 57 of file Reactor.h.
|
|
|
no cloning
|
|
Constructor. Maximum number of sockets supported (per process) Win32 defines it to 64 in winsock2.h. Initialize winsock2 library Definition at line 24 of file Reactor.cpp. References m_fd_setsize, ASSA::REACTTRACE, and trace_with_mask. 00024 : 00025 m_fd_setsize (1024), 00026 m_maxfd_plus1 (0), 00027 m_active (true) 00028 { 00029 trace_with_mask("Reactor::Reactor",REACTTRACE); 00030 00034 #if defined(WIN32) 00035 m_fd_setsize = FD_SETSIZE; 00036 00037 #else // POSIX 00038 struct rlimit rlim; 00039 rlim.rlim_max = 0; 00040 00041 if ( getrlimit (RLIMIT_NOFILE, &rlim) == 0 ) { 00042 m_fd_setsize = rlim.rlim_cur; 00043 } 00044 #endif 00045 00048 #if defined (WIN32) 00049 WSADATA data; 00050 WSAStartup (MAKEWORD (2, 2), &data); 00051 #endif 00052 }
|
|
Destructor. Otherwise, a race condition between Logger (singleton) and GenServer (singleton) might yield core dump if Reactor has been destroyed before Logger. Since Reactor is *attached* to the Logger with Logger::log_open () for the assa-logd connection, it is Reactor's responsibility to *detach* first. Definition at line 63 of file Reactor.cpp. References deactivate(), ASSA::Log::log_close(), m_exceptSet, m_readSet, m_writeSet, ASSA::REACTTRACE, and trace_with_mask. 00064 { 00065 trace_with_mask("Reactor::~Reactor",REACTTRACE); 00066 00067 m_readSet.clear (); 00068 m_writeSet.clear (); 00069 m_exceptSet.clear (); 00070 Log::log_close (); 00071 deactivate (); 00072 }
|
|
|
|
Adjust maxfdp1 in a portable way (win32 ignores masfd alltogether). Win32 implementation of select() ignores this value altogether. Definition at line 729 of file Reactor.cpp. References DL, m_maxfd_plus1, ASSA::REACTTRACE, and trace_with_mask. 00733 { 00734 #if !defined (WIN32) /* POSIX */ 00735 00736 trace_with_mask("Reactor::adjust_maxfdp1", REACTTRACE); 00737 00738 if (m_maxfd_plus1 == fd_ + 1) { 00739 m_maxfd_plus1 = std::max (rmax_, std::max (wmax_, emax_)); 00740 00741 DL((REACT,"maxfd+1 adjusted to %d\n", m_maxfd_plus1)); 00742 } 00743 #endif 00744 }
|
|
Calculate closest timeout. If TimerQueue is not empty, then return smallest of maxtimeout and first in the queue. Otherwise, return maxtimeout.
Definition at line 449 of file Reactor.cpp. References ASSA::TimerQueue::isEmpty(), m_tqueue, ASSA::REACTTRACE, and trace_with_mask. Referenced by waitForEvents(). 00450 { 00451 trace_with_mask("Reactor::calculateTimeout",REACTTRACE); 00452 00453 TimeVal now; 00454 TimeVal tv; 00455 00456 if (m_tqueue.isEmpty () ) { 00457 howlong_ = maxwait_; 00458 goto done; 00459 } 00460 now = TimeVal::gettimeofday (); 00461 tv = m_tqueue.top (); 00462 00463 if (tv < now) { 00464 /*--- 00465 It took too long to get here (fraction of a millisecond), 00466 and top timer had already expired. In this case, 00467 perform non-blocking select in order to drain the timer queue. 00468 ---*/ 00469 *howlong_ = 0; 00470 } 00471 else { 00472 DL((REACT,"--------- Timer Queue ----------\n")); 00473 m_tqueue.dump(); 00474 DL((REACT,"--------------------------------\n")); 00475 00476 if (maxwait_ == NULL || *maxwait_ == TimeVal::zeroTime ()) { 00477 *howlong_ = tv - now; 00478 } 00479 else { 00480 *howlong_ = (*maxwait_+now) < tv ? *maxwait_ : tv-now; 00481 } 00482 } 00483 00484 done: 00485 if (howlong_ != NULL) { 00486 DL((REACT,"delay (%f)\n", double (*howlong_) )); 00487 } 00488 else { 00489 DL((REACT,"delay (forever)\n")); 00490 } 00491 }
|
|
Check mask for bad file descriptors.
Definition at line 345 of file Reactor.cpp. References ASSA::FdSet::clear(), DL, m_fd_setsize, m_readSet, ASSA::REACTTRACE, removeIOHandler(), ASSA::FdSet::setFd(), and trace_with_mask. Referenced by handleError(). 00346 { 00347 trace_with_mask("Reactor::checkFDs",REACTTRACE); 00348 00349 bool num_removed = false; 00350 FdSet mask; 00351 timeval poll = { 0, 0 }; 00352 00353 for (handler_t fd = 0; fd < m_fd_setsize; fd++) { 00354 if ( m_readSet[fd] != NULL ) { 00355 mask.setFd (fd); 00356 if ( ::select (fd+1, &mask, NULL, NULL, &poll) < 0 ) { 00357 removeIOHandler (fd); 00358 num_removed = true; 00359 DL((REACT,"Detected BAD FD: %d\n", fd )); 00360 } 00361 mask.clear (fd); 00362 } 00363 } 00364 return (num_removed); 00365 }
|
|
Deactivate Reactor. This function sets internal flag which notifies Reactor's internal event handling loop to abort its activity. It is mostly used when a *slow* system call is interrupted by the signal handler. The system call will be restarted by OS after control returns from the signal handler. Signal handler (GenServer::handle_signal()) should call this method to delay Reactor's deactivation. Definition at line 237 of file Reactor.h. References m_active. Referenced by ASSA::GenServer::handle_signal(), ASSA::GenServer::stop_service(), and ~Reactor(). 00237 { m_active = false; }
|
|
Notify all EventHandlers registered on respecful events occured.
Definition at line 654 of file Reactor.cpp. References ASSA::ASSAERR, dispatchHandler(), DL, ASSA::MaskSet::dump(), EL, ASSA::TimerQueue::expire(), ASSA::TimeVal::gettimeofday(), ASSA::EventHandler::handle_except(), ASSA::EventHandler::handle_read(), ASSA::EventHandler::handle_write(), ASSA::MaskSet::m_eset, m_exceptSet, m_readSet, m_readySet, ASSA::MaskSet::m_rset, m_tqueue, m_writeSet, ASSA::MaskSet::m_wset, ASSA::REACTTRACE, and trace_with_mask. Referenced by waitForEvents(). 00655 { 00656 trace_with_mask("Reactor::dispatch", REACTTRACE); 00657 00658 m_tqueue.expire (TimeVal::gettimeofday ()); 00659 00660 if ( ready_ < 0 ) 00661 { 00662 #if !defined (WIN32) 00663 EL((ASSAERR,"::select(3) error\n")); 00664 #endif 00665 return (false); 00666 } 00667 if ( ready_ == 0 ) { 00668 return (true); 00669 } 00670 00671 DL((REACT,"Dispatching %d FDs.\n",ready_)); 00672 DL((REACT,"m_readySet:\n")); 00673 m_readySet.dump (); 00674 00675 /*--- Writes first ---*/ 00676 dispatchHandler (m_readySet.m_wset, 00677 m_writeSet, 00678 &EventHandler::handle_write); 00679 00680 /*--- Exceptions next ---*/ 00681 dispatchHandler (m_readySet.m_eset, 00682 m_exceptSet, 00683 &EventHandler::handle_except); 00684 00685 /*--- Finally, the Reads ---*/ 00686 dispatchHandler (m_readySet.m_rset, 00687 m_readSet, 00688 &EventHandler::handle_read); 00689 00690 return (true); 00691 }
|
|
Call handler's callback and, if callback returns negative value, remove it from the Reactor. When you have several high data-rate connections sending data at the same time, the one that had connected first would get lower FD number and would get data transfer preference over everybody else who has connected later on. WIN32 HACK: Without having restarted scan from the beginning, this causes crash due to the fact that firing a callback of EventHandler might have invalidated the iterator (happens with Connector's in a sync mode). Definition at line 596 of file Reactor.cpp. References ASSA::FdSet::clear(), DL, ASSA::EventHandler::get_id(), ASSA::FdSet::isSet(), ASSA::REACTTRACE, removeIOHandler(), and trace_with_mask. Referenced by dispatch(). 00597 { 00598 trace_with_mask("Reactor::dispatchHandler",REACTTRACE); 00599 00600 int ret = 0; 00601 handler_t fd; 00602 EventHandler* ehp = NULL; 00603 std::string eh_id; 00604 00605 Fd2Eh_Map_Iter iter = fdSet_.begin (); 00606 00607 while (iter != fdSet_.end ()) 00608 { 00609 fd = (*iter).first; 00610 ehp = (*iter).second; 00611 00612 if (mask_.isSet (fd) && ehp != NULL) 00613 { 00614 eh_id = ehp->get_id (); 00615 DL((REACT,"Data detected from \"%s\"(fd=%d)\n", 00616 eh_id.c_str (), fd)); 00617 00618 ret = (ehp->*callback_) (fd); /* Fire up a callback */ 00619 00620 if (ret == -1) { 00621 removeIOHandler (fd); 00622 } 00623 else if (ret > 0) { 00624 DL((REACT,"%d bytes pending on fd=%d \"%s\"\n", 00625 ret, fd, eh_id.c_str ())); 00626 //return; <-- would starve other connections 00627 } 00628 else { 00629 DL((REACT,"All data from \"%s\"(fd=%d) are consumed\n", 00630 eh_id.c_str (), fd)); 00631 mask_.clear (fd); 00632 } 00639 iter = fdSet_.begin (); 00640 } 00641 else { 00642 iter++; 00643 } 00644 } 00645 }
|
|
Handle error in select(2) loop appropriately. If commanded to stop, do so Definition at line 369 of file Reactor.cpp. References ASSA::ASSAERR, checkFDs(), DL, EL, m_active, ASSA::REACTTRACE, and trace_with_mask. Referenced by waitForEvents(). 00370 { 00371 trace_with_mask("Reactor::handleError",REACTTRACE); 00372 00375 if ( !m_active ) { 00376 DL((REACT,"Received cmd to stop Reactor\n")); 00377 return (false); 00378 } 00379 00380 /*--- 00381 TODO: If select(2) returns before time expires, with 00382 a descriptor ready or with EINTR, timeval is not 00383 going to be updated with number of seconds remaining. 00384 This is true for all systems except Linux, which will 00385 do so. Therefore, to restart correctly in case of 00386 EINTR, we ought to take time measurement before and 00387 after select, and try to select() for remaining time. 00388 00389 For now, we restart with the initial timing value. 00390 ---*/ 00391 /*--- 00392 BSD kernel never restarts select(2). SVR4 will restart if 00393 the SA_RESTART flag is specified when the signal handler 00394 for the signal delivered is installed. This means taht for 00395 portability, we must handle signal interrupts. 00396 ---*/ 00397 00398 if ( errno == EINTR ) { 00399 EL((REACT,"EINTR: interrupted select(2)\n")); 00400 /* 00401 If I was sitting in select(2) and received SIGTERM, 00402 the signal handler would have set m_active to 'false', 00403 and this function would have returned 'false' as above. 00404 For any other non-critical signals (USR1,...), 00405 we retry select. 00406 */ 00407 return (true); 00408 } 00409 /* 00410 EBADF - bad file number. One of the file descriptors does 00411 not reference an open file to open(), close(), ioctl(). 00412 This can happen if user closed fd and forgot to remove 00413 handler from Reactor. 00414 */ 00415 if ( errno == EBADF ) { 00416 DL((REACT,"EBADF: bad file descriptor\n")); 00417 return (checkFDs ()); 00418 } 00419 /* 00420 Any other error from select 00421 */ 00422 #if defined (WIN32) 00423 DL ((REACT,"select(3) error = %d\n", WSAGetLastError())); 00424 #else 00425 EL((ASSAERR,"select(3) error\n")); 00426 #endif 00427 return (false); 00428 }
|
|
Return number of file descriptors ready accross all sets.
Definition at line 432 of file Reactor.cpp. References DL, ASSA::MaskSet::dump(), ASSA::MaskSet::m_eset, m_readySet, ASSA::MaskSet::m_rset, ASSA::MaskSet::m_wset, ASSA::FdSet::numSet(), ASSA::REACTTRACE, and trace_with_mask. Referenced by waitForEvents(). 00433 { 00434 trace_with_mask("Reactor::isAnyReady",REACTTRACE); 00435 00436 int n = m_readySet.m_rset.numSet () + 00437 m_readySet.m_wset.numSet () + 00438 m_readySet.m_eset.numSet (); 00439 00440 if ( n > 0 ) { 00441 DL((REACT,"m_readySet: %d FDs are ready for processing\n", n)); 00442 m_readySet.dump (); 00443 } 00444 return (n); 00445 }
|
|
no cloning
|
|
Register I/O Event handler with Reactor. Reactor will dispatch appropriate callback when event of EventType is received.
Definition at line 102 of file Reactor.cpp. References ASSA::ASSAERR, Assure_return, DL, ASSA::isReadEvent(), ASSA::isSignalEvent(), ASSA::isTimeoutEvent(), m_readSet, ASSA::MaskSet::m_rset, m_waitSet, ASSA::REACTTRACE, ASSA::FdSet::setFd(), and trace_with_mask. Referenced by ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doAsync(), ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doSync(), and ASSA::RemoteLogger::log_open(). 00103 { 00104 trace_with_mask("Reactor::registerHandler(I/O)",REACTTRACE); 00105 00106 std::ostringstream msg; 00107 Assure_return (eh_ && !isSignalEvent (et_) && !isTimeoutEvent (et_)); 00108 00109 if (isReadEvent (et_)) 00110 { 00111 if (!m_waitSet.m_rset.setFd (fd_)) 00112 { 00113 DL((ASSAERR,"readset: fd %d out of range\n", fd_)); 00114 return (false); 00115 } 00116 m_readSet[fd_] = eh_; 00117 msg << "READ_EVENT"; 00118 } 00119 00120 if (isWriteEvent (et_)) 00121 { 00122 if (!m_waitSet.m_wset.setFd (fd_)) 00123 { 00124 DL((ASSAERR,"writeset: fd %d out of range\n", fd_)); 00125 return (false); 00126 } 00127 m_writeSet[fd_] = eh_; 00128 msg << " WRITE_EVENT"; 00129 } 00130 00131 if (isExceptEvent (et_)) 00132 { 00133 if (!m_waitSet.m_eset.setFd (fd_)) 00134 { 00135 DL((ASSAERR,"exceptset: fd %d out of range\n", fd_)); 00136 return (false); 00137 } 00138 m_exceptSet[fd_] = eh_; 00139 msg << " EXCEPT_EVENT"; 00140 } 00141 msg << std::ends; 00142 00143 DL((REACT,"Registered EvtH(%s) fd=%d (0x%x) for event(s) %s\n", 00144 eh_->get_id ().c_str (), fd_, (u_long)eh_, msg.str ().c_str () )); 00145 00146 #if !defined (WIN32) 00147 if (m_maxfd_plus1 < fd_+1) { 00148 m_maxfd_plus1 = fd_+1; 00149 DL((REACT,"maxfd+1 adjusted to %d\n", m_maxfd_plus1)); 00150 } 00151 #endif 00152 00153 DL((REACT,"Modified waitSet:\n")); 00154 m_waitSet.dump (); 00155 00156 return (true); 00157 }
|
|
Register Timer Event handler with Reactor. Reactor will dispatch appropriate callback when event of EventType is received.
Definition at line 76 of file Reactor.cpp. References Assure_return, DL, ASSA::TimerQueue::dump(), ASSA::TimeVal::fmtString(), ASSA::TimeVal::gettimeofday(), ASSA::TimerQueue::insert(), m_tqueue, ASSA::TimeVal::msec(), ASSA::REACT, ASSA::REACTTRACE, ASSA::TimeVal::sec(), and trace_with_mask. Referenced by ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doAsync(), and ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doSync(). 00079 { 00080 trace_with_mask( "Reactor::registerTimerHandler",REACTTRACE); 00081 Assure_return (eh_); 00082 00083 TimeVal now (TimeVal::gettimeofday()); 00084 TimeVal t (now + timeout_); 00085 00086 DL((REACT,"TIMEOUT_EVENT......: (%d,%d)\n", 00087 timeout_.sec(),timeout_.msec())); 00088 DL((REACT,"Time now...........: %s\n", now.fmtString().c_str() )); 00089 DL((REACT,"Scheduled to expire: %s\n", t.fmtString().c_str() )); 00090 00091 TimerId tid = m_tqueue.insert (eh_, t, timeout_, name_); 00092 00093 DL((REACT,"---Modified Timer Queue----\n")); 00094 m_tqueue.dump(); 00095 DL((REACT,"---------------------------\n")); 00096 00097 return (tid); 00098 }
|
|
Remove Event handler from reactor for either all I/O events or timeout event or both. If et_ is TIMEOUT_EVENT, all timers associated with Event Handler eh_ will be removed.
Definition at line 182 of file Reactor.cpp. References ASSA::FdSet::clear(), ASSA::isReadEvent(), ASSA::isTimeoutEvent(), m_readSet, ASSA::MaskSet::m_rset, m_tqueue, m_waitSet, ASSA::REACTTRACE, ASSA::TimerQueue::remove(), and trace_with_mask. Referenced by ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doSync(), ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::handle_timeout(), ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::handle_write(), ASSA::RemoteLogger::log_close(), and stopReactor(). 00183 { 00184 trace_with_mask("Reactor::removeHandler(eh_,et_)",REACTTRACE); 00185 00186 bool ret = false; 00187 handler_t fd; 00188 handler_t rfdmax; 00189 handler_t wfdmax; 00190 handler_t efdmax; 00191 Fd2Eh_Map_Iter iter; 00192 00193 rfdmax = wfdmax = efdmax = 0; 00194 00195 if (eh_ == NULL) { 00196 return false; 00197 } 00198 00199 if (isTimeoutEvent (event_)) { 00200 ret = m_tqueue.remove (eh_); 00201 ret = true; 00202 } 00203 00204 if (isReadEvent (event_)) { 00205 iter = m_readSet.begin (); 00206 while (iter != m_readSet.end ()) { 00207 if ((*iter).second == eh_) { 00208 fd = (*iter).first; 00209 m_readSet.erase (iter); 00210 m_waitSet.m_rset.clear (fd); 00211 ret = true; 00212 break; 00213 } 00214 rfdmax = fd; 00215 iter++; 00216 } 00217 } 00218 00219 if (isWriteEvent (event_)) { 00220 iter = m_writeSet.begin (); 00221 while (iter != m_writeSet.end ()) { 00222 if ((*iter).second == eh_) { 00223 fd = (*iter).first; 00224 m_writeSet.erase (iter); 00225 m_waitSet.m_wset.clear (fd); 00226 ret = true; 00227 break; 00228 } 00229 wfdmax = fd; 00230 iter++; 00231 } 00232 } 00233 00234 if (isExceptEvent (event_)) { 00235 iter = m_exceptSet.begin (); 00236 while (iter != m_exceptSet.end ()) { 00237 if ((*iter).second == eh_) { 00238 fd = (*iter).first; 00239 m_exceptSet.erase (iter); 00240 m_waitSet.m_eset.clear (fd); 00241 ret = true; 00242 break; 00243 } 00244 efdmax = fd; 00245 iter++; 00246 } 00247 } 00248 00249 if (ret == true) { 00250 DL((REACT,"Found EvtH \"%s\"(0x%X)\n", 00251 eh_->get_id ().c_str (),int(eh_))); 00252 eh_->handle_close (fd); 00253 } 00254 00255 adjust_maxfdp1 (fd, rfdmax, wfdmax, efdmax); 00256 00257 DL((REACT,"Modifies waitSet:\n")); 00258 m_waitSet.dump (); 00259 00260 return (ret); 00261 }
|
|
Remove IO Event handler from reactor. This will remove handler from receiving all I/O events.
Definition at line 265 of file Reactor.cpp. References Assure_return, ASSA::FdSet::clear(), DL, ASSA::is_valid_handler(), m_readSet, m_readySet, ASSA::MaskSet::m_rset, m_waitSet, ASSA::REACTTRACE, and trace_with_mask. Referenced by checkFDs(), and dispatchHandler(). 00266 { 00267 trace_with_mask("Reactor::removeIOHandler",REACTTRACE); 00268 00269 bool ret = false; 00270 EventHandler* ehp = NULL; 00271 Fd2Eh_Map_Iter iter; 00272 00273 handler_t rfdmax; 00274 handler_t wfdmax; 00275 handler_t efdmax; 00276 00277 rfdmax = wfdmax = efdmax = 0; 00278 00279 Assure_return (ASSA::is_valid_handler (fd_)); 00280 00281 DL((REACT,"Removing handler for fd=%d\n",fd_)); 00282 00287 if ((iter = m_readSet.find (fd_)) != m_readSet.end ()) 00288 { 00289 ehp = (*iter).second; 00290 m_readSet.erase (iter); 00291 m_waitSet.m_rset.clear (fd_); 00292 m_readySet.m_rset.clear (fd_); 00293 if (m_readSet.size () > 0) { 00294 iter = m_readSet.end (); 00295 iter--; 00296 rfdmax = (*iter).first; 00297 } 00298 ret = true; 00299 } 00300 00301 if ((iter = m_writeSet.find (fd_)) != m_writeSet.end ()) 00302 { 00303 ehp = (*iter).second; 00304 m_writeSet.erase (iter); 00305 m_waitSet.m_wset.clear (fd_); 00306 m_readySet.m_wset.clear (fd_); 00307 if (m_writeSet.size () > 0) { 00308 iter = m_writeSet.end (); 00309 iter--; 00310 wfdmax = (*iter).first; 00311 } 00312 ret = true; 00313 } 00314 00315 if ((iter = m_exceptSet.find (fd_)) != m_exceptSet.end ()) 00316 { 00317 ehp = (*iter).second; 00318 m_exceptSet.erase (iter); 00319 m_waitSet.m_eset.clear (fd_); 00320 m_readySet.m_eset.clear (fd_); 00321 if (m_exceptSet.size () > 0) { 00322 iter = m_exceptSet.end (); 00323 iter--; 00324 efdmax = (*iter).first; 00325 } 00326 ret = true; 00327 } 00328 00329 if (ret == true && ehp != NULL) { 00330 DL((REACT,"Removed EvtH \"%s\"(0x%X)\n", 00331 ehp->get_id ().c_str (), int(ehp))); 00332 ehp->handle_close (fd_); 00333 } 00334 00335 adjust_maxfdp1 (fd_, rfdmax, wfdmax, efdmax); 00336 00337 DL((REACT,"Modifies waitSet:\n")); 00338 m_waitSet.dump (); 00339 00340 return (ret); 00341 }
|
|
Remove Timer event from the queue. This removes particular event.
Definition at line 161 of file Reactor.cpp. References ASSA::ASSAERR, DL, ASSA::TimerQueue::dump(), EL, m_tqueue, ASSA::REACT, ASSA::REACTTRACE, ASSA::TimerQueue::remove(), and trace_with_mask. Referenced by ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::handle_write(). 00162 { 00163 trace_with_mask("Reactor::removeTimer",REACTTRACE); 00164 bool ret; 00165 00166 if ((ret = m_tqueue.remove (tid_))) { 00167 DL((REACT,"---Modified Timer Queue----\n")); 00168 m_tqueue.dump(); 00169 DL((REACT,"---------------------------\n")); 00170 } 00171 else { 00172 EL((ASSAERR,"Timer tid 0x%x wasn't found!\n", (u_long)tid_ )); 00173 } 00174 return (ret); 00175 }
|
|
Stop Reactor's activity. This effectively removes all handlers from under Reactor's supervision. As of now, there is no way to re-activate the Reactor. This method is typically called from method other then EventHandler::signal_handler(). EventHandler::handle_read () is a good candidate. Calling it from EventHandler::handle_close () will most likely cause an infinite loop of recursive calls. Definition at line 695 of file Reactor.cpp. References m_active, m_readSet, ASSA::REACTTRACE, removeHandler(), and trace_with_mask. 00696 { 00697 trace_with_mask("Reactor::stopReactor", REACTTRACE); 00698 00699 m_active = false; 00700 00701 Fd2Eh_Map_Iter iter; 00702 EventHandler* ehp; 00703 00704 while (m_readSet.size () > 0) { 00705 iter = m_readSet.begin (); 00706 ehp = (*iter).second; 00707 removeHandler (ehp); 00708 } 00709 00710 while (m_writeSet.size () > 0) { 00711 iter = m_writeSet.begin (); 00712 ehp = (*iter).second; 00713 removeHandler (ehp); 00714 } 00715 00716 while (m_exceptSet.size () > 0) { 00717 iter = m_exceptSet.begin (); 00718 ehp = (*iter).second; 00719 removeHandler (ehp); 00720 } 00721 }
|
|
Wait for events for time specified. Passing NULL replicates behavior of waitForEvents(void). Passing tv_ {0, 0} will cause non-blocking polling for all events. This method blocks up to tv_ time interval processing event. If an event occurs, it will process event(s) and return. tv_ time is adjusted by substracting time spent in event processing.
Definition at line 523 of file Reactor.cpp. References calculateTimeout(), dispatch(), DL, ASSA::MaskSet::dump(), ASSA::TimerQueue::expire(), ASSA::TimeVal::gettimeofday(), handleError(), isAnyReady(), m_active, ASSA::MaskSet::m_eset, m_maxfd_plus1, m_readySet, ASSA::MaskSet::m_rset, m_tqueue, m_waitSet, ASSA::MaskSet::m_wset, ASSA::REACTTRACE, ASSA::MaskSet::reset(), ASSA::MaskSet::sync(), and trace_with_mask. 00524 { 00525 trace_with_mask("Reactor::waitForEvents",REACTTRACE); 00526 00527 TimerCountdown traceTime (tv_); 00528 DL((REACT,"======================================\n")); 00529 00530 /*--- Expire all stale Timers ---*/ 00531 m_tqueue.expire (TimeVal::gettimeofday ()); 00532 00533 /* Test to see if Reactor has been deactivated as a result 00534 * of processing done by any TimerHandlers. 00535 */ 00536 if (!m_active) { 00537 return; 00538 } 00539 00540 int nReady; 00541 TimeVal delay; 00542 TimeVal* dlp = &delay; 00543 00544 /*--- 00545 In case if not all data have been processed by the EventHandler, 00546 and EventHandler stated so in its callback's return value 00547 to dispatcher (), it will be called again. This way 00548 underlying file/socket stream can efficiently utilize its 00549 buffering mechaninsm. 00550 ---*/ 00551 if ((nReady = isAnyReady ())) { 00552 DL((REACT,"isAnyReady returned: %d\n",nReady)); 00553 dispatch (nReady); 00554 return; 00555 } 00556 00557 DL((REACT,"=== m_waitSet ===\n")); 00558 m_waitSet.dump (); 00559 00560 do { 00561 m_readySet.reset (); 00562 DL ((REACT,"m_readySet after reset():\n")); 00563 m_readySet.dump (); 00564 00565 m_readySet = m_waitSet; 00566 DL ((REACT,"m_readySet after assign:\n")); 00567 m_readySet.dump (); 00568 00569 calculateTimeout (dlp, tv_); 00570 00571 nReady = ::select (m_maxfd_plus1, 00572 &m_readySet.m_rset, 00573 &m_readySet.m_wset, 00574 &m_readySet.m_eset, 00575 dlp); 00576 DL((REACT,"::select() returned: %d\n",nReady)); 00577 00578 m_readySet.sync (); 00579 DL ((REACT,"m_readySet after select:\n")); 00580 m_readySet.dump (); 00581 00582 } 00583 while (nReady < 0 && handleError ()); 00584 00585 dispatch (nReady); 00586 }
|
|
Main waiting loop that blocks indefinitely processing events.
Definition at line 498 of file Reactor.cpp. References m_active. Referenced by ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doSync(). 00499 { 00500 while ( m_active ) { 00501 waitForEvents ((TimeVal*) NULL); 00502 } 00503 }
|
|
Flag that indicates whether Reactor is active or had been stopped.
Definition at line 212 of file Reactor.h. Referenced by deactivate(), handleError(), stopReactor(), and waitForEvents(). |
|
Event handlers awaiting on EXCEPT_EVENT.
Definition at line 221 of file Reactor.h. Referenced by dispatch(), and ~Reactor(). |
|
Max number of open files per process. This is the soft limit enforced by the kernel. It can be obtained/manipulated from the shell with ulimit/limit utilities, but may not exceed the hard limit. Definition at line 203 of file Reactor.h. Referenced by checkFDs(), and Reactor(). |
|
Max file descriptor number (in all sets) plus 1. This value is ignored by WIN32 implementation of select() Definition at line 209 of file Reactor.h. Referenced by adjust_maxfdp1(), and waitForEvents(). |
|
Event handlers awaiting on READ_EVENT.
Definition at line 215 of file Reactor.h. Referenced by checkFDs(), dispatch(), registerIOHandler(), removeHandler(), removeIOHandler(), stopReactor(), and ~Reactor(). |
|
Handlers that are ready for processing.
Definition at line 227 of file Reactor.h. Referenced by dispatch(), isAnyReady(), removeIOHandler(), and waitForEvents(). |
|
The queue of Timers.
Definition at line 230 of file Reactor.h. Referenced by calculateTimeout(), dispatch(), registerTimerHandler(), removeHandler(), removeTimerHandler(), and waitForEvents(). |
|
Handlers to wait for event on.
Definition at line 224 of file Reactor.h. Referenced by registerIOHandler(), removeHandler(), removeIOHandler(), and waitForEvents(). |
|
Event handlers awaiting on WRITE_EVENT.
Definition at line 218 of file Reactor.h. Referenced by dispatch(), and ~Reactor(). |