/home/vlg/develop/libASSA/libassa/assa/Reactor.cpp

Go to the documentation of this file.
00001 // -*- c++ -*-
00002 //------------------------------------------------------------------------------
00003 //                          Reactor.cpp
00004 //------------------------------------------------------------------------------
00005 //  Copyright (C) 1997-2002,2005,2006  Vladislav Grinchenko
00006 //
00007 //  This library is free software; you can redistribute it and/or
00008 //  modify it under the terms of the GNU Library General Public
00009 //  License as published by the Free Software Foundation; either
00010 //  version 2 of the License, or (at your option) any later version.
00011 //----------------------------------------------------------------------------- 
00012 //  Created: 05/25/1999
00013 //----------------------------------------------------------------------------- 
00014 #include <iostream>
00015 #include <sstream>
00016 #include <string>
00017 
00018 #include "assa/Reactor.h"
00019 #include "assa/Logger.h"
00020 
00021 using namespace ASSA;
00022 
00023 Reactor::
00024 Reactor () : 
00025     m_fd_setsize  (1024), 
00026     m_maxfd_plus1 (0), 
00027     m_active      (true)
00028 {
00029     trace_with_mask("Reactor::Reactor",REACTTRACE);
00030 
00034 #if defined(WIN32)
00035     m_fd_setsize = FD_SETSIZE;
00036 
00037 #else  // POSIX
00038     struct rlimit rlim;
00039     rlim.rlim_max = 0;
00040 
00041     if ( getrlimit (RLIMIT_NOFILE, &rlim) == 0 ) {
00042         m_fd_setsize = rlim.rlim_cur;
00043     }
00044 #endif
00045 
00048 #if defined (WIN32)             
00049     WSADATA data;
00050     WSAStartup (MAKEWORD (2, 2), &data);
00051 #endif
00052 }
00053 
00062 Reactor::
00063 ~Reactor()
00064 {   
00065     trace_with_mask("Reactor::~Reactor",REACTTRACE);
00066 
00067     m_readSet.clear   ();
00068     m_writeSet.clear  ();
00069     m_exceptSet.clear ();
00070     Log::log_close ();
00071     deactivate ();
00072 }
00073 
00074 TimerId
00075 Reactor::
00076 registerTimerHandler (EventHandler*      eh_, 
00077                       const TimeVal&     timeout_,
00078                       const std::string& name_)
00079 {
00080     trace_with_mask( "Reactor::registerTimerHandler",REACTTRACE);
00081     Assure_return (eh_);
00082 
00083     TimeVal now (TimeVal::gettimeofday());
00084     TimeVal t (now + timeout_);
00085 
00086     DL((REACT,"TIMEOUT_EVENT......: (%d,%d)\n",  
00087         timeout_.sec(),timeout_.msec()));
00088     DL((REACT,"Time now...........: %s\n", now.fmtString().c_str() ));
00089     DL((REACT,"Scheduled to expire: %s\n", t.fmtString().c_str() ));
00090 
00091     TimerId tid =  m_tqueue.insert (eh_, t, timeout_, name_);
00092 
00093     DL((REACT,"---Modified Timer Queue----\n"));
00094     m_tqueue.dump();
00095     DL((REACT,"---------------------------\n"));
00096 
00097     return (tid);
00098 }
00099 
00100 bool 
00101 Reactor::
00102 registerIOHandler (EventHandler* eh_, handler_t fd_, EventType et_)
00103 {
00104     trace_with_mask("Reactor::registerHandler(I/O)",REACTTRACE);
00105 
00106     std::ostringstream msg;
00107     Assure_return (eh_ && !isSignalEvent (et_) && !isTimeoutEvent (et_));
00108 
00109     if (isReadEvent (et_)) 
00110     {
00111         if (!m_waitSet.m_rset.setFd (fd_)) 
00112         {
00113             DL((ASSAERR,"readset: fd %d out of range\n", fd_));
00114             return (false);
00115         }
00116         m_readSet[fd_] = eh_;
00117         msg << "READ_EVENT";
00118     }
00119 
00120     if (isWriteEvent (et_)) 
00121     {
00122         if (!m_waitSet.m_wset.setFd (fd_)) 
00123         {
00124             DL((ASSAERR,"writeset: fd %d out of range\n", fd_));
00125             return (false);
00126         }
00127         m_writeSet[fd_] = eh_;
00128         msg << " WRITE_EVENT";
00129     }
00130 
00131     if (isExceptEvent (et_)) 
00132     {
00133         if (!m_waitSet.m_eset.setFd (fd_)) 
00134         {
00135             DL((ASSAERR,"exceptset: fd %d out of range\n", fd_));
00136             return (false);
00137         }
00138         m_exceptSet[fd_] = eh_;
00139         msg << " EXCEPT_EVENT";
00140     }
00141     msg << std::ends;
00142 
00143     DL((REACT,"Registered EvtH(%s) fd=%d (0x%x) for event(s) %s\n", 
00144         eh_->get_id ().c_str (), fd_, (u_long)eh_, msg.str ().c_str () ));
00145 
00146 #if !defined (WIN32)
00147     if (m_maxfd_plus1 < fd_+1) {
00148         m_maxfd_plus1 = fd_+1;
00149         DL((REACT,"maxfd+1 adjusted to %d\n", m_maxfd_plus1));
00150     }
00151 #endif
00152 
00153     DL((REACT,"Modified waitSet:\n"));
00154     m_waitSet.dump ();
00155 
00156     return (true);
00157 }
00158 
00159 bool 
00160 Reactor::
00161 removeTimerHandler (TimerId tid_)
00162 {
00163     trace_with_mask("Reactor::removeTimer",REACTTRACE);
00164     bool ret;
00165 
00166     if ((ret = m_tqueue.remove (tid_))) {
00167         DL((REACT,"---Modified Timer Queue----\n"));
00168         m_tqueue.dump();
00169         DL((REACT,"---------------------------\n"));
00170     }
00171     else {
00172         EL((ASSAERR,"Timer tid 0x%x wasn't found!\n", (u_long)tid_ ));
00173     }
00174     return (ret);
00175 }
00176 
00180 bool 
00181 Reactor::
00182 removeHandler (EventHandler* eh_, EventType event_)
00183 {
00184     trace_with_mask("Reactor::removeHandler(eh_,et_)",REACTTRACE);
00185 
00186     bool ret = false;
00187     handler_t fd;
00188     handler_t rfdmax;
00189     handler_t wfdmax;
00190     handler_t efdmax;
00191     Fd2Eh_Map_Iter iter;
00192 
00193     rfdmax = wfdmax = efdmax = 0;
00194 
00195     if (eh_ == NULL) {
00196         return false;
00197     }
00198 
00199     if (isTimeoutEvent (event_)) {
00200         ret = m_tqueue.remove (eh_);
00201         ret = true;
00202     }
00203 
00204     if (isReadEvent (event_)) {
00205         iter = m_readSet.begin ();
00206         while (iter != m_readSet.end ()) {
00207             if ((*iter).second == eh_) {
00208                 fd = (*iter).first;
00209                 m_readSet.erase (iter);
00210                 m_waitSet.m_rset.clear (fd);
00211                 ret = true;
00212                 break;
00213             }
00214             rfdmax = fd;
00215             iter++;
00216         }
00217     } 
00218     
00219     if (isWriteEvent (event_)) {
00220         iter = m_writeSet.begin ();
00221         while (iter != m_writeSet.end ()) {
00222             if ((*iter).second == eh_) {
00223                 fd = (*iter).first;
00224                 m_writeSet.erase (iter);
00225                 m_waitSet.m_wset.clear (fd);
00226                 ret = true;
00227                 break;
00228             }
00229             wfdmax = fd;
00230             iter++;
00231         }
00232     }
00233 
00234     if (isExceptEvent (event_)) {
00235         iter = m_exceptSet.begin ();
00236         while (iter != m_exceptSet.end ()) {
00237             if ((*iter).second == eh_) {
00238                 fd = (*iter).first;
00239                 m_exceptSet.erase (iter);
00240                 m_waitSet.m_eset.clear (fd);
00241                 ret = true;
00242                 break;
00243             }
00244             efdmax = fd;
00245             iter++;
00246         }
00247     }
00248 
00249     if (ret == true) {
00250         DL((REACT,"Found EvtH \"%s\"(0x%X)\n", 
00251             eh_->get_id ().c_str (),int(eh_)));
00252         eh_->handle_close (fd);
00253     }
00254 
00255     adjust_maxfdp1 (fd, rfdmax, wfdmax, efdmax);
00256 
00257     DL((REACT,"Modifies waitSet:\n"));
00258     m_waitSet.dump ();
00259 
00260     return (ret);
00261 }
00262 
00263 bool
00264 Reactor::
00265 removeIOHandler (handler_t fd_)
00266 {
00267     trace_with_mask("Reactor::removeIOHandler",REACTTRACE);
00268 
00269     bool ret = false;
00270     EventHandler*  ehp = NULL;
00271     Fd2Eh_Map_Iter iter;
00272 
00273     handler_t      rfdmax;
00274     handler_t      wfdmax;
00275     handler_t      efdmax;
00276 
00277     rfdmax = wfdmax = efdmax = 0;
00278 
00279     Assure_return (ASSA::is_valid_handler (fd_));
00280 
00281     DL((REACT,"Removing handler for fd=%d\n",fd_));
00282 
00287     if ((iter = m_readSet.find (fd_)) != m_readSet.end ()) 
00288     {
00289         ehp = (*iter).second;
00290         m_readSet.erase (iter);
00291         m_waitSet.m_rset.clear (fd_);
00292         m_readySet.m_rset.clear (fd_);
00293         if (m_readSet.size () > 0) {
00294             iter = m_readSet.end ();
00295             iter--;
00296             rfdmax = (*iter).first;
00297         }
00298         ret = true;
00299     }
00300 
00301     if ((iter = m_writeSet.find (fd_)) != m_writeSet.end ()) 
00302     {
00303         ehp = (*iter).second;
00304         m_writeSet.erase (iter);
00305         m_waitSet.m_wset.clear (fd_);
00306         m_readySet.m_wset.clear (fd_);
00307         if (m_writeSet.size () > 0) {
00308             iter = m_writeSet.end ();
00309             iter--;
00310             wfdmax = (*iter).first;
00311         }
00312         ret = true;
00313     }
00314 
00315     if ((iter = m_exceptSet.find (fd_)) != m_exceptSet.end ()) 
00316     {
00317         ehp = (*iter).second;
00318         m_exceptSet.erase (iter);
00319         m_waitSet.m_eset.clear (fd_);
00320         m_readySet.m_eset.clear (fd_);
00321         if (m_exceptSet.size () > 0) {
00322             iter = m_exceptSet.end ();
00323             iter--;
00324             efdmax = (*iter).first;
00325         }
00326         ret = true;
00327     }
00328 
00329     if (ret == true && ehp != NULL) {
00330         DL((REACT,"Removed EvtH \"%s\"(0x%X)\n", 
00331             ehp->get_id ().c_str (), int(ehp)));
00332         ehp->handle_close (fd_);
00333     }
00334 
00335     adjust_maxfdp1 (fd_, rfdmax, wfdmax, efdmax);
00336 
00337     DL((REACT,"Modifies waitSet:\n"));
00338     m_waitSet.dump ();
00339 
00340     return (ret);
00341 }
00342 
00343 bool
00344 Reactor::
00345 checkFDs (void)
00346 {
00347     trace_with_mask("Reactor::checkFDs",REACTTRACE);
00348     
00349     bool num_removed = false;
00350     FdSet mask;
00351     timeval poll = { 0, 0 };
00352 
00353     for (handler_t fd = 0; fd < m_fd_setsize; fd++) {
00354         if ( m_readSet[fd] != NULL ) {
00355             mask.setFd (fd);
00356             if ( ::select (fd+1, &mask, NULL, NULL, &poll) < 0 ) {
00357                 removeIOHandler (fd);
00358                 num_removed = true;
00359                 DL((REACT,"Detected BAD FD: %d\n", fd ));
00360             }
00361             mask.clear (fd);
00362         }
00363     }
00364     return (num_removed);
00365 }
00366 
00367 bool
00368 Reactor::
00369 handleError (void)
00370 {
00371     trace_with_mask("Reactor::handleError",REACTTRACE);
00372 
00375     if ( !m_active ) {
00376         DL((REACT,"Received cmd to stop Reactor\n"));
00377         return (false);
00378     }
00379 
00380     /*---
00381       TODO: If select(2) returns before time expires, with
00382       a descriptor ready or with EINTR, timeval is not
00383       going to be updated with number of seconds remaining.
00384       This is true for all systems except Linux, which will
00385       do so. Therefore, to restart correctly in case of
00386       EINTR, we ought to take time measurement before and
00387       after select, and try to select() for remaining time.
00388     
00389       For now, we restart with the initial timing value.
00390       ---*/
00391     /*---
00392       BSD kernel never restarts select(2). SVR4 will restart if
00393       the SA_RESTART flag is specified when the signal handler
00394       for the signal delivered is installed. This means taht for
00395       portability, we must handle signal interrupts.
00396       ---*/
00397 
00398     if ( errno == EINTR ) {
00399         EL((REACT,"EINTR: interrupted select(2)\n"));
00400         /*
00401           If I was sitting in select(2) and received SIGTERM,
00402           the signal handler would have set m_active to 'false',
00403           and this function would have returned 'false' as above.
00404           For any other non-critical signals (USR1,...),
00405           we retry select.
00406         */
00407         return (true);
00408     }
00409     /*
00410       EBADF - bad file number. One of the file descriptors does
00411       not reference an open file to open(), close(), ioctl().
00412       This can happen if user closed fd and forgot to remove
00413       handler from Reactor.
00414     */
00415     if ( errno == EBADF ) {
00416         DL((REACT,"EBADF: bad file descriptor\n"));
00417         return (checkFDs ());
00418     }
00419     /*
00420       Any other error from select
00421     */
00422 #if defined (WIN32) 
00423     DL ((REACT,"select(3) error = %d\n", WSAGetLastError()));
00424 #else
00425     EL((ASSAERR,"select(3) error\n"));
00426 #endif
00427     return (false);
00428 }
00429 
00430 int
00431 Reactor::
00432 isAnyReady (void)
00433 {
00434     trace_with_mask("Reactor::isAnyReady",REACTTRACE);
00435 
00436     int n = m_readySet.m_rset.numSet () +
00437         m_readySet.m_wset.numSet () +
00438         m_readySet.m_eset.numSet ();
00439 
00440     if ( n > 0 ) {
00441         DL((REACT,"m_readySet: %d FDs are ready for processing\n", n));
00442         m_readySet.dump ();
00443     }
00444     return (n);
00445 }
00446 
00447 void 
00448 Reactor::
00449 calculateTimeout (TimeVal*& howlong_, TimeVal* maxwait_)
00450 {
00451     trace_with_mask("Reactor::calculateTimeout",REACTTRACE);
00452 
00453     TimeVal now;
00454     TimeVal tv;
00455 
00456     if (m_tqueue.isEmpty () ) {
00457         howlong_ = maxwait_;
00458         goto done;
00459     }
00460     now = TimeVal::gettimeofday ();
00461     tv = m_tqueue.top ();
00462     
00463     if (tv < now) {
00464         /*--- 
00465           It took too long to get here (fraction of a millisecond), 
00466           and top timer had already expired. In this case,
00467           perform non-blocking select in order to drain the timer queue.
00468           ---*/
00469         *howlong_ = 0;
00470     }
00471     else {  
00472         DL((REACT,"--------- Timer Queue ----------\n"));
00473         m_tqueue.dump();
00474         DL((REACT,"--------------------------------\n"));
00475 
00476         if (maxwait_ == NULL || *maxwait_ == TimeVal::zeroTime ()) {
00477             *howlong_ = tv - now;
00478         }
00479         else {
00480             *howlong_ = (*maxwait_+now) < tv ? *maxwait_ : tv-now;
00481         }
00482     }
00483 
00484  done:
00485     if (howlong_ != NULL) {
00486         DL((REACT,"delay (%f)\n", double (*howlong_) ));
00487     }
00488     else {
00489         DL((REACT,"delay (forever)\n"));
00490     }
00491 }
00492 
00496 void
00497 Reactor::
00498 waitForEvents (void)
00499 {
00500     while ( m_active ) {
00501         waitForEvents ((TimeVal*) NULL);
00502     }
00503 }
00504 
00521 void
00522 Reactor::
00523 waitForEvents (TimeVal* tv_)
00524 {
00525     trace_with_mask("Reactor::waitForEvents",REACTTRACE);
00526 
00527     TimerCountdown traceTime (tv_);
00528     DL((REACT,"======================================\n"));
00529 
00530     /*--- Expire all stale Timers ---*/
00531     m_tqueue.expire (TimeVal::gettimeofday ());
00532 
00533     /* Test to see if Reactor has been deactivated as a result
00534      * of processing done by any TimerHandlers.
00535      */
00536     if (!m_active) {
00537         return;
00538     }
00539 
00540     int      nReady;
00541     TimeVal  delay;
00542     TimeVal* dlp = &delay;
00543 
00544     /*---
00545       In case if not all data have been processed by the EventHandler,
00546       and EventHandler stated so in its callback's return value
00547       to dispatcher (), it will be called again. This way 
00548       underlying file/socket stream can efficiently utilize its
00549       buffering mechaninsm.
00550       ---*/
00551     if ((nReady = isAnyReady ())) {
00552         DL((REACT,"isAnyReady returned: %d\n",nReady));
00553         dispatch (nReady);
00554         return;
00555     }
00556 
00557     DL((REACT,"=== m_waitSet ===\n"));
00558     m_waitSet.dump ();
00559 
00560     do {
00561         m_readySet.reset ();
00562         DL ((REACT,"m_readySet after reset():\n"));
00563         m_readySet.dump ();
00564 
00565         m_readySet = m_waitSet;
00566         DL ((REACT,"m_readySet after assign:\n"));
00567         m_readySet.dump ();
00568 
00569         calculateTimeout (dlp, tv_);
00570 
00571         nReady = ::select (m_maxfd_plus1, 
00572                            &m_readySet.m_rset,
00573                            &m_readySet.m_wset, 
00574                            &m_readySet.m_eset, 
00575                            dlp);
00576         DL((REACT,"::select() returned: %d\n",nReady));
00577 
00578         m_readySet.sync ();
00579         DL ((REACT,"m_readySet after select:\n"));
00580         m_readySet.dump ();
00581 
00582     } 
00583     while (nReady < 0 && handleError ());
00584 
00585     dispatch (nReady);
00586 }
00587 
00594 void 
00595 Reactor::
00596 dispatchHandler (FdSet& mask_, Fd2Eh_Map_Type& fdSet_, EH_IO_Callback callback_)
00597 {
00598     trace_with_mask("Reactor::dispatchHandler",REACTTRACE);
00599 
00600     int ret = 0;
00601     handler_t fd;
00602     EventHandler* ehp = NULL;
00603     std::string eh_id;
00604 
00605     Fd2Eh_Map_Iter iter = fdSet_.begin ();
00606 
00607     while (iter != fdSet_.end ()) 
00608     {
00609         fd  = (*iter).first;
00610         ehp = (*iter).second;
00611 
00612         if (mask_.isSet (fd) && ehp != NULL) 
00613         {
00614             eh_id = ehp->get_id ();
00615             DL((REACT,"Data detected from \"%s\"(fd=%d)\n",
00616                 eh_id.c_str (), fd));
00617 
00618             ret = (ehp->*callback_) (fd); /* Fire up a callback */
00619 
00620             if (ret == -1) {
00621                 removeIOHandler (fd);
00622             }
00623             else if (ret > 0) {
00624                 DL((REACT,"%d bytes pending on fd=%d \"%s\"\n",
00625                     ret, fd, eh_id.c_str ()));
00626                 //return;   <-- would starve other connections
00627             }
00628             else {
00629                 DL((REACT,"All data from \"%s\"(fd=%d) are consumed\n", 
00630                     eh_id.c_str (), fd));
00631                 mask_.clear (fd);
00632             }
00639             iter = fdSet_.begin ();
00640         }
00641         else {
00642             iter++;
00643         }
00644     }
00645 }
00646 
00652 bool
00653 Reactor::
00654 dispatch (int ready_)
00655 {
00656     trace_with_mask("Reactor::dispatch", REACTTRACE);
00657 
00658     m_tqueue.expire (TimeVal::gettimeofday ());
00659 
00660     if ( ready_ < 0 ) 
00661     {
00662 #if !defined (WIN32)
00663         EL((ASSAERR,"::select(3) error\n"));
00664 #endif
00665         return (false);
00666     }
00667     if ( ready_ == 0 ) {
00668         return (true);
00669     }
00670 
00671     DL((REACT,"Dispatching %d FDs.\n",ready_));
00672     DL((REACT,"m_readySet:\n"));
00673     m_readySet.dump ();
00674 
00675     /*--- Writes first ---*/
00676     dispatchHandler (m_readySet.m_wset, 
00677                      m_writeSet, 
00678                      &EventHandler::handle_write);
00679 
00680     /*--- Exceptions next ---*/
00681     dispatchHandler (m_readySet.m_eset, 
00682                      m_exceptSet, 
00683                      &EventHandler::handle_except);
00684 
00685     /*--- Finally, the Reads ---*/
00686     dispatchHandler (m_readySet.m_rset, 
00687                      m_readSet, 
00688                      &EventHandler::handle_read);
00689 
00690     return (true);
00691 }
00692 
00693 void 
00694 Reactor::
00695 stopReactor (void) 
00696 { 
00697     trace_with_mask("Reactor::stopReactor", REACTTRACE);
00698 
00699     m_active = false; 
00700 
00701     Fd2Eh_Map_Iter iter;
00702     EventHandler* ehp;
00703 
00704     while (m_readSet.size () > 0) {
00705         iter = m_readSet.begin ();
00706         ehp = (*iter).second;
00707         removeHandler (ehp);
00708     }
00709 
00710     while (m_writeSet.size () > 0) {
00711         iter = m_writeSet.begin ();
00712         ehp = (*iter).second;
00713         removeHandler (ehp);
00714     }
00715 
00716     while (m_exceptSet.size () > 0) {
00717         iter = m_exceptSet.begin ();
00718         ehp = (*iter).second;
00719         removeHandler (ehp);
00720     }
00721 }
00722 
00727 void
00728 Reactor::
00729 adjust_maxfdp1 (handler_t fd_, 
00730                 handler_t rmax_, 
00731                 handler_t wmax_, 
00732                 handler_t emax_)
00733 {
00734 #if !defined (WIN32)  /* POSIX */
00735 
00736     trace_with_mask("Reactor::adjust_maxfdp1", REACTTRACE);
00737 
00738     if (m_maxfd_plus1 == fd_ + 1) {
00739         m_maxfd_plus1 = std::max (rmax_, std::max (wmax_, emax_));
00740 
00741         DL((REACT,"maxfd+1 adjusted to %d\n", m_maxfd_plus1));
00742     }
00743 #endif
00744 }

Generated on Sun Aug 13 15:08:00 2006 for libassa by  doxygen 1.4.6