34.3. 组件选项


Kafka 组件支持 104 选项,如下所列。

Name描述默认值类型

additionalProperties (common)

如果无法直接在 camel 配置(例如:新的 Kafka 属性没有反映在 Camel 配置中),则必须为 kafka consumer 或 kafka producer 设置额外的属性,属性必须使用 additionalProperties 前缀。例如: additionalProperties.transactional.id=12345&additionalProperties.schema.registry.url=http://localhost:8811/avro。

 

Map

brokers (common)

要使用的 Kafka 代理的 URL。格式为 host1:port1,host2:port2,列表可以是代理的子集,也可以是指向代理子集的 VIP。这个选项在 Kafka 文档中称为 bootstrap.servers。

 

字符串

clientId (common)

客户端 ID 是每个请求中发送的用户指定的字符串,以帮助追踪调用。它应该以逻辑方式识别发出请求的应用程序。

 

字符串

configuration (common)

允许使用端点将重复使用的通用选项预配置 Kafka 组件。

 

KafkaConfiguration

HeaderFilterStrategy (common)

使用自定义 HeaderFilterStrategy 将标头过滤到或从 Camel 消息过滤。

 

HeaderFilterStrategy

reconnectBackoffMaxMs (common)

当重新连接到重复无法连接的代理时,等待的最大时间(毫秒)。如果提供,每个主机的 backoff 将为每个连续的连接失败指数增加,直到最高值。计算 backoff 后,会添加 20% 随机 jitter 以避免连接停滞。

1000

整数

shutdownTimeout (common)

超时时间(毫秒)以毫秒为单位等待消费者或生成者关闭并终止其 worker 线程。

30000

int

allowManualCommit (consumer)

是否允许通过 KafkaManualCommit 手动提交。如果启用了这个选项,则 KafkaManualCommit 实例存储在 Exchange 消息标头中,这将允许最终用户访问这个 API,并通过 Kafka 使用者执行手动偏移提交。

false

布尔值

autoCommitEnable (consumer)

如果为 true,请定期提交到 ZooKeeper,以偏移已由消费者获取的信息。当进程失败时,将使用此提交偏移,作为新消费者开始的位置。

true

布尔值

autoCommitIntervalMs (consumer)

消费者偏移提交到 zookeeper 的频率。

5000

整数

autoCommitOnStop (consumer)

消费者停止时是否执行显式自动提交,以确保代理有来自最近使用的消息的提交。这需要打开选项 autoCommitEnable。可能的值有: sync、syncsync 或 none。sync 是默认值。

Enum 值:

  • 同步
  • async
  • none

同步

字符串

autoOffsetReset (consumer)

当 ZooKeeper 中没有初始偏移量时,或者偏移没有范围: earliest : 自动将偏移重置为最早的偏移 latest:自动将偏移重置为最新的偏移失败:抛出异常。

Enum 值:

  • 最新
  • earliest
  • none

最新

字符串

breakOnFirstError (consumer)

此选项控制消费者处理交换且失败时会发生什么。如果选项为 false,则消费者将继续到下一个消息并处理它。如果选项为 true,则消费者会发现出导致故障的消息的偏移,然后重新尝试处理此消息。但是,如果每次绑定都失败,这可能会导致意外处理同一消息,例如一个 poison 消息。因此,建议处理这一点,例如使用 Camel 的错误处理程序。

false

布尔值

bridgeErrorHandler (consumer)

允许将消费者桥接到 Camel 路由错误处理程序,这意味着当消费者试图选择传入消息或类似信息时发生异常,现在将作为消息处理并由路由 Error Handler 处理。默认情况下,使用者将使用 org.apache.camel.spi.ExceptionHandler 来处理例外情况,该处理程序将被记录在 WARN 或 ERROR 级别,并忽略。

false

布尔值

checkCrcs (consumer)

自动检查所消耗的记录的 CRC32。这样可确保不会发生在线或磁盘崩溃信息。此检查增加了一些开销,因此在出现极端性能的情况下可能会禁用它。

true

布尔值

commitTimeoutMs (consumer)

代码将等待同步提交完成的最长时间(以毫秒为单位)。

5000

Long

consumerRequestTimeoutMs (consumer)

配置控制客户端等待请求响应的最长时间。如果在超时超时前未收到响应,如果需要,或者如果重试耗尽,则请求将重新发送。

40000

整数

consumersCount (consumer)

连接到 kafka 服务器的消费者数量。每个消费者都在一个单独的线程上运行,用于检索和处理传入的数据。

1

int

fetchMaxBytes (consumer)

如果获取的第一个非空分区中的第一个消息大于这个值,则服务器为 fetch 请求返回的最大数据量并非绝对的最大值。代理接受的最大消息大小通过 message.max.bytes (broker config)或 max.message.bytes (topic config)定义。请注意,使用者并行执行多个获取。

52428800

整数

fetchMinBytes (consumer)

服务器为获取请求返回的最小数据量。如果数据不足,请求将在回答请求前等待该数量的数据累积。

1

整数

fetchWaitMaxMs (consumer)

如果没有足够的数据立即满足 fetch.min.bytes,服务器将在回答获取请求前阻止的最大时间。

500

整数

GroupId (consumer)

标识此消费者所属的消费者进程组的字符串。通过设置相同的组 id 多个进程表示它们都是同一消费者组的一部分。消费者需要这个选项。

 

字符串

groupInstanceId (consumer)

最终用户提供的消费者实例的唯一标识符。只允许非空字符串。如果设置,则消费者被视为静态成员,这意味着在任何消费者组中都只允许具有此 ID 的实例。这可以与更大的会话超时结合使用,以避免因为临时不可用(如进程重启)导致组重新平衡。如果没有设置,则消费者将作为动态成员加入组,这是传统行为。

 

字符串

headerDeserializer (consumer)

使用自定义 KafkaHeaderDeserializer 来反序列化 kafka 标头值。

 

KafkaHeaderDeserializer

heartbeatIntervalMs (consumer)

在使用 Kafka 的组管理功能时,心跳到消费者协调器的预期时间。心跳用于确保消费者的会话保持活动状态,并在新消费者加入或离开组时促进重新平衡。该值必须小于 session.timeout.ms,但通常不应设置高于该值的 1/3。可以调整它,以控制正常重新平衡的预期时间。

3000

整数

keyDeserializer (consumer)

实施 Deserializer 接口的密钥反序列化类。

org.apache.kafka.common.serialization.StringDeserializer

字符串

maxPartitionFetchBytes (consumer)

服务器将返回的最大每个分区的数据量。用于请求的最大内存总量为 192.168.1.0/24partitions max.partition.fetch.bytes。这个大小必须至少与服务器允许的最大消息大小相同,否则生成者可以发送大于消费者的消息。如果发生这种情况,使用者可能会卡住尝试在某个分区中获取大量消息。

1048576

整数

maxPollIntervalMs (consumer)

使用消费者组管理时调用 poll ()的最大延迟。这会在获取更多记录前,在消费者闲置的时间上放置上限。如果在超时过期前没有调用 poll (),则消费者被视为失败,组将重新平衡,以便将分区重新分配给另一个成员。

 

Long

maxPollRecords (consumer)

单个调用中返回到 poll ()中返回的最大记录数。

500

整数

offsetRepository (consumer)

用于本地存储主题的每个分区的偏移程序库。定义一个将禁用自动提交。

 

StateRepository

partitionAssignor (consumer)

使用组管理时,客户端将使用分区分配策略的类名称,在消费者实例之间分发分区所有权。

org.apache.kafka.clients.consumer.RangeAssignor

字符串

pollOnError (consumer)

如果 kafka 在轮询新消息时异常,则该怎么办。除非在端点级别上配置了显式值,否则默认使用组件配置中的值。DISCARD 将丢弃消息并继续轮询下一个消息。ERROR_HANDLER 将使用 Camel 的错误处理程序来处理异常,然后继续轮询下一个消息。RECONNECT 将重新连接消费者,并尝试再次轮询 RETRY,以便消费者再次重试同一消息,STOP 将停止消费者(如果消费者应该再次消耗消息,则应手动启动/重新启动)。

Enum 值:

  • 丢弃
  • ERROR_HANDLER
  • RECONNECT
  • RETRY
  • STOP

ERROR_HANDLER

PollOnError

pollTimeoutMs (consumer)

轮询 KafkaConsumer 时使用的超时。

5000

Long

resumeStrategy (consumer)

这个选项允许用户设置自定义恢复策略。恢复策略会在分配分区时执行(例如:连接或重新连接)。它允许实现自定义如何恢复操作,并更灵活替代 seekTo 和 offsetRepository 机制。有关实现详情,请参阅 KafkaConsumerResumeStrategy。此选项不会影响自动提交设置。使用此设置的实现可能还希望使用手动提交选项进行评估。

 

KafkaConsumerResumeStrategy

seekTo (consumer)

设置 KafkaConsumer 将在启动时从开始或结束读取:start : read from end : read from end this is replace the previous attributes seekToBeginning。

Enum 值:

  • 开始
  • end
 

字符串

sessionTimeoutMs (consumer)

使用 Kafka 组管理功能时检测失败的超时。

10000

整数

specificAvroReader (consumer)

这可让特定的 Avro reader 与 Confluent Platform schema registry 和 io.confluent.kafka.serializers.KafkaAvroDeserializer 搭配使用。这个选项仅适用于 Confluent Platform (不适用于标准 Apache Kafka)。

false

布尔值

topicIsPattern (consumer)

主题是否为模式(正则表达式)。这可用于订阅与模式匹配的动态主题数量。

false

布尔值

valueDeserializer (consumer)

用于实现 Deserializer 接口的值反序列化类。

org.apache.kafka.common.serialization.StringDeserializer

字符串

kafkaManualCommitFactory (consumer (advanced))

用于创建 KafkaManualCommit 实例的 Autowired Factory。当执行与开箱即用的默认实现中分离的手动提交时,如果需要插入自定义 KafkaManualCommit 实例。

 

KafkaManualCommitFactory

pollExceptionStrategy (consumer (advanced))

Autowired 使用带有消费者的自定义策略来控制如何在池消息时处理从 Kafka 代理抛出的异常。

 

PollExceptionStrategy

bufferMemorySize (producer)

生成者可用于缓冲区等待发送到服务器的内存总量字节。如果发送记录比服务器发送的速度快,则生成者会根据 block.on.buffer.full 指定的首选项阻止或抛出异常。此设置应该与制作者将使用的总内存对应,而不是硬绑定,因为不是生成者使用的所有内存进行缓冲。一些额外的内存将用于压缩(如果启用了压缩),以及维护动态请求。

33554432

整数

compressionCodec (producer)

这个参数允许您为这个制作者生成的所有数据指定压缩 codec。有效值为 none、gzip 和 snappy。

Enum 值:

  • none
  • gzip
  • snappy
  • lz4

none

字符串

connectionMaxIdleMs (producer)

在此配置指定的毫秒数后关闭闲置连接。

540000

整数

deliveryTimeoutMs (producer)

在调用 send ()后报告成功或失败的时间上限。这限制了在发送前记录延迟、从代理等待确认的时间(如预期)以及可重新检索失败所需的时间。

120000

整数

enableIdempotence (producer)

如果设置为 'true',则制作者将确保在流中写入每个消息的一个副本。如果 'false',则制作者重试可能会在流中写入重试的消息的副本。如果设置为 true,则此选项需要 max.in.flight.requests.per.connection 设置为 1,并且重试不能为零,且其他 acks 必须设为 'all'。

false

布尔值

headerSerializer (producer)

使用自定义 KafkaHeaderSerializer 来序列化 kafka 标头值。

 

KafkaHeaderSerializer

key (producer)

记录键(如果没有指定密钥,则为 null)。如果配置了这个选项,它将优先于标头 KafkaConstants114KEY。

 

字符串

keySerializer (producer)

键的序列化类(如果没有给出任何信息,则默认为消息)。

org.apache.kafka.common.serialization.StringSerializer

字符串

lazyStartProducer (producer)

生成者是否应懒惰启动 (在第一个消息中)。通过懒惰启动,您可以使用此选项来允许 CamelContext 和路由在生成者启动期间启动,并导致路由启动失败。通过懒惰启动,启动失败可以在路由信息时通过 Camel 的路由错误处理程序进行处理。请注意,在处理第一个消息时,创建并启动生成者可能需要稍等时间,并延长处理的总处理时间。

false

布尔值

lingerMs (producer)

生产者将请求传输之间到达的任何记录分组到单个批处理请求中。通常,只有在记录到达比发送的速度相比,才会在负载下发生。然而,在某些情况下,客户端可能希望减少请求数,即使在负载下也是如此。此设置通过添加少量人类延迟来实现这一目的,而不是立即发送记录,让生成者最多等待给定延迟,以允许将发送其他记录,以便将发送发送。这可以象 TCP 中的 Nagle 算法类似。此设置在批处理延迟上提供了上限:一旦为分区获取 batch.size,无论此设置如何,都会立即发送它。但是,如果我们对这个分区的总字节少于这个分区,我们将"闲置"等待更多记录显示。此设置默认为 0 (例如,无延迟)。例如,设置 linger.ms=5 可减少发送的请求数量,但对负载中发送的记录增加最多 5ms 的延迟数。

0

整数

maxBlockMs (producer)

配置控制发送到 kafka 的时长将阻断。这些方法可能会因为多个原因而被阻止。例如:缓冲区已满,元数据不可用。此配置对获取元数据、键和值序列化、在执行 send ()时分区和分配缓冲区内存的总时间实施最大限制。如果是 partitionsFor (),此配置在等待元数据时强制实施最长时间阈值。

60000

整数

maxInFlightRequest (producer)

客户端在阻止前在单个连接上发送的最大未确认请求数。请注意,如果此设置设定为大于 1,且有失败,则可能会因为重试重试而重新排序消息(例如,如果启用了重试)。

5

整数

maxRequestSize (producer)

请求的最大大小。这也在最大记录大小上有效上限。请注意,服务器对记录大小有自己的上限,它们可能与这个值不同。此设置将限制生成者将在单个请求中发送的记录数量,以避免发送大量请求。

1048576

整数

metadataMaxAgeMs (producer)

我们强制刷新元数据的时间(以毫秒为单位),即使我们没有看到任何分区领导更改来主动发现任何新的代理或分区。

300000

整数

metricReporters (producer)

用作指标报告器的类列表。实施 MetricReporter 接口允许插入将创建新指标创建通知的类。JmxReporter 始终被包含以注册 JMX 统计信息。

 

字符串

metricsSampleWindowMs (producer)

为计算指标维护的示例数量。

30000

整数

noOfMetricsSample (producer)

为计算指标维护的示例数量。

2

整数

Partitioner (producer)

在子主题间分区消息的 partitioner 类。默认分区程序基于密钥的哈希。

org.apache.kafka.clients.producer.internals.DefaultPartitioner

字符串

partitionKey (producer)

将记录发送到的分区(如果没有指定分区,则为 null)。如果配置了这个选项,它将优先于标头 KafkaConstants114PARTITION_KEY。

 

整数

producerBatchSize (producer)

每当将多个记录发送到同一分区时,生产者会尝试将记录批处理到较少的请求中。这有助于客户端和服务器的性能。此配置以字节为单位控制默认批处理大小。不尝试批处理大于这个大小的批处理记录。发送到代理的请求将包含多个批处理,每个带有可用数据的分区都会进行批处理。小批处理大小会减少吞吐量,并可能会降低吞吐量(零的批处理大小将完全禁用批处理)。非常大的批处理大小可能会更严重地使用内存,因为我们始终以额外的记录为指定批处理大小分配缓冲区。

16384

整数

queueBufferingMaxMessages (producer)

在使用 async 模式时可以排队生成者的最大未消息数量,然后才能阻止生成者,或者必须丢弃数据。

10000

整数

receiveBufferBytes (producer)

读取数据时使用的 TCP 接收缓冲区(SO_RCVBUF)的大小。

65536

整数

reconnectBackoffMs (producer)

尝试重新连接到给定主机前等待的时间。这可避免在严格的循环中重复连接到主机。此 backoff 适用于消费者向代理发送的所有请求。

50

整数

recordMetadata (producer)

生成者是否应该存储来自发送到 Kafka 的 RecordMetadata 结果。结果存储在包含 RecordMetadata 元数据的列表中。该列表存储在带有键 KafkaConstantsHQKAFKA_RECORDMETA 的标头中。

true

布尔值

requestRequiredAcks (producer)

确认生成者要求接收领导数量,然后才能考虑请求完成。这控制发送的记录的持久性。以下设置比较常见:acks=0 如果设为零,则制作者将根本不等待服务器的任何确认。记录将立即添加到套接字缓冲区中并被视为发送。不保证服务器在这种情况下收到记录,重试的配置不会生效(因为客户端通常不知道任何故障)。为每个记录的偏移量始终设置为 -1。这意味着领导会将记录写入本地日志,但不会等待所有后续者的确认。在这种情况下,领导会在确认记录后立即失败,但在后续者复制前它会丢失。acks=all 表示领导将等待完整的 in-sync 副本集确认记录。这样可保证,只要至少有一个同步副本保持活跃状态,就不会丢失记录。这是最强的保证。

Enum 值:

  • -1
  • 0
  • 1
  • all

1

字符串

requestTimeoutMs (producer)

在将错误发送到客户端前,代理将等待尝试满足 request.required.acks 要求的时间。

30000

整数

retries (producer)

设置大于零的值将导致客户端重新发送发送失败的任何记录,并显示潜在的临时错误。请注意,这个重试与客户端在收到错误时重新处理记录不同。允许重试可能会更改记录顺序,因为如果两个记录发送到单个分区,第一个失败且被重试,但第二个成功,则可能会首先出现第二个记录。

0

整数

retryBackoffMs (producer)

每次重试前,制作者会刷新相关主题的元数据,以查看是否已选择新的领导。由于领导选举机制需要一些时间,此属性指定制作者在刷新元数据前等待的时间。

100

整数

sendBufferBytes (producer)

套接字写入缓冲区大小。

131072

整数

valueSerializer (producer)

消息的序列化类。

org.apache.kafka.common.serialization.StringSerializer

字符串

workerpool (producer)

要在 kafka 服务器确认使用异步非阻塞处理从 KafkaProducer 发送的消息后,使用自定义 worker 池继续路由交换。如果使用这个选项,则必须处理线程池的生命周期,以便在不再需要时关闭池。

 

ExecutorService

workerPoolCoreSize (producer)

kafka 服务器后用于继续路由交换的 worker 池的核心线程数量确认使用异步非阻塞处理从 KafkaProducer 发送的消息。

10

整数

workerPoolMaxSize (producer)

kafka 服务器后,用于继续路由交换的 worker 池的最大线程数量确认使用异步非阻塞处理从 KafkaProducer 发送的消息。

20

整数

autowiredEnabled (advanced)

是否启用自动关闭。这用于自动关闭选项(选项必须标记为 autowired),方法是在 registry 中查找查找是否有单个匹配类型实例,然后在组件上配置。这可以用于自动配置 JDBC 数据源、JMS 连接工厂、AWS 客户端等。

true

布尔值

kafkaClientFactory (advanced)

Autowired Factory 用于创建 org.apache.kafka.clients.consumer.KafkaConsumer 和 org.apache.kafka.clients.producer.KafkaProducer 实例。这允许配置自定义工厂,以使用扩展 vanilla Kafka 客户端的逻辑创建实例。

 

KafkaClientFactory

Sync (advanced)

设置是否应严格使用同步处理。

false

布尔值

schemaRegistryURL (confluent)

要使用的 Confluent Platform 模式 registry 服务器的 URL。格式为 host1:port1,host2:port2。这在 Confluent Platform 文档中称为 schema.registry.url。这个选项仅适用于 Confluent Platform (不适用于标准 Apache Kafka)。

 

字符串

interceptorClasses (monitoring)

为制作者或消费者设置拦截器。制作者拦截器必须是实施 org.apache.kafka.clients.producer.ProducerInterceptor Consumer interceptors 的类,则需要类实施 org.apache.kafka.clients.consumer.ConsumerInterceptor 请注意,如果您在消费者上使用 Producer 拦截器,它将在运行时抛出类多播异常。

 

字符串

kerberosBeforeReloginMinTime (security)

在刷新尝试之间登录线程睡眠时间。

60000

整数

kerberosInitCmd (security)

Kerberos kinit 命令路径。默认为 /usr/bin/kinit。

/usr/bin/kinit

字符串

kerberosPrincipalToLocalRules (security)

从主体名称映射到短名称(通常是操作系统用户名)的规则列表。规则按顺序评估,第一个匹配主体名称的规则被用来将其映射到短名称。列表中的任何后续规则都会被忽略。默认情况下,形式为 {username}/{hostname}{REALM} 的主体名称映射到 {username}。有关格式的详情,请查看安全授权和 acls 文档。可以使用逗号分隔多个值。

DEFAULT

字符串

kerberosRenewJitter (security)

添加到续订时间的随机 jitter 百分比。

0.05

�

kerberosRenewWindowFactor (security)

登录线程将休眠,直到达到最后刷新到票据的过期时间的窗口因子,此时它将尝试续订票据。

0.8

�

saslJaasConfig (security)

公开 kafka sasl.jaas.config 参数示例: org.apache.kafka.common.security.plain.PlainLoginModule required username=USERNAME password=PASSWORD;。

 

字符串

saslKerberosServiceName (security)

Kafka 运行的 Kerberos 主体名称。这可以在 Kafka 的 JAAS 配置或 Kafka 配置中定义。

 

字符串

saslMechanism (security)

使用简单验证和安全层(SASL)机制。有关有效值,请参阅。

GSSAPI

字符串

securityProtocol (security)

用于与代理通信的协议。支持 SASL_PLAINTEXT, PLAINTEXT 和 SSL。

明文

字符串

sslCipherSuites (security)

密码套件列表。这是用于使用 TLS 或 SSL 网络协议协商网络连接的安全设置的身份验证、加密、MAC 和密钥交换算法的命名组合。支持所有可用的密码套件。

 

字符串

sslContextParameters (security)

使用 Camel SSLContextParameters 对象的 SSL 配置。如果配置了,则在其他 SSL 端点参数之前应用它。注意: Kafka 只支持从文件位置加载密钥存储,因此在 KeyStoreParameters.resource 选项中使用 file: 前缀。

 

SSLContextParameters

sslEnabledProtocols (security)

为 SSL 连接启用的协议列表。TLSv1.2、TLSv1.1 和 TLSv1 会被默认启用。

 

字符串

sslEndpointAlgorithm (security)

端点识别算法,使用服务器证书验证服务器主机名。

https

字符串

sslKeymanagerAlgorithm (security)

SSL 连接的密钥管理器工厂使用的算法。默认值为为 Java 虚拟机配置的密钥管理器工厂算法。

SunX509

字符串

sslKeyPassword (security)

密钥存储文件中私钥的密码。对于客户端,这是可选的。

 

字符串

sslKeystoreLocation (security)

密钥存储文件的位置。这对客户端是可选的,可用于客户端的双向身份验证。

 

字符串

sslKeystorePassword (security)

密钥存储文件的存储密码。这对客户端是可选的,且仅在配置了 ssl.keystore.location 时才需要。

 

字符串

sslKeystoreType (security)

密钥存储文件的文件格式。对于客户端,这是可选的。默认值为 JKS。

JKS

字符串

SSLProtocol ( security)

用于生成 SSLContext 的 SSL 协议。默认设置为 TLS,对于大多数情况来说是理想的选择。最近的 JVM 中允许的值是 TLS、TLSv1.1 和 TLSv1.2。SSL、SSLv2 和 SSLv3 可能在较旧的 JVM 中被支持,但由于已知的安全漏洞,不建议使用它们的使用。

 

字符串

sslProvider (security)

用于 SSL 连接的安全提供程序的名称。默认值为 JVM 的默认安全提供程序。

 

字符串

sslTrustmanagerAlgorithm (security)

信任管理器工厂用于 SSL 连接的算法。默认值为为 Java 虚拟机配置的信任管理器工厂算法。

PKIX

字符串

sslTruststoreLocation (security)

信任存储文件的位置。

 

字符串

sslTruststorePassword (security)

信任存储文件的密码。

 

字符串

sslTruststoreType (security)

信任存储文件的文件格式。默认值为 JKS。

JKS

字符串

useGlobalSslContextParameters (security)

启用使用全局 SSL 上下文参数。

false

布尔值

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.