2.3. Connecting with Connector

For in-depth discussion of Connector pattern, please, refer to the article "Acceptor and Connector - A family of Object Creational Patterns for Initializing Communication Services" that appeared in the book [MartinBuschmannRiehle97].

Connector class allows to establish socket connection with a remote peer. Here are some of its features:

  1. Because it is a parameterized class, it separates data communication code from connection establishing code cleanly.

  2. Connector implements default connection strategy. It also permits to modify connection strategy through inheritance.

  3. Because it can be configured to establish connection asynchronously, it permits an application program to establish very large number of connections with remote peers without disturbing main data processing flow.

2.3.1. Initialization Phase

Connector class is defined as a parameterized class of SERVICE_HANDLER and PEER_CONNECTOR.

Example 2-3. Class Connector definition


template<class SERVICE_HANDLER, class PEER_STREAM>
class Connector : public EventHandler
{
public:
    virtual int open ();
    virtual int close ();
    virtual int connect ();

    // ... other methods
};
		

SERVICE_HANDLER is a parameterized class derived from ServiceHandler class (see Section 2.2.1 for details).

To illustrate Connector class in action, we are going to modify assa-logmon to connect to assa-logd first synchronously and then asynchronously. Upon establishing the connection, we will send an arbitrary string for a test.

2.3.2. Connecting Synchronously

A synchronous connection establishment causes the requested application code to block until either the 1) connection is established or 2) connection attempt times out. This is the default behavior for Connector class. Your application code blocks until either condition occurs.

We are going to modify assa-logmon by adding Connector object to LogMon class, and introducing class LogConn to encapsulate a connection with assa-logd.


// logserver/monitor/LogMon.h

...
0033 #include <assa/Connector.h>
0034 #include <assa/IPv4Socket.h>
0035 
0036 #include "LogConn.h"
0037 
0038 class LogMon :
0039     public ASSA::GenServer,
0040     public ASSA::Singleton<LogMon>
0041 {
0042 public:
0043     ~LogMon () { if (m_peer) { delete m_peer; m_peer = NULL; } }
...
0051 private:
0052     typedef ASSA::Connector<LogConn, ASSA::IPv4Socket> log_connector_t;
0053 
0054     int m_exit_value;     // Return status of the process
0055     log_connector_t m_connector;
0056     LogConn* m_peer;
};
	  


// logserver/monitor/LogMon.cpp

0028 #include <assa/INETAddress.h>
...
0031 LogMon::LogMon () : m_exit_value (0), m_peer (NULL)
0032 {
0033     // ---Configuration---
0034     rm_opt ('f', "config-file"  );
0035     rm_opt ('n', "instance"     );
0036 
0037     // ---Process bookkeeping---
0038     rm_opt ('b', "daemon"       );
0039     rm_opt ('l', "pidfile"      );
0040     rm_opt ('L', "ommit-pidfile");
0041     m_ommit_pidflock_flag = true;
0042 
0043     /*---
0044      * By defauil disable all debugging
0045      *---*/
0046     m_debug_mask = ASSA::APP | ASSA::ERROR;
0047     m_log_file = "LogMon.log";
0048 
0049     setPortName ("assalmon");  // ASSA logging server monitoring port
0050 }
	  

We trim (lines 34-41) command-line arguments list of assa-logmon by removing all irrelevant options. We also set (line 49) port to "assalmon" because that is where assa-logd is listening for incoming requests from monitors.

We establish connection with the server in initialization phase. If we fail, we set StopServer flag to abord data processing phase.


// logserver/monitor/LogMon.cpp

0053 void
0054 LogMon::
0055 initServer ()
0056 {
0057     trace("LogMon::initServer");
0058 
0059     m_peer = new LogConn;
0060     ASSA::INETAddress lmon_addr (getPortName ().c_str ());
0061 
0062     m_connector.open ();
0063     if (m_connector.connect (m_peer, lmon_addr) < 0) {
0064         DL((ASSA::ERROR,"Failed to connect to server\n"));
0065         setStopServerFlag ();
0066         return;
0067     }
0068 
0069     DL((ASSA::APP,"Service has been initialized\n"));
0070 }
	  

We are ready to test our modifications. Recompile assa-logmon and try to run it. We expect it to fail because assa-logd does not have code in it to listen for connection requests on assalmon port yet.


% assa-logmon -d

...
[GenServer::initInternals] 
|  [Connector::connect] Bad address (errno 14)
|  [Connector::connect] errno: 14 "Bad address"
[LogMon::initServer] Failed to connect to server

	  

2.3.2.1. Adding LogConn Proxy Class

As has been mentioned before, we encapsulate connection to assa-logd with LogConn class. We would also like this class to take the responsibility of monitoring console input from the user. For now, whatever you type in the console will be sent to the server. Later, we will add user commands to browse the list of available logging clients and select one of them to receive its messages.

LogConn is a child of ServiceHandler because Connector can only connect classes derived from ServiceHandler.


// logserver/monitor/LogConn.h

#ifndef LOGCONN_H
#define LOGCONN_H

#include <assa/ServiceHandler.h>
#include <assa/IPv4Socket.h>

class LogConn : public ASSA::ServiceHandler<ASSA::IPv4Socket>
{
public:
	LogConn ();
	~LogConn () { trace ("LogConn::~LogConn"); }

	virtual int  open ();

	virtual int handle_read (int fd_);
	virtual int handle_close (int fd_);

private:
	int process_user_request (ASSA::IPv4Socket& s_);

private:
	char m_eor [2];	// end-of-record
};

inline int LogConn::LogConn ()
{
    trace ("LogConn::LogConn");

    m_eor [0] = 0xD;    // ASCII CR (carrige return)
    m_eor [1] = 0xA;    // ASCII LF (line feed)
}
		

Data member m_eor is an end-of-record marker that separates one message from another in the communication protocol between assa-logmon and assa-logd. It consists of two ASCII characters: carrige return followed by line feed. This way, you can communicate with assa-logd from telnet session as well as with assa-logmon.

When connection with the peer is established, we subscribe for READ_EVENT events from two sources: 1) assa-logd daemon and 2) console input from the user.


// logserver/monitor/LogConn.cpp

#include <unistd.h>
#include <iostream>

#include "LogConn.h"
#include "LogMon.h"				// REACTOR declaration

int
LogConn::
open ()
{
    trace ("LogConn::open");

    ASSA::IPv4Socket& s = *this;

    REACTOR->registerIOHandler (this, s.getHandler (), ASSA::READ_EVENT);
    REACTOR->registerIOHandler (this, STDIN_FILENO, ASSA::READ_EVENT);

    DL((ASSA::APP,"Connected to assa-logd\n"));
    return 0;
}
		

Typically, classes derived from ServiceHandler are self-managed. They are created on the heap and destroy themselves. However, in this case, LogConn is fully managed by LogMon (allocated on the heap in initServer and deallocated by the destructor).


// logserver/monitor/LogConn.cpp

int LogConn::handle_close (int fd_)
{
    trace ("LogConn::handle_close");

    if (fd_ != STDIN_FILENO) { 
    	DL((ASSA::APP,"Disconnected from assa-logd\n"));
    }
    return 0;
}
		

We differentiate the input from STDIN and assa-logd by comparing the file descriptors. User input commands are handled in a separate function.


// logserver/monitor/LogConn.cpp

int LogConn::handle_read (int fd_)
{
    trace ("LogConn::handle_read");
    ASSA::IPv4Socket& s = *this;

    if (fd_ == STDIN_FILENO) {
        return process_user_request (s);
    }

    int ret = 0;
    char buf [256];
    if (s.getHandler () != fd_) { return -1; }

    if ((ret = s.read (buf, 256)) < 0) {
        DL((ASSA::ERROR,"Error reading from socket\n"));
        return (-1);
    }

    return s.good () ? s.in_avail () : -1;
}
		

And finally, we take an input from the user and send it over to assa-logd, terminating our message with [CR][LF] combination just like any telnet session would have done it.


// logserver/monitor/LogConn.cpp

int LogConn::process_user_request (ASSA::IPv4Socket& s_)
{
	trace("LogConn::process_user_request");

	std::string input;
	getline (std::cin, input);
	s_.write (input.c_str (), input.length ());
	s_.write (m_eor, 2);
	s_ << ASSA::flush;
	return 0;
}
		

Note that we have to flush the output socket stream at the end, because IPv4Socket is a buffered stream.

The following class diagram depicts class association in assa-logmon.

Figure 2-7. assa-logmon Synchronous Connection

2.3.2.2. Adding Monitoring Support to assa-logd

In order for assa-logmon to connect to assa-logd, the latter needs to support this type of connections. We are going to add the code to support connection establishment here and later on, we will add communication protocol between log server and its monitors.

We modify LogServer.h by adding acceptor for monitor clients:


// logserver/server/LogServer.h

class MonitorConn;

class LogServer ...
{
    ...

private:
    ASSA::Acceptor<MonitorConn, ASSA::IPv4Socket>* m_monitor_acceptor;
};
		

Initalizing listening point for monitors is similar to that one of logging clients:


// logserver/server/LogServer.cpp

0021 ...
0022 #include "MonitorConn.h"
...

0033 LogServer::
0034 LogServer () : 
0035	m_exit_value (0),
0036	m_acceptor (NULL),
0037	m_monitor_acceptor (NULL)
0038 {
...
0070 }
...
0075 void LogServer::initServer ()
0076 {
0077     trace("LogServer::initServer");
0078 
0079     m_acceptor = 
               new ASSA::Acceptor<Conn, ASSA::IPv4Socket> (getReactor ());
0080 
0081     ASSA::INETAddress lport ("assalogd");
0082     Assert_exit (!lport.bad ());
0083     Assert_exit (m_acceptor->open (lport) == 0);
0084 
0085     m_monitor_acceptor =
0086         new ASSA::Acceptor<Conn, ASSA::IPv4Socket> (getReactor ());
0087 
0088     ASSA::INETAddress lmport ("assalmon");
0089     Assert_exit (!lmport.bad ());
0090     Assert_exit (m_monitor_acceptor->open (lport) == 0);
0091 
0092     DL((ASSA::APP,"Service has been initialized\n"));
0093 }
		

  • Line 22 includes declaration of MonitorConn class.

  • Line 37 initializes pointer to Acceptor to NULL.

  • Lines 85-90 initialize listener endpoint on port assalmon.

Declaration and definition of class MonitorConn is similar to that of the class Conn. They, however have different functionality and later will be modified to recognize and drive different communication protocols with its peers.


// logserver/server/MonitorConn.h

#ifndef MONITOR_CONN_H
#define MONITOR_CONN_H

#include <assa/ServiceHandler.h>
#include <assa/IPv4Socket.h>

using ASSA::ServiceHandler;
using ASSA::IPv4Socket;

class MonitorConn : public ServiceHandler<IPv4Socket>
{
public:
    MonitorConn (IPv4Socket* stream_);
    ~MonitorConn ();

    virtual int open ();

    virtual int handle_read (int fd_);
    virtual int handle_close (int /* fd */);
};

#endif /* MONITOR_CONN_H */
		

Example 2-4. Simple implementation of MonitorConn class


// logserver/server/MonitorConn.cpp

#include "MonitorConn.h"
#include "LogServer-main.h"
#include "LogServer.h"

MonitorConn::MonitorConn (IPv4Socket* stream_)
    : ServiceHandler<IPv4Socket> (stream_) 
{
    trace ("MonitorConn::MonitorConn");
}

MonitorConn::~MonitorConn ()
{
    trace ("MonitorConn::~MonitorConn");
}

int MonitorConn::open ()
{
    trace("MonitorConn::open");

    ASSA::IPv4Socket& s = *this;
    REACTOR->registerIOHandler (this, 
                                s.getHandler (), 
                                ASSA::READ_EVENT);
    return 0;
}

int MonitorConn::handle_read (int fd_)
{
    trace("MonitorConn::handle_read");

    ASSA::IPv4Socket& s = *this;
    if (s.getHandler () != fd_) { return (-1); }

    int ret = 0;
    char buf [256];

    if ((ret = s.read (buf, 256)) < 0) {
        DL((ASSA::ERROR,"Error reading from socket\n"));
        return (-1);
    }

    if (ret > 0) {
        /* Process data received */
        ASSA::MemDump::dump_to_log (ASSA::APP, "=> Got new message", buf, ret);
    }
    return s.eof () ? -1 : s.in_avail ();
}

int MonitorConn::handle_close (int /* fd */)
{
    trace("MonitorConn::handle_close");

    delete (this);
    return 0;
}
		  

Here is modified class diagram for assa-logd server. We added MonitorConn class and pointed out the multiplicity of Acceptor class that became 2.

Figure 2-8. assa-logd Class Diagram

2.3.2.3. Testing Synchronous Connection

We are ready to test our monitor client. In one window, we start the daemon process and monitor its log file:


% assa-logd --daemon --port=assalogd
% tail -f LogServer.log

...
[LogServer::initServer] Service has been initialized
		

From another terminal window, connect to the server with assa-logmon and type in an ASCII string to send to the server:


% assa-logmon

Hey, there ...
		

We should observer back in the server's window a trace indicating that a new message has been received on a monitoring connection (MonitorConn):


...
[LogServer::initServer] Service has been initialized
...
|  |  |  |  [MonitorConn::handle_read] (15 bytes) => Got new message
|  |  |  |  [MonitorConn::handle_read] 

4865 792c 2074 6865 7265 2e2e 2e0d 0a    Hey, there...\r\n
		

2.3.3. Connecting Asynchronously

An asynchronous connectect operation does not block the requesting application code. Busy application programs that maintain multiple connections to multiple services might be better off performance-wise by establishing connections with their peers asynchronously. GUI client applications also fall in this category. They usually require the ability to monitor and control connection progress.

Connector class allows us to establish asynchronous connections. There are some extra steps an application code has to take in order to take advantage of this capability, namely:

  1. Provide external event Reactor object.

  2. Implement connection time out event handling.

  3. Implement successful connection event handling.

After calling connect() method and checking for immediate errors, an application code can continue attending to other activities. When either connection is completed or an attempt to connect times out, there should be some mechanism to notify application code about an event in question. This is accomplished by using Reactor class. An object of type Reactor is expected by Connector configured to establish asynchronous connection.

We are going to modify assa-logmon to connect asynchronously to assa-logd server. We issue the connection request and then wait in the main event loop for either connection establishment notification or for connection timer to expire. LogConn::open() is called if we are connected, and LogConn::close() is called if connection timer expires.


// logserver/monitor/LogMon.cpp

0057 void LogMon::initServer ()
0058 {
0059     trace("LogMon::initServer");
0060 
0061     m_peer = new LogConn;
0062     ASSA::INETAddress lmon_addr (getPortName ().c_str ());
0063 
0064     ASSA::TimeVal timeout (5.0);
0065     m_connector.open (timeout,
0066                       ASSA::Connector<LogConn, ASSA::IPv4Socket>::async,
0067                       REACTOR);
0068 
0069     if (m_connector.connect (m_peer, lmon_addr) < 0) {
0070         DL((ASSA::ERROR,"Failed to connect to server\n"));
0071         setStopServerFlag ();
0072         return;
0073     }
0074 
0075     DL((ASSA::APP,"Service has been initialized\n"));
0076 }
	  

If all goes well and connection is established some time later, Reactor object sends Connector object WRITE event notification. Connector object notifies ServiceHandler object about successfully completed connection by calling its open() member function.

Another possibility would be receiving an error or timeout while connecting. Either way, Connector object reports connection establishment error by calling ServiceHandler's virtual function close(). We have to add implementation of this function to LogConn class:


// logserver/monitor/LogConn.h

class LogConn ...
{
public:
    ...
    virtual void close ();
    ...
};
	  

We implement this function by stopping the service:


// logserver/monitor/LogConn.cpp

void LogConn::close ()
{
    trace ("LogConn::close");
    DL((ASSA::APP, "Failed to connection to assa-logd!\n"));

    LOGMON->setStopServerFlag ();
}

	  

If we compiled this modification and try to run assa-logmon without running assa-logd server, we would get the following error:


% assa-logmon -d

...
[GenServer::initInternals] 
|  |  |  [IPv4Socket::connect()] FD: 3 OS::connect() error
|  |  |  [IPv4Socket::connect()] errno: 115 "Operation now in progress"
[LogMon::initServer] Service has been initialized
|  |  |  |  [Connector::handle_write] Socket pending error: 111
|  |  |  |  [Connector::handle_write] errno: 0 "Success"
|  |  |  |  [Connector::handle_write] Nonblocking connect (2) failed
|  |  |  |  [Connector::handle_write] errno: 111 "Connection refused"
|  |  |  |  [Connector::handle_write] Try to compare port numbers on client and service hosts.
|  |  |  |  [Connector::handle_write] errno: 111 "Connection refused"
|  |  |  |  |  [LogConn::close] Failed to connection to assa-logd!
[LogMon::processServer] Service stopped!

	  

As can be seen, initial connect failed with the error "Operation now in progress". Later we get notification of failure to connect via call to close().