00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014 #include <iostream>
00015 #include <sstream>
00016 #include <string>
00017
00018 #include "assa/Reactor.h"
00019 #include "assa/Logger.h"
00020
00021 using namespace ASSA;
00022
00023 Reactor::
00024 Reactor () :
00025 m_fd_setsize (1024),
00026 m_maxfd_plus1 (0),
00027 m_active (true)
00028 {
00029 trace_with_mask("Reactor::Reactor",REACTTRACE);
00030
00034 #if defined(WIN32)
00035 m_fd_setsize = FD_SETSIZE;
00036
00037 #else // POSIX
00038 struct rlimit rlim;
00039 rlim.rlim_max = 0;
00040
00041 if ( getrlimit (RLIMIT_NOFILE, &rlim) == 0 ) {
00042 m_fd_setsize = rlim.rlim_cur;
00043 }
00044 #endif
00045
00048 #if defined (WIN32)
00049 WSADATA data;
00050 WSAStartup (MAKEWORD (2, 2), &data);
00051 #endif
00052 }
00053
00062 Reactor::
00063 ~Reactor()
00064 {
00065 trace_with_mask("Reactor::~Reactor",REACTTRACE);
00066
00067 m_readSet.clear ();
00068 m_writeSet.clear ();
00069 m_exceptSet.clear ();
00070 Log::log_close ();
00071 deactivate ();
00072 }
00073
00074 TimerId
00075 Reactor::
00076 registerTimerHandler (EventHandler* eh_,
00077 const TimeVal& timeout_,
00078 const std::string& name_)
00079 {
00080 trace_with_mask( "Reactor::registerTimerHandler",REACTTRACE);
00081 Assure_return (eh_);
00082
00083 TimeVal now (TimeVal::gettimeofday());
00084 TimeVal t (now + timeout_);
00085
00086 DL((REACT,"TIMEOUT_EVENT......: (%d,%d)\n",
00087 timeout_.sec(),timeout_.msec()));
00088 DL((REACT,"Time now...........: %s\n", now.fmtString().c_str() ));
00089 DL((REACT,"Scheduled to expire: %s\n", t.fmtString().c_str() ));
00090
00091 TimerId tid = m_tqueue.insert (eh_, t, timeout_, name_);
00092
00093 DL((REACT,"---Modified Timer Queue----\n"));
00094 m_tqueue.dump();
00095 DL((REACT,"---------------------------\n"));
00096
00097 return (tid);
00098 }
00099
00100 bool
00101 Reactor::
00102 registerIOHandler (EventHandler* eh_, handler_t fd_, EventType et_)
00103 {
00104 trace_with_mask("Reactor::registerHandler(I/O)",REACTTRACE);
00105
00106 std::ostringstream msg;
00107 Assure_return (eh_ && !isSignalEvent (et_) && !isTimeoutEvent (et_));
00108
00109 if (isReadEvent (et_))
00110 {
00111 if (!m_waitSet.m_rset.setFd (fd_))
00112 {
00113 DL((ASSAERR,"readset: fd %d out of range\n", fd_));
00114 return (false);
00115 }
00116 m_readSet[fd_] = eh_;
00117 msg << "READ_EVENT";
00118 }
00119
00120 if (isWriteEvent (et_))
00121 {
00122 if (!m_waitSet.m_wset.setFd (fd_))
00123 {
00124 DL((ASSAERR,"writeset: fd %d out of range\n", fd_));
00125 return (false);
00126 }
00127 m_writeSet[fd_] = eh_;
00128 msg << " WRITE_EVENT";
00129 }
00130
00131 if (isExceptEvent (et_))
00132 {
00133 if (!m_waitSet.m_eset.setFd (fd_))
00134 {
00135 DL((ASSAERR,"exceptset: fd %d out of range\n", fd_));
00136 return (false);
00137 }
00138 m_exceptSet[fd_] = eh_;
00139 msg << " EXCEPT_EVENT";
00140 }
00141 msg << std::ends;
00142
00143 DL((REACT,"Registered EvtH(%s) fd=%d (0x%x) for event(s) %s\n",
00144 eh_->get_id ().c_str (), fd_, (u_long)eh_, msg.str ().c_str () ));
00145
00146 #if !defined (WIN32)
00147 if (m_maxfd_plus1 < fd_+1) {
00148 m_maxfd_plus1 = fd_+1;
00149 DL((REACT,"maxfd+1 adjusted to %d\n", m_maxfd_plus1));
00150 }
00151 #endif
00152
00153 DL((REACT,"Modified waitSet:\n"));
00154 m_waitSet.dump ();
00155
00156 return (true);
00157 }
00158
00159 bool
00160 Reactor::
00161 removeTimerHandler (TimerId tid_)
00162 {
00163 trace_with_mask("Reactor::removeTimer",REACTTRACE);
00164 bool ret;
00165
00166 if ((ret = m_tqueue.remove (tid_))) {
00167 DL((REACT,"---Modified Timer Queue----\n"));
00168 m_tqueue.dump();
00169 DL((REACT,"---------------------------\n"));
00170 }
00171 else {
00172 EL((ASSAERR,"Timer tid 0x%x wasn't found!\n", (u_long)tid_ ));
00173 }
00174 return (ret);
00175 }
00176
00180 bool
00181 Reactor::
00182 removeHandler (EventHandler* eh_, EventType event_)
00183 {
00184 trace_with_mask("Reactor::removeHandler(eh_,et_)",REACTTRACE);
00185
00186 bool ret = false;
00187 handler_t fd;
00188 handler_t rfdmax;
00189 handler_t wfdmax;
00190 handler_t efdmax;
00191 Fd2Eh_Map_Iter iter;
00192
00193 rfdmax = wfdmax = efdmax = 0;
00194
00195 if (eh_ == NULL) {
00196 return false;
00197 }
00198
00199 if (isTimeoutEvent (event_)) {
00200 ret = m_tqueue.remove (eh_);
00201 ret = true;
00202 }
00203
00204 if (isReadEvent (event_)) {
00205 iter = m_readSet.begin ();
00206 while (iter != m_readSet.end ()) {
00207 if ((*iter).second == eh_) {
00208 fd = (*iter).first;
00209 m_readSet.erase (iter);
00210 m_waitSet.m_rset.clear (fd);
00211 ret = true;
00212 break;
00213 }
00214 rfdmax = fd;
00215 iter++;
00216 }
00217 }
00218
00219 if (isWriteEvent (event_)) {
00220 iter = m_writeSet.begin ();
00221 while (iter != m_writeSet.end ()) {
00222 if ((*iter).second == eh_) {
00223 fd = (*iter).first;
00224 m_writeSet.erase (iter);
00225 m_waitSet.m_wset.clear (fd);
00226 ret = true;
00227 break;
00228 }
00229 wfdmax = fd;
00230 iter++;
00231 }
00232 }
00233
00234 if (isExceptEvent (event_)) {
00235 iter = m_exceptSet.begin ();
00236 while (iter != m_exceptSet.end ()) {
00237 if ((*iter).second == eh_) {
00238 fd = (*iter).first;
00239 m_exceptSet.erase (iter);
00240 m_waitSet.m_eset.clear (fd);
00241 ret = true;
00242 break;
00243 }
00244 efdmax = fd;
00245 iter++;
00246 }
00247 }
00248
00249 if (ret == true) {
00250 DL((REACT,"Found EvtH \"%s\"(0x%X)\n",
00251 eh_->get_id ().c_str (),int(eh_)));
00252 eh_->handle_close (fd);
00253 }
00254
00255 adjust_maxfdp1 (fd, rfdmax, wfdmax, efdmax);
00256
00257 DL((REACT,"Modifies waitSet:\n"));
00258 m_waitSet.dump ();
00259
00260 return (ret);
00261 }
00262
00263 bool
00264 Reactor::
00265 removeIOHandler (handler_t fd_)
00266 {
00267 trace_with_mask("Reactor::removeIOHandler",REACTTRACE);
00268
00269 bool ret = false;
00270 EventHandler* ehp = NULL;
00271 Fd2Eh_Map_Iter iter;
00272
00273 handler_t rfdmax;
00274 handler_t wfdmax;
00275 handler_t efdmax;
00276
00277 rfdmax = wfdmax = efdmax = 0;
00278
00279 Assure_return (ASSA::is_valid_handler (fd_));
00280
00281 DL((REACT,"Removing handler for fd=%d\n",fd_));
00282
00287 if ((iter = m_readSet.find (fd_)) != m_readSet.end ())
00288 {
00289 ehp = (*iter).second;
00290 m_readSet.erase (iter);
00291 m_waitSet.m_rset.clear (fd_);
00292 m_readySet.m_rset.clear (fd_);
00293 if (m_readSet.size () > 0) {
00294 iter = m_readSet.end ();
00295 iter--;
00296 rfdmax = (*iter).first;
00297 }
00298 ret = true;
00299 }
00300
00301 if ((iter = m_writeSet.find (fd_)) != m_writeSet.end ())
00302 {
00303 ehp = (*iter).second;
00304 m_writeSet.erase (iter);
00305 m_waitSet.m_wset.clear (fd_);
00306 m_readySet.m_wset.clear (fd_);
00307 if (m_writeSet.size () > 0) {
00308 iter = m_writeSet.end ();
00309 iter--;
00310 wfdmax = (*iter).first;
00311 }
00312 ret = true;
00313 }
00314
00315 if ((iter = m_exceptSet.find (fd_)) != m_exceptSet.end ())
00316 {
00317 ehp = (*iter).second;
00318 m_exceptSet.erase (iter);
00319 m_waitSet.m_eset.clear (fd_);
00320 m_readySet.m_eset.clear (fd_);
00321 if (m_exceptSet.size () > 0) {
00322 iter = m_exceptSet.end ();
00323 iter--;
00324 efdmax = (*iter).first;
00325 }
00326 ret = true;
00327 }
00328
00329 if (ret == true && ehp != NULL) {
00330 DL((REACT,"Removed EvtH \"%s\"(0x%X)\n",
00331 ehp->get_id ().c_str (), int(ehp)));
00332 ehp->handle_close (fd_);
00333 }
00334
00335 adjust_maxfdp1 (fd_, rfdmax, wfdmax, efdmax);
00336
00337 DL((REACT,"Modifies waitSet:\n"));
00338 m_waitSet.dump ();
00339
00340 return (ret);
00341 }
00342
00343 bool
00344 Reactor::
00345 checkFDs (void)
00346 {
00347 trace_with_mask("Reactor::checkFDs",REACTTRACE);
00348
00349 bool num_removed = false;
00350 FdSet mask;
00351 timeval poll = { 0, 0 };
00352
00353 for (handler_t fd = 0; fd < m_fd_setsize; fd++) {
00354 if ( m_readSet[fd] != NULL ) {
00355 mask.setFd (fd);
00356 if ( ::select (fd+1, &mask, NULL, NULL, &poll) < 0 ) {
00357 removeIOHandler (fd);
00358 num_removed = true;
00359 DL((REACT,"Detected BAD FD: %d\n", fd ));
00360 }
00361 mask.clear (fd);
00362 }
00363 }
00364 return (num_removed);
00365 }
00366
00367 bool
00368 Reactor::
00369 handleError (void)
00370 {
00371 trace_with_mask("Reactor::handleError",REACTTRACE);
00372
00375 if ( !m_active ) {
00376 DL((REACT,"Received cmd to stop Reactor\n"));
00377 return (false);
00378 }
00379
00380
00381
00382
00383
00384
00385
00386
00387
00388
00389
00390
00391
00392
00393
00394
00395
00396
00397
00398 if ( errno == EINTR ) {
00399 EL((REACT,"EINTR: interrupted select(2)\n"));
00400
00401
00402
00403
00404
00405
00406
00407 return (true);
00408 }
00409
00410
00411
00412
00413
00414
00415 if ( errno == EBADF ) {
00416 DL((REACT,"EBADF: bad file descriptor\n"));
00417 return (checkFDs ());
00418 }
00419
00420
00421
00422 #if defined (WIN32)
00423 DL ((REACT,"select(3) error = %d\n", WSAGetLastError()));
00424 #else
00425 EL((ASSAERR,"select(3) error\n"));
00426 #endif
00427 return (false);
00428 }
00429
00430 int
00431 Reactor::
00432 isAnyReady (void)
00433 {
00434 trace_with_mask("Reactor::isAnyReady",REACTTRACE);
00435
00436 int n = m_readySet.m_rset.numSet () +
00437 m_readySet.m_wset.numSet () +
00438 m_readySet.m_eset.numSet ();
00439
00440 if ( n > 0 ) {
00441 DL((REACT,"m_readySet: %d FDs are ready for processing\n", n));
00442 m_readySet.dump ();
00443 }
00444 return (n);
00445 }
00446
00447 void
00448 Reactor::
00449 calculateTimeout (TimeVal*& howlong_, TimeVal* maxwait_)
00450 {
00451 trace_with_mask("Reactor::calculateTimeout",REACTTRACE);
00452
00453 TimeVal now;
00454 TimeVal tv;
00455
00456 if (m_tqueue.isEmpty () ) {
00457 howlong_ = maxwait_;
00458 goto done;
00459 }
00460 now = TimeVal::gettimeofday ();
00461 tv = m_tqueue.top ();
00462
00463 if (tv < now) {
00464
00465
00466
00467
00468
00469 *howlong_ = 0;
00470 }
00471 else {
00472 DL((REACT,"--------- Timer Queue ----------\n"));
00473 m_tqueue.dump();
00474 DL((REACT,"--------------------------------\n"));
00475
00476 if (maxwait_ == NULL || *maxwait_ == TimeVal::zeroTime ()) {
00477 *howlong_ = tv - now;
00478 }
00479 else {
00480 *howlong_ = (*maxwait_+now) < tv ? *maxwait_ : tv-now;
00481 }
00482 }
00483
00484 done:
00485 if (howlong_ != NULL) {
00486 DL((REACT,"delay (%f)\n", double (*howlong_) ));
00487 }
00488 else {
00489 DL((REACT,"delay (forever)\n"));
00490 }
00491 }
00492
00496 void
00497 Reactor::
00498 waitForEvents (void)
00499 {
00500 while ( m_active ) {
00501 waitForEvents ((TimeVal*) NULL);
00502 }
00503 }
00504
00521 void
00522 Reactor::
00523 waitForEvents (TimeVal* tv_)
00524 {
00525 trace_with_mask("Reactor::waitForEvents",REACTTRACE);
00526
00527 TimerCountdown traceTime (tv_);
00528 DL((REACT,"======================================\n"));
00529
00530
00531 m_tqueue.expire (TimeVal::gettimeofday ());
00532
00533
00534
00535
00536 if (!m_active) {
00537 return;
00538 }
00539
00540 int nReady;
00541 TimeVal delay;
00542 TimeVal* dlp = &delay;
00543
00544
00545
00546
00547
00548
00549
00550
00551 if ((nReady = isAnyReady ())) {
00552 DL((REACT,"isAnyReady returned: %d\n",nReady));
00553 dispatch (nReady);
00554 return;
00555 }
00556
00557 DL((REACT,"=== m_waitSet ===\n"));
00558 m_waitSet.dump ();
00559
00560 do {
00561 m_readySet.reset ();
00562 DL ((REACT,"m_readySet after reset():\n"));
00563 m_readySet.dump ();
00564
00565 m_readySet = m_waitSet;
00566 DL ((REACT,"m_readySet after assign:\n"));
00567 m_readySet.dump ();
00568
00569 calculateTimeout (dlp, tv_);
00570
00571 nReady = ::select (m_maxfd_plus1,
00572 &m_readySet.m_rset,
00573 &m_readySet.m_wset,
00574 &m_readySet.m_eset,
00575 dlp);
00576 DL((REACT,"::select() returned: %d\n",nReady));
00577
00578 m_readySet.sync ();
00579 DL ((REACT,"m_readySet after select:\n"));
00580 m_readySet.dump ();
00581
00582 }
00583 while (nReady < 0 && handleError ());
00584
00585 dispatch (nReady);
00586 }
00587
00594 void
00595 Reactor::
00596 dispatchHandler (FdSet& mask_, Fd2Eh_Map_Type& fdSet_, EH_IO_Callback callback_)
00597 {
00598 trace_with_mask("Reactor::dispatchHandler",REACTTRACE);
00599
00600 int ret = 0;
00601 handler_t fd;
00602 EventHandler* ehp = NULL;
00603 std::string eh_id;
00604
00605 Fd2Eh_Map_Iter iter = fdSet_.begin ();
00606
00607 while (iter != fdSet_.end ())
00608 {
00609 fd = (*iter).first;
00610 ehp = (*iter).second;
00611
00612 if (mask_.isSet (fd) && ehp != NULL)
00613 {
00614 eh_id = ehp->get_id ();
00615 DL((REACT,"Data detected from \"%s\"(fd=%d)\n",
00616 eh_id.c_str (), fd));
00617
00618 ret = (ehp->*callback_) (fd);
00619
00620 if (ret == -1) {
00621 removeIOHandler (fd);
00622 }
00623 else if (ret > 0) {
00624 DL((REACT,"%d bytes pending on fd=%d \"%s\"\n",
00625 ret, fd, eh_id.c_str ()));
00626
00627 }
00628 else {
00629 DL((REACT,"All data from \"%s\"(fd=%d) are consumed\n",
00630 eh_id.c_str (), fd));
00631 mask_.clear (fd);
00632 }
00639 iter = fdSet_.begin ();
00640 }
00641 else {
00642 iter++;
00643 }
00644 }
00645 }
00646
00652 bool
00653 Reactor::
00654 dispatch (int ready_)
00655 {
00656 trace_with_mask("Reactor::dispatch", REACTTRACE);
00657
00658 m_tqueue.expire (TimeVal::gettimeofday ());
00659
00660 if ( ready_ < 0 )
00661 {
00662 #if !defined (WIN32)
00663 EL((ASSAERR,"::select(3) error\n"));
00664 #endif
00665 return (false);
00666 }
00667 if ( ready_ == 0 ) {
00668 return (true);
00669 }
00670
00671 DL((REACT,"Dispatching %d FDs.\n",ready_));
00672 DL((REACT,"m_readySet:\n"));
00673 m_readySet.dump ();
00674
00675
00676 dispatchHandler (m_readySet.m_wset,
00677 m_writeSet,
00678 &EventHandler::handle_write);
00679
00680
00681 dispatchHandler (m_readySet.m_eset,
00682 m_exceptSet,
00683 &EventHandler::handle_except);
00684
00685
00686 dispatchHandler (m_readySet.m_rset,
00687 m_readSet,
00688 &EventHandler::handle_read);
00689
00690 return (true);
00691 }
00692
00693 void
00694 Reactor::
00695 stopReactor (void)
00696 {
00697 trace_with_mask("Reactor::stopReactor", REACTTRACE);
00698
00699 m_active = false;
00700
00701 Fd2Eh_Map_Iter iter;
00702 EventHandler* ehp;
00703
00704 while (m_readSet.size () > 0) {
00705 iter = m_readSet.begin ();
00706 ehp = (*iter).second;
00707 removeHandler (ehp);
00708 }
00709
00710 while (m_writeSet.size () > 0) {
00711 iter = m_writeSet.begin ();
00712 ehp = (*iter).second;
00713 removeHandler (ehp);
00714 }
00715
00716 while (m_exceptSet.size () > 0) {
00717 iter = m_exceptSet.begin ();
00718 ehp = (*iter).second;
00719 removeHandler (ehp);
00720 }
00721 }
00722
00727 void
00728 Reactor::
00729 adjust_maxfdp1 (handler_t fd_,
00730 handler_t rmax_,
00731 handler_t wmax_,
00732 handler_t emax_)
00733 {
00734 #if !defined (WIN32)
00735
00736 trace_with_mask("Reactor::adjust_maxfdp1", REACTTRACE);
00737
00738 if (m_maxfd_plus1 == fd_ + 1) {
00739 m_maxfd_plus1 = std::max (rmax_, std::max (wmax_, emax_));
00740
00741 DL((REACT,"maxfd+1 adjusted to %d\n", m_maxfd_plus1));
00742 }
00743 #endif
00744 }