2.5. I/O Multiplexing with Reactor

For in-depth discussion of Reactor pattern, please, refer to the article "Reactor - An Object Behavioral Pattern for Demultiplexing and Dispatching Handles for Synchronous Events" by Douglas C. Schmidt, published in [MartinBuschmannRiehle97].

2.5.1. Requirements

Our goal is to provide a framework for building high efficiency UNIX servers and application programs. In order to achieve this objective, our programs have to meet certain requirements. They are:

  1. The ability to serve multiple communication streams concurrently.

  2. None of the I/O operations should block the application program code at any time.

  3. Duration of every I/O operation should be restrained by a configurable timeout.

  4. An application program should process UNIX signals in concurrent manner.

  5. None of the data processing activities should take long enough time to cause starvation to other concurrent communication activities.

It should also be noted that most of the requirements mentioned above can be satisfied by using multithreading. However, the protability of the thread libraries and, what is more important, programming complexity precludes us from writing stable and robust code. The past experience has shown that multithreaded applications eventually run into wicked concurrency issues leading to process deadlocking with no hope of investigating the cause of the problem and effectively fixing it.

2.5.2. Overview

Reactor pattern integrates the synchronous demultiplexing of events and the dispatching of their corresponding event handlers. Classes of events that are serviced by the class Reactor are:

Socket I/O events are differentiated further as read, write, and except events.

The underlying common demultiplexer used for I/O events is select(2).

2.5.3. Class EventHandler

EventHandler base class provides standard interface for dispatching handlers that have registered with Reactor to process event.

Example 2-6. Class EventHandler definition


class EventHandler
{
public:
      virtual int handle_read (int fd);
      virtual int handle_write (int fd);
      virtual int handle_except (int fd);

      virtual int handle_timeout (TimerId tid); 
      virtual int handle_signal (int signum_); 

      virtual int handle_close (int fd);
};
		

A class derived from EventHandler overloads inteface method(s) that process events in an application-specific manner. A derived class should only implement methods of the events it is interested in.

Note

Reactor is the only caller of the member functions of class EventHandler or their overloaded equivalents. Remembering this simple rule takes a lot of confusion out of the whole picture.

A class derived from EventHandler selects the type of event it wants to receive when it registers with Reactor. The event type is one of the values of enum ASSA::EventType.

Table 2-1. Events handled by the Reactor

SignalDescription
READ_EVENTNotify when there is at least 1 byte available for reading from I/O channel without blocking .
WRITE_EVENTNotify when there is room for at least 1 byte that can be written to I/O channel without blocking.
EXCEPT_EVENTNotify when there is an exception (out-of-band data) condition detected in TCP layer.
RWE_EVENTS READ/WRITE/EXCEPT mask
TIMEOUT_EVENTNotify about expired timer.
SIGNAL_EVENTNotify when UNIX signal is delivered by the operating system.
ALL_EVENTSAll events mask

2.5.4. Registering EventHandler

Reactor provides following methods for registration and removal of EventHandler objects:

Example 2-7. Class Reactor definition


class Reactor 
{
public:

    bool registerIOHandler (EventHandler* eh_, 
                            int fd_, 
                            EventType et_ = RWE_EVENTS);

    TimerId registerTimerHandler (EventHandler* eh_, TimeVal& tv_);

    bool removeIOHandler (int fd_);    
    bool removeHandler (EventHandler* eh_, EventType et_ = ALL_EVENTS);
    bool removeTimerHandler (TimerId id_);

    // other member functions ...
};
		

We have already seen an example of registering a class that implements EventHandler with Reactor (for example: Example 2-2).

2.5.5. Processing Events

Reactor combines all registered EventHandlers and for each iteration of its event loop waits for occurance of any of the events using synchronous event demultiplexer, select(2).

Reactor triggers appropriate EventHandler method in response to an event that has occured by calling an appropriate virtual member function.

2.5.5.1. Record-Oriented Data Stream

Record-oriented data stream is defined as a communication protocol in which each record is of unknown variable length and there is a character or a sequence of characters that denote the end of a record, thus separating one record from another.

In our assa-logd server MonitorConn class represents the connection with a monitoring client, assa-logmon. We might want to implement communication protocol between the two peers in terms of ASCII strings terminated by carriage return (CR) followed immediately by line feed (LF). This way, we can use telnet session instead of assa-logmon.

Our communication protocol thus becomes record-oriented because we don't know the length of the message and the boundary of each message is detected by [CR][LF] control characters. It is worth mentioning that our choice of the record-terminated characters is largerly dictated by the fact that no control characters are expected to be used in the message strings themselves.

This gives us an opportunity to illustrates a typical technique of reading record-oriented data stream with libassa. We are going to modify the original code (see Example 2-4) of MonitorConn class to process each record-oriented ASCII message individually.


// logserver/server/MonitorConn.h

0029 class MonitorConn : public ServiceHandler<IPv4Socket>
0030 {
0031 public:
...
0039 
0040 private:
0041     static const char CR;             // ASCII CR (cartrige return)
0042     static const char LF;             // ASCII LF (linefeed)
0043 
0044     static const size_t MAXMSGLEN = 128;    // Maximum message length
0045 
0046     void parse_record ();
0047 
0048 private:
0049     u_int m_iolen;              // Number of bytes in I/O buffer so far
0050     char  m_iobuf [MAXMSGLEN];  // Message buffer
0051 };
		

  • Lines 41-44 declare constants. If message from a client exceeds MAXMSGLEN, we log the error and disconnect from the offending client.

  • Line 46 declares private function to process commands from the client.

  • Lines 49-50 declare buffer that holds the message. We might be receiving message in several chunks. m_iobuf[] collects the entire message.


// logserver/server/MonitorConn.cpp

0023 const char MonitorConn::CR = 0xD;
0024 const char MonitorConn::LF = 0xA;
...
0026 MonitorConn::
0027 MonitorConn (IPv4Socket* stream_)
0028     : ServiceHandler<IPv4Socket> (stream_),
0029       m_iolen (0)
0030 {   
0031     trace ("MonitorConn::MonitorConn");
0032     /* no-op */
0033 }
		

  • Lines 23-24 initializes carriage return and line feed constants.

  • Line 29 initializes number of bytes in the buffer so far.


// logserver/server/MonitorConn.cpp

...
0096 void MonitorConn::parse_record ()
0097 {
0098     trace("MonitorConn::parse_record");
0099 
0100     DL((ASSA::APP,"=> Message from client:\n\n%s\n\n", m_iobuf));
0101     m_iolen = 0;
0102 }
		

  • Lines 96-102 process message by logging it and reset the buffer to accept new message.

Example 2-8. Record-Oriented Data Transfer


// logserver/server/MonitorConn.cpp

0059 int MonitorConn::handle_read (int fd_)
0060 {   
0061     trace("MonitorConn::handle_read");
0062 
0063     ASSA::IPv4Socket& s = *this;
0064     if (s.getHandler () != fd_) { return (-1); }
0065 
0066     char c = 0;
0067     int ret = 0;
0068     bool seen_eor = false;      // have we seen end-of-record?
0069     
0070     while ((ret = s.read (&c, 1)) == 1 && m_iolen < MAXMSGLEN) {
0071         if (c == LF) {
0072             continue;
0073         }
0074         if (c == CR) {
0075             seen_eor = true;
0076             m_iobuf [m_iolen] = '\0';
0077             break;
0078         }
0079         m_iobuf [m_iolen++] = c;
0080     }
0081
0082     if (m_iolen >= MAXMSGLEN) {
0083         DL((ASSA::ERROR,"Message length exceeded %d bytes!\n",MAXMSGLEN));
0084         return -1;
0085     }
0086 
0087     if (seen_eor) {
0088         parse_record ();
0089     }
0090 
0091     return s.eof () ? -1 : s.in_avail ();
0092 }
		  

  • Lines 70-80 read data a byte at a time, checking for LF followed by CR. If end of record is found, we break out of the loop.

  • Lines 82-85 handle an attempt of the client program to overflow buffer. We disconnect from the offending client.

  • Lines 87-89 call record processing procedure.

2.5.5.2. Binary Data Stream

The trouble with record-oriented data stream is that under heavy data transfer it becomes inefficient. When we move big chunks of data that represent scientific data, we want the information to be compact and encoded into network-independent (i.e. XDR) format.

A Binary data stream protocol assumes that each message carries a constant-size header that at the minimum conveys the type and the size of the message that follows. Thus, both header and the body are of a known size.

Below is a skeleton of DataProcessor that reads and accumulates anticipated number of bytes for further data processing. Note that the raw data transferred this way can be interpreted later on as a certain binary format.


0001 class DataProcessor : public ServiceHandler<IPv4Socket>
0002 {
0003 public:
0004     DataProcessor(IPv4Socket* stream_) : 
0005         ServiceHandler<IPv4Socket> (stream_) { /* no-op */ }
0006
0007     ~DataProcessor () { /* no-op */ }
0008 
0009     virtual int open ();
0010     virtual int handle_read (int fd_);
0011     virtual int handle_close (int fd_);
0012     void process_message ();
0013 
0014 private:
0015     enum state_t { waiting, complete };
0016
0017     std::string m_body;      // Message so far
0018     u_int       m_len;       // Expected message length
0019     state_t     m_state;     // Are we done with transfer?
0020 };
0021
0022 int DataProcessor::open ()
0023 {
0024     m_len = NUM;    // Number of bytes expected
0025     m_state = waiting;
0026 }  
		

  • Line 15 defines two states DataProcessor can be in: 1) waiting for complete packet and 2) having complete packet received.

  • Lines 17-18 declare character bucket, m_body and total message length.

Example 2-9. Binary Data Transfer


0022 int DataProcessor::handle_read (int fd_)
0023 {
0024     trace("Conn::handle_read");
0025
0026     ASSA::IPv4Socket& s = *this;
0027
0028     if (m_state == complete) { return 0; }
0029     if (s.getHandler () != fd_) { return (-1); }
0030
0031     int rcvd = 0;
0032     u_int expd = m_len - m_body.size ();
0033     char* buf = new char [expd + 1];
0034
0035     while ((rcvd = s.read (buf, expd)) > 0) {
0036         buf [rcvd] = '\0';
0037         m_body += buf;
0038         expd = m_len - m_body.size ();
0039         if (expd == 0) {
0040             m_state = complete;
0041             break;
0042         }
0043     }
0044     delete [] buf;
0045     return s.eof () ? -1 : s.in_avail ();
0046 }
0047 
0048 int DataProcessor::process_message ()
0049 {
0050     // do data processing
0051     
0052     // Re-initialize character bucket
0053     m_len = NUM;
0054     m_buf = "";
0055     m_state = waiting;
0056 }
		  

  • Line 28 is a short-circuit to avoid reading more data from the socket then required.

  • Line 32 sets expd to the number of bytes left to complete the message transfer.

  • Lines 35-43 try to read as much as possible and append data to m_body. When we read the last byte or if there is no more data in the socket buffer, break out.

  • Line 50 would be replaced with message processing code.

  • Lines 53-55 prepare the character bucket for the new message.

2.5.6. Removing EventHandler

You can remove EventHandler from Reactor's processing queue using:


void MyEventHandler::foo ()
{
    Socket& s = *this;
    /**
     * Remove handler registered for particular file descriptor.
     * Socket::getHandler () returns socket file descriptor.
     */
    REACTOR->removeIOHandler (s.getHandler ());

    /**
     * Remove 'this' event handler for one particular event 
     */
    REACTOR->removeHandler (this, READ_EVENT);

    /**
     * Remove timer identified with m_tid 
     */
    react->removeTimerHandler (m_tid);
    m_tid = 0;
}