このコンテンツは選択した言語では利用できません。

48.2. Implementing the Producer Interface


Alternative ways of implementing a producer

You can implement a producer in one of the following ways:

How to implement a synchronous producer

Example 48.4, “DefaultProducer Implementation” outlines how to implement a synchronous producer. In this case, call to Producer.process() blocks until a reply is received.

Example 48.4. DefaultProducer Implementation

import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultProducer;

public class CustomProducer extends DefaultProducer { 1

    public CustomProducer(Endpoint endpoint) { 2
        super(endpoint);
        // Perform other initialization tasks...
    }

    public void process(Exchange exchange) throws Exception { 3
        // Process exchange synchronously.
        // ...
    }
}
1
Implement a custom synchronous producer class, CustomProducer, by extending the org.apache.camel.impl.DefaultProducer class.
2
Implement a constructor that takes a reference to the parent endpoint.
3
The process() method implementation represents the core of the producer code. The implementation of the process() method is entirely dependent on the type of component that you are implementing. In outline, the process() method is normally implemented as follows:
  • If the exchange contains an In message, and if this is consistent with the specified exchange pattern, then send the In message to the designated endpoint.
  • If the exchange pattern anticipates the receipt of an Out message, then wait until the Out message has been received. This typically causes the process() method to block for a significant length of time.
  • When a reply is received, call exchange.setOut() to attach the reply to the exchange object. If the reply contains a fault message, set the fault flag on the Out message using Message.setFault(true).

How to implement an asynchronous producer

Example 48.5, “CollectionProducer Implementation” outlines how to implement an asynchronous producer. In this case, you must implement both a synchronous process() method and an asynchronous process() method (which takes an additional AsyncCallback argument).

Example 48.5. CollectionProducer Implementation

import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultProducer;

public class CustomProducer extends DefaultProducer implements AsyncProcessor { 1

    public CustomProducer(Endpoint endpoint) { 2
        super(endpoint);
        // ...
    }

    public void process(Exchange exchange) throws Exception { 3
        // Process exchange synchronously.
        // ...
    }

    public boolean process(Exchange exchange, AsyncCallback callback) { 4
        // Process exchange asynchronously.
        CustomProducerTask task = new CustomProducerTask(exchange, callback);
        // Process 'task' in a separate thread...
        // ...
        return false; 5
    }
}

public class CustomProducerTask implements Runnable { 6
    private Exchange exchange;
    private AsyncCallback callback;

    public CustomProducerTask(Exchange exchange, AsyncCallback callback) {
        this.exchange = exchange;
        this.callback = callback;
    }

    public void run() { 7
        // Process exchange.
        // ...
        callback.done(false);
    }
}
1
Implement a custom asynchronous producer class, CustomProducer, by extending the org.apache.camel.impl.DefaultProducer class, and implementing the AsyncProcessor interface.
2
Implement a constructor that takes a reference to the parent endpoint.
3
Implement the synchronous process() method.
4
Implement the asynchronous process() method. You can implement the asynchronous method in several ways. The approach shown here is to create a java.lang.Runnable instance, task, that represents the code that runs in a sub-thread. You then use the Java threading API to run the task in a sub-thread (for example, by creating a new thread or by allocating the task to an existing thread pool).
5
Normally, you return false from the asynchronous process() method, to indicate that the exchange was processed asynchronously.
6
The CustomProducerTask class encapsulates the processing code that runs in a sub-thread. This class must store a copy of the Exchange object, exchange, and the AsyncCallback object, callback, as private member variables.
7
The run() method contains the code that sends the In message to the producer endpoint and waits to receive the reply, if any. After receiving the reply (Out message or Fault message) and inserting it into the exchange object, you must call callback.done() to notify the caller that processing is complete.
Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.