65.17. 例子
65.17.1. 消耗 Kafka 的消息 复制链接链接已复制到粘贴板!
复制链接链接已复制到粘贴板!
以下是从 Kafka 读取信息所需的最小路由。
from("kafka:test?brokers=localhost:9092")
.log("Message received from Kafka : ${body}")
.log(" on the topic ${headers[kafka.TOPIC]}")
.log(" on the partition ${headers[kafka.PARTITION]}")
.log(" with the offset ${headers[kafka.OFFSET]}")
.log(" with the key ${headers[kafka.KEY]}")
如果您需要消耗来自多个主题的消息,您可以使用以逗号分隔的主题名称列表。
from("kafka:test,test1,test2?brokers=localhost:9092")
.log("Message received from Kafka : ${body}")
.log(" on the topic ${headers[kafka.TOPIC]}")
.log(" on the partition ${headers[kafka.PARTITION]}")
.log(" with the offset ${headers[kafka.OFFSET]}")
.log(" with the key ${headers[kafka.KEY]}")
也可以订阅多个主题,以主题名称形式提供模式,并使用 topicIsPattern 选项。
from("kafka:test*?brokers=localhost:9092&topicIsPattern=true")
.log("Message received from Kafka : ${body}")
.log(" on the topic ${headers[kafka.TOPIC]}")
.log(" on the partition ${headers[kafka.PARTITION]}")
.log(" with the offset ${headers[kafka.OFFSET]}")
.log(" with the key ${headers[kafka.KEY]}")
当消耗来自 Kafka 的消息时,您可以使用自己的偏移管理,而不将这个管理委派给 Kafka。为了保持偏移量,组件需要 StateRepository 实施,如 FileStateRepository。此 bean 应在 registry 中提供。在这里如何使用它:
// Create the repository in which the Kafka offsets will be persisted
FileStateRepository repository = FileStateRepository.fileStateRepository(new File("/path/to/repo.dat"));
// Bind this repository into the Camel registry
Registry registry = createCamelRegistry();
registry.bind("offsetRepo", repository);
// Configure the camel context
DefaultCamelContext camelContext = new DefaultCamelContext(registry);
camelContext.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("kafka:" + TOPIC + "?brokers=localhost:{{kafkaPort}}" +
// Setup the topic and broker address
"&groupId=A" +
// The consumer processor group ID
"&autoOffsetReset=earliest" +
// Ask to start from the beginning if we have unknown offset
"&offsetRepository=#offsetRepo")
// Keep the offsets in the previously configured repository
.to("mock:result");
}
});
65.17.2. 将消息生成到 Kafka 复制链接链接已复制到粘贴板!
复制链接链接已复制到粘贴板!
以下是将信息写入 Kafka 所需的最小路由。
from("direct:start")
.setBody(constant("Message from Camel")) // Message to send
.setHeader(KafkaConstants.KEY, constant("Camel")) // Key of the message
.to("kafka:test?brokers=localhost:9092");
65.17.3. SSL 配置 复制链接链接已复制到粘贴板!
复制链接链接已复制到粘贴板!
您可以使用两种不同的方法在 Kafka 组件中配置 SSL 通信。
第一种方法是通过多个 SSL 端点参数
from("kafka:" + TOPIC + "?brokers=localhost:{{kafkaPort}}" +
"&groupId=A" +
"&sslKeystoreLocation=/path/to/keystore.jks" +
"&sslKeystorePassword=changeit" +
"&sslKeyPassword=changeit" +
"&securityProtocol=SSL")
.to("mock:result");
第二种方法是使用 sslContextParameters 端点参数。
// Configure the SSLContextParameters object
KeyStoreParameters ksp = new KeyStoreParameters();
ksp.setResource("/path/to/keystore.jks");
ksp.setPassword("changeit");
KeyManagersParameters kmp = new KeyManagersParameters();
kmp.setKeyStore(ksp);
kmp.setKeyPassword("changeit");
SSLContextParameters scp = new SSLContextParameters();
scp.setKeyManagers(kmp);
// Bind this SSLContextParameters into the Camel registry
Registry registry = createCamelRegistry();
registry.bind("ssl", scp);
// Configure the camel context
DefaultCamelContext camelContext = new DefaultCamelContext(registry);
camelContext.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("kafka:" + TOPIC + "?brokers=localhost:{{kafkaPort}}" +
// Setup the topic and broker address
"&groupId=A" +
// The consumer processor group ID
"&sslContextParameters=#ssl" +
// The security protocol
"&securityProtocol=SSL)
// Reference the SSL configuration
.to("mock:result");
}
});