libASSA Programmer's Manual | ||
---|---|---|
<<< Previous | Next >>> |
A client/server application framework is meaningless without some sort of communication protocol between a client and a server. There is a wide range of communication protocols, both public and proprietary, that we have to deal with. This chapter considers one way of implementing such protocols. We are going to implement: 1) a binary data stream protocol between libassa-based applications that log their messages to the assa-logd server and the server itself; 2) a record-oriented data stream protocol between log monitor, assa-logmon, and log server, assa-logd.
We begin with defining a communication protocol between Logger class on the client side and Conn class on the assa-logd server side. Logger class methods are called when an application program makes a call to DL((MASK,"format", arglist)) macro to log a message. When application program is configured to send all its logging traffic to the assa-logd server, each individual message is sent to the server as a binary data stream packet.
Before we can send a message, we have to establish connection with assa-logd. When the connection is established, we send a SIGN_ON message to identify our application and tell the server how to log our messages. Then, each time the application writes a log message, the message is formatted, XDR-encoded along with its attributes and is sent over to the server. The server XDR-decodes the message and writes it out to the log file. When an application exits, it sends a SIGN_OFF message to the server and closes its side of the connection. The server receives the SIGN_OFF message, closes the log file, and destroys the Conn object.
Each message we send to assa-logd starts with the HEADER of 12 bytes long:
Table 3-1. Header
Offset (in bytes) | Data | Type | Length | Description |
---|---|---|---|---|
0 | 1234567890 | int | 4 bytes | Message Start Signature |
4 | Message type | int | 4 bytes | 0 : SIGN_ON; 1 : SIGN_OFF; 2 : LOG_MSG |
8 | Message size | int | 4 bytes | Lenght of the message that follows (in bytes) |
We need Message Start Signature for a quick check to make sure our communication link stays in sync.
We choose the following format for the SIGN_ON message of 4+N+M bytes long:
Table 3-2. SIGN_ON Message
Offset (in bytes) | Data | Type | Length | Description |
---|---|---|---|---|
0 | MaxLogSize | int | 4 bytes | Maximum size of log file |
4 | AppName | std::string | N bytes | Application name (XDR-encoded) |
4+N | LogFileName | std::string | M bytes | LogFile name (XDR-encoded) |
STL string when XDR-encoded occupies more space. To find out the leght of your XDR-encoded string, call static method Socket::xdr_length(const string&). |
The client side is already implemented in the library. For testing purposes, we make log-client log every line of the ASCII input file. We are also going to examine the snippets of the code from the library to illustrate the way protocol is actually implemented.
We add a command-line option { --input-file NAME} to supply data file.
// File: logserver/client/LogClient.h 0033 class LogClient : 0034 public ASSA::GenServer, 0035 public ASSA::Singleton<LogClient> 0036 { 0037 public: ... 0046 private: ... 0048 std::string m_input_file; 0050 }; |
Line 48 declares new data member that holds file name.
// File: logserver/client/LogClient.cpp 0030 enum { LC = ASSA::APP }; 0031 0032 LogClient:: 0033 LogClient () : 0034 m_exit_value (0) 0035 { 0036 add_opt (0, "input-file", &m_input_file); 0037 0038 0039 // ---Configuration--- 0040 rm_opt ('f', "config-file" ); 0041 rm_opt ('p', "port" ); 0042 0043 // ---Process bookkeeping--- 0044 rm_opt ('b', "daemon" ); 0045 0046 /*--- 0047 * By defauil disable all debugging 0048 *---*/ 0049 m_debug_mask = LC; 0050 m_log_file = "LogClient.log"; 0051 } |
Line 36 adds new command-line argument
Lines 40-44 remove unnecessary arguments from the list.
Lines 49 sets debug mask to LC which is defined on line 30.
Line 50 sets log file to LogClient.log. Our log server writes messages from log-client into this file.
// File: logserver/client/LogClient.cpp 0062 void 0063 LogClient:: 0064 processServer () 0065 { 0066 trace("LogClient::processServer"); 0067 0068 const int size = 256; 0069 char line [size]; 0070 std::ifstream in_file; 0071 0072 in_file.open (m_input_file.c_str (), std::ios::in); 0073 if (!in_file) { 0074 std::cerr << "Failed to open input file \"" 0075 << m_input_file << "\"\n" 0076 << "Option \"--input-file=NAME\" is required\n"; 0077 } 0078 else { 0079 while (in_file) { 0080 in_file.getline (line, size, '\n'); 0081 DL((LC,"%s\n", line)); 0082 } 0083 in_file.close (); 0084 } 0085 0086 // Shut the service down 0087 m_reactor.stopReactor (); 0088 } |
There is nothing special going on here. We open an input file, read it line by line, and log each line as debug message as fast as we can.
At some point we are going to overflow the outgoing socket buffer. When that happens, we block in line 81, waiting for the log server to receive and log messages thus making more room available for us to write.
The point here to keep in mind is that if we log excessively to the logging server, our own process might slow down due to the blocking I/O we perform. |
The code that takes a log message from the application and sends it to the log server is encapsulated in RemoteLogger class of the library.
At some point during initialization, GenServer calls log_open() function of class Logger to establish connection with the log server:
// Name: assa/Logger.cpp 0077 int Logger:: 0078 log_open (const std::string& logsvraddr_, 0079 const char* logfname_, 0080 u_long groups_, 0081 u_long maxsize_, 0082 ASSA::Reactor* reactor_) 0083 { 0084 { 0085 ASSA::TimeVal tv (10.0); 0086 ASSA::INETAddress addr (logsvraddr_.c_str ()); 0087 if (addr.bad ()) { 0088 errno = EPERM; 0089 return -1; 0090 } 0091 ASSA::Connector <RemoteLogger, ASSA::IPv4Socket> log_connector; 0092 AutoPtr<RemoteLogger> lsp (new RemoteLogger); 0094 log_connector.open (tv); 0095 if (log_connector.connect (lsp.get (), addr) < 0) { 0096 delete m_impl; 0097 m_impl = NULL; 0098 return -1; 0099 } 0093 m_impl = lsp.release (); 0000 } 0001 int ret = m_impl->log_open (m_app_name.c_str (), logfname_, 0002 groups_, maxsize_, reactor_); 0003 return ret; 0004 } |
We establish connection with the server and then call log_open() function of RemoteLogger class:
// Name: assa/RemoteLogger.cpp 0045 int RemoteLogger:: 0046 log_open (const char* appname_, 0047 const char* logfname_, 0048 u_long groups_, 0049 u_long maxsize_, 0050 Reactor* reactor_) 0051 { 0052 if (m_recursive_call) { 0053 return 0; 0054 } 0055 m_recursive_call = true; 0056 0057 if (m_state == opened) { 0058 return 0; 0059 } 0060 m_logfname = logfname_; 0061 m_groups = groups_; 0062 m_reactor = reactor_; 0063 0064 m_reactor->registerIOHandler (this, get_stream ().getHandler(), 0065 ASSA::READ_EVENT); 0066 0067 /** Put stream in a blocking mode. Otherwise, fast clients can 0068 override log server. 0069 */ 0070 get_stream ().turnOptionOn (Socket::blocking); 0071 0072 /** Send SIGN_ON message to the log server. 0073 */ 0074 size_t len = sizeof (maxsize_) + 0075 Socket::xdr_length (appname_) + 0076 Socket::xdr_length (logfname_); 0077 0078 /** Header + Body 0079 */ 0080 get_stream () << 1234567890 << SIGN_ON << len 0081 << maxsize_ << appname_ << logfname_ << ASSA::flush; 0082 m_state = opened; 0083 m_recursive_call = false; 0084 return 0; 0085 } |
Lines 52-54 guard us against reccursive calls that otherwise will occur for we use parts of the library to establish connection and send data to the server. These parts themselves can log trace messages.
Line 70 puts the Socket in the blocking mode. Otherwise, we are going to overflow the outgoing buffer.
Lines 74-76 calculate length of the message that follows. This value goes into the protocol's header.
Lines 80-81 send the header of the SIGN_ON message and then the message itself. The stream is flushed so that the server can get it right away.
// Name: assa/RemoteLogger.cpp 0024 int RemoteLogger:: 0025 log_msg (Group groups_, 0026 size_t indent_level_, 0027 const string& func_name_, 0028 const char* fmt_, 0029 va_list msg_list_) 0030 { 0031 if (m_recursive_call) { 0032 return 0; 0033 } ... 0055 /** Header + body (preamble;LOG_MSG;length;msg) 0056 */ 0057 if (get_stream ()) { 0058 m_recursive_call = true; 0059 Assert_exit (os.str ().length () != 0); 0060 get_stream () << 1234567890 << LOG_MSG << Socket::xdr_length (os) 0061 << os.str () << ASSA::flush; 0062 m_recursive_call = false; 0063 } 0064 else { 0065 m_state = closed; 0066 } 0067 return 0; 0068 } |
Line 57 tests to see if stream is open. If it is, we are going to send the message which has already been formatted at this point.
Lines 60-61 send the header and then the message. Because ASCII strings are XDR-encoded, their transmission format adds housekeeping information to the string. We find out its encoded length with the help of static member function xdr_length() of class Socket.
We close connection with the server by sending SIGN_OFF message. The message size is 0.
// Name: assa/RemoteLogger.cpp 0088 int RemoteLogger:: 0089 log_close (void) 0090 { 0091 /** Send SIGN_OFF message to the log server and stop data processing 0092 * We are managed by Logger class. 0093 */ 0094 if (m_state == opened) { 0095 m_recursive_call = true; 0096 get_stream () << 1234567890 << SIGN_OFF << 0 << ASSA::flush; 0097 m_reactor->removeHandler (this, READ_EVENT); 0098 m_recursive_call = false; 0099 } 0000 return 0; 0001 } |
Our changes to the server are isolated to class Conn for it is in its responsibility to handle communication with clients.
// name: logserver/server/Conn.h 0031 class Conn : public ServiceHandler<IPv4Socket> 0032 { 0033 public: 0034 Conn (IPv4Socket* stream_); 0035 ~Conn (); 0036 0037 virtual int open (); 0038 0039 virtual int handle_read (int fd_); 0040 virtual int handle_close (int /* fd */); 0041 0042 private: 0043 std::string wstate () const; 0044 void shift_logfile (); 0045 0046 private: 0047 enum msg_t { 0048 SIGN_ON, /**< Login message */ 0049 SIGN_OFF, /**< Logout message */ 0050 LOG_MSG /**< Payload data */ 0051 }; 0052 enum wstate_t { 0053 wait_for_header, /**< Waiting for the message header */ 0054 wait_for_signon, /**< Waiting for SIGN_ON message */ 0055 wait_for_signoff, /**< Waiting for SIGN_OFF message */ 0056 wait_for_logmsg /**< Waiting for LOG_MSG message */ 0057 }; 0058 enum state_t { 0059 opened, 0060 closed 0061 }; 0062 0063 wstate_t m_wstate; 0064 state_t m_state; 0065 0066 /// Incoming message size 0067 int m_msg_size; 0068 0069 /// Incoming message type 0070 int m_msg_type; 0071 0072 /// Maximum logfile size can reach 0073 int m_maxsize; 0074 0075 /// Logging application name 0076 std::string m_app_name; 0077 0078 /// Logfile name 0079 std::string m_logfname; 0080 0081 /// Logfile stream 0082 std::ofstream m_sink; 0083 0084 /// Bytes written to the sink so far. 0085 u_long m_bytecount; 0086 }; |
Line 44 declares shift_logfile() member function responsible for renaming old log file and creating the new one.
Line 47-50 define message type. This is a clear candidate for generalization. We would probably want to isolate this information into a class LogHeader and share it between server and core library (client-side).
Lines 53-60 declare some rudimentary notion of state so that we know what to expect next.
// name: logserver/server/Conn.h 0092 inline 0093 Conn:: 0094 Conn (IPv4Socket* stream_) : 0095 ServiceHandler<IPv4Socket> (stream_), 0096 m_maxsize (1048576), 0097 m_bytecount (0), 0098 m_wstate (wait_for_header), 0099 m_state (closed) 0100 { 0101 trace_with_mask ("Conn::Conn",LSVRTRACE); 0102 /* no-op */ 0103 } ... 0112 inline std::string 0113 Conn:: 0114 wstate () const 0115 { 0116 return (m_wstate == wait_for_signon ? "wait_for_signon" : 0117 m_wstate == wait_for_signoff ? "wait_for_signoff" : 0118 "wait_for_logmsg"); 0119 } |
Lines 94-99 initialize data members in constructor. First we expect to the header; our state is closed.
Lines 114-119 verbally interpret object's state.
// name: logserver/server/Conn.cpp 0020 int 0021 Conn:: 0022 open () 0023 { 0024 trace_with_mask("Conn::open",LSVRTRACE); 0025 0026 REACTOR->registerIOHandler (this, get_stream ().getHandler (), 0027 ASSA::READ_EVENT); 0028 DL((LSVR,"+--------------------------------+\n")); 0029 DL((LSVR,"| Accepted new client connection |\n")); 0030 DL((LSVR,"+--------------------------------+\n")); 0031 return 0; 0032 } |
Lines 28-30 add a "session start" marker to help us identify in the log file when new connection was detected.
// name: logserver/server/Conn.cpp 0036 int 0037 Conn:: 0038 handle_read (int fd_) 0039 { 0040 trace_with_mask ("Conn::handle_read", LSVRTRACE); 0041 if (get_stream ().getHandler () != fd_) { 0042 return (-1); 0043 } 0044 0045 if (m_wstate == wait_for_header) { 0046 DL((LSVR,"=> Detected Header\n")); 0047 int preamble = 0; 0048 m_msg_type = m_msg_size = 0; 0049 0050 get_stream () >> preamble >> m_msg_type >> m_msg_size; 0051 0052 if (preamble != 1234567890) { 0053 DL((LSVRERROR,"Message stream is out of sync - Abort!\n")); 0054 return -1; 0055 } 0056 0057 DL((LSVR,"rcvd: Preamble = %d, Type = %d\n", preamble, m_msg_type)); 0058 DL((LSVR,"rcvd: Size = %d\n", m_msg_size)); 0059 0060 switch (m_msg_type) 0061 { 0062 case SIGN_ON: m_wstate = wait_for_signon; break; 0063 case SIGN_OFF: m_wstate = wait_for_signoff; break; 0064 case LOG_MSG: m_wstate = wait_for_logmsg; break; 0065 } 0066 } 0067 0068 if (m_wstate == wait_for_signon) { 0069 m_maxsize = 0; 0070 DL((LSVR,"=> Incoming SIGN_ON message\n")); 0071 Assert_exit (m_state == closed); 0072 get_stream () >> m_maxsize >> m_app_name >> m_logfname; 0073 DL((LSVR,"rcvd: MaxLogSize = %d, AppName = \"%s\"\n", 0074 m_maxsize, m_app_name.c_str ())); 0075 DL((LSVR,"rcvd: LogFileName = \"%s\"\n", m_logfname.c_str ())); 0076 0077 m_sink.open (m_logfname.c_str (), std::ios::out | std::ios::app); 0078 if (!m_sink) { 0079 DL((LSVRERROR,"m_sink.open (\"%s\",...) = -1\n")); 0080 return -1; 0081 } 0082 m_state = opened; 0083 m_wstate = wait_for_header; 0084 } 0085 else if (m_wstate == wait_for_signoff) { 0086 DL((LSVR,"=> Incoming SIGN_OFF message\n")); 0087 Assert_exit (m_state == opened); 0088 m_sink << std::flush; 0089 m_sink.close (); 0090 m_state = closed; 0091 return -1; 0092 } 0093 else if (m_wstate == wait_for_logmsg) { 0094 DL((LSVR,"=> Incoming LOG_MSG message\n")); 0095 Assert_exit (m_state == opened); 0096 std::string msg; 0097 if (get_stream () >> msg) { 0098 if (msg.length () != 0) { 0099 DL((LSVR,"rcvs message:\n%s\n%s%s\n", sep, msg.c_str (), sep)); 0100 m_bytecount += msg.length (); 0101 if (m_bytecount > m_maxsize) { 0102 shift_logfile (); 0103 } 0104 } 0105 else { 0106 DL((LSVR,"rcvs EMPTY message!\n")); 0107 Assert_exit (false); 0108 } 0109 m_sink << msg << std::flush; 0110 } 0111 else { 0112 DL((LSVRERROR,"Peer dropped connection!\n")); 0113 m_sink << std::flush; 0114 m_sink.close (); 0115 DL((LSVR,"m_bytecount = %d\n", m_bytecount)); 0116 DL((LSVR,"m_maxsize = %d\n", m_maxsize)); 0117 } 0118 m_wstate = wait_for_header; 0119 } 0120 0121 return get_stream ().eof () ? -1 : get_stream ().in_avail (); 0122 } |
Line 45 tests for the state to be wait_for_header. In this state we handle the Header.
Line 50 reads the header from the stream. The header is read in and XDR-decoded. We check for preamble to match an expected value in line 52.
Lines 60-64 switch the state engine into the next state depending on the message type we got from the header.
Line 72 reads in and XDR-decodes the SIGN_ON message. At this point, we open the log file (line 77) and change our state to wait_for_header.
Line 85 handles the SIGN_OFF message. Remember that this message has no body. We close the log file, set our state to closed, and terminate the data processing for this client by returning -1 to the Reactor.
Line 93 starts processing actual log messages. We read and XDR-decode std::string message on line 97.
Lines 100-103 count the number of characters received and when the maximum allowed size is reached, open a fresh log file.
Line 109 writes the message to the log file.
Lines 111-117 handle the case when connection between the server and the client has been broken.
Line 118 switches the state engine back to waiting for the next header.
// name: logserver/server/Conn.cpp 0124 int 0125 Conn:: 0126 handle_close (int /* fd */) 0127 { 0128 trace_with_mask("Conn::handle_close", LSVRTRACE); 0129 0130 DL((LSVR,"+---------------------+\n")); 0131 DL((LSVR,"| Client disconnected |\n")); 0132 DL((LSVR,"+---------------------+\n")); 0133 delete (this); 0134 return 0; 0135 } |
Lines 130-132 mark the end of the session with the client (for debugging purposes).
// name: logserver/server/Conn.cpp 0137 void 0138 Conn:: 0139 shift_logfile () 0140 { 0141 trace_with_mask("Conn::shift_logfile", LSVRTRACE); 0142 0143 m_sink << std::flush; 0144 m_sink.close (); 0145 m_bytecount = 0; 0146 std::string oldfile = m_logfname + ".0"; 0147 ::unlink (oldfile.c_str ()); 0148 ::rename (m_logfname.c_str (), oldfile.c_str ()); 0149 0150 m_sink.open (m_logfname.c_str (), std::ios::out | std::ios::app); 0151 if (!m_sink) { 0152 DL((LSVRERROR,"m_sink.open (\"%s\",...) = -1\n", m_logfname.c_str ())); 0153 } 0154 } |
Lines 143-148 close and rename current log file to the one with .0 extension.
Line 150 opens fresh log file.
We are ready to test the modifications. Open an xterm window and go to the directory where the server code is. Start assalogd server:
% cd $srcdir/examples/logserver/server % ./assa-logd --daemon --mask=0 |
Go to the directory where client code is. For the tests, you would need an ASCII data file. You can generate one with make-data utility. It generates ASCII data files by repetetively copying one of the source files found in current directory to achieve desirable size (in megabytes). See % make-data --help for details. When ASCII file is generated, we log it with log-client to the log server and then compare source and destination with log-diff.sh shell script.
% cd $srcdir/examples/logserver/client % make-data --output-file 1Mb -n 1 % ./log-client --with-log-server --input-file=1Mb --mask=0x7fffffff % ./log-diff.sh 1Mb Files are the same |
<<< Previous | Home | Next >>> |
Reactor Event Loop | Record-Oriented Stream Protocol |