libASSA Programmer's Manual | ||
---|---|---|
<<< Previous | Chapter 3. Implementing Communication Protocols | Next >>> |
In this section we are going to describe and implement a record-oriented stream protocol between our monitor application, assa-logmon, and the server, assa-logd. A monitor application requests a list of client applications connected to the server. The server replies with the list. Monitor application, then, selects a client, and the server starts sending all logging messages of the selected client to the monitor program. At any time, assa-logmon can cancel the last commnad and select another client.
Each record (request or response) is an ASCII string, terminated by ASCII CR (carrige return) followed by ASCII LF (line feed) characters.
Client can send following commands:
LIST - request for the list of connected logging clients.
GET <client_name> - request logging messages of a particular client.
STOP - terminate last GET command.
Server responds with the following messages:
In response to LIST command, server sends a list of connected logging clients separated by semicolon. If there is no clients connected at present, an empty list terminated by [cr][lf] is returned instead.
CLIENT1:CLIENT2:CLIENT3:...:CLIENTn[cr][lf] |
When logging client disconnects, assa-logmon is notified with disconnect message:
DISCONNECTED[cr][lf] |
Each logging message is reflected to the assa-logmon as it is. All logging messages are typically terminated by the linefeed anyway. For example,
[LogClient::processServer] // This program is free software; you can[lf] |
Here is the message flow between assa-logmon and assa-logd:
assa-logmon assa-logd ----------- --------- LIST ----------> <--------- CLIENT1:CLIENT2:CLIENT3:...:CLIENTn[cr][lf] |
GET CLIENTi ----------> <--------- CLIENTi's log message: MSG[cr][lf] |
STOP ----------> Stop sending CLIENTi's log messages |
Event: logging client disconnected ... <--------- DISCONNECTED[cr][lf] |
For the server to answer to the LIST request, it should be able to query all connected loggging clients. When new logging client connects to the server, its method Conn::open() is called. Here we register our new connection with the connection repository. MonitorConn uses this repository to list all available connections.
// File: logserver/server/LogServer.h 0034 #include <assa/Repository.h> ... 0040 class LogServer : 0041 public ASSA::GenServer, 0042 public ASSA::Singleton<LogServer> 0043 { 0044 public: 0045 typedef ASSA::Repository<Conn> repo_type; 0046 typedef ASSA::Acceptor<Conn, ASSA::IPv4Socket> conn_accpt_type; 0047 typedef ASSA::Acceptor<MonitorConn, ASSA::IPv4Socket> mon_accpt_type; 0048 0049 public: ... 0057 repo_type* get_repository () { return &m_repo; } 0058 0059 private: 0060 int m_exit_value; 0061 repo_type m_repo; 0062 conn_accpt_type* m_acceptor; 0063 mon_accpt_type* m_monitor_acceptor; 0064 }; ... 0070 #define REPO LOGSERVER->get_repository() |
Line 57 returns a reference to the repository
Line 61 declares a repository member function.
Line 70 defines a convenience marco to the a hold of the repository from anywhere in the code.
Conn class keeps a list of all MonitorConn objects connected to it as observers. Observers subscribe for the logging messages.
// File: logserver/server/Conn.h 0022 #include <assa/Repository.h> ... 0034 class Conn : public ServiceHandler<IPv4Socket> 0035 { 0036 public: ... 0046 void subscribe (MonitorConn* mc_) { m_observers.push_back (mc_); } 0047 void unsubscribe (MonitorConn* mc_) { m_observers.erase (mc_); } ... 0053 private: ... 0094 /// Repository of all connected MonitorConn observers 0095 ASSA::Repository<MonitorConn> m_observers; 0096 }; |
// File: logserver/server/Conn.cpp 0036 int 0037 Conn:: 0038 handle_close (int /* fd */) 0039 { 0040 trace_with_mask("Conn::handle_close", LSVRTRACE); 0041 ... 0045 /** Disconnect from observers 0046 */ 0047 if (m_observers.size ()) { 0048 ASSA::Repository<MonitorConn>::const_iterator cit; 0049 cit = m_observers.begin (); 0050 while (cit != m_observers.end ()) { 0051 (*cit++)->notify (NULL); 0052 } 0053 m_observers.clear (); 0054 } 0055 REPO->erase (this); 0056 delete (this); 0057 return 0; 0058 } 0060 int 0061 Conn:: 0062 handle_read (int fd_) 0063 { ... 0083 if (m_wstate == wait_for_signon) { ... 0098 m_state = opened; 0098 m_wstate = wait_for_header; 0099 REPO->push_back (this); 0100 } ... 0121 else if (m_wstate == wait_for_logmsg) { 0122 DL((LSVR,"=> Incoming LOG_MSG message\n")); 0123 Assert_exit (m_state == opened); 0124 std::string msg; 0125 if (get_stream () >> msg) { ... 0137 m_sink << msg << std::flush; 0138 if (m_observers.size ()) { 0139 ASSA::Repository<MonitorConn>::const_iterator cit; 0140 cit = m_observers.begin (); 0141 while (cit != m_observers.end ()) { 0142 (*cit++)->notify (msg.c_str ()); 0143 } 0144 } 0145 } ... 0154 } 0155 0156 return get_stream ().eof () ? -1 : get_stream ().in_avail (); 0157 } |
Lines 47-54 disconnect Conn object from all of its observers.
Line 55 remove Conn object from global repository of all logging client connections.
On line 99, Conn object signs up with the repository when connection is established,
When logging message arrives, it is written to the log file (line 137). If there are any observers connected to the Conn object, each of them is notified with the logging message (lines 138-143).
We also have to modify MonitorConn class implementation to support commands from monitoring client. Command processing takes place in parse_record() member function.
// File: logserver/server/MonitorConn.h 0031 class MonitorConn : public ServiceHandler<IPv4Socket> 0032 { 0033 public: ... 0041 void notify (const char* msg_); ... 0043 private: ... 0046 void process_get_cmd (const std::string& token_); 0047 void process_list_cmd (); ... 0052 Conn* m_current_conn; 0053 char m_eor [2]; // End-of-Record 0054 }; |
Line 41 - method notify() is used by Conn object to notify about logging events.
Lines 46-47 declare two new methods for processing commands.
Line 52 declares a pointer to the Conn object we would like to observe.
Line 53 declares end-of-record characters.
// File: logserver/server/MonitorConn.cpp 0017 #include <assa/CommonUtils.h> 0018 #include "MonitorConn.h" ... 0023 /** Convert user input to upper case 0024 */ 0025 template <class T> 0026 class ToUpper { 0027 public: 0028 void operator ()(T& elem_) const { elem_ = ::toupper (elem_); } 0029 }; 0030 0031 MonitorConn:: 0032 MonitorConn (IPv4Socket* stream_) : 0033 ServiceHandler<IPv4Socket> (stream_), 0034 m_iolen (0), 0035 m_current_conn (NULL) 0036 { 0037 trace ("MonitorConn::MonitorConn"); 0038 0039 m_eor [0] = 0xA; // ASCII LF (line feed) 0040 m_eor [1] = 0xD; // ASCII CR (carrige return) 0041 } 0043 MonitorConn:: 0044 ~MonitorConn () 0045 { 0046 trace ("MonitorConn::~MonitorConn"); 0047 if (m_current_conn) { 0048 m_current_conn->unsubscribe (this); 0049 } 0050 /* no-op */ 0051 } |
Lines 25-29 define a functor to convert STL string to upper case.
Lines 39-40 initialize End-Of-Record marker.
Lines 47-49 disconnect this observer from Conn object upon destruction, if we are connected to one.
notify() is called by Conn object when it has a new log message for us to process. It is also called by Conn object's destructor when logging client closes connection with the server.
// File: logserver/server/MonitorConn.cpp 0076 void 0077 MonitorConn:: 0078 notify (const char* msg_) 0079 { 0080 trace("MonitorConn::notify"); 0081 0082 ASSA::IPv4Socket& s = *this; 0083 static const char abort_msg [] = "DISCONNECTED\r\n"; 0084 0085 if (msg_ == NULL) { 0086 s.write (abort_msg, strlen (abort_msg)); 0087 m_current_conn = NULL; 0088 } 0089 else { 0090 s.write (msg_, ::strlen (msg_)); 0091 } 0092 s << ASSA::flush; 0093 } |
Lines 85-87 take care of the closure case.
Otherwise, on line 90 we send message to the assa-logmon.
// File: logserver/server/MonitorConn.cpp 0132 void 0033 MonitorConn:: 0034 parse_record () 0035 { 0036 trace("MonitorConn::parse_record"); 0037 0038 DL((ASSA::APP,"=> Message from client:\n\n%s\n\n", m_iobuf)); 0039 std::vector<std::string> tokens; 0040 ASSA::Utils::split (m_iobuf, tokens); 0041 for_each (tokens [0].begin (), tokens [0].end (), ToUpper<char> ()); 0042 0043 if (tokens [0] == "LIST") { 0044 process_list_cmd (); 0045 } 0046 else if (tokens [0] == "STOP") { 0047 m_current_conn->unsubscribe (this); 0048 } 0049 else if (tokens [0] == "GET") { 0050 process_get_cmd (tokens [1]); 0051 } 0052 else { 0053 DL((ASSA::APP,"Unknown command \"%s\"\n", m_iobuf)); 0054 } 0055 m_iolen = 0; 0056 } |
We parse incoming command by splitting it into tokens and examining the first token. At the end (line 55), we "rewind" the receiving buffer.
// File: logserver/server/MonitorConn.cpp 0058 void 0059 MonitorConn:: 0060 process_list_cmd () 0061 { 0062 trace("MonitorConn::process_list_cmd"); 0063 0064 ASSA::IPv4Socket& s = *this; 0065 ASSA::Repository<Conn>::const_iterator cit = REPO->begin (); 0066 if (cit != REPO->end ()) { 0067 s.write ((*cit)->get_app_name().c_str(), 0068 (*cit)->get_app_name().size ()); 0069 cit++; 0070 } 0071 while (cit != REPO->end ()) { 0072 s.write (":", 1); 0073 s.write ((*cit)->get_app_name().c_str(), 0074 (*cit)->get_app_name().size ()); 0075 cit++; 0076 } 0077 s.write (m_eor, 2); 0078 s << ASSA::flush; 0079 } |
LIST command is processed by walking through repository of Conn objects and sending back a list of their names.
// File: logserver/server/MonitorConn.cpp 0081 void 0082 MonitorConn:: 0083 process_get_cmd (const std::string& name_) 0084 { 0085 trace("MonitorConn::process_get_cmd"); 0086 0087 ASSA::Repository<Conn>::const_iterator cit = REPO->begin (); 0088 while (cit != REPO->end ()) { 0089 if ((*cit)->get_app_name() == name_) { 0090 m_current_conn = *cit; 0091 m_current_conn->subscribe (this); 0092 break; 0093 } 0094 cit++; 0095 } 0096 } |
When we receive GET command, first we determine if there is such a Conn object present in the repository (lines 87-95) and if so, subscribe to its logging messages traffic (line 91).
We are ready to test the modifications. Open an xterm window and go to the directory where the server code is. Start assalogd server:
% cd $srcdir/examples/logserver/server % ./assa-logd --daemon --mask=0 |
Go to the directory where the client code is. If needed, generate a test file with make-data utility. This time, we would like to have a 10 seconds delay between each logging message to make it more realistic.
% cd $srcdir/examples/logserver/client % make-data --output-file 1Mb -n 1 % ./log-client --with-log-server --input-file=1Mb --mask=0x7fffffff --delay=10 |
Go to the directory where the monitor client code is. Start the monitor, issue LIST command followed by GET command and verify that you receive logging messages from the client.
% cd $srcdir/examples/logserver/monitor % assa-logmon LIST lt-log-client GET lt-log-client [LogClient::processServer] // modify it under the terms of the GNU General [LogClient::processServer] // as published by the Free Software Foundation; [LogClient::processServer] // 2 of the License, or (at your option) any later [LogClient::processServer] //------------------------------------------------- [LogClient::processServer] // [LogClient::processServer] // Date : Fri Jul 4 13:55:47 2003 [LogClient::processServer] // .... STOP CTRL-C % |
<<< Previous | Home | Next >>> |
Implementing Communication Protocols | Up | Summary |