Chapter 3. Implementing Communication Protocols

Table of Contents
3.1. Binary Data Stream Protocol
3.2. Record-Oriented Stream Protocol
3.3. Summary

A client/server application framework is meaningless without some sort of communication protocol between a client and a server. There is a wide range of communication protocols, both public and proprietary, that we have to deal with. This chapter considers one way of implementing such protocols. We are going to implement: 1) a binary data stream protocol between libassa-based applications that log their messages to the assa-logd server and the server itself; 2) a record-oriented data stream protocol between log monitor, assa-logmon, and log server, assa-logd.

3.1. Binary Data Stream Protocol

3.1.1. Communication Protocol

We begin with defining a communication protocol between Logger class on the client side and Conn class on the assa-logd server side. Logger class methods are called when an application program makes a call to DL((MASK,"format", arglist)) macro to log a message. When application program is configured to send all its logging traffic to the assa-logd server, each individual message is sent to the server as a binary data stream packet.

Before we can send a message, we have to establish connection with assa-logd. When the connection is established, we send a SIGN_ON message to identify our application and tell the server how to log our messages. Then, each time the application writes a log message, the message is formatted, XDR-encoded along with its attributes and is sent over to the server. The server XDR-decodes the message and writes it out to the log file. When an application exits, it sends a SIGN_OFF message to the server and closes its side of the connection. The server receives the SIGN_OFF message, closes the log file, and destroys the Conn object.

3.1.1.1. Message Header

Each message we send to assa-logd starts with the HEADER of 12 bytes long:

Table 3-1. Header

Offset (in bytes)DataTypeLengthDescription
01234567890int4 bytesMessage Start Signature
4Message typeint4 bytes 0 : SIGN_ON; 1 : SIGN_OFF; 2 : LOG_MSG
8Message sizeint4 bytesLenght of the message that follows (in bytes)

We need Message Start Signature for a quick check to make sure our communication link stays in sync.

3.1.1.2. SIGN_ON Message

We choose the following format for the SIGN_ON message of 4+N+M bytes long:

Table 3-2. SIGN_ON Message

Offset (in bytes)DataTypeLengthDescription
0MaxLogSizeint4 bytesMaximum size of log file
4AppNamestd::stringN bytesApplication name (XDR-encoded)
4+NLogFileNamestd::stringM bytesLogFile name (XDR-encoded)

Note

STL string when XDR-encoded occupies more space. To find out the leght of your XDR-encoded string, call static method Socket::xdr_length(const string&).

3.1.1.3. SIGN_OFF Message

SIGN_OFF message has no body. MessageSize is set to 0 in the header.

3.1.1.4. LOG_MSG Message

LOG_MSG message carries the timestamped and formatted message ready for logging to the file. It is transmitted as XDR-encoded STL string.

Table 3-3. SIGN_OFF Message

Offset (in bytes)DataTypeLengthDescription
0Messagestd::stringN bytesLog message (XDR-encoded)

3.1.2. Client Side Modifications

The client side is already implemented in the library. For testing purposes, we make log-client log every line of the ASCII input file. We are also going to examine the snippets of the code from the library to illustrate the way protocol is actually implemented.

3.1.2.1. log-client Modifications

We add a command-line option { --input-file NAME} to supply data file.


// File: logserver/client/LogClient.h

0033 class LogClient :
0034     public ASSA::GenServer,
0035     public ASSA::Singleton<LogClient>
0036 {
0037 public:
...
0046 private:
...
0048     std::string m_input_file;
0050 };

		

  • Line 48 declares new data member that holds file name.


// File: logserver/client/LogClient.cpp

0030 enum { LC = ASSA::APP };
0031 
0032 LogClient::
0033 LogClient () :
0034       m_exit_value (0)
0035 {
0036     add_opt (0, "input-file", &m_input_file);
0037     
0038 
0039     // ---Configuration---
0040     rm_opt ('f', "config-file"  );
0041     rm_opt ('p', "port"         );
0042 
0043     // ---Process bookkeeping---
0044     rm_opt ('b', "daemon"       );
0045 
0046     /*---
0047      * By defauil disable all debugging
0048      *---*/
0049     m_debug_mask = LC;
0050     m_log_file = "LogClient.log";
0051 }
		

  • Line 36 adds new command-line argument

  • Lines 40-44 remove unnecessary arguments from the list.

  • Lines 49 sets debug mask to LC which is defined on line 30.

  • Line 50 sets log file to LogClient.log. Our log server writes messages from log-client into this file.


// File: logserver/client/LogClient.cpp

0062 void
0063 LogClient::
0064 processServer ()
0065 {
0066     trace("LogClient::processServer");
0067 
0068     const int size = 256;
0069     char line [size];
0070     std::ifstream in_file;
0071 
0072     in_file.open (m_input_file.c_str (), std::ios::in);
0073     if (!in_file) {
0074         std::cerr << "Failed to open input file \""
0075                   << m_input_file << "\"\n"
0076                   << "Option \"--input-file=NAME\" is required\n";
0077     }
0078     else {
0079         while (in_file) {
0080             in_file.getline (line, size, '\n');
0081             DL((LC,"%s\n", line));
0082         }
0083         in_file.close ();
0084     }
0085 
0086     // Shut the service down
0087     m_reactor.stopReactor ();
0088 }
		

There is nothing special going on here. We open an input file, read it line by line, and log each line as debug message as fast as we can.

At some point we are going to overflow the outgoing socket buffer. When that happens, we block in line 81, waiting for the log server to receive and log messages thus making more room available for us to write.

Note

The point here to keep in mind is that if we log excessively to the logging server, our own process might slow down due to the blocking I/O we perform.

3.1.2.2. RemoteLogger Sending Code

The code that takes a log message from the application and sends it to the log server is encapsulated in RemoteLogger class of the library.

3.1.2.2.1. Signing On

At some point during initialization, GenServer calls log_open() function of class Logger to establish connection with the log server:


// Name: assa/Logger.cpp

0077 int Logger::
0078 log_open (const std::string& logsvraddr_,
0079           const char*          logfname_,
0080           u_long                 groups_,
0081           u_long                maxsize_,
0082           ASSA::Reactor*        reactor_)
0083 {
0084     {
0085         ASSA::TimeVal tv (10.0);
0086         ASSA::INETAddress addr (logsvraddr_.c_str ());
0087         if (addr.bad ()) {
0088             errno = EPERM;
0089             return -1;
0090         }
0091         ASSA::Connector <RemoteLogger, ASSA::IPv4Socket> log_connector;
0092         AutoPtr<RemoteLogger> lsp (new RemoteLogger);
0094         log_connector.open (tv);
0095         if (log_connector.connect (lsp.get (), addr) < 0) {
0096             delete m_impl;
0097             m_impl = NULL;
0098             return -1;
0099         }
0093         m_impl = lsp.release ();
0000     }
0001     int ret =  m_impl->log_open (m_app_name.c_str (), logfname_,
0002                                  groups_, maxsize_, reactor_);
0003     return ret;
0004 }
		  

We establish connection with the server and then call log_open() function of RemoteLogger class:


// Name: assa/RemoteLogger.cpp

0045 int RemoteLogger::
0046 log_open (const char*  appname_,
0047           const char* logfname_,
0048           u_long        groups_,
0049           u_long       maxsize_,
0050           Reactor*     reactor_)
0051 {
0052     if (m_recursive_call) {
0053         return 0;
0054     }
0055     m_recursive_call = true;
0056 
0057     if (m_state == opened) {
0058         return 0;
0059     }
0060     m_logfname = logfname_;
0061     m_groups   = groups_;
0062     m_reactor  = reactor_;
0063 
0064     m_reactor->registerIOHandler (this, get_stream ().getHandler(),
0065                                   ASSA::READ_EVENT);
0066 
0067     /** Put stream in a blocking mode. Otherwise, fast clients can
0068         override log server.
0069      */
0070     get_stream ().turnOptionOn (Socket::blocking);
0071     
0072     /** Send SIGN_ON message to the log server.
0073      */
0074     size_t len = sizeof (maxsize_) +
0075         Socket::xdr_length (appname_) +
0076         Socket::xdr_length (logfname_);
0077     
0078     /** Header + Body
0079      */
0080     get_stream () << 1234567890 << SIGN_ON << len
0081                   << maxsize_ << appname_ << logfname_ << ASSA::flush;
0082     m_state = opened;
0083     m_recursive_call = false;
0084     return 0;
0085 }
		  

  • Lines 52-54 guard us against reccursive calls that otherwise will occur for we use parts of the library to establish connection and send data to the server. These parts themselves can log trace messages.

  • Line 70 puts the Socket in the blocking mode. Otherwise, we are going to overflow the outgoing buffer.

  • Lines 74-76 calculate length of the message that follows. This value goes into the protocol's header.

  • Lines 80-81 send the header of the SIGN_ON message and then the message itself. The stream is flushed so that the server can get it right away.

3.1.2.2.2. Sending A Message


// Name: assa/RemoteLogger.cpp

0024 int RemoteLogger::
0025 log_msg (Group               groups_,
0026          size_t        indent_level_,
0027          const string&    func_name_,
0028          const char*            fmt_,
0029          va_list           msg_list_)
0030 {
0031     if (m_recursive_call) {
0032         return 0;
0033     }
...
0055     /** Header + body (preamble;LOG_MSG;length;msg)
0056      */
0057     if (get_stream ()) {
0058         m_recursive_call = true;
0059         Assert_exit (os.str ().length () != 0);
0060         get_stream () << 1234567890 << LOG_MSG << Socket::xdr_length (os)
0061                       << os.str () << ASSA::flush;
0062         m_recursive_call = false;
0063     }
0064     else {
0065         m_state = closed;
0066     }
0067     return 0;
0068 }
		  

  • Line 57 tests to see if stream is open. If it is, we are going to send the message which has already been formatted at this point.

  • Lines 60-61 send the header and then the message. Because ASCII strings are XDR-encoded, their transmission format adds housekeeping information to the string. We find out its encoded length with the help of static member function xdr_length() of class Socket.

3.1.2.2.3. Signing Off

We close connection with the server by sending SIGN_OFF message. The message size is 0.


// Name: assa/RemoteLogger.cpp

0088 int RemoteLogger::
0089 log_close (void)
0090 {
0091     /** Send SIGN_OFF message to the log server and stop data processing
0092      *  We are managed by Logger class.
0093      */
0094     if (m_state == opened) {
0095         m_recursive_call = true;
0096         get_stream () << 1234567890 << SIGN_OFF << 0 << ASSA::flush;
0097         m_reactor->removeHandler (this, READ_EVENT);
0098         m_recursive_call = false;
0099     }
0000     return 0;
0001 }
		  

3.1.3. Server Side Modifications

Our changes to the server are isolated to class Conn for it is in its responsibility to handle communication with clients.

3.1.3.1. Conn Class Declaration


// name: logserver/server/Conn.h

0031 class Conn : public ServiceHandler<IPv4Socket>
0032 {
0033 public:
0034     Conn (IPv4Socket* stream_);
0035     ~Conn ();
0036 
0037     virtual int open ();
0038 
0039     virtual int handle_read (int fd_);
0040     virtual int handle_close (int /* fd */);
0041 
0042 private:
0043     std::string wstate () const;
0044     void shift_logfile ();
0045 
0046 private:
0047     enum msg_t {
0048         SIGN_ON,                /**< Login message  */
0049         SIGN_OFF,               /**< Logout message */
0050         LOG_MSG                 /**< Payload data   */
0051     };
0052     enum wstate_t {
0053         wait_for_header,        /**< Waiting for the message header */
0054         wait_for_signon,        /**< Waiting for SIGN_ON message    */
0055         wait_for_signoff,       /**< Waiting for SIGN_OFF message   */
0056         wait_for_logmsg         /**< Waiting for LOG_MSG message    */
0057     };
0058     enum state_t {
0059         opened,
0060         closed
0061     };
0062 
0063     wstate_t m_wstate;
0064     state_t m_state;
0065 
0066     /// Incoming message size
0067     int m_msg_size;
0068 
0069     /// Incoming message type
0070     int m_msg_type;
0071 
0072     /// Maximum logfile size can reach
0073     int m_maxsize;
0074 
0075     /// Logging application name
0076     std::string m_app_name;
0077 
0078     /// Logfile name
0079     std::string m_logfname;
0080 
0081     /// Logfile stream
0082     std::ofstream m_sink;
0083 
0084     /// Bytes written to the sink so far.
0085     u_long m_bytecount;
0086 };
		

  • Line 44 declares shift_logfile() member function responsible for renaming old log file and creating the new one.

  • Line 47-50 define message type. This is a clear candidate for generalization. We would probably want to isolate this information into a class LogHeader and share it between server and core library (client-side).

  • Lines 53-60 declare some rudimentary notion of state so that we know what to expect next.

3.1.3.2. Conn Class Constructor

		  
// name: logserver/server/Conn.h

0092 inline
0093 Conn::
0094 Conn (IPv4Socket* stream_) : 
0095     ServiceHandler<IPv4Socket> (stream_),
0096     m_maxsize (1048576),
0097     m_bytecount (0),
0098     m_wstate (wait_for_header),
0099     m_state (closed)
0100 {   
0101     trace_with_mask ("Conn::Conn",LSVRTRACE);
0102     /* no-op */
0103 }
...
0112 inline std::string
0113 Conn:: 
0114 wstate () const
0115 {   
0116     return (m_wstate == wait_for_signon ? "wait_for_signon"   :
0117             m_wstate == wait_for_signoff ? "wait_for_signoff" :
0118             "wait_for_logmsg");
0119 }
		

  • Lines 94-99 initialize data members in constructor. First we expect to the header; our state is closed.

  • Lines 114-119 verbally interpret object's state.

3.1.3.3. Conn::open() Modifications


// name: logserver/server/Conn.cpp

0020 int
0021 Conn::
0022 open ()
0023 {
0024     trace_with_mask("Conn::open",LSVRTRACE);
0025 
0026     REACTOR->registerIOHandler (this, get_stream ().getHandler (),
0027                                 ASSA::READ_EVENT);
0028     DL((LSVR,"+--------------------------------+\n"));
0029     DL((LSVR,"| Accepted new client connection |\n"));
0030     DL((LSVR,"+--------------------------------+\n"));
0031     return 0;
0032 }
		

  • Lines 28-30 add a "session start" marker to help us identify in the log file when new connection was detected.

3.1.3.4. Conn::handle_read() Modifications


// name: logserver/server/Conn.cpp

0036 int
0037 Conn::
0038 handle_read (int fd_)
0039 {
0040     trace_with_mask ("Conn::handle_read", LSVRTRACE);
0041     if (get_stream ().getHandler () != fd_) {
0042         return (-1);
0043     }
0044 
0045     if (m_wstate == wait_for_header) {
0046         DL((LSVR,"=> Detected Header\n"));
0047         int preamble = 0;
0048         m_msg_type = m_msg_size = 0;
0049 
0050         get_stream () >> preamble >> m_msg_type >> m_msg_size;
0051 
0052         if (preamble != 1234567890) {
0053             DL((LSVRERROR,"Message stream is out of sync - Abort!\n"));
0054             return -1;
0055         }
0056 
0057         DL((LSVR,"rcvd: Preamble = %d, Type = %d\n", preamble, m_msg_type));
0058         DL((LSVR,"rcvd: Size = %d\n", m_msg_size));
0059 
0060         switch (m_msg_type)
0061         {
0062         case SIGN_ON:  m_wstate = wait_for_signon;  break;
0063         case SIGN_OFF: m_wstate = wait_for_signoff; break;
0064         case LOG_MSG:  m_wstate = wait_for_logmsg;  break;
0065         }
0066     }
0067 
0068     if (m_wstate == wait_for_signon) {
0069         m_maxsize = 0;
0070         DL((LSVR,"=> Incoming SIGN_ON message\n"));
0071         Assert_exit (m_state == closed);
0072         get_stream () >> m_maxsize >> m_app_name >> m_logfname;
0073         DL((LSVR,"rcvd: MaxLogSize = %d, AppName = \"%s\"\n",
0074             m_maxsize, m_app_name.c_str ()));
0075         DL((LSVR,"rcvd: LogFileName = \"%s\"\n", m_logfname.c_str ()));
0076 
0077         m_sink.open (m_logfname.c_str (), std::ios::out | std::ios::app);
0078         if (!m_sink) {
0079             DL((LSVRERROR,"m_sink.open (\"%s\",...) = -1\n"));
0080             return -1;
0081         }
0082         m_state = opened;
0083         m_wstate = wait_for_header;
0084     }
0085     else if (m_wstate == wait_for_signoff) {
0086         DL((LSVR,"=> Incoming SIGN_OFF message\n"));
0087         Assert_exit (m_state == opened);
0088         m_sink << std::flush;
0089         m_sink.close ();
0090         m_state = closed;
0091         return -1;
0092     }
0093     else if (m_wstate == wait_for_logmsg) {
0094         DL((LSVR,"=> Incoming LOG_MSG message\n"));
0095         Assert_exit (m_state == opened);
0096         std::string msg;
0097         if (get_stream () >> msg) {
0098             if (msg.length () != 0) {
0099                 DL((LSVR,"rcvs message:\n%s\n%s%s\n", sep, msg.c_str (), sep));
0100                 m_bytecount += msg.length ();
0101                 if (m_bytecount > m_maxsize) {
0102                     shift_logfile ();
0103                 }
0104             }
0105             else {
0106                 DL((LSVR,"rcvs EMPTY message!\n"));
0107                 Assert_exit (false);
0108             }
0109             m_sink << msg << std::flush;
0110         }
0111         else {
0112             DL((LSVRERROR,"Peer dropped connection!\n"));
0113             m_sink << std::flush;
0114             m_sink.close ();
0115             DL((LSVR,"m_bytecount = %d\n", m_bytecount));
0116             DL((LSVR,"m_maxsize   = %d\n", m_maxsize));
0117         }
0118         m_wstate = wait_for_header;
0119     }
0120 
0121     return get_stream ().eof () ? -1 : get_stream ().in_avail ();
0122 }
		

  • Line 45 tests for the state to be wait_for_header. In this state we handle the Header.

  • Line 50 reads the header from the stream. The header is read in and XDR-decoded. We check for preamble to match an expected value in line 52.

  • Lines 60-64 switch the state engine into the next state depending on the message type we got from the header.

  • Line 72 reads in and XDR-decodes the SIGN_ON message. At this point, we open the log file (line 77) and change our state to wait_for_header.

  • Line 85 handles the SIGN_OFF message. Remember that this message has no body. We close the log file, set our state to closed, and terminate the data processing for this client by returning -1 to the Reactor.

  • Line 93 starts processing actual log messages. We read and XDR-decode std::string message on line 97.

  • Lines 100-103 count the number of characters received and when the maximum allowed size is reached, open a fresh log file.

  • Line 109 writes the message to the log file.

  • Lines 111-117 handle the case when connection between the server and the client has been broken.

  • Line 118 switches the state engine back to waiting for the next header.

3.1.3.5. Conn::handle_close() Modifications


// name: logserver/server/Conn.cpp

0124 int
0125 Conn::
0126 handle_close (int /* fd */)
0127 {   
0128     trace_with_mask("Conn::handle_close", LSVRTRACE);
0129     
0130     DL((LSVR,"+---------------------+\n"));
0131     DL((LSVR,"| Client disconnected |\n"));
0132     DL((LSVR,"+---------------------+\n"));
0133     delete (this);
0134     return 0;
0135 }
		

  • Lines 130-132 mark the end of the session with the client (for debugging purposes).

3.1.3.6. Adding Utility Functions


// name: logserver/server/Conn.cpp

0137 void
0138 Conn::
0139 shift_logfile ()
0140 {
0141     trace_with_mask("Conn::shift_logfile", LSVRTRACE);
0142 
0143     m_sink << std::flush;
0144     m_sink.close ();
0145     m_bytecount = 0;
0146     std::string oldfile = m_logfname + ".0";
0147     ::unlink (oldfile.c_str ());
0148     ::rename (m_logfname.c_str (), oldfile.c_str ());
0149 
0150     m_sink.open (m_logfname.c_str (), std::ios::out | std::ios::app);
0151     if (!m_sink) {
0152         DL((LSVRERROR,"m_sink.open (\"%s\",...) = -1\n", m_logfname.c_str ()));
0153     }
0154 }
		

  • Lines 143-148 close and rename current log file to the one with .0 extension.

  • Line 150 opens fresh log file.

3.1.4. Testing Modifications

We are ready to test the modifications. Open an xterm window and go to the directory where the server code is. Start assalogd server:

% cd $srcdir/examples/logserver/server
% ./assa-logd --daemon --mask=0
	  

Go to the directory where client code is. For the tests, you would need an ASCII data file. You can generate one with make-data utility. It generates ASCII data files by repetetively copying one of the source files found in current directory to achieve desirable size (in megabytes). See % make-data --help for details. When ASCII file is generated, we log it with log-client to the log server and then compare source and destination with log-diff.sh shell script.

% cd $srcdir/examples/logserver/client
% make-data --output-file 1Mb -n 1
% ./log-client --with-log-server --input-file=1Mb --mask=0x7fffffff
% ./log-diff.sh 1Mb
Files are the same