3.2. Record-Oriented Stream Protocol

In this section we are going to describe and implement a record-oriented stream protocol between our monitor application, assa-logmon, and the server, assa-logd. A monitor application requests a list of client applications connected to the server. The server replies with the list. Monitor application, then, selects a client, and the server starts sending all logging messages of the selected client to the monitor program. At any time, assa-logmon can cancel the last commnad and select another client.

3.2.1. Communication Protocol

Each record (request or response) is an ASCII string, terminated by ASCII CR (carrige return) followed by ASCII LF (line feed) characters.

Client can send following commands:

  1. LIST - request for the list of connected logging clients.

  2. GET <client_name> - request logging messages of a particular client.

  3. STOP - terminate last GET command.

Server responds with the following messages:

  1. In response to LIST command, server sends a list of connected logging clients separated by semicolon. If there is no clients connected at present, an empty list terminated by [cr][lf] is returned instead.

    CLIENT1:CLIENT2:CLIENT3:...:CLIENTn[cr][lf]
    		  
  2. When logging client disconnects, assa-logmon is notified with disconnect message:

    DISCONNECTED[cr][lf]
    		  
  3. Each logging message is reflected to the assa-logmon as it is. All logging messages are typically terminated by the linefeed anyway. For example,

    [LogClient::processServer] //  This program is free software; you can[lf]
    		  

Here is the message flow between assa-logmon and assa-logd:

assa-logmon               assa-logd
-----------               ---------

LIST         ---------->
             <---------   CLIENT1:CLIENT2:CLIENT3:...:CLIENTn[cr][lf]
	  

GET CLIENTi  ---------->
             <---------   CLIENTi's log message:  MSG[cr][lf]
	  

STOP         ---------->  Stop sending CLIENTi's log messages
	  

                          Event: logging client disconnected ...
             <---------   DISCONNECTED[cr][lf]
	  

3.2.2. Server-Side Modifications

For the server to answer to the LIST request, it should be able to query all connected loggging clients. When new logging client connects to the server, its method Conn::open() is called. Here we register our new connection with the connection repository. MonitorConn uses this repository to list all available connections.

Figure 3-1. Repository to hold Conn references


// File: logserver/server/LogServer.h

0034 #include <assa/Repository.h>
...
0040 class LogServer :
0041  public ASSA::GenServer,
0042     public ASSA::Singleton<LogServer>
0043 {
0044 public:
0045     typedef ASSA::Repository<Conn> repo_type;
0046     typedef ASSA::Acceptor<Conn, ASSA::IPv4Socket> conn_accpt_type;
0047     typedef ASSA::Acceptor<MonitorConn, ASSA::IPv4Socket> mon_accpt_type;
0048 
0049 public:
...
0057     repo_type* get_repository () { return &m_repo; }
0058 
0059 private:
0060     int                    m_exit_value;
0061     repo_type                    m_repo;
0062     conn_accpt_type*         m_acceptor;
0063     mon_accpt_type*  m_monitor_acceptor;
0064 };
...
0070 #define REPO       LOGSERVER->get_repository()
	  

Conn class keeps a list of all MonitorConn objects connected to it as observers. Observers subscribe for the logging messages.

// File: logserver/server/Conn.h

0022 #include <assa/Repository.h>
...
0034 class Conn : public ServiceHandler<IPv4Socket>
0035 {
0036 public:
...
0046     void subscribe (MonitorConn* mc_) { m_observers.push_back (mc_); }
0047     void unsubscribe (MonitorConn* mc_) { m_observers.erase (mc_); }
...
0053 private:
...
0094     /// Repository of all connected MonitorConn observers
0095     ASSA::Repository<MonitorConn> m_observers;
0096 };
	  

// File: logserver/server/Conn.cpp

0036 int
0037 Conn::
0038 handle_close (int /* fd */)
0039 {
0040     trace_with_mask("Conn::handle_close", LSVRTRACE);
0041 
...
0045 	/** Disconnect from observers
0046     */
0047    if (m_observers.size ()) {
0048        ASSA::Repository<MonitorConn>::const_iterator cit;
0049        cit = m_observers.begin ();
0050        while (cit != m_observers.end ()) {
0051            (*cit++)->notify (NULL);
0052        }
0053        m_observers.clear ();
0054    }
0055    REPO->erase (this);
0056    delete (this);
0057    return 0;
0058 }


0060 int
0061 Conn::
0062 handle_read (int fd_)
0063 {
...
0083     if (m_wstate == wait_for_signon) {
...
0098         m_state = opened;
0098         m_wstate = wait_for_header;
0099         REPO->push_back (this); 
0100     }
...
0121     else if (m_wstate == wait_for_logmsg) {
0122         DL((LSVR,"=> Incoming LOG_MSG message\n"));
0123         Assert_exit (m_state == opened);
0124         std::string msg;
0125         if (get_stream () >> msg) {
...
0137             m_sink << msg << std::flush;
0138             if (m_observers.size ()) {
0139                 ASSA::Repository<MonitorConn>::const_iterator cit;
0140                 cit = m_observers.begin ();
0141                 while (cit != m_observers.end ()) {
0142                     (*cit++)->notify (msg.c_str ());
0143                 }
0144             }
0145         }
...
0154     }
0155 
0156     return get_stream ().eof () ? -1 : get_stream ().in_avail ();
0157 }
	  

We also have to modify MonitorConn class implementation to support commands from monitoring client. Command processing takes place in parse_record() member function.


// File: logserver/server/MonitorConn.h

0031 class MonitorConn : public ServiceHandler<IPv4Socket>
0032 {
0033 public:
...
0041     void notify (const char* msg_);
...
0043 private:
...
0046     void process_get_cmd (const std::string& token_);
0047     void process_list_cmd ();
...
0052     Conn* m_current_conn;
0053     char m_eor [2];             // End-of-Record
0054 };
	  


// File: logserver/server/MonitorConn.cpp

0017 #include <assa/CommonUtils.h>
0018 #include "MonitorConn.h"
...
0023 /** Convert user input to upper case
0024  */
0025 template <class T>
0026 class ToUpper {
0027 public:
0028     void operator ()(T& elem_) const { elem_ = ::toupper (elem_); }
0029 };
0030 
0031 MonitorConn::
0032 MonitorConn (IPv4Socket* stream_) :
0033     ServiceHandler<IPv4Socket> (stream_),
0034     m_iolen (0),
0035     m_current_conn (NULL)
0036 {
0037     trace ("MonitorConn::MonitorConn");
0038 
0039     m_eor [0] = 0xA;            // ASCII LF (line feed)
0040     m_eor [1] = 0xD;            // ASCII CR (carrige return)
0041 }
0043 MonitorConn::
0044 ~MonitorConn ()
0045 {
0046     trace ("MonitorConn::~MonitorConn");
0047     if (m_current_conn) {
0048         m_current_conn->unsubscribe (this);
0049     }
0050     /* no-op */
0051 }
	  

notify() is called by Conn object when it has a new log message for us to process. It is also called by Conn object's destructor when logging client closes connection with the server.


// File: logserver/server/MonitorConn.cpp

0076 void
0077 MonitorConn::
0078 notify (const char* msg_)
0079 {
0080     trace("MonitorConn::notify");
0081 
0082     ASSA::IPv4Socket& s = *this;
0083     static const char abort_msg [] = "DISCONNECTED\r\n";
0084 
0085     if (msg_ == NULL) {
0086         s.write (abort_msg, strlen (abort_msg));
0087         m_current_conn = NULL;
0088     }
0089     else {
0090         s.write (msg_, ::strlen (msg_));
0091     }
0092     s << ASSA::flush;
0093 }
	  


// File: logserver/server/MonitorConn.cpp

0132 void
0033 MonitorConn::
0034 parse_record ()
0035 {
0036     trace("MonitorConn::parse_record");
0037 
0038     DL((ASSA::APP,"=> Message from client:\n\n%s\n\n", m_iobuf));
0039     std::vector<std::string> tokens;
0040     ASSA::Utils::split (m_iobuf, tokens);
0041     for_each (tokens [0].begin (), tokens [0].end (), ToUpper<char> ());
0042 
0043     if (tokens [0] == "LIST") {
0044         process_list_cmd ();
0045     }
0046     else if (tokens [0] == "STOP") {
0047         m_current_conn->unsubscribe (this);
0048     }
0049     else if (tokens [0] == "GET") {
0050         process_get_cmd (tokens [1]);
0051     }
0052     else {
0053         DL((ASSA::APP,"Unknown command \"%s\"\n", m_iobuf));
0054     }
0055     m_iolen = 0;
0056 }
	  

We parse incoming command by splitting it into tokens and examining the first token. At the end (line 55), we "rewind" the receiving buffer.


// File: logserver/server/MonitorConn.cpp

0058 void
0059 MonitorConn::
0060 process_list_cmd ()
0061 {
0062     trace("MonitorConn::process_list_cmd");
0063 
0064     ASSA::IPv4Socket& s = *this;
0065     ASSA::Repository<Conn>::const_iterator cit = REPO->begin ();
0066     if (cit != REPO->end ()) {
0067         s.write ((*cit)->get_app_name().c_str(),
0068                  (*cit)->get_app_name().size ());
0069         cit++;
0070     }
0071     while (cit != REPO->end ()) {
0072         s.write (":", 1);
0073         s.write ((*cit)->get_app_name().c_str(),
0074                  (*cit)->get_app_name().size ());
0075         cit++;
0076     }
0077     s.write (m_eor, 2);
0078     s << ASSA::flush;
0079 }
	  

LIST command is processed by walking through repository of Conn objects and sending back a list of their names.


// File: logserver/server/MonitorConn.cpp

0081 void
0082 MonitorConn::
0083 process_get_cmd (const std::string& name_)
0084 {
0085     trace("MonitorConn::process_get_cmd");
0086 
0087     ASSA::Repository<Conn>::const_iterator cit = REPO->begin ();
0088     while (cit != REPO->end ()) {
0089         if ((*cit)->get_app_name() == name_) {
0090             m_current_conn = *cit;
0091             m_current_conn->subscribe (this);
0092             break;
0093         }
0094         cit++;
0095     }
0096 }
	  

When we receive GET command, first we determine if there is such a Conn object present in the repository (lines 87-95) and if so, subscribe to its logging messages traffic (line 91).

3.2.3. Testing Modifications

We are ready to test the modifications. Open an xterm window and go to the directory where the server code is. Start assalogd server:

% cd $srcdir/examples/logserver/server
% ./assa-logd --daemon --mask=0
	  

Go to the directory where the client code is. If needed, generate a test file with make-data utility. This time, we would like to have a 10 seconds delay between each logging message to make it more realistic.

% cd $srcdir/examples/logserver/client
% make-data --output-file 1Mb -n 1
% ./log-client --with-log-server --input-file=1Mb
 --mask=0x7fffffff --delay=10
	  

Go to the directory where the monitor client code is. Start the monitor, issue LIST command followed by GET command and verify that you receive logging messages from the client.

% cd $srcdir/examples/logserver/monitor
% assa-logmon
LIST
lt-log-client
GET lt-log-client
[LogClient::processServer] //  modify it under the terms of the GNU General
[LogClient::processServer] //  as published by the Free Software Foundation; 
[LogClient::processServer] //  2 of the License, or (at your option) any later
[LogClient::processServer] //-------------------------------------------------
[LogClient::processServer] //
[LogClient::processServer] // Date   : Fri Jul  4 13:55:47 2003
[LogClient::processServer] //

....

STOP
CTRL-C
%