2.3. Create a Consumer Client
Overview
This section describes how to create a consumer client of the notification broker. The consumer client subscribes to a particular topic and creates a callback service, which is capable of receiving messages directly from the broker.
Sample consumer client code
Example 2.2, “Consumer Client Code” shows the code for a sample consumer client that subscribes to messages published on the
MyTopic
topic.
Example 2.2. Consumer Client Code
// Java package org.jboss.fuse.example.wsn.consumer.client; import javax.xml.bind.JAXBElement; import javax.xml.namespace.QName; import org.w3c.dom.Element; import org.apache.cxf.wsn.client.Consumer; import org.apache.cxf.wsn.client.NotificationBroker; import org.apache.cxf.wsn.client.Subscription; import org.oasis_open.docs.wsn.b_2.NotificationMessageHolderType; /** * */ public final class Client { private Client() { //not constructed } /** * @param args */ public static void main(String[] args) throws Exception { String wsnPort = "8182"; if (args.length > 0) { wsnPort = args[0]; } // Start a consumer that will listen for notification messages // We'll just print the text content out for now. Consumer consumer = new Consumer(new Consumer.Callback() { public void notify(NotificationMessageHolderType message) { Object o = message.getMessage().getAny(); System.out.println(message.getMessage().getAny()); if (o instanceof Element) { System.out.println(((Element)o).getTextContent()); } } }, "http://localhost:9001/MyConsumer"); // Create a subscription for a Topic on the broker NotificationBroker notificationBroker = new NotificationBroker("http://localhost:" + wsnPort + "/wsn/NotificationBroker"); Subscription subscription = notificationBroker.subscribe(consumer, "MyTopic"); // Just sleep for a bit to pick up some incoming messages Thread.sleep(60000); // Cleanup and exit subscription.unsubscribe(); consumer.stop(); System.exit(0); } }
Creating a consumer callback object
In order to receive notification messages from the notification broker, you must create a consumer callback object to receive the messages. The consumer callback object is in fact a Web service which is embedded in your client. The easiest way to create the consumer callback is to use the
org.apache.cxf.wsn.client.Consumer
class from the simplified client API, which enables you to define a callback as follows:
// Start a consumer that will listen for notification messages // We'll just print the text content out for now. Consumer consumer = new Consumer(new Consumer.Callback() { public void notify(NotificationMessageHolderType message) { Object o = message.getMessage().getAny(); System.out.println(message.getMessage().getAny()); if (o instanceof Element) { System.out.println(((Element)o).getTextContent()); } } }, "http://localhost:9001/MyConsumer");
The first argument to the
Consumer
constructor is a reference to the consumer callback object, which is defined inline. The second argument specifies the URL of the consumer callback endpoint, which can receive messages from the notification broker.
Subscribing to a topic
To start receiving messages, you must subscribe the consumer to a topic in the notification broker. To create a subscription, invoke the following
subscribe
method on the NotificationBroker
proxy object:
Subscription subscribe(Referencable consumer, String topic)
The first argument is a reference to a
Consumer
object (which is capable of returning a WS-Addressing endpoint reference to the consumer callback through the Referencable.getEpr()
method). The second argument is the name of the topic you want to subscribe to.
The return value is a reference to a
Subscription
object, which you can use to manage the subscription (for example, pause, resume, or unsubscribe).
Threading in the consumer client
Because the consumer client has an embedded Web service (the consumer callback object), which automatically starts in a background thread, it is necessary to manage threading in this sample client. In particular, after creating the subscription, you need to put the main thread to sleep (by calling
Thread.sleep(60000)
), so that the thread context can switch to the background thread, where the callback Web service is running. This makes it possible for the consumer callback to receive some messages.
Steps to create a consumer client
Perform the following steps to create a consumer client:
- Use the
archetype:generate
goal to invoke theservicemix-cxf-code-first-osgi-bundle
archetype. Under thewsn
directory, invoke the Maven archetype as follows:mvn archetype:generate \ -DarchetypeGroupId=org.apache.servicemix.tooling \ -DarchetypeArtifactId=servicemix-cxf-code-first-osgi-bundle \ -DarchetypeVersion=2013.01.0.redhat-610379 \ -DgroupId=org.jboss.fuse.example \ -DartifactId=wsn-consumer \ -Dversion=1.0-SNAPSHOT \ -Dpackage=org.jboss.fuse.example.wsn.consumer
NoteThe backslash characters at the end of each line are effective as line-continuation characters on UNIX and LINUX platforms. If you are entering the command on a Windows platform, however, you must enter the entire command on a single line.You will be prompted to confirm the project settings, with a message similar to this one:Confirm properties configuration: groupId: org.jboss.fuse.example artifactId: wsn-consumer version: 1.0-SNAPSHOT package: org.jboss.fuse.example.wsn.consumer Y: :
Type Return to accept the settings and generate the project. When the command finishes, you should find a new Maven project in thewsn/wsn-consumer
directory. - Some of the generated project files are not needed for this tutorial. Under the
wsn/wsn-consumer
directory, delete the following files and directories:src/main/resources/META-INF/spring/beans.xml src/main/java/org/jboss/fuse/example/wsn/consumer/Person.java src/main/java/org/jboss/fuse/example/wsn/consumer/PersonImpl.java src/main/java/org/jboss/fuse/example/wsn/consumer/UnknownPersonFault.java src/main/java/org/jboss/fuse/example/wsn/consumer/types
- Edit the
pom.xml
file in thewsn-consumer
directory, and add the following dependencies, as required by the consumer client:<?xml version="1.0" encoding="UTF-8"?> <project ...> ... <dependencies> ... <!-- Needed for HTTP callback endpoint --> <dependency> <groupId>org.apache.cxf</groupId> <artifactId>cxf-rt-transports-http-jetty</artifactId> <version>2.7.0.redhat-610379</version> </dependency> <!-- Needed for WS-Notification --> <dependency> <groupId>org.apache.cxf.services.wsn</groupId> <artifactId>cxf-services-wsn-api</artifactId> <version>2.7.0.redhat-610379</version> </dependency> </dependencies> ... </project>
- Edit the
Client.java
file in thewsn-consumer/src/main/java/org/jboss/fuse/example/wsn/consumer/client/
directory, remove the existing content, and replace it with the code from Example 2.2, “Consumer Client Code”.
Test the consumer client
Test the consumer client as follows:
- If the JBoss A-MQ container is not already running (with the notification broker installed), start it up now:
./amq
- Run the publisher client at the command line. Open a new command prompt, and enter the following commands:
cd wsn/wsn-publisher mvn -Pclient
In the command window, you should see some output like the following:... INFO: Creating Service {http://cxf.apache.org/wsn/jaxws}NotificationBrokerService from WSDL: jar:file:/Users/fbolton/.m2/repository/org/apache/cxf/services/wsn/ cxf-services-wsn-api/2.6.0.redhat-60024/cxf-services-wsn-api-2.6.0.redhat-60024.jar !/org/apache/cxf/wsn/wsdl/wsn.wsdl
You now have approximately two minutes before the publisher client times out. - Run the consumer client at the command line. Open a new command prompt and enter the following commands:
cd wsn/wsn-consumer mvn -Pclient
In the command window, you should see some output like the following:... Jun 25, 2013 4:13:47 PM org.apache.cxf.service.factory.ReflectionServiceFactoryBean buildServiceFromWSDL INFO: Creating Service {http://cxf.apache.org/wsn/jaxws}PausableSubscriptionManagerService from WSDL: jar:file:/Users/fbolton/.m2/repository/org/apache/cxf/services/wsn /cxf-services-wsn-api/2.6.0.redhat-60024/cxf-services-wsn-api-2.6.0.redhat-60024.jar !/org/apache/cxf/wsn/wsdl/wsn.wsdl [ns8:foo: null] Hello World! ...
- To inspect the state of the notification broker, you can connect to the JMX port of the ActiveMQ broker. Start up a JMX console by entering the following command at the command line:
jconsole
In the JConsole: New Connection dialog, select Remote Process and enter the following URL in the accompanying text field:service:jmx:rmi:///jndi/rmi://localhost:1099/karaf-root
In the Username and Password fields, enter one of the user credentials you created at the start of this tutorial. When you are connected to the JMX port, you can inspect the state of the broker by clicking on the MBeans tab and drilling down the object tree in the JConsole.