搜索

此内容没有您所选择的语言版本。

Chapter 12. Threading and scheduling

download PDF

AMQ C++ supports full multithreading with C++11 and later. Limited multithreading is possible with older versions of C++. See Section 12.6, “Using older versions of C++”.

12.1. The threading model

The container object can handle multiple connections concurrently. When AMQP events occur on connections, the container calls messaging_handler callback functions. Callbacks for any one connection are serialized (not called concurrently), but callbacks for different connections can be safely executed in parallel.

You can assign a handler to a connection in container::connect() or listen_handler::on_accept() using the handler connection option. It is recommended to create a separate handler for each connection so that the handler does not need locks or other synchronization to protect it against concurrent use by library threads. If any non-library threads use the handler concurrently, then you need synchronization.

12.2. Thread-safety rules

The connection, session, sender, receiver, tracker, and delivery objects are not thread-safe and are subject to the following rules.

  1. You must use them only from a messaging_handler callback or a work_queue function.
  2. You must not use objects belonging to one connection from a callback for another connection.
  3. You can store AMQ C++ objects in member variables for use in a later callback, provided you respect rule two.

The message object is a value type with the same threading constraints as a standard C++ built-in type. It cannot be concurrently modified.

12.3. Work queues

The work_queue interface provides a safe way to communicate between different connection handlers or between non-library threads and connection handlers.

  • Each connection has an associated work_queue.
  • The work queue is thread-safe (C++11 or greater). Any thread can add work.
  • A work item is a std::function, and bound arguments are called like an event callback.

When the library calls the work function, it is serialized safely so that you can treat the work function like an event callback and safely access the handler and AMQ C++ objects stored on it.

12.4. The wake primitive

The connection::wake() method allows any thread to prompt activity on a connection by triggering an on_connection_wake() callback. This is the only thread-safe method on connection.

wake() is a lightweight, low-level primitive for signaling between threads.

  • It does not carry any code or data, unlike work_queue.
  • Multiple calls to wake() might be coalesced into a single on_connection_wake().
  • Calls to on_connection_wake() can occur without any application call to wake() since the library uses wake() internally.

The semantics of wake() are similar to std::condition_variable::notify_one(). There will be a wakeup, but there must be some shared application state to determine why the wakeup occurred and what, if anything, to do about it.

Work queues are easier to use in many instances, but wake() may be useful if you already have your own external thread-safe queues and need an efficient way to wake a connection to check them for data.

12.5. Scheduling deferred work

AMQ C++ has the ability to execute code after a delay. You can use this to implement time-based behaviors in your application, such as periodically scheduled work or timeouts.

To defer work for a fixed amount of time, use the schedule method to set the delay and register a function defining the work.

Example: Sending a message after a delay

void on_sender_open(proton::sender& snd) override {
    proton::duration interval {5 * proton::duration::SECOND};
    snd.work_queue().schedule(interval, [=] { send(snd); });
}

void send(proton::sender snd) {
    if (snd.credit() > 0) {
        proton::message msg {"hello"};
        snd.send(msg);
    }
}

This example uses the schedule method on the work queue of the sender in order to establish it as the execution context for the work.

12.6. Using older versions of C++

Before C++11 there was no standard support for threading in C++. You can use AMQ C++ with threads but with the following limitations.

  • The container does not create threads. It only uses the single thread that calls container::run().
  • None of the AMQ C++ library classes are thread-safe, including container and work_queue. You need an external lock to use container in multiple threads. The only exception is connection::wake(). It is thread-safe even in older C++.

The container::schedule() and work_queue APIs accept C++11 lambda functions to define units of work. If you are using a version of C++ that does not support lambdas, you must use the make_work() function instead.

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.