So - you want to build an ObjectQ application. Wise choice of platform!
You probably already know that ObjectQ provides you not only with the mechanism for moving information around, but also with a framework for building applications.
(You didn't? Here's some background information.)
You also know that there have to be two sides to a conversation, even though you're most likely only involved with one side. The two sides, of course, are the client and the server.
The client contains one or more managers, each of which will communicate with its alter ego, an agent, which lives within the server.
A simple client
Here's an example of the simplest possible client, which simply sends a request, and receives a response.
SimClient_c.c
#include <Args.h>
#include <stdlib.h>
#include "cpDSAP.h"
#include "cpTransMgr.h"
#include "cpInstId.h"
#include "cpMessage.h"
#include "cpGenMgr.h"
void
error(const String& errMsg, cpStatus ret)
{cerr << errMsg << " [" << (long)ret << "]" << endl;}
void
usage(const String& prog)
{
cerr << "Usage: " << prog
<< " -s <service-name> -d <definition-directory>" << endl;
}
main(int argc, char **argv)
{
cpStatus ret;
cpManagerResource resource;
cpHandle handle;
cpVendor vendor;
String serviceName;
String defDir;
int trLevel = -1;
// Get command line arguments
Args args(argc, argv, "s:d:x:");
if (args.isset('s')) serviceName = args.value('s');
if (args.isset('d')) defDir = args.value('d');
if (args.isset('x')) trLevel = atoi(args.value('x'));
if (serviceName.length() == 0) {
usage(argv[0]);
return 1;
}
// Initialize ObjectQ
if ((ret = cpInit(trLevel, defDir)) != cpSUCCESS) {
error("Initialization failed", ret);
return 2;
}
// Connect to the queueing engine
vendor = cpXportMgr->vendor(serviceName);
if ((ret = cpXportMgr->registerPrimary(vendor, handle)) != cpSUCCESS) {
error("Connection failed", ret);
return 3;
}
// Instantiate a manager for a synchronous request
resource.synchronous(cpTRUE);
cpGenericMgr mgr(resource, serviceName);
// Issue the request
List<cpAttribute> attrList;
cpInstanceId id("Hello world!");
if ((ret = mgr.get("Simple.demoClass", attrList, id)) != cpSUCCESS) {
error("Get failed", ret);
return 4;
}
else {
// Display the responses
const List<cpMessage*> *mlist = mgr.messages();
Const_listiter<cpMessage*> msgIter(*mlist);
cpMessage* respMsg;
while (msgIter.next(respMsg))
cout << "Response: <" << respMsg->instanceId() << "> " << endl;
}
// Disconnect from the queueing engine
cpXportMgr->unregisterPrimary(vendor);
return 0;
}
You wouldn't write code like this, of course. For one thing, there's no error checking. You can see a more complete version of the same thing, complete with all the appropriate error checks here.
But this is good enough for a simple example. The code is using a generic manager, which is a plain vanilla manager that has all the basic functionality of a customized manager, but none of the frills. We'll see later on how to extend this by subclassing.
A simple server
To complete the picture, we should look at the server, on the other side. Here it is, again stripped of error checking.
simServer_c.c
#include <Args.h>
#include <stdlib.h>
#include "cpTransMgr.h"
#include "cpEnvelope.h"
#include "simAgent.h"
void
error(const String& errMsg, cpStatus ret)
{
cerr << errMsg << " [" << (long)ret << "]" << endl;
}
void
usage(const String& prog)
{
cerr << "Usage: " << prog
<< " -s <service-name> -d <definition-directory>" << endl;
}
main(int argc, char **argv)
{
cpEnvelope env;
cpStatus ret;
cpResource res;
cpTransport *t;
String serviceName;
String defDir;
cpHandle handle;
int trLevel = -1;
// Get command line arguments
Args args(argc, argv, "s:d:x:");
if (args.isset('s')) serviceName = args.value('s');
if (args.isset('d')) defDir = args.value('d');
if (args.isset('x')) trLevel = atoi(args.value('x'));
if (serviceName.length() == 0) {
usage(argv[0]);
return 1;
}
// Initialize ObjectQ
if ((ret = cpInit(trLevel, defDir)) != cpSUCCESS) {
error("Initialization failed", ret);
return 2;
}
// Instantiate an agent
simAgent agent(serviceName);
// Connect to the queueing engine
if ((ret = cpXportMgr->registerPrimary(serviceName, handle)) != cpSUCCESS) {
error("Connection failed", ret);
return 3;
}
// Handle requests - wait for a message and dispatch it
for (;;) {
if ((t = cpXportMgr->transport(serviceName)) == 0) {
error("Get transport failed", cpFAIL);
return 4;
}
if ((ret = t->receive(cpWAIT, env)) != cpSUCCESS) {
error("Receive failed", ret);
return 5;
}
if ((ret = cpDispatch->dispatch(&env)) != cpSUCCESS) {
error("Dispatch failed", ret);
return 6;
}
}
}
The complete version, if you're interested, is here.
But hold on a minute - this code doesn't seem to do anything! It reads an incoming request, and then (we would guess) it dispatches it. But where to? A simple agent
No mystery here - it's dispatched to an agent. There's no such thing as a generic agent, because the kind of thing an agent does is too specialized. So we have to write our own (we didn't have to do that on the client side, because the kind ObjectQ folks did it for us).
Here's the agent:
simAgent_c.c
#include "cpTransMgr.h"
#include "simAgent.h"
extern void error(const String& errMsg, cpStatus ret);
// Constructors
simAgent::simAgent()
{
cpStatus ret;
if ((ret = cpDispatch->registerServer(AserviceName, this)) != cpSUCCESS)
error("Server registration failed", ret);
}
simAgent::simAgent(const String& s) : cpAgent()
{
AserviceName = s;
cpStatus ret;
if ((ret = cpDispatch->registerServer(AserviceName, this)) != cpSUCCESS)
error("Server registration failed", ret);
}
// Destructor
simAgent::~simAgent()
{
cpDispatch->unregisterServer(AserviceName, this);
}
// Mark the agent as available to handle the next request
cpStatus
simAgent::makeAvailable()
{
cpStatus ret;
if ((ret = cpDispatch->serverAvailable(AserviceName, this)) != cpSUCCESS)
error("Server availability failed", ret);
return ret;
}
// This function is invoked by the dispatcher when
// an incoming request is received
cpStatus
simAgent::requestHandler(cpEnvelope *env)
{
cpStatus ret;
cpResource res;
cpTransport *t;
cpGetReqMessage reqMsg;
// Pull out the next message
if ((ret = env->nextMessage(reqMsg)) != cpSUCCESS) {
error("Message retrieval failed", ret);
return ret;
}
// Display the instance name
cout << "Request: <" << reqMsg.instanceId() << "> " << endl;
// Construct a response message
cpGetRespMessage respMsg;
respMsg.instanceId("Goodbye World!");
// Put it into an envelope
env->clearMessages();
if ((ret = env->addMessage(respMsg)) != cpSUCCESS) {
(void)makeAvailable();
error("Message add failed", ret);
return ret;
}
env->last();
// Send it back
if ((t = cpXportMgr->transport(AserviceName)) == 0) {
(void)makeAvailable();
error("Get transport failed", cpFAIL);
return ret;
}
if ((ret = t->reply(*env)) != cpSUCCESS) {
(void)makeAvailable();
error("Reply failed", ret);
return ret;
}
// We're available again
(void)makeAvailable();
return cpSUCCESS;
}
And here's the real thing.
Asynchronous communications
The client above performs communications synchronously. That is, the call to get blocks until a response is received. While synchronous behavior is normal for a server, it is unusual for a client, which typically needs to service a user interface, and periodically check for incoming messages.
So what do we need to change to make our client behave asynchronously? Well, first note that the server stays exactly the same - it doesn't care whether the client is synchronous or asynchronous. Now, since the client needs to check for messages every now and then, it needs a polling routine. Here's an example of one:
cpStatus
poll(cpVendor vendor)
{
cpStatus ret;
cpTransport *xport;
cpEnvelope env;
// Get transport from Transport Manager
xport = cpXportMgr->transport(vendor);
// Process all messages on queue
while (1) {
ret = xport->receive(cpNOWAIT, env);
if (ret == cpFAIL) {
// Error
return cpFAIL;
} else if ((ret == cpSUCCESS) &&
(xport->lastRetCode() == cpNO_MESSAGES)) {
// No messages on the queue
return cpSUCCESS;
} else {
// Process message
ret = cpDispatch->dispatch(&env);
if (ret != cpSUCCESS)
return ret;
}
}
}
This function processes every message on the queue until either it hits an error condition, or there are no messages left to read. The "processing" is handled by a call to the Dispatcher, which is responsible for routing the response message to the manager that sent out the original request.
In order for the manager to communicate the results back the the client, an intermediate object, called a requestor object is used. The requestor object is subclassed from a special class cpObjectNotify that contains a virtual function called notifyComplete, and it is this function that is invoked by the manager when responses are received. So, when it wants to send out a request, the client no longer instantiates a manager; instead, it instantiates a requestor object (written by the application programmer) which instantiates the manager, and which also contains the notifyComplete function. Here's what the requestor object might look like:
#include <Map.h>
#include "cpGenMgr.h"
#include "cpMsgUtil.h"
#include "cpResource.h"
#include "simRequest.h"
Requestor::Requestor()
{
}
Requestor::Requestor(const cpManagerResource &mRes, const String &service) :
_resource(mRes), _service(service)
{
}
Requestor::~Requestor()
{
}
cpStatus
Requestor::get(const String &className,
const List<cpAttribute> &attrList,
const cpInstanceId &id)
{
cpStatus ret;
// Create a manager to handle the request/response
cpGenericMgr* mgr = new cpGenericMgr(_resource, this, _service);
// Issue the get asynchronously (ie, don't wait for response)
ret = mgr->get("Simple.demoClass", attrList, id);
return ret;
}
void
Requestor::notifyComplete(cpManager *mgr)
{
cpGenericMgr* gMgr = (cpGenericMgr*)mgr;
const List<cpMessage*>* mlist;
// The manager called us - retrieve messages
mlist = gMgr->messages();
if (mlist) {
Const_listiter<cpMessage*> msgIter(*mlist);
cpMessage* respMsg;
// Display the instance name for each response
while (msgIter.next(respMsg))
cout << "Response: <" << respMsg->instanceId() << "> " << endl;
}
// We don't need the manager any more
delete gMgr;
}
And the full (with error checking) version is here. The client that goes along with it looks like this (with differences from the synchronous client in red:
#include <Args.h>
#include <stdlib.h>
#include "cpDSAP.h"
#include "cpTransMgr.h"
#include "cpInstId.h"
#include "cpMessage.h"
#include "cpDispatch.h"
#include "simRequest.h"
cpStatus
poll(cpVendor vendor)
{
cpStatus ret;
cpTransport *xport;
cpEnvelope env;
// Get transport from Transport Manager
xport = cpXportMgr->transport(vendor);
// Process all messages on queue
while (1) {
ret = xport->receive(cpNOWAIT, env);
if (ret == cpFAIL) {
// Error
return cpFAIL;
} else if ((ret == cpSUCCESS) &&
(xport->lastRetCode() == cpNO_MESSAGES)) {
// No messages on the queue
return cpSUCCESS;
} else {
// Process message
ret = cpDispatch->dispatch(&env);
if (ret != cpSUCCESS)
return ret;
}
}
}
main(int argc, char **argv)
{
cpManagerResource resource;
cpHandle handle;
cpVendor vendor;
String serviceName;
String defDir;
int trLevel = -1;
// Get command line arguments
Args args(argc, argv, "s:d:x:");
if (args.isset('s')) serviceName = args.value('s');
if (args.isset('d')) defDir = args.value('d');
if (args.isset('x')) trLevel = atoi(args.value('x'));
// Initialize ObjectQ
cpInit(trLevel, defDir);
// Connect to the queueing engine
vendor = cpXportMgr->vendor(serviceName);
cpXportMgr->registerPrimary(vendor, handle);
// Instantiate a requestor for an asynchronous request
Requestor req(resource, serviceName);
// Issue the request
List<cpAttribute> attrList;
cpInstanceId id("Hello world!");
req.get("Simple.demoClass", attrList, id);
// Wait a while, then poll
sleep(10);
poll(vendor);
// Disconnect from the queueing engine
cpXportMgr->unregisterPrimary(vendor);
return 0;
}
And the full (with error checking) version is here.
Callbacks
If the idea of using functions with well-known names (like notifyComplete) offends you, you can call your "callback" functions by any name you choose. Of course, you have to communicate this information to the objects that need to use it. In the case of notifyComplete, suppose we wanted to call our function "foo" instead. The only changes necessary (shown in red below are in the requestor header:
simRequestCB_h.h
#ifndef simRequest_h
#define simRequest_h
#include <List.h>
#include "cpCommon.h"
#include "cpMsgUtil.h"
#include "cpResource.h"
#include "cpCallBack.h"
class Requestor: public cpObjectNotify{
public:
Requestor();
Requestor(const cpManagerResource &mRes, const String &service);
~Requestor();
void notifyComplete(cpManager *mgr);
cpStatus foo(cpManager *mgr);
cpStatus get(const String &className,
const List<cpAttribute> &attrList,
const cpInstanceId &id);
private:
cpManagerResource _resource;
String _service;
cpCallBackBaseP<cpManager>* _cb;
};
#endif
and body:
#include <Map.h>
#include "cpGenMgr.h"
#include "cpMsgUtil.h"
#include "cpResource.h"
#include "cpCallBack.h"
#include "simRequest.h"
Requestor::Requestor()
{
_cb = 0;
}
Requestor::Requestor(const cpManagerResource &mRes, const String &service) :
_resource(mRes), _service(service)
{
_cb = new cpCallBackP<Requestor,cpManager>(this, &Requestor::foo);
}
Requestor::~Requestor()
{
if (_cb)
delete _cb;
}
cpStatus
Requestor::get(const String &className,
const List<cpAttribute> &attrList,
const cpInstanceId &id)
{
cpStatus ret;
// Create a manager to handle the request/response
cpGenericMgr* mgr = new cpGenericMgr(_resource, this, _service);
// Issue the get asynchronously (ie, don't wait for response)
ret = mgr->get("Simple.demoClass", attrList, id);
return ret;
}
void
Requestor::notifyComplete(cpManager *mgr)
{
cpGenericMgr* gMgr = (cpGenericMgr*)mgr;
const List<cpMessage*>* mlist;
// The manager called us - retrieve messages
mlist = gMgr->messages();
if (mlist) {
Const_listiter<cpMessage*> msgIter(*mlist);
cpMessage* respMsg;
// Display the instance name for each response
while (msgIter.next(respMsg))
cout << "Response: <" << respMsg->instanceId() << "> " << endl;
}
// We don't need the manager any more
delete gMgr;
}
cpStatus
Requestor::foo(cpManager *mgr)
{
notifyComplete(mgr);
return cpSUCCESS;
}
(Note that our function foo just calls notifyComplete - yours can do anything it wants).
|