ASSA::Reactor Class Reference

#include <Reactor.h>

Collaboration diagram for ASSA::Reactor:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 Reactor ()
 Constructor.
 ~Reactor ()
 Destructor.
TimerId registerTimerHandler (EventHandler *eh_, const TimeVal &tv_, const std::string &name_="<unknown>")
 Register Timer Event handler with Reactor.
bool registerIOHandler (EventHandler *eh_, handler_t fd_, EventType et_=RWE_EVENTS)
 Register I/O Event handler with Reactor.
bool removeHandler (EventHandler *eh_, EventType et_=ALL_EVENTS)
 Remove Event handler from reactor for either all I/O events or timeout event or both.
bool removeTimerHandler (TimerId id_)
 Remove Timer event from the queue.
bool removeIOHandler (handler_t fd_)
 Remove IO Event handler from reactor.
void waitForEvents (void)
 Main waiting loop that blocks indefinitely processing events.
void waitForEvents (TimeVal *tv_)
 Wait for events for time specified.
void stopReactor (void)
 Stop Reactor's activity.
void deactivate (void)
 Deactivate Reactor.

Private Types

typedef std::map< u_int, EventHandler * > Fd2Eh_Map_Type
 no cloning
typedef Fd2Eh_Map_Type::iterator Fd2Eh_Map_Iter

Private Member Functions

 Reactor (const Reactor &)
Reactoroperator= (const Reactor &)
 no cloning
void adjust_maxfdp1 (handler_t fd_, handler_t rmax_, handler_t wmax_, handler_t emax_)
 Adjust maxfdp1 in a portable way (win32 ignores masfd alltogether).
bool handleError (void)
 Handle error in select(2) loop appropriately.
bool dispatch (int minimum_)
 Notify all EventHandlers registered on respecful events occured.
int isAnyReady (void)
 Return number of file descriptors ready accross all sets.
bool checkFDs (void)
 Check mask for bad file descriptors.
void dispatchHandler (FdSet &mask_, Fd2Eh_Map_Type &fdSet_, EH_IO_Callback callback_)
 Call handler's callback and, if callback returns negative value, remove it from the Reactor.
void calculateTimeout (TimeVal *&howlong_, TimeVal *maxwait_)
 Calculate closest timeout.

Private Attributes

int m_fd_setsize
 Max number of open files per process.
handler_t m_maxfd_plus1
 Max file descriptor number (in all sets) plus 1.
bool m_active
 Flag that indicates whether Reactor is active or had been stopped.
Fd2Eh_Map_Type m_readSet
 Event handlers awaiting on READ_EVENT.
Fd2Eh_Map_Type m_writeSet
 Event handlers awaiting on WRITE_EVENT.
Fd2Eh_Map_Type m_exceptSet
 Event handlers awaiting on EXCEPT_EVENT.
MaskSet m_waitSet
 Handlers to wait for event on.
MaskSet m_readySet
 Handlers that are ready for processing.
TimerQueue m_tqueue
 The queue of Timers.

Detailed Description

Definition at line 57 of file Reactor.h.


Member Typedef Documentation

typedef Fd2Eh_Map_Type::iterator ASSA::Reactor::Fd2Eh_Map_Iter [private]
 

Definition at line 155 of file Reactor.h.

typedef std::map<u_int, EventHandler*> ASSA::Reactor::Fd2Eh_Map_Type [private]
 

no cloning

Definition at line 154 of file Reactor.h.


Constructor & Destructor Documentation

Reactor::Reactor  ) 
 

Constructor.

Maximum number of sockets supported (per process) Win32 defines it to 64 in winsock2.h.

Initialize winsock2 library

Definition at line 24 of file Reactor.cpp.

References m_fd_setsize, ASSA::REACTTRACE, and trace_with_mask.

00024            : 
00025     m_fd_setsize  (1024), 
00026     m_maxfd_plus1 (0), 
00027     m_active      (true)
00028 {
00029     trace_with_mask("Reactor::Reactor",REACTTRACE);
00030 
00034 #if defined(WIN32)
00035     m_fd_setsize = FD_SETSIZE;
00036 
00037 #else  // POSIX
00038     struct rlimit rlim;
00039     rlim.rlim_max = 0;
00040 
00041     if ( getrlimit (RLIMIT_NOFILE, &rlim) == 0 ) {
00042         m_fd_setsize = rlim.rlim_cur;
00043     }
00044 #endif
00045 
00048 #if defined (WIN32)             
00049     WSADATA data;
00050     WSAStartup (MAKEWORD (2, 2), &data);
00051 #endif
00052 }

Reactor::~Reactor  ) 
 

Destructor.

Otherwise, a race condition between Logger (singleton) and GenServer (singleton) might yield core dump if Reactor has been destroyed before Logger. Since Reactor is *attached* to the Logger with Logger::log_open () for the assa-logd connection, it is Reactor's responsibility to *detach* first.

Definition at line 63 of file Reactor.cpp.

References deactivate(), ASSA::Log::log_close(), m_exceptSet, m_readSet, m_writeSet, ASSA::REACTTRACE, and trace_with_mask.

00064 {   
00065     trace_with_mask("Reactor::~Reactor",REACTTRACE);
00066 
00067     m_readSet.clear   ();
00068     m_writeSet.clear  ();
00069     m_exceptSet.clear ();
00070     Log::log_close ();
00071     deactivate ();
00072 }

ASSA::Reactor::Reactor const Reactor  )  [private]
 


Member Function Documentation

void Reactor::adjust_maxfdp1 handler_t  fd_,
handler_t  rmax_,
handler_t  wmax_,
handler_t  emax_
[private]
 

Adjust maxfdp1 in a portable way (win32 ignores masfd alltogether).

Win32 implementation of select() ignores this value altogether.

Definition at line 729 of file Reactor.cpp.

References DL, m_maxfd_plus1, ASSA::REACTTRACE, and trace_with_mask.

00733 {
00734 #if !defined (WIN32)  /* POSIX */
00735 
00736     trace_with_mask("Reactor::adjust_maxfdp1", REACTTRACE);
00737 
00738     if (m_maxfd_plus1 == fd_ + 1) {
00739         m_maxfd_plus1 = std::max (rmax_, std::max (wmax_, emax_));
00740 
00741         DL((REACT,"maxfd+1 adjusted to %d\n", m_maxfd_plus1));
00742     }
00743 #endif
00744 }

void Reactor::calculateTimeout TimeVal *&  howlong_,
TimeVal maxwait_
[private]
 

Calculate closest timeout.

If TimerQueue is not empty, then return smallest of maxtimeout and first in the queue. Otherwise, return maxtimeout.

Parameters:
maxwait_ (in) how long we are expected to wait for event(s).
howlong_ (out) how long we are going to wait.

Definition at line 449 of file Reactor.cpp.

References ASSA::TimerQueue::isEmpty(), m_tqueue, ASSA::REACTTRACE, and trace_with_mask.

Referenced by waitForEvents().

00450 {
00451     trace_with_mask("Reactor::calculateTimeout",REACTTRACE);
00452 
00453     TimeVal now;
00454     TimeVal tv;
00455 
00456     if (m_tqueue.isEmpty () ) {
00457         howlong_ = maxwait_;
00458         goto done;
00459     }
00460     now = TimeVal::gettimeofday ();
00461     tv = m_tqueue.top ();
00462     
00463     if (tv < now) {
00464         /*--- 
00465           It took too long to get here (fraction of a millisecond), 
00466           and top timer had already expired. In this case,
00467           perform non-blocking select in order to drain the timer queue.
00468           ---*/
00469         *howlong_ = 0;
00470     }
00471     else {  
00472         DL((REACT,"--------- Timer Queue ----------\n"));
00473         m_tqueue.dump();
00474         DL((REACT,"--------------------------------\n"));
00475 
00476         if (maxwait_ == NULL || *maxwait_ == TimeVal::zeroTime ()) {
00477             *howlong_ = tv - now;
00478         }
00479         else {
00480             *howlong_ = (*maxwait_+now) < tv ? *maxwait_ : tv-now;
00481         }
00482     }
00483 
00484  done:
00485     if (howlong_ != NULL) {
00486         DL((REACT,"delay (%f)\n", double (*howlong_) ));
00487     }
00488     else {
00489         DL((REACT,"delay (forever)\n"));
00490     }
00491 }

bool Reactor::checkFDs void   )  [private]
 

Check mask for bad file descriptors.

Returns:
true if any fd(s) were found and removed; false otherwise

Definition at line 345 of file Reactor.cpp.

References ASSA::FdSet::clear(), DL, m_fd_setsize, m_readSet, ASSA::REACTTRACE, removeIOHandler(), ASSA::FdSet::setFd(), and trace_with_mask.

Referenced by handleError().

00346 {
00347     trace_with_mask("Reactor::checkFDs",REACTTRACE);
00348     
00349     bool num_removed = false;
00350     FdSet mask;
00351     timeval poll = { 0, 0 };
00352 
00353     for (handler_t fd = 0; fd < m_fd_setsize; fd++) {
00354         if ( m_readSet[fd] != NULL ) {
00355             mask.setFd (fd);
00356             if ( ::select (fd+1, &mask, NULL, NULL, &poll) < 0 ) {
00357                 removeIOHandler (fd);
00358                 num_removed = true;
00359                 DL((REACT,"Detected BAD FD: %d\n", fd ));
00360             }
00361             mask.clear (fd);
00362         }
00363     }
00364     return (num_removed);
00365 }

void ASSA::Reactor::deactivate void   )  [inline]
 

Deactivate Reactor.

This function sets internal flag which notifies Reactor's internal event handling loop to abort its activity. It is mostly used when a *slow* system call is interrupted by the signal handler. The system call will be restarted by OS after control returns from the signal handler. Signal handler (GenServer::handle_signal()) should call this method to delay Reactor's deactivation.

Definition at line 237 of file Reactor.h.

References m_active.

Referenced by ASSA::GenServer::handle_signal(), ASSA::GenServer::stop_service(), and ~Reactor().

00237 {  m_active = false; }

bool Reactor::dispatch int  minimum_  )  [private]
 

Notify all EventHandlers registered on respecful events occured.

Parameters:
minimum_ number of file descriptors ready.

Definition at line 654 of file Reactor.cpp.

References ASSA::ASSAERR, dispatchHandler(), DL, ASSA::MaskSet::dump(), EL, ASSA::TimerQueue::expire(), ASSA::TimeVal::gettimeofday(), ASSA::EventHandler::handle_except(), ASSA::EventHandler::handle_read(), ASSA::EventHandler::handle_write(), ASSA::MaskSet::m_eset, m_exceptSet, m_readSet, m_readySet, ASSA::MaskSet::m_rset, m_tqueue, m_writeSet, ASSA::MaskSet::m_wset, ASSA::REACTTRACE, and trace_with_mask.

Referenced by waitForEvents().

00655 {
00656     trace_with_mask("Reactor::dispatch", REACTTRACE);
00657 
00658     m_tqueue.expire (TimeVal::gettimeofday ());
00659 
00660     if ( ready_ < 0 ) 
00661     {
00662 #if !defined (WIN32)
00663         EL((ASSAERR,"::select(3) error\n"));
00664 #endif
00665         return (false);
00666     }
00667     if ( ready_ == 0 ) {
00668         return (true);
00669     }
00670 
00671     DL((REACT,"Dispatching %d FDs.\n",ready_));
00672     DL((REACT,"m_readySet:\n"));
00673     m_readySet.dump ();
00674 
00675     /*--- Writes first ---*/
00676     dispatchHandler (m_readySet.m_wset, 
00677                      m_writeSet, 
00678                      &EventHandler::handle_write);
00679 
00680     /*--- Exceptions next ---*/
00681     dispatchHandler (m_readySet.m_eset, 
00682                      m_exceptSet, 
00683                      &EventHandler::handle_except);
00684 
00685     /*--- Finally, the Reads ---*/
00686     dispatchHandler (m_readySet.m_rset, 
00687                      m_readSet, 
00688                      &EventHandler::handle_read);
00689 
00690     return (true);
00691 }

void Reactor::dispatchHandler FdSet mask_,
Fd2Eh_Map_Type fdSet_,
EH_IO_Callback  callback_
[private]
 

Call handler's callback and, if callback returns negative value, remove it from the Reactor.

When you have several high data-rate connections sending data at the same time, the one that had connected first would get lower FD number and would get data transfer preference over everybody else who has connected later on.

WIN32 HACK: Without having restarted scan from the beginning, this causes crash due to the fact that firing a callback of EventHandler might have invalidated the iterator (happens with Connector's in a sync mode).

Definition at line 596 of file Reactor.cpp.

References ASSA::FdSet::clear(), DL, ASSA::EventHandler::get_id(), ASSA::FdSet::isSet(), ASSA::REACTTRACE, removeIOHandler(), and trace_with_mask.

Referenced by dispatch().

00597 {
00598     trace_with_mask("Reactor::dispatchHandler",REACTTRACE);
00599 
00600     int ret = 0;
00601     handler_t fd;
00602     EventHandler* ehp = NULL;
00603     std::string eh_id;
00604 
00605     Fd2Eh_Map_Iter iter = fdSet_.begin ();
00606 
00607     while (iter != fdSet_.end ()) 
00608     {
00609         fd  = (*iter).first;
00610         ehp = (*iter).second;
00611 
00612         if (mask_.isSet (fd) && ehp != NULL) 
00613         {
00614             eh_id = ehp->get_id ();
00615             DL((REACT,"Data detected from \"%s\"(fd=%d)\n",
00616                 eh_id.c_str (), fd));
00617 
00618             ret = (ehp->*callback_) (fd); /* Fire up a callback */
00619 
00620             if (ret == -1) {
00621                 removeIOHandler (fd);
00622             }
00623             else if (ret > 0) {
00624                 DL((REACT,"%d bytes pending on fd=%d \"%s\"\n",
00625                     ret, fd, eh_id.c_str ()));
00626                 //return;   <-- would starve other connections
00627             }
00628             else {
00629                 DL((REACT,"All data from \"%s\"(fd=%d) are consumed\n", 
00630                     eh_id.c_str (), fd));
00631                 mask_.clear (fd);
00632             }
00639             iter = fdSet_.begin ();
00640         }
00641         else {
00642             iter++;
00643         }
00644     }
00645 }

bool Reactor::handleError void   )  [private]
 

Handle error in select(2) loop appropriately.

If commanded to stop, do so

Definition at line 369 of file Reactor.cpp.

References ASSA::ASSAERR, checkFDs(), DL, EL, m_active, ASSA::REACTTRACE, and trace_with_mask.

Referenced by waitForEvents().

00370 {
00371     trace_with_mask("Reactor::handleError",REACTTRACE);
00372 
00375     if ( !m_active ) {
00376         DL((REACT,"Received cmd to stop Reactor\n"));
00377         return (false);
00378     }
00379 
00380     /*---
00381       TODO: If select(2) returns before time expires, with
00382       a descriptor ready or with EINTR, timeval is not
00383       going to be updated with number of seconds remaining.
00384       This is true for all systems except Linux, which will
00385       do so. Therefore, to restart correctly in case of
00386       EINTR, we ought to take time measurement before and
00387       after select, and try to select() for remaining time.
00388     
00389       For now, we restart with the initial timing value.
00390       ---*/
00391     /*---
00392       BSD kernel never restarts select(2). SVR4 will restart if
00393       the SA_RESTART flag is specified when the signal handler
00394       for the signal delivered is installed. This means taht for
00395       portability, we must handle signal interrupts.
00396       ---*/
00397 
00398     if ( errno == EINTR ) {
00399         EL((REACT,"EINTR: interrupted select(2)\n"));
00400         /*
00401           If I was sitting in select(2) and received SIGTERM,
00402           the signal handler would have set m_active to 'false',
00403           and this function would have returned 'false' as above.
00404           For any other non-critical signals (USR1,...),
00405           we retry select.
00406         */
00407         return (true);
00408     }
00409     /*
00410       EBADF - bad file number. One of the file descriptors does
00411       not reference an open file to open(), close(), ioctl().
00412       This can happen if user closed fd and forgot to remove
00413       handler from Reactor.
00414     */
00415     if ( errno == EBADF ) {
00416         DL((REACT,"EBADF: bad file descriptor\n"));
00417         return (checkFDs ());
00418     }
00419     /*
00420       Any other error from select
00421     */
00422 #if defined (WIN32) 
00423     DL ((REACT,"select(3) error = %d\n", WSAGetLastError()));
00424 #else
00425     EL((ASSAERR,"select(3) error\n"));
00426 #endif
00427     return (false);
00428 }

int Reactor::isAnyReady void   )  [private]
 

Return number of file descriptors ready accross all sets.

Definition at line 432 of file Reactor.cpp.

References DL, ASSA::MaskSet::dump(), ASSA::MaskSet::m_eset, m_readySet, ASSA::MaskSet::m_rset, ASSA::MaskSet::m_wset, ASSA::FdSet::numSet(), ASSA::REACTTRACE, and trace_with_mask.

Referenced by waitForEvents().

00433 {
00434     trace_with_mask("Reactor::isAnyReady",REACTTRACE);
00435 
00436     int n = m_readySet.m_rset.numSet () +
00437         m_readySet.m_wset.numSet () +
00438         m_readySet.m_eset.numSet ();
00439 
00440     if ( n > 0 ) {
00441         DL((REACT,"m_readySet: %d FDs are ready for processing\n", n));
00442         m_readySet.dump ();
00443     }
00444     return (n);
00445 }

Reactor& ASSA::Reactor::operator= const Reactor  )  [private]
 

no cloning

bool Reactor::registerIOHandler EventHandler eh_,
handler_t  fd_,
EventType  et_ = RWE_EVENTS
 

Register I/O Event handler with Reactor.

Reactor will dispatch appropriate callback when event of EventType is received.

Parameters:
eh_ Pointer to the EventHandler
fd_ File descriptor
et_ Event Type
Returns:
true if success, false if error

Definition at line 102 of file Reactor.cpp.

References ASSA::ASSAERR, Assure_return, DL, ASSA::isReadEvent(), ASSA::isSignalEvent(), ASSA::isTimeoutEvent(), m_readSet, ASSA::MaskSet::m_rset, m_waitSet, ASSA::REACTTRACE, ASSA::FdSet::setFd(), and trace_with_mask.

Referenced by ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doAsync(), ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doSync(), and ASSA::RemoteLogger::log_open().

00103 {
00104     trace_with_mask("Reactor::registerHandler(I/O)",REACTTRACE);
00105 
00106     std::ostringstream msg;
00107     Assure_return (eh_ && !isSignalEvent (et_) && !isTimeoutEvent (et_));
00108 
00109     if (isReadEvent (et_)) 
00110     {
00111         if (!m_waitSet.m_rset.setFd (fd_)) 
00112         {
00113             DL((ASSAERR,"readset: fd %d out of range\n", fd_));
00114             return (false);
00115         }
00116         m_readSet[fd_] = eh_;
00117         msg << "READ_EVENT";
00118     }
00119 
00120     if (isWriteEvent (et_)) 
00121     {
00122         if (!m_waitSet.m_wset.setFd (fd_)) 
00123         {
00124             DL((ASSAERR,"writeset: fd %d out of range\n", fd_));
00125             return (false);
00126         }
00127         m_writeSet[fd_] = eh_;
00128         msg << " WRITE_EVENT";
00129     }
00130 
00131     if (isExceptEvent (et_)) 
00132     {
00133         if (!m_waitSet.m_eset.setFd (fd_)) 
00134         {
00135             DL((ASSAERR,"exceptset: fd %d out of range\n", fd_));
00136             return (false);
00137         }
00138         m_exceptSet[fd_] = eh_;
00139         msg << " EXCEPT_EVENT";
00140     }
00141     msg << std::ends;
00142 
00143     DL((REACT,"Registered EvtH(%s) fd=%d (0x%x) for event(s) %s\n", 
00144         eh_->get_id ().c_str (), fd_, (u_long)eh_, msg.str ().c_str () ));
00145 
00146 #if !defined (WIN32)
00147     if (m_maxfd_plus1 < fd_+1) {
00148         m_maxfd_plus1 = fd_+1;
00149         DL((REACT,"maxfd+1 adjusted to %d\n", m_maxfd_plus1));
00150     }
00151 #endif
00152 
00153     DL((REACT,"Modified waitSet:\n"));
00154     m_waitSet.dump ();
00155 
00156     return (true);
00157 }

TimerId Reactor::registerTimerHandler EventHandler eh_,
const TimeVal tv_,
const std::string &  name_ = "<unknown>"
 

Register Timer Event handler with Reactor.

Reactor will dispatch appropriate callback when event of EventType is received.

Parameters:
eh_ Pointer to the EventHandler
tv_ Timeout value
name_ Name of the timer
Returns:
Timer ID that can be used to cancel timer and find out its name.

Definition at line 76 of file Reactor.cpp.

References Assure_return, DL, ASSA::TimerQueue::dump(), ASSA::TimeVal::fmtString(), ASSA::TimeVal::gettimeofday(), ASSA::TimerQueue::insert(), m_tqueue, ASSA::TimeVal::msec(), ASSA::REACT, ASSA::REACTTRACE, ASSA::TimeVal::sec(), and trace_with_mask.

Referenced by ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doAsync(), and ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doSync().

00079 {
00080     trace_with_mask( "Reactor::registerTimerHandler",REACTTRACE);
00081     Assure_return (eh_);
00082 
00083     TimeVal now (TimeVal::gettimeofday());
00084     TimeVal t (now + timeout_);
00085 
00086     DL((REACT,"TIMEOUT_EVENT......: (%d,%d)\n",  
00087         timeout_.sec(),timeout_.msec()));
00088     DL((REACT,"Time now...........: %s\n", now.fmtString().c_str() ));
00089     DL((REACT,"Scheduled to expire: %s\n", t.fmtString().c_str() ));
00090 
00091     TimerId tid =  m_tqueue.insert (eh_, t, timeout_, name_);
00092 
00093     DL((REACT,"---Modified Timer Queue----\n"));
00094     m_tqueue.dump();
00095     DL((REACT,"---------------------------\n"));
00096 
00097     return (tid);
00098 }

bool Reactor::removeHandler EventHandler eh_,
EventType  et_ = ALL_EVENTS
 

Remove Event handler from reactor for either all I/O events or timeout event or both.

If et_ is TIMEOUT_EVENT, all timers associated with Event Handler eh_ will be removed.

Parameters:
eh_ Pointer to the EventHandler
et_ Event Type to remove. Default will remove Event Handler for all events.
Returns:
true if success, false if wasn't registered for any events.

Definition at line 182 of file Reactor.cpp.

References ASSA::FdSet::clear(), ASSA::isReadEvent(), ASSA::isTimeoutEvent(), m_readSet, ASSA::MaskSet::m_rset, m_tqueue, m_waitSet, ASSA::REACTTRACE, ASSA::TimerQueue::remove(), and trace_with_mask.

Referenced by ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doSync(), ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::handle_timeout(), ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::handle_write(), ASSA::RemoteLogger::log_close(), and stopReactor().

00183 {
00184     trace_with_mask("Reactor::removeHandler(eh_,et_)",REACTTRACE);
00185 
00186     bool ret = false;
00187     handler_t fd;
00188     handler_t rfdmax;
00189     handler_t wfdmax;
00190     handler_t efdmax;
00191     Fd2Eh_Map_Iter iter;
00192 
00193     rfdmax = wfdmax = efdmax = 0;
00194 
00195     if (eh_ == NULL) {
00196         return false;
00197     }
00198 
00199     if (isTimeoutEvent (event_)) {
00200         ret = m_tqueue.remove (eh_);
00201         ret = true;
00202     }
00203 
00204     if (isReadEvent (event_)) {
00205         iter = m_readSet.begin ();
00206         while (iter != m_readSet.end ()) {
00207             if ((*iter).second == eh_) {
00208                 fd = (*iter).first;
00209                 m_readSet.erase (iter);
00210                 m_waitSet.m_rset.clear (fd);
00211                 ret = true;
00212                 break;
00213             }
00214             rfdmax = fd;
00215             iter++;
00216         }
00217     } 
00218     
00219     if (isWriteEvent (event_)) {
00220         iter = m_writeSet.begin ();
00221         while (iter != m_writeSet.end ()) {
00222             if ((*iter).second == eh_) {
00223                 fd = (*iter).first;
00224                 m_writeSet.erase (iter);
00225                 m_waitSet.m_wset.clear (fd);
00226                 ret = true;
00227                 break;
00228             }
00229             wfdmax = fd;
00230             iter++;
00231         }
00232     }
00233 
00234     if (isExceptEvent (event_)) {
00235         iter = m_exceptSet.begin ();
00236         while (iter != m_exceptSet.end ()) {
00237             if ((*iter).second == eh_) {
00238                 fd = (*iter).first;
00239                 m_exceptSet.erase (iter);
00240                 m_waitSet.m_eset.clear (fd);
00241                 ret = true;
00242                 break;
00243             }
00244             efdmax = fd;
00245             iter++;
00246         }
00247     }
00248 
00249     if (ret == true) {
00250         DL((REACT,"Found EvtH \"%s\"(0x%X)\n", 
00251             eh_->get_id ().c_str (),int(eh_)));
00252         eh_->handle_close (fd);
00253     }
00254 
00255     adjust_maxfdp1 (fd, rfdmax, wfdmax, efdmax);
00256 
00257     DL((REACT,"Modifies waitSet:\n"));
00258     m_waitSet.dump ();
00259 
00260     return (ret);
00261 }

bool Reactor::removeIOHandler handler_t  fd_  ) 
 

Remove IO Event handler from reactor.

This will remove handler from receiving all I/O events.

Parameters:
fd_ File descriptor
Returns:
true on success, false if fd_ is out of range

Definition at line 265 of file Reactor.cpp.

References Assure_return, ASSA::FdSet::clear(), DL, ASSA::is_valid_handler(), m_readSet, m_readySet, ASSA::MaskSet::m_rset, m_waitSet, ASSA::REACTTRACE, and trace_with_mask.

Referenced by checkFDs(), and dispatchHandler().

00266 {
00267     trace_with_mask("Reactor::removeIOHandler",REACTTRACE);
00268 
00269     bool ret = false;
00270     EventHandler*  ehp = NULL;
00271     Fd2Eh_Map_Iter iter;
00272 
00273     handler_t      rfdmax;
00274     handler_t      wfdmax;
00275     handler_t      efdmax;
00276 
00277     rfdmax = wfdmax = efdmax = 0;
00278 
00279     Assure_return (ASSA::is_valid_handler (fd_));
00280 
00281     DL((REACT,"Removing handler for fd=%d\n",fd_));
00282 
00287     if ((iter = m_readSet.find (fd_)) != m_readSet.end ()) 
00288     {
00289         ehp = (*iter).second;
00290         m_readSet.erase (iter);
00291         m_waitSet.m_rset.clear (fd_);
00292         m_readySet.m_rset.clear (fd_);
00293         if (m_readSet.size () > 0) {
00294             iter = m_readSet.end ();
00295             iter--;
00296             rfdmax = (*iter).first;
00297         }
00298         ret = true;
00299     }
00300 
00301     if ((iter = m_writeSet.find (fd_)) != m_writeSet.end ()) 
00302     {
00303         ehp = (*iter).second;
00304         m_writeSet.erase (iter);
00305         m_waitSet.m_wset.clear (fd_);
00306         m_readySet.m_wset.clear (fd_);
00307         if (m_writeSet.size () > 0) {
00308             iter = m_writeSet.end ();
00309             iter--;
00310             wfdmax = (*iter).first;
00311         }
00312         ret = true;
00313     }
00314 
00315     if ((iter = m_exceptSet.find (fd_)) != m_exceptSet.end ()) 
00316     {
00317         ehp = (*iter).second;
00318         m_exceptSet.erase (iter);
00319         m_waitSet.m_eset.clear (fd_);
00320         m_readySet.m_eset.clear (fd_);
00321         if (m_exceptSet.size () > 0) {
00322             iter = m_exceptSet.end ();
00323             iter--;
00324             efdmax = (*iter).first;
00325         }
00326         ret = true;
00327     }
00328 
00329     if (ret == true && ehp != NULL) {
00330         DL((REACT,"Removed EvtH \"%s\"(0x%X)\n", 
00331             ehp->get_id ().c_str (), int(ehp)));
00332         ehp->handle_close (fd_);
00333     }
00334 
00335     adjust_maxfdp1 (fd_, rfdmax, wfdmax, efdmax);
00336 
00337     DL((REACT,"Modifies waitSet:\n"));
00338     m_waitSet.dump ();
00339 
00340     return (ret);
00341 }

bool Reactor::removeTimerHandler TimerId  id_  ) 
 

Remove Timer event from the queue.

This removes particular event.

Parameters:
id_ Timer Id returned by registerTimer.
Returns:
true if timer found and removed; false otherwise

Definition at line 161 of file Reactor.cpp.

References ASSA::ASSAERR, DL, ASSA::TimerQueue::dump(), EL, m_tqueue, ASSA::REACT, ASSA::REACTTRACE, ASSA::TimerQueue::remove(), and trace_with_mask.

Referenced by ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::handle_write().

00162 {
00163     trace_with_mask("Reactor::removeTimer",REACTTRACE);
00164     bool ret;
00165 
00166     if ((ret = m_tqueue.remove (tid_))) {
00167         DL((REACT,"---Modified Timer Queue----\n"));
00168         m_tqueue.dump();
00169         DL((REACT,"---------------------------\n"));
00170     }
00171     else {
00172         EL((ASSAERR,"Timer tid 0x%x wasn't found!\n", (u_long)tid_ ));
00173     }
00174     return (ret);
00175 }

void Reactor::stopReactor void   ) 
 

Stop Reactor's activity.

This effectively removes all handlers from under Reactor's supervision. As of now, there is no way to re-activate the Reactor. This method is typically called from method other then EventHandler::signal_handler(). EventHandler::handle_read () is a good candidate. Calling it from EventHandler::handle_close () will most likely cause an infinite loop of recursive calls.

Definition at line 695 of file Reactor.cpp.

References m_active, m_readSet, ASSA::REACTTRACE, removeHandler(), and trace_with_mask.

00696 { 
00697     trace_with_mask("Reactor::stopReactor", REACTTRACE);
00698 
00699     m_active = false; 
00700 
00701     Fd2Eh_Map_Iter iter;
00702     EventHandler* ehp;
00703 
00704     while (m_readSet.size () > 0) {
00705         iter = m_readSet.begin ();
00706         ehp = (*iter).second;
00707         removeHandler (ehp);
00708     }
00709 
00710     while (m_writeSet.size () > 0) {
00711         iter = m_writeSet.begin ();
00712         ehp = (*iter).second;
00713         removeHandler (ehp);
00714     }
00715 
00716     while (m_exceptSet.size () > 0) {
00717         iter = m_exceptSet.begin ();
00718         ehp = (*iter).second;
00719         removeHandler (ehp);
00720     }
00721 }

void Reactor::waitForEvents TimeVal tv_  ) 
 

Wait for events for time specified.

Passing NULL replicates behavior of waitForEvents(void). Passing tv_ {0, 0} will cause non-blocking polling for all events. This method blocks up to tv_ time interval processing event. If an event occurs, it will process event(s) and return. tv_ time is adjusted by substracting time spent in event processing.

Parameters:
tv_ [RW] is time to wait for.

Definition at line 523 of file Reactor.cpp.

References calculateTimeout(), dispatch(), DL, ASSA::MaskSet::dump(), ASSA::TimerQueue::expire(), ASSA::TimeVal::gettimeofday(), handleError(), isAnyReady(), m_active, ASSA::MaskSet::m_eset, m_maxfd_plus1, m_readySet, ASSA::MaskSet::m_rset, m_tqueue, m_waitSet, ASSA::MaskSet::m_wset, ASSA::REACTTRACE, ASSA::MaskSet::reset(), ASSA::MaskSet::sync(), and trace_with_mask.

00524 {
00525     trace_with_mask("Reactor::waitForEvents",REACTTRACE);
00526 
00527     TimerCountdown traceTime (tv_);
00528     DL((REACT,"======================================\n"));
00529 
00530     /*--- Expire all stale Timers ---*/
00531     m_tqueue.expire (TimeVal::gettimeofday ());
00532 
00533     /* Test to see if Reactor has been deactivated as a result
00534      * of processing done by any TimerHandlers.
00535      */
00536     if (!m_active) {
00537         return;
00538     }
00539 
00540     int      nReady;
00541     TimeVal  delay;
00542     TimeVal* dlp = &delay;
00543 
00544     /*---
00545       In case if not all data have been processed by the EventHandler,
00546       and EventHandler stated so in its callback's return value
00547       to dispatcher (), it will be called again. This way 
00548       underlying file/socket stream can efficiently utilize its
00549       buffering mechaninsm.
00550       ---*/
00551     if ((nReady = isAnyReady ())) {
00552         DL((REACT,"isAnyReady returned: %d\n",nReady));
00553         dispatch (nReady);
00554         return;
00555     }
00556 
00557     DL((REACT,"=== m_waitSet ===\n"));
00558     m_waitSet.dump ();
00559 
00560     do {
00561         m_readySet.reset ();
00562         DL ((REACT,"m_readySet after reset():\n"));
00563         m_readySet.dump ();
00564 
00565         m_readySet = m_waitSet;
00566         DL ((REACT,"m_readySet after assign:\n"));
00567         m_readySet.dump ();
00568 
00569         calculateTimeout (dlp, tv_);
00570 
00571         nReady = ::select (m_maxfd_plus1, 
00572                            &m_readySet.m_rset,
00573                            &m_readySet.m_wset, 
00574                            &m_readySet.m_eset, 
00575                            dlp);
00576         DL((REACT,"::select() returned: %d\n",nReady));
00577 
00578         m_readySet.sync ();
00579         DL ((REACT,"m_readySet after select:\n"));
00580         m_readySet.dump ();
00581 
00582     } 
00583     while (nReady < 0 && handleError ());
00584 
00585     dispatch (nReady);
00586 }

void Reactor::waitForEvents void   ) 
 

Main waiting loop that blocks indefinitely processing events.

Definition at line 498 of file Reactor.cpp.

References m_active.

Referenced by ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doSync().

00499 {
00500     while ( m_active ) {
00501         waitForEvents ((TimeVal*) NULL);
00502     }
00503 }


Member Data Documentation

bool ASSA::Reactor::m_active [private]
 

Flag that indicates whether Reactor is active or had been stopped.

Definition at line 212 of file Reactor.h.

Referenced by deactivate(), handleError(), stopReactor(), and waitForEvents().

Fd2Eh_Map_Type ASSA::Reactor::m_exceptSet [private]
 

Event handlers awaiting on EXCEPT_EVENT.

Definition at line 221 of file Reactor.h.

Referenced by dispatch(), and ~Reactor().

int ASSA::Reactor::m_fd_setsize [private]
 

Max number of open files per process.

This is the soft limit enforced by the kernel. It can be obtained/manipulated from the shell with ulimit/limit utilities, but may not exceed the hard limit.

Definition at line 203 of file Reactor.h.

Referenced by checkFDs(), and Reactor().

handler_t ASSA::Reactor::m_maxfd_plus1 [private]
 

Max file descriptor number (in all sets) plus 1.

This value is ignored by WIN32 implementation of select()

Definition at line 209 of file Reactor.h.

Referenced by adjust_maxfdp1(), and waitForEvents().

Fd2Eh_Map_Type ASSA::Reactor::m_readSet [private]
 

Event handlers awaiting on READ_EVENT.

Definition at line 215 of file Reactor.h.

Referenced by checkFDs(), dispatch(), registerIOHandler(), removeHandler(), removeIOHandler(), stopReactor(), and ~Reactor().

MaskSet ASSA::Reactor::m_readySet [private]
 

Handlers that are ready for processing.

Definition at line 227 of file Reactor.h.

Referenced by dispatch(), isAnyReady(), removeIOHandler(), and waitForEvents().

TimerQueue ASSA::Reactor::m_tqueue [private]
 

The queue of Timers.

Definition at line 230 of file Reactor.h.

Referenced by calculateTimeout(), dispatch(), registerTimerHandler(), removeHandler(), removeTimerHandler(), and waitForEvents().

MaskSet ASSA::Reactor::m_waitSet [private]
 

Handlers to wait for event on.

Definition at line 224 of file Reactor.h.

Referenced by registerIOHandler(), removeHandler(), removeIOHandler(), and waitForEvents().

Fd2Eh_Map_Type ASSA::Reactor::m_writeSet [private]
 

Event handlers awaiting on WRITE_EVENT.

Definition at line 218 of file Reactor.h.

Referenced by dispatch(), and ~Reactor().


The documentation for this class was generated from the following files:
Generated on Sun Aug 13 15:08:21 2006 for libassa by  doxygen 1.4.6