2.3. Create a Consumer Client


Overview

This section describes how to create a consumer client of the notification broker. The consumer client subscribes to a particular topic and creates a callback service, which is capable of receiving messages directly from the broker.

Sample consumer client code

Example 2.2, “Consumer Client Code” shows the code for a sample consumer client that subscribes to messages published on the MyTopic topic.

Example 2.2. Consumer Client Code

// Java
package org.jboss.fuse.example.wsn.consumer.client;

import javax.xml.bind.JAXBElement;
import javax.xml.namespace.QName;

import org.w3c.dom.Element;

import org.apache.cxf.wsn.client.Consumer;
import org.apache.cxf.wsn.client.NotificationBroker;
import org.apache.cxf.wsn.client.Subscription;
import org.oasis_open.docs.wsn.b_2.NotificationMessageHolderType;

/**
 * 
 */
public final class Client {
    private Client() {
        //not constructed
    }

    /**
     * @param args
     */
    public static void main(String[] args) throws Exception {
        String wsnPort = "8182";
        if (args.length > 0) {
            wsnPort = args[0];
        }
        
        // Start a consumer that will listen for notification messages
        // We'll just print the text content out for now.
        Consumer consumer = new Consumer(new Consumer.Callback() {
            public void notify(NotificationMessageHolderType message) {
                Object o = message.getMessage().getAny();
                System.out.println(message.getMessage().getAny());
                if (o instanceof Element) {
                    System.out.println(((Element)o).getTextContent());
                }
            }
        }, "http://localhost:9001/MyConsumer");
        
        
        // Create a subscription for a Topic on the broker
        NotificationBroker notificationBroker 
            = new NotificationBroker("http://localhost:" + wsnPort + "/wsn/NotificationBroker");
        Subscription subscription = notificationBroker.subscribe(consumer, "MyTopic");


        // Just sleep for a bit to pick up some incoming messages
        Thread.sleep(60000);
        
        // Cleanup and exit
        subscription.unsubscribe();
        consumer.stop();
        System.exit(0);
    }

}

Creating a consumer callback object

In order to receive notification messages from the notification broker, you must create a consumer callback object to receive the messages. The consumer callback object is in fact a Web service which is embedded in your client. The easiest way to create the consumer callback is to use the org.apache.cxf.wsn.client.Consumer class from the simplified client API, which enables you to define a callback as follows:
        // Start a consumer that will listen for notification messages
        // We'll just print the text content out for now.
        Consumer consumer = new Consumer(new Consumer.Callback() {
            public void notify(NotificationMessageHolderType message) {
                Object o = message.getMessage().getAny();
                System.out.println(message.getMessage().getAny());
                if (o instanceof Element) {
                    System.out.println(((Element)o).getTextContent());
                }
            }
        }, "http://localhost:9001/MyConsumer");
The first argument to the Consumer constructor is a reference to the consumer callback object, which is defined inline. The second argument specifies the URL of the consumer callback endpoint, which can receive messages from the notification broker.

Subscribing to a topic

To start receiving messages, you must subscribe the consumer to a topic in the notification broker. To create a subscription, invoke the following subscribe method on the NotificationBroker proxy object:
Subscription subscribe(Referencable consumer, String topic)
The first argument is a reference to a Consumer object (which is capable of returning a WS-Addressing endpoint reference to the consumer callback through the Referencable.getEpr() method). The second argument is the name of the topic you want to subscribe to.
The return value is a reference to a Subscription object, which you can use to manage the subscription (for example, pause, resume, or unsubscribe).

Threading in the consumer client

Because the consumer client has an embedded Web service (the consumer callback object), which automatically starts in a background thread, it is necessary to manage threading in this sample client. In particular, after creating the subscription, you need to put the main thread to sleep (by calling Thread.sleep(60000)), so that the thread context can switch to the background thread, where the callback Web service is running. This makes it possible for the consumer callback to receive some messages.

Steps to create a consumer client

Perform the following steps to create a consumer client:
  1. Use the archetype:generate goal to invoke the servicemix-cxf-code-first-osgi-bundle archetype. Under the wsn directory, invoke the Maven archetype as follows:
    mvn archetype:generate \
      -DarchetypeGroupId=org.apache.servicemix.tooling \
      -DarchetypeArtifactId=servicemix-cxf-code-first-osgi-bundle \
      -DarchetypeVersion=2013.01.0.redhat-610379 \
      -DgroupId=org.jboss.fuse.example \
      -DartifactId=wsn-consumer \
      -Dversion=1.0-SNAPSHOT \
      -Dpackage=org.jboss.fuse.example.wsn.consumer
    Note
    The backslash characters at the end of each line are effective as line-continuation characters on UNIX and LINUX platforms. If you are entering the command on a Windows platform, however, you must enter the entire command on a single line.
    You will be prompted to confirm the project settings, with a message similar to this one:
    Confirm properties configuration:
    groupId: org.jboss.fuse.example
    artifactId: wsn-consumer
    version: 1.0-SNAPSHOT
    package: org.jboss.fuse.example.wsn.consumer
     Y: :
    Type Return to accept the settings and generate the project. When the command finishes, you should find a new Maven project in the wsn/wsn-consumer directory.
  2. Some of the generated project files are not needed for this tutorial. Under the wsn/wsn-consumer directory, delete the following files and directories:
    src/main/resources/META-INF/spring/beans.xml
    src/main/java/org/jboss/fuse/example/wsn/consumer/Person.java
    src/main/java/org/jboss/fuse/example/wsn/consumer/PersonImpl.java
    src/main/java/org/jboss/fuse/example/wsn/consumer/UnknownPersonFault.java
    src/main/java/org/jboss/fuse/example/wsn/consumer/types
  3. Edit the pom.xml file in the wsn-consumer directory, and add the following dependencies, as required by the consumer client:
    <?xml version="1.0" encoding="UTF-8"?>
    <project ...>
        ...
        <dependencies>
            ...
            <!-- Needed for HTTP callback endpoint -->
            <dependency>
                <groupId>org.apache.cxf</groupId>
                <artifactId>cxf-rt-transports-http-jetty</artifactId>
                <version>2.7.0.redhat-610379</version>
            </dependency>
    
            <!-- Needed for WS-Notification -->
            <dependency>
                <groupId>org.apache.cxf.services.wsn</groupId>
                <artifactId>cxf-services-wsn-api</artifactId>
                <version>2.7.0.redhat-610379</version>
            </dependency>
        </dependencies>
        ...
    </project>
  4. Edit the Client.java file in the wsn-consumer/src/main/java/org/jboss/fuse/example/wsn/consumer/client/ directory, remove the existing content, and replace it with the code from Example 2.2, “Consumer Client Code”.

Test the consumer client

Test the consumer client as follows:
  1. If the JBoss A-MQ container is not already running (with the notification broker installed), start it up now:
    ./amq
  2. Run the publisher client at the command line. Open a new command prompt, and enter the following commands:
    cd wsn/wsn-publisher
    mvn -Pclient
    In the command window, you should see some output like the following:
    ...
    INFO: Creating Service {http://cxf.apache.org/wsn/jaxws}NotificationBrokerService
    from WSDL: jar:file:/Users/fbolton/.m2/repository/org/apache/cxf/services/wsn/
    cxf-services-wsn-api/2.6.0.redhat-60024/cxf-services-wsn-api-2.6.0.redhat-60024.jar
    !/org/apache/cxf/wsn/wsdl/wsn.wsdl
    You now have approximately two minutes before the publisher client times out.
  3. Run the consumer client at the command line. Open a new command prompt and enter the following commands:
    cd wsn/wsn-consumer
    mvn -Pclient
    In the command window, you should see some output like the following:
    ...
    Jun 25, 2013 4:13:47 PM org.apache.cxf.service.factory.ReflectionServiceFactoryBean
    buildServiceFromWSDL
    INFO: Creating Service {http://cxf.apache.org/wsn/jaxws}PausableSubscriptionManagerService
    from WSDL: jar:file:/Users/fbolton/.m2/repository/org/apache/cxf/services/wsn
    /cxf-services-wsn-api/2.6.0.redhat-60024/cxf-services-wsn-api-2.6.0.redhat-60024.jar
    !/org/apache/cxf/wsn/wsdl/wsn.wsdl
    [ns8:foo: null]
    Hello World!
    ...
  4. To inspect the state of the notification broker, you can connect to the JMX port of the ActiveMQ broker. Start up a JMX console by entering the following command at the command line:
    jconsole
    In the JConsole: New Connection dialog, select Remote Process and enter the following URL in the accompanying text field:
    service:jmx:rmi:///jndi/rmi://localhost:1099/karaf-root
    In the Username and Password fields, enter one of the user credentials you created at the start of this tutorial. When you are connected to the JMX port, you can inspect the state of the broker by clicking on the MBeans tab and drilling down the object tree in the JConsole.
Red Hat logoGithubRedditYoutubeTwitter

Learn

Try, buy, & sell

Communities

About Red Hat Documentation

We help Red Hat users innovate and achieve their goals with our products and services with content they can trust.

Making open source more inclusive

Red Hat is committed to replacing problematic language in our code, documentation, and web properties. For more details, see the Red Hat Blog.

About Red Hat

We deliver hardened solutions that make it easier for enterprises to work across platforms and environments, from the core datacenter to the network edge.

© 2024 Red Hat, Inc.