libASSA Programmer's Manual | ||
---|---|---|
<<< Previous | Chapter 2. Writing Network Applications | Next >>> |
For in-depth discussion of Reactor pattern, please, refer to the article "Reactor - An Object Behavioral Pattern for Demultiplexing and Dispatching Handles for Synchronous Events" by Douglas C. Schmidt, published in [MartinBuschmannRiehle97].
Our goal is to provide a framework for building high efficiency UNIX servers and application programs. In order to achieve this objective, our programs have to meet certain requirements. They are:
The ability to serve multiple communication streams concurrently.
None of the I/O operations should block the application program code at any time.
Duration of every I/O operation should be restrained by a configurable timeout.
An application program should process UNIX signals in concurrent manner.
None of the data processing activities should take long enough time to cause starvation to other concurrent communication activities.
It should also be noted that most of the requirements mentioned above can be satisfied by using multithreading. However, the protability of the thread libraries and, what is more important, programming complexity precludes us from writing stable and robust code. The past experience has shown that multithreaded applications eventually run into wicked concurrency issues leading to process deadlocking with no hope of investigating the cause of the problem and effectively fixing it.
Reactor pattern integrates the synchronous demultiplexing of events and the dispatching of their corresponding event handlers. Classes of events that are serviced by the class Reactor are:
Socket (or file descriptor) I/O events.
UNIX signals.
Timers.
Socket I/O events are differentiated further as read, write, and except events.
The underlying common demultiplexer used for I/O events is select(2).
EventHandler base class provides standard interface for dispatching handlers that have registered with Reactor to process event.
Example 2-6. Class EventHandler definition
class EventHandler { public: virtual int handle_read (int fd); virtual int handle_write (int fd); virtual int handle_except (int fd); virtual int handle_timeout (TimerId tid); virtual int handle_signal (int signum_); virtual int handle_close (int fd); }; |
A class derived from EventHandler overloads inteface method(s) that process events in an application-specific manner. A derived class should only implement methods of the events it is interested in.
Reactor is the only caller of the member functions of class EventHandler or their overloaded equivalents. Remembering this simple rule takes a lot of confusion out of the whole picture. |
A class derived from EventHandler selects the type of event it wants to receive when it registers with Reactor. The event type is one of the values of enum ASSA::EventType.
Table 2-1. Events handled by the Reactor
Signal | Description |
---|---|
READ_EVENT | Notify when there is at least 1 byte available for reading from I/O channel without blocking . |
WRITE_EVENT | Notify when there is room for at least 1 byte that can be written to I/O channel without blocking. |
EXCEPT_EVENT | Notify when there is an exception (out-of-band data) condition detected in TCP layer. |
RWE_EVENTS | READ/WRITE/EXCEPT mask |
TIMEOUT_EVENT | Notify about expired timer. |
SIGNAL_EVENT | Notify when UNIX signal is delivered by the operating system. |
ALL_EVENTS | All events mask |
Reactor provides following methods for registration and removal of EventHandler objects:
Example 2-7. Class Reactor definition
class Reactor { public: bool registerIOHandler (EventHandler* eh_, int fd_, EventType et_ = RWE_EVENTS); TimerId registerTimerHandler (EventHandler* eh_, TimeVal& tv_); bool removeIOHandler (int fd_); bool removeHandler (EventHandler* eh_, EventType et_ = ALL_EVENTS); bool removeTimerHandler (TimerId id_); // other member functions ... }; |
We have already seen an example of registering a class that implements EventHandler with Reactor (for example: Example 2-2).
Reactor combines all registered EventHandlers and for each iteration of its event loop waits for occurance of any of the events using synchronous event demultiplexer, select(2).
Reactor triggers appropriate EventHandler method in response to an event that has occured by calling an appropriate virtual member function.
Record-oriented data stream is defined as a communication protocol in which each record is of unknown variable length and there is a character or a sequence of characters that denote the end of a record, thus separating one record from another.
In our assa-logd server MonitorConn class represents the connection with a monitoring client, assa-logmon. We might want to implement communication protocol between the two peers in terms of ASCII strings terminated by carriage return (CR) followed immediately by line feed (LF). This way, we can use telnet session instead of assa-logmon.
Our communication protocol thus becomes record-oriented because we don't know the length of the message and the boundary of each message is detected by [CR][LF] control characters. It is worth mentioning that our choice of the record-terminated characters is largerly dictated by the fact that no control characters are expected to be used in the message strings themselves.
This gives us an opportunity to illustrates a typical technique of reading record-oriented data stream with libassa. We are going to modify the original code (see Example 2-4) of MonitorConn class to process each record-oriented ASCII message individually.
// logserver/server/MonitorConn.h 0029 class MonitorConn : public ServiceHandler<IPv4Socket> 0030 { 0031 public: ... 0039 0040 private: 0041 static const char CR; // ASCII CR (cartrige return) 0042 static const char LF; // ASCII LF (linefeed) 0043 0044 static const size_t MAXMSGLEN = 128; // Maximum message length 0045 0046 void parse_record (); 0047 0048 private: 0049 u_int m_iolen; // Number of bytes in I/O buffer so far 0050 char m_iobuf [MAXMSGLEN]; // Message buffer 0051 }; |
Lines 41-44 declare constants. If message from a client exceeds MAXMSGLEN, we log the error and disconnect from the offending client.
Line 46 declares private function to process commands from the client.
Lines 49-50 declare buffer that holds the message. We might be receiving message in several chunks. m_iobuf[] collects the entire message.
// logserver/server/MonitorConn.cpp 0023 const char MonitorConn::CR = 0xD; 0024 const char MonitorConn::LF = 0xA; ... 0026 MonitorConn:: 0027 MonitorConn (IPv4Socket* stream_) 0028 : ServiceHandler<IPv4Socket> (stream_), 0029 m_iolen (0) 0030 { 0031 trace ("MonitorConn::MonitorConn"); 0032 /* no-op */ 0033 } |
Lines 23-24 initializes carriage return and line feed constants.
Line 29 initializes number of bytes in the buffer so far.
// logserver/server/MonitorConn.cpp ... 0096 void MonitorConn::parse_record () 0097 { 0098 trace("MonitorConn::parse_record"); 0099 0100 DL((ASSA::APP,"=> Message from client:\n\n%s\n\n", m_iobuf)); 0101 m_iolen = 0; 0102 } |
Lines 96-102 process message by logging it and reset the buffer to accept new message.
Example 2-8. Record-Oriented Data Transfer
// logserver/server/MonitorConn.cpp 0059 int MonitorConn::handle_read (int fd_) 0060 { 0061 trace("MonitorConn::handle_read"); 0062 0063 ASSA::IPv4Socket& s = *this; 0064 if (s.getHandler () != fd_) { return (-1); } 0065 0066 char c = 0; 0067 int ret = 0; 0068 bool seen_eor = false; // have we seen end-of-record? 0069 0070 while ((ret = s.read (&c, 1)) == 1 && m_iolen < MAXMSGLEN) { 0071 if (c == LF) { 0072 continue; 0073 } 0074 if (c == CR) { 0075 seen_eor = true; 0076 m_iobuf [m_iolen] = '\0'; 0077 break; 0078 } 0079 m_iobuf [m_iolen++] = c; 0080 } 0081 0082 if (m_iolen >= MAXMSGLEN) { 0083 DL((ASSA::ERROR,"Message length exceeded %d bytes!\n",MAXMSGLEN)); 0084 return -1; 0085 } 0086 0087 if (seen_eor) { 0088 parse_record (); 0089 } 0090 0091 return s.eof () ? -1 : s.in_avail (); 0092 } |
Lines 70-80 read data a byte at a time, checking for LF followed by CR. If end of record is found, we break out of the loop.
Lines 82-85 handle an attempt of the client program to overflow buffer. We disconnect from the offending client.
Lines 87-89 call record processing procedure.
The trouble with record-oriented data stream is that under heavy data transfer it becomes inefficient. When we move big chunks of data that represent scientific data, we want the information to be compact and encoded into network-independent (i.e. XDR) format.
A Binary data stream protocol assumes that each message carries a constant-size header that at the minimum conveys the type and the size of the message that follows. Thus, both header and the body are of a known size.
Below is a skeleton of DataProcessor that reads and accumulates anticipated number of bytes for further data processing. Note that the raw data transferred this way can be interpreted later on as a certain binary format.
0001 class DataProcessor : public ServiceHandler<IPv4Socket> 0002 { 0003 public: 0004 DataProcessor(IPv4Socket* stream_) : 0005 ServiceHandler<IPv4Socket> (stream_) { /* no-op */ } 0006 0007 ~DataProcessor () { /* no-op */ } 0008 0009 virtual int open (); 0010 virtual int handle_read (int fd_); 0011 virtual int handle_close (int fd_); 0012 void process_message (); 0013 0014 private: 0015 enum state_t { waiting, complete }; 0016 0017 std::string m_body; // Message so far 0018 u_int m_len; // Expected message length 0019 state_t m_state; // Are we done with transfer? 0020 }; 0021 0022 int DataProcessor::open () 0023 { 0024 m_len = NUM; // Number of bytes expected 0025 m_state = waiting; 0026 } |
Line 15 defines two states DataProcessor can be in: 1) waiting for complete packet and 2) having complete packet received.
Lines 17-18 declare character bucket, m_body and total message length.
Example 2-9. Binary Data Transfer
0022 int DataProcessor::handle_read (int fd_) 0023 { 0024 trace("Conn::handle_read"); 0025 0026 ASSA::IPv4Socket& s = *this; 0027 0028 if (m_state == complete) { return 0; } 0029 if (s.getHandler () != fd_) { return (-1); } 0030 0031 int rcvd = 0; 0032 u_int expd = m_len - m_body.size (); 0033 char* buf = new char [expd + 1]; 0034 0035 while ((rcvd = s.read (buf, expd)) > 0) { 0036 buf [rcvd] = '\0'; 0037 m_body += buf; 0038 expd = m_len - m_body.size (); 0039 if (expd == 0) { 0040 m_state = complete; 0041 break; 0042 } 0043 } 0044 delete [] buf; 0045 return s.eof () ? -1 : s.in_avail (); 0046 } 0047 0048 int DataProcessor::process_message () 0049 { 0050 // do data processing 0051 0052 // Re-initialize character bucket 0053 m_len = NUM; 0054 m_buf = ""; 0055 m_state = waiting; 0056 } |
Line 28 is a short-circuit to avoid reading more data from the socket then required.
Line 32 sets expd to the number of bytes left to complete the message transfer.
Lines 35-43 try to read as much as possible and append data to m_body. When we read the last byte or if there is no more data in the socket buffer, break out.
Line 50 would be replaced with message processing code.
Lines 53-55 prepare the character bucket for the new message.
You can remove EventHandler from Reactor's processing queue using:
File descriptor, fd.
TimerId timer identification number.
A pointer to the EventHandler and event type.
void MyEventHandler::foo () { Socket& s = *this; /** * Remove handler registered for particular file descriptor. * Socket::getHandler () returns socket file descriptor. */ REACTOR->removeIOHandler (s.getHandler ()); /** * Remove 'this' event handler for one particular event */ REACTOR->removeHandler (this, READ_EVENT); /** * Remove timer identified with m_tid */ react->removeTimerHandler (m_tid); m_tid = 0; } |
<<< Previous | Home | Next >>> |
Data I/O Over TCP/IP Transport Layer | Up | Reactor Event Loop |