libASSA Programmer's Manual | ||
---|---|---|
<<< Previous | Chapter 2. Writing Network Applications | Next >>> |
For in-depth discussion of Connector pattern, please, refer to the article "Acceptor and Connector - A family of Object Creational Patterns for Initializing Communication Services" that appeared in the book [MartinBuschmannRiehle97].
Connector class allows to establish socket connection with a remote peer. Here are some of its features:
Because it is a parameterized class, it separates data communication code from connection establishing code cleanly.
Connector implements default connection strategy. It also permits to modify connection strategy through inheritance.
Because it can be configured to establish connection asynchronously, it permits an application program to establish very large number of connections with remote peers without disturbing main data processing flow.
Connector class is defined as a parameterized class of SERVICE_HANDLER and PEER_CONNECTOR.
Example 2-3. Class Connector definition
template<class SERVICE_HANDLER, class PEER_STREAM> class Connector : public EventHandler { public: virtual int open (); virtual int close (); virtual int connect (); // ... other methods }; |
SERVICE_HANDLER is a parameterized class derived from ServiceHandler class (see Section 2.2.1 for details).
To illustrate Connector class in action, we are going to modify assa-logmon to connect to assa-logd first synchronously and then asynchronously. Upon establishing the connection, we will send an arbitrary string for a test.
A synchronous connection establishment causes the requested application code to block until either the 1) connection is established or 2) connection attempt times out. This is the default behavior for Connector class. Your application code blocks until either condition occurs.
We are going to modify assa-logmon by adding Connector object to LogMon class, and introducing class LogConn to encapsulate a connection with assa-logd.
// logserver/monitor/LogMon.h ... 0033 #include <assa/Connector.h> 0034 #include <assa/IPv4Socket.h> 0035 0036 #include "LogConn.h" 0037 0038 class LogMon : 0039 public ASSA::GenServer, 0040 public ASSA::Singleton<LogMon> 0041 { 0042 public: 0043 ~LogMon () { if (m_peer) { delete m_peer; m_peer = NULL; } } ... 0051 private: 0052 typedef ASSA::Connector<LogConn, ASSA::IPv4Socket> log_connector_t; 0053 0054 int m_exit_value; // Return status of the process 0055 log_connector_t m_connector; 0056 LogConn* m_peer; }; |
Line 43 defines the destructor which deletes m_peer object if connection was established.
Lines 33-36 include necessary header files.
Line 52 defines a shortcut for concrete Connector class.
Line 55 adds connector that will be connecting LogConn with assa-logd.
Line 56 declares a pointer to our proxy class, LogConn that encapsulates a connection to assa-logd server.
// logserver/monitor/LogMon.cpp 0028 #include <assa/INETAddress.h> ... 0031 LogMon::LogMon () : m_exit_value (0), m_peer (NULL) 0032 { 0033 // ---Configuration--- 0034 rm_opt ('f', "config-file" ); 0035 rm_opt ('n', "instance" ); 0036 0037 // ---Process bookkeeping--- 0038 rm_opt ('b', "daemon" ); 0039 rm_opt ('l', "pidfile" ); 0040 rm_opt ('L', "ommit-pidfile"); 0041 m_ommit_pidflock_flag = true; 0042 0043 /*--- 0044 * By defauil disable all debugging 0045 *---*/ 0046 m_debug_mask = ASSA::APP | ASSA::ERROR; 0047 m_log_file = "LogMon.log"; 0048 0049 setPortName ("assalmon"); // ASSA logging server monitoring port 0050 } |
We trim (lines 34-41) command-line arguments list of assa-logmon by removing all irrelevant options. We also set (line 49) port to "assalmon" because that is where assa-logd is listening for incoming requests from monitors.
We establish connection with the server in initialization phase. If we fail, we set StopServer flag to abord data processing phase.
// logserver/monitor/LogMon.cpp 0053 void 0054 LogMon:: 0055 initServer () 0056 { 0057 trace("LogMon::initServer"); 0058 0059 m_peer = new LogConn; 0060 ASSA::INETAddress lmon_addr (getPortName ().c_str ()); 0061 0062 m_connector.open (); 0063 if (m_connector.connect (m_peer, lmon_addr) < 0) { 0064 DL((ASSA::ERROR,"Failed to connect to server\n")); 0065 setStopServerFlag (); 0066 return; 0067 } 0068 0069 DL((ASSA::APP,"Service has been initialized\n")); 0070 } |
On line 59 we instantiate a LogConn object.
On line 60 we create an address to contact assa-logd server's monitoring port.
On line 62 we configure Connector to connect synchronously with default timeout of 5 seconds.
We establish connection on line 63. If failed, we set StopServer flag and return.
We are ready to test our modifications. Recompile assa-logmon and try to run it. We expect it to fail because assa-logd does not have code in it to listen for connection requests on assalmon port yet.
% assa-logmon -d ... [GenServer::initInternals] | [Connector::connect] Bad address (errno 14) | [Connector::connect] errno: 14 "Bad address" [LogMon::initServer] Failed to connect to server |
As has been mentioned before, we encapsulate connection to assa-logd with LogConn class. We would also like this class to take the responsibility of monitoring console input from the user. For now, whatever you type in the console will be sent to the server. Later, we will add user commands to browse the list of available logging clients and select one of them to receive its messages.
LogConn is a child of ServiceHandler because Connector can only connect classes derived from ServiceHandler.
// logserver/monitor/LogConn.h #ifndef LOGCONN_H #define LOGCONN_H #include <assa/ServiceHandler.h> #include <assa/IPv4Socket.h> class LogConn : public ASSA::ServiceHandler<ASSA::IPv4Socket> { public: LogConn (); ~LogConn () { trace ("LogConn::~LogConn"); } virtual int open (); virtual int handle_read (int fd_); virtual int handle_close (int fd_); private: int process_user_request (ASSA::IPv4Socket& s_); private: char m_eor [2]; // end-of-record }; inline int LogConn::LogConn () { trace ("LogConn::LogConn"); m_eor [0] = 0xD; // ASCII CR (carrige return) m_eor [1] = 0xA; // ASCII LF (line feed) } |
Data member m_eor is an end-of-record marker that separates one message from another in the communication protocol between assa-logmon and assa-logd. It consists of two ASCII characters: carrige return followed by line feed. This way, you can communicate with assa-logd from telnet session as well as with assa-logmon.
When connection with the peer is established, we subscribe for READ_EVENT events from two sources: 1) assa-logd daemon and 2) console input from the user.
// logserver/monitor/LogConn.cpp #include <unistd.h> #include <iostream> #include "LogConn.h" #include "LogMon.h" // REACTOR declaration int LogConn:: open () { trace ("LogConn::open"); ASSA::IPv4Socket& s = *this; REACTOR->registerIOHandler (this, s.getHandler (), ASSA::READ_EVENT); REACTOR->registerIOHandler (this, STDIN_FILENO, ASSA::READ_EVENT); DL((ASSA::APP,"Connected to assa-logd\n")); return 0; } |
Typically, classes derived from ServiceHandler are self-managed. They are created on the heap and destroy themselves. However, in this case, LogConn is fully managed by LogMon (allocated on the heap in initServer and deallocated by the destructor).
// logserver/monitor/LogConn.cpp int LogConn::handle_close (int fd_) { trace ("LogConn::handle_close"); if (fd_ != STDIN_FILENO) { DL((ASSA::APP,"Disconnected from assa-logd\n")); } return 0; } |
We differentiate the input from STDIN and assa-logd by comparing the file descriptors. User input commands are handled in a separate function.
// logserver/monitor/LogConn.cpp int LogConn::handle_read (int fd_) { trace ("LogConn::handle_read"); ASSA::IPv4Socket& s = *this; if (fd_ == STDIN_FILENO) { return process_user_request (s); } int ret = 0; char buf [256]; if (s.getHandler () != fd_) { return -1; } if ((ret = s.read (buf, 256)) < 0) { DL((ASSA::ERROR,"Error reading from socket\n")); return (-1); } return s.good () ? s.in_avail () : -1; } |
And finally, we take an input from the user and send it over to assa-logd, terminating our message with [CR][LF] combination just like any telnet session would have done it.
// logserver/monitor/LogConn.cpp int LogConn::process_user_request (ASSA::IPv4Socket& s_) { trace("LogConn::process_user_request"); std::string input; getline (std::cin, input); s_.write (input.c_str (), input.length ()); s_.write (m_eor, 2); s_ << ASSA::flush; return 0; } |
Note that we have to flush the output socket stream at the end, because IPv4Socket is a buffered stream.
The following class diagram depicts class association in assa-logmon.
In order for assa-logmon to connect to assa-logd, the latter needs to support this type of connections. We are going to add the code to support connection establishment here and later on, we will add communication protocol between log server and its monitors.
We modify LogServer.h by adding acceptor for monitor clients:
// logserver/server/LogServer.h class MonitorConn; class LogServer ... { ... private: ASSA::Acceptor<MonitorConn, ASSA::IPv4Socket>* m_monitor_acceptor; }; |
Initalizing listening point for monitors is similar to that one of logging clients:
// logserver/server/LogServer.cpp 0021 ... 0022 #include "MonitorConn.h" ... 0033 LogServer:: 0034 LogServer () : 0035 m_exit_value (0), 0036 m_acceptor (NULL), 0037 m_monitor_acceptor (NULL) 0038 { ... 0070 } ... 0075 void LogServer::initServer () 0076 { 0077 trace("LogServer::initServer"); 0078 0079 m_acceptor = new ASSA::Acceptor<Conn, ASSA::IPv4Socket> (getReactor ()); 0080 0081 ASSA::INETAddress lport ("assalogd"); 0082 Assert_exit (!lport.bad ()); 0083 Assert_exit (m_acceptor->open (lport) == 0); 0084 0085 m_monitor_acceptor = 0086 new ASSA::Acceptor<Conn, ASSA::IPv4Socket> (getReactor ()); 0087 0088 ASSA::INETAddress lmport ("assalmon"); 0089 Assert_exit (!lmport.bad ()); 0090 Assert_exit (m_monitor_acceptor->open (lport) == 0); 0091 0092 DL((ASSA::APP,"Service has been initialized\n")); 0093 } |
Line 22 includes declaration of MonitorConn class.
Line 37 initializes pointer to Acceptor to NULL.
Lines 85-90 initialize listener endpoint on port assalmon.
Declaration and definition of class MonitorConn is similar to that of the class Conn. They, however have different functionality and later will be modified to recognize and drive different communication protocols with its peers.
// logserver/server/MonitorConn.h #ifndef MONITOR_CONN_H #define MONITOR_CONN_H #include <assa/ServiceHandler.h> #include <assa/IPv4Socket.h> using ASSA::ServiceHandler; using ASSA::IPv4Socket; class MonitorConn : public ServiceHandler<IPv4Socket> { public: MonitorConn (IPv4Socket* stream_); ~MonitorConn (); virtual int open (); virtual int handle_read (int fd_); virtual int handle_close (int /* fd */); }; #endif /* MONITOR_CONN_H */ |
Example 2-4. Simple implementation of MonitorConn class
// logserver/server/MonitorConn.cpp #include "MonitorConn.h" #include "LogServer-main.h" #include "LogServer.h" MonitorConn::MonitorConn (IPv4Socket* stream_) : ServiceHandler<IPv4Socket> (stream_) { trace ("MonitorConn::MonitorConn"); } MonitorConn::~MonitorConn () { trace ("MonitorConn::~MonitorConn"); } int MonitorConn::open () { trace("MonitorConn::open"); ASSA::IPv4Socket& s = *this; REACTOR->registerIOHandler (this, s.getHandler (), ASSA::READ_EVENT); return 0; } int MonitorConn::handle_read (int fd_) { trace("MonitorConn::handle_read"); ASSA::IPv4Socket& s = *this; if (s.getHandler () != fd_) { return (-1); } int ret = 0; char buf [256]; if ((ret = s.read (buf, 256)) < 0) { DL((ASSA::ERROR,"Error reading from socket\n")); return (-1); } if (ret > 0) { /* Process data received */ ASSA::MemDump::dump_to_log (ASSA::APP, "=> Got new message", buf, ret); } return s.eof () ? -1 : s.in_avail (); } int MonitorConn::handle_close (int /* fd */) { trace("MonitorConn::handle_close"); delete (this); return 0; } |
Here is modified class diagram for assa-logd server. We added MonitorConn class and pointed out the multiplicity of Acceptor class that became 2.
We are ready to test our monitor client. In one window, we start the daemon process and monitor its log file:
% assa-logd --daemon --port=assalogd % tail -f LogServer.log ... [LogServer::initServer] Service has been initialized |
From another terminal window, connect to the server with assa-logmon and type in an ASCII string to send to the server:
% assa-logmon Hey, there ... |
We should observer back in the server's window a trace indicating that a new message has been received on a monitoring connection (MonitorConn):
... [LogServer::initServer] Service has been initialized ... | | | | [MonitorConn::handle_read] (15 bytes) => Got new message | | | | [MonitorConn::handle_read] 4865 792c 2074 6865 7265 2e2e 2e0d 0a Hey, there...\r\n |
An asynchronous connectect operation does not block the requesting application code. Busy application programs that maintain multiple connections to multiple services might be better off performance-wise by establishing connections with their peers asynchronously. GUI client applications also fall in this category. They usually require the ability to monitor and control connection progress.
Connector class allows us to establish asynchronous connections. There are some extra steps an application code has to take in order to take advantage of this capability, namely:
Provide external event Reactor object.
Implement connection time out event handling.
Implement successful connection event handling.
After calling connect() method and checking for immediate errors, an application code can continue attending to other activities. When either connection is completed or an attempt to connect times out, there should be some mechanism to notify application code about an event in question. This is accomplished by using Reactor class. An object of type Reactor is expected by Connector configured to establish asynchronous connection.
We are going to modify assa-logmon to connect asynchronously to assa-logd server. We issue the connection request and then wait in the main event loop for either connection establishment notification or for connection timer to expire. LogConn::open() is called if we are connected, and LogConn::close() is called if connection timer expires.
// logserver/monitor/LogMon.cpp 0057 void LogMon::initServer () 0058 { 0059 trace("LogMon::initServer"); 0060 0061 m_peer = new LogConn; 0062 ASSA::INETAddress lmon_addr (getPortName ().c_str ()); 0063 0064 ASSA::TimeVal timeout (5.0); 0065 m_connector.open (timeout, 0066 ASSA::Connector<LogConn, ASSA::IPv4Socket>::async, 0067 REACTOR); 0068 0069 if (m_connector.connect (m_peer, lmon_addr) < 0) { 0070 DL((ASSA::ERROR,"Failed to connect to server\n")); 0071 setStopServerFlag (); 0072 return; 0073 } 0074 0075 DL((ASSA::APP,"Service has been initialized\n")); 0076 } |
Line 64 instantiates TimeVal object of 5 seconds connection timeout.
On line 65 open() configures Connector object to execute ASYNC connection, using our REACTOR as an external processing engine.
On line 69, if connect() fails right away, we stop the service and return.
If all goes well and connection is established some time later, Reactor object sends Connector object WRITE event notification. Connector object notifies ServiceHandler object about successfully completed connection by calling its open() member function.
Another possibility would be receiving an error or timeout while connecting. Either way, Connector object reports connection establishment error by calling ServiceHandler's virtual function close(). We have to add implementation of this function to LogConn class:
// logserver/monitor/LogConn.h class LogConn ... { public: ... virtual void close (); ... }; |
We implement this function by stopping the service:
// logserver/monitor/LogConn.cpp void LogConn::close () { trace ("LogConn::close"); DL((ASSA::APP, "Failed to connection to assa-logd!\n")); LOGMON->setStopServerFlag (); } |
If we compiled this modification and try to run assa-logmon without running assa-logd server, we would get the following error:
% assa-logmon -d ... [GenServer::initInternals] | | | [IPv4Socket::connect()] FD: 3 OS::connect() error | | | [IPv4Socket::connect()] errno: 115 "Operation now in progress" [LogMon::initServer] Service has been initialized | | | | [Connector::handle_write] Socket pending error: 111 | | | | [Connector::handle_write] errno: 0 "Success" | | | | [Connector::handle_write] Nonblocking connect (2) failed | | | | [Connector::handle_write] errno: 111 "Connection refused" | | | | [Connector::handle_write] Try to compare port numbers on client and service hosts. | | | | [Connector::handle_write] errno: 111 "Connection refused" | | | | | [LogConn::close] Failed to connection to assa-logd! [LogMon::processServer] Service stopped! |
As can be seen, initial connect failed with the error "Operation now in progress". Later we get notification of failure to connect via call to close().
<<< Previous | Home | Next >>> |
Accepting Connection Requests with Acceptor | Up | Data I/O Over TCP/IP Transport Layer |