27.4.2. 查询参数(102 参数)


Expand
Name描述默认类型

additionalProperties (通用)

如果无法直接在 camel 配置上设置 kafka consumer 或 kafka producer 的额外属性(例如:未反映在 Camel 配置中的新 Kafka 属性),则属性必须使用 additionalProperties 作为前缀。e.g: additionalProperties.transactional.id=12345&additionalProperties.schema.registry.url=http://localhost:8811/avro。

 

map

代理 (通用)

要使用的 Kafka 代理的 URL。格式为 host1:port1,host2:port2,列表可以是代理子集或指向代理子集的 VIP。这个选项在 Kafka 文档中称为 bootstrap.servers。

 

字符串

clientID ( 通用)

客户端 ID 是在每次请求中发送的用户指定的字符串,以帮助跟踪调用。它应该以逻辑方式识别发出请求的应用程序。

 

字符串

headerFilterStrategy (common)

使用自定义 HeaderFilterStrategy 过滤来自 Camel 消息的标头。

 

HeaderFilterStrategy

reconnectBackoffMaxMs (通用)

重新连接到重复连接失败的代理时要等待的最大时间,以毫秒为单位。如果提供,每个主机的 backoff 将根据连续连接失败增加每个连续连接失败,最多增加这个最大值。在计算后,添加 20% 的随机 jitter 以避免连接停滞。

1000

整数

shutdownTimeout (common)

以毫秒为单位的超时,以正常等待使用者或制作者关闭和终止其 worker 线程。

30000

int

allowManualCommit (consumer)

是否允许通过 KafkaManualCommit 进行手动提交。如果启用了这个选项,则 KafkaManualCommit 实例存储在 Exchange message 标头上,这允许最终用户访问这个 API,并通过 Kafka consumer 执行手动偏移提交。

false

布尔值

autoCommitEnable (consumer)

如果为 true,请定期提交 ZooKeeper 被消费者获取的消息偏移。当进程失败时,将使用提交的偏移量,作为新消费者开始的位置。

true

布尔值

autoCommitIntervalMs (consumer)

消费者偏移量的频率被提交到 zookeeper。

5000

整数

autoCommitOnStop (consumer)

当消费者停止时,是否执行显式自动提交,以确保代理有来自上一次消耗的消息的提交。这需要打开 autoCommitEnable 选项。可能的值有:sync、sync 或 none。sync 是默认值。

枚举值:

  • sync
  • async
  • none

sync

字符串

autoOffsetReset (consumer)

ZooKeeper 中没有初始偏移时怎么办,或者有误差范围:最早的 : 自动将偏移重置为最早的偏移:自动将偏移重置为最新偏移失败:对使用者抛出异常。

枚举值:

  • latest
  • 最早
  • none

latest

字符串

breakOnFirstError (consumer)

该选项控制在消费者处理交换时所发生的情况,它会失败。如果 选项为 false,则使用者将继续进行下一个消息并进行处理。如果 选项为 true,则使用者将中断,并且会重新查找导致失败的消息偏移,然后重新尝试处理此消息。但是,如果其绑定每次都失败,则可能导致完全无法处理同一消息,例如投毒消息。因此,建议通过使用 Camel 的错误处理程序来处理该示例。

false

布尔值

bridgeErrorHandler (consumer)

允许将消费者桥接到 Camel 路由 Error Handler,这意味着使用者试图获取传入消息或类似信息时出现任何异常,现在将作为一个消息进行处理,并由路由 Error Handler 处理。默认情况下,使用者将使用 org.apache.camel.spi.Exception 处理程序处理异常,该处理程序将记录在 WARN 或 ERROR 级别,并忽略。

false

布尔值

checkCrcs (consumer)

自动检查消耗的记录的 CRC32。这可确保没有发生消息的在线或磁盘损坏。这个检查会增加一些开销,因此在寻求极端性能的情况下可能会禁用它。

true

布尔值

commitTimeoutMs (consumer)

代码将等待同步提交完成的最长时间,以毫秒为单位。

5000

Long

consumerRequestTimeoutMs (consumer)

配置控制客户端等待请求响应的最长时间。如果在超时前未收到响应,如果请求用尽,则客户端将重新发送请求,或者请求失败。

40000

整数

consumersCount (消费者)

连接到 kafka 服务器的消费者数量。每个使用者都在单独的线程上运行,用于检索和处理传入的数据。

1

int

fetchMaxBytes (consumer)

如果 get 的第一个非空分区中的第一条大于这个值大于这个值,则服务器应返回最多的数据量应返回这个值,以确保使用者可以进行进展。代理接受的最大消息大小通过 message.max.bytes (broker config)或 max.message.bytes (主题配置)来定义。请注意,使用者执行多个并行获取。

52428800

整数

fetchMinBytes (consumer)

服务器应返回获取请求的最小数据量。如果没有足够的数据可用,请求会等待该数据在回答请求前累积累积。

1

整数

fetchWaitMaxMs (consumer)

如果没有足够的数据立即满足 fetch.min.bytes,则服务器将在应答获取请求前阻止的最大时间。

500

整数

groupId (consumer)

唯一标识此消费者所属消费者的消费者进程组的字符串。通过设置同一个组 id 多个进程,表示它们都是同一消费者组的所有部分。消费者需要此选项。

 

字符串

groupInstanceId (consumer)

最终用户提供的使用者实例的唯一标识符。只允许非空字符串。如果设置,使用者将被视为静态成员,这意味着任何时候只允许具有此 ID 中的一个实例。这可与较大的会话超时结合使用,以避免临时不可用造成的组重新平衡(如进程重启)。如果没有设置,使用者将作为动态成员加入组,即传统行为。

 

字符串

headerDeserializer (consumer)

使用自定义 KafkaHeaderDeserializer 来反序列化 kafka 标头值。

 

KafkaHeaderDeserializer

heartbeatIntervalM (consumer)

在使用 Kafka 的组管理设施时,心跳到消费者协调器之间预期的时间。心跳用于确保消费者的会话保持活跃,并在新消费者加入或离开组时促进重新平衡。该值必须设置小于 session.timeout.ms,但通常不应设置超过该值的 1/3。甚至可以调整它,以控制正常重新平衡的预期时间。

3000

整数

keyDeserializer (消费者)

用于实现 Deserializer 接口的密钥的 Deserializer 类。

org.apache.kafka.common.serialization.StringDeserializer

字符串

maxPartitionFetchBytes (consumer)

服务器将返回每个分区的最大数据量。用于请求的最大内存总量为 #partitions max.partition.fetch.bytes。这种大小必须至少与服务器允许的最大消息大小相同,或者生产者能够发送大于消费者可获取的消息。如果出现这种情况,消费者会卡住在特定分区上获取大量信息。

1048576

整数

maxPollIntervalM (使用者)

在使用消费者组管理时,poll ()的调用之间的最大延迟。这会在获取更多记录前处于闲置时间的上限。如果在这个超时过期前不调用 poll (),则消费者被视为失败,并且组将重新平衡,从而将分区重新分配给其他成员。

 

Long

maxPollRecords (consumer)

单个调用中返回的最大记录数,用于 poll ()。

500

整数

offsetRepository (consumer)

要使用的偏移存储库,在本地存储每个主题分区的误差。定义一个将禁用 autocommit。

 

StateRepository

partitionAssignor (consumer)

使用组管理时,客户端将使用的分区分配策略在原始消费者实例之间分发分区所有权。

org.apache.kafka.clients.consumer.RangeAssignor

字符串

pollOnError (consumer)

如果 kafka threw 在轮询新消息时出现异常,该怎么办。默认情况下,将使用组件配置中的值,除非端点级别上配置了显式值。DISCARD 将丢弃消息,并继续轮询下一个消息。ERROR_HANDLER 将使用 Camel 的错误处理程序来处理异常,之后继续轮询下一消息。RECONNECT 将重新连接使用者,并尝试对消息进行重新轮询,让使用者重新尝试对相同消息进行重试轮询同一消息(如果使用者应该能够再次使用消息,则需要手动启动/重新启动)。

枚举值:

  • DISCARD
  • ERROR_HANDLER
  • RECONNECT
  • RETRY
  • STOP

ERROR_HANDLER

PollOnError

pollTimeoutMs (consumer)

轮询 KafkaConsumer 时使用的超时。

5000

Long

resumeStrategy (consumer)

此选项允许用户设置自定义恢复策略。当分区被分配时(例如:进行连接或重新连接时),将执行恢复策略。它允许实施自定义如何恢复操作并充当 seekTo 和 offsetRepository 机制更灵活的替代方案。有关实现详情,请参阅 KafkaConsumerResumeStrategy。这个选项不会影响自动提交设置。使用此设置的实现可能还希望与手动提交选项一起评估。

 

KafkaConsumerResumeStrategy

seekTo (消费者)

设定如果 KafkaConsumer 将在启动时读取或结束:从开头读取:从结尾读取,这将替换掉前面的属性 seekToBeginning。

枚举值:

  • 开始
  • end
 

字符串

sessionTimeoutMs (consumer)

使用 Kafka 的组管理功能时检测故障的超时时间。

10000

整数

specificAvroReader (consumer)

这可让特定 Avro reader 与 Confluent Platform schema registry 和 io.confluent.kafka.serializers.KafkaAvroDeserializer 搭配使用。这个选项只在 Confluent Platform (不是标准 Apache Kafka)中可用。

false

布尔值

topicIsPattern (消费者)

主题是模式(正则表达式)。这可用于订阅与模式匹配的动态主题。

false

布尔值

valueDeserializer (consumer)

对实现 Deserializer 接口的值进行反序列化器类。

org.apache.kafka.common.serialization.StringDeserializer

字符串

exceptionHandler (消费者(高级)

要让使用者使用自定义 ExceptionHandler。请注意,如果启用了选项 bridgeErrorHandler,则不使用这个选项。默认情况下,消费者处理异常,这将在 WARN 或 ERROR 级别记录,并忽略。

 

ExceptionHandler

ExchangePattern (消费者(高级)

在使用者创建交换时设置交换模式。

枚举值:

  • InOnly
  • InOut
  • InOptionalOut
 

ExchangePattern

kafkaManualCommitFactory (consumer (advanced)

创建 KafkaManualCommit 实例使用的工厂。这样,在进行手动提交时,可以插入一个自定义 KafkaManualCommit 实例来创建自定义 KafkaManualCommit 实例。

 

KafkaManualCommitFactory

bufferMemorySize (producer)

制作者可用于缓冲记录的内存总量,等待发送到服务器。如果记录的发送速度快于服务器,则生产者不会根据 block.on.buffer.full 指定的首选项进行阻止或抛出异常。此设置应该大致对应于制作者将使用的总内存,而不是硬绑定,因为生产者不使用所有内存。一些额外的内存用于压缩(如果启用了压缩)以及维护航班请求。

33554432

整数

compressionCodec (producer)

此参数允许您指定此制作者生成的所有数据的压缩代码。有效值为 none、gzip 和 snappy。

枚举值:

  • none
  • gzip
  • snappy
  • lz4

none

字符串

connectionMaxIdleM (生产者)

在此配置指定的毫秒数后关闭闲置连接。

540000

整数

deliveryTimeoutMs (producer)

在调用 send ()返回后,对时间报告成功或失败的上限。这限制了在发送前记录的总时间、从代理到等待确认时间(如预期)以及允许重试发送失败的时间。

120000

整数

enableIdempotence (producer)

如果设置为 'true',则制作者将确保流中写入每条消息的一个副本。如果 'false',则生产者重试可能会在流中写入重试消息的副本。如果设置为 true,这个选项需要 max.in.flight.requests.per.connection 设置为 1,且重试无法为零,另外的攻击必须设置为 'all'。

false

布尔值

headerSerializer (producer)

使用自定义 KafkaHeaderSerializer 序列化 kafka 标头值。

 

KafkaHeaderSerializer

密钥 (生成器)

记录密钥(如果没有指定密钥,则为 null)。如果配置了这个选项,则优先于标头 KafkaConstants#KEY。

 

字符串

keySerializer (producer)

密钥的 serializer 类(如果未指定任何信息,默认为与消息相同)。

org.apache.kafka.common.serialization.StringSerializer

字符串

lazyStartProducer (producer)

制作者是否应启动 lazy (在第一条消息中)。通过开始 lazy,您可以使用此方法来允许 CamelContext 和路由在制作者无法启动时启动失败,并导致路由无法启动。通过将此启动推迟为 lazy,则启动失败可以通过 Camel 的路由错误处理程序在路由消息期间处理。请注意,当处理第一条消息时,创建和启动制作者可能需要花费一些时间,延长处理总处理时间。

false

布尔值

lingerM (生成器)

制作者将请求传输到单一批处理请求之间的任何记录分组在一起。通常,仅当记录到达的速度比发送的速度快时,这才会发生。但是在某些情形中,客户端可能也会减少请求的数量,即使在中等负载下也是如此。此设置通过添加少量的延迟来实现此目的,而不是立即发送该记录,以便生产者最多等待给定延迟发送其他记录,从而使发送其他记录可以一起批量。这可能被视为与在 TCP 中 Nagle 的算法类似。此设置提供批量延迟的上限:一旦我们获得批次,每个分区的相应记录会立即发送,而无论此设置如何,我们将立即发送超过此分区的字节数,那么我们将会"linger' 以便等待更多记录显示。此设置默认为 0 (即没有延迟)。例如,设置 linger.ms=5 将对发送的请求数量产生影响,但会最多为加载加载时发送的记录添加 5ms 的延迟。

0

整数

maxBlockMs (生成器)

配置控制 kafka 发送到 kafka 的时长。由于多个原因,这些方法可以被阻断。例如: buffer full,metadata 不可用。这种配置会对获取元数据的总时间、键和值序列化、在执行 send ()时对缓冲区内存进行分区和分配进行最大的限制。对于 partitionsFor (),此配置会在等待元数据时实施最长时间阈值。

60000

整数

maxInFlightRequest (producer)

在阻断前,客户端将在单一连接上发送的最大未确认请求数。请注意,如果此设置设定为大于 1 且存在失败发送,则因为重试导致消息重新排序的风险(例如,如果启用了重试)。

5

整数

maxRequestSize (producer)

请求的最大大小。这也是最高记录大小的上限。请注意,该服务器具有自己的记录大小,这可能与此大小不同。此设置将限制制作者将在单一请求中发送的记录数量,以避免发送大量请求。

1048576

整数

metadataMaxAgeM (生产者)

在毫秒内,我们强制刷新元数据,即使我们没有看到任何分区领导更改来主动发现任何新的代理或分区。

300000

整数

metricReporters (生成的器)

用作指标报告者的类列表。实施MetricReporter 接口允许在创建新指标的类中插入内容。JmxReporter 始终包含在内来注册 JMX 统计数据。

 

字符串

metricsSampleWindowMs (producer)

为计算指标保留的样本数量。

30000

整数

noOfMetricsSample (producer)

为计算指标保留的样本数量。

2

整数

分区程序 (生成器)

partitioner 类,用于对st 子主题之间的消息进行分区。默认分区器基于密钥的哈希。

org.apache.kafka.clients.producer.internals.DefaultPartitioner

字符串

partitionKey (producer)

记录将被发送的分区(如果没有指定分区,则为 null)。如果配置了这个选项,则优先于标头 KafkaConstants#PARTITION_KEY。

 

整数

producerBatchSize (producer)

每当多个记录被发送到同一分区时,生产者将尝试将记录集中到更少的请求。这有助于在客户端和服务器中性能。此配置以字节为单位控制默认批处理大小。将不尝试批量记录,而不是发送到代理的批处理。请求将包含多个批处理,每个分区都有多个批处理,每个分区都可用于发送数据。一个小批处理大小将降低批处理,并可以降低吞吐量(零批处理大小完全将完全禁用批处理)。非常大的批量大小可能会更加严重地使用内存,因为我们将始终在指定批处理大小中分配指定批处理大小的缓冲区。

16384

整数

queueBufferingMaxMessages (producer)

在使用 async 模式时可排队制作者的最大未处理消息数量,然后才能阻止制作者或必须丢弃数据。

10000

整数

receiveBufferBytes (producer)

TCP 接收缓冲区的大小(SO_RCVBUF),用于读取数据。

65536

整数

ReconnectBackoffMs (producer)

尝试重新连接给定主机前等待的时间。这可避免在紧张循环中重复连接到主机。这个 backoff 适用于由消费者发送到代理的所有请求。

50

整数

recordMetadata (producer)

制作者应该存储 RecordMetadata 结果是否应该发送到 Kafka。结果存储在包含 RecordMetadata 元数据的列表中。列表存储在带有键 KafkaConstants#KAFKA_RECORDMETA 的标头中。

true

布尔值

requestRequiredAcks (producer)

生产者数量要求领导人在考虑请求完成前收到。这会控制发送的记录的持久性。以下设置是 common: acks=0 如果设为零,则制作者不会等待全部来自服务器的任何致谢。记录将立即添加到套接字缓冲区并被视为发送。在这种情况下,无法保证服务器收到记录,并且重试配置不会生效(因为客户端通常不会知道任何故障)。每个记录给出的误差将始终设置为 -1. acks=1 意味着领导机会将记录写入其本地日志,但是不会对所有后续者等待完全确认。在这种情况下,领导机应该立即在记录后立即失败,但后续人已复制记录将会丢失。acks=all 意味着领导机将等待完全同步副本组确认记录。这样可保证,只要至少有一个 in-sync 副本仍保持上线,则记录将不会丢失。这是可用最强的保证。

枚举值:

  • -1
  • 0
  • 1
  • all

1

字符串

requestTimeoutMs (producer)

在向客户端发送错误前,代理会等待满足 request.required.acks 要求的时间。

30000

整数

重试 (重现)

设置大于零的值将导致客户端重新发送发送失败并显示可能瞬态错误的任何记录。请注意,这个重试与客户端在收到错误时重新输入记录不同。允许重试可能会更改记录顺序,因为如果将两个记录发送到单个分区,则第一个记录失败并重试,但第二个记录可能会首先出现。

0

整数

retryBackoffMs (producer)

在重试前,生产者都会刷新相关主题的元数据,以查看是否已选定新的领导人。由于领导选举需要一定的时间,因此此属性指定制作者在刷新元数据前等待的时间。

100

整数

sendBufferBytes (producer)

套接字写入缓冲区的大小。

131072

整数

valueSerializer (producer)

消息的 serializer 类。

org.apache.kafka.common.serialization.StringSerializer

字符串

workerPool (producer)

使用自定义 worker 池在 kafka 服务器后继续路由 Exchange,已确认通过异步非阻塞处理从 KafkaProducer 发送到它的消息。如果使用这个选项,则必须处理线程池的生命周期,以便在需要时关闭池。

 

ExecutorService

workerPoolCoreSize (producer)

kafka 服务器后用于继续路由 Exchange 的 worker 池的核心线程数量,确认已使用异步非阻塞处理从 KafkaProducer 发送到它的消息。

10

整数

workerPoolMaxSize (producer)

kafka 服务器后用于继续路由 Exchange 的 worker 池的最大线程数量已被确认使用异步非阻塞处理从 KafkaProducer 发送到它的消息。

20

整数

kafkaClientFactory (高级)

工厂用于创建 org.apache.kafka.clients.consumer.KafkaConsumer 和 org.apache.kafka.clients.producer.KafkaProducer 实例。这允许配置自定义工厂来创建具有扩展 vanilla Kafka 客户端的逻辑的实例。

 

KafkaClientFactory

同步 (高级)

设置同步处理是否应当严格使用。

false

布尔值

schemaRegistryURL (confluent)

要使用的 Confluent Platform schema registry 服务器的 URL。格式为 host1:port1,host2:port2。这被称为 Confluent Platform 文档中的 schema.registry.url。这个选项只在 Confluent Platform (不是标准 Apache Kafka)中可用。

 

字符串

拦截器( monitoring)

为制作者或消费者设置拦截器。制作者拦截器必须实施 org.apache.kafka.clients.producer.ProducerInterceptor interceptors 类,必须实施 org.apache.kafka.clients.consumer.ConsumerInterceptor 注意,如果您在使用者上使用了 Producer 拦截器,它将在运行时引发一个类广播异常。

 

字符串

kerberosBeforeReloginMinTime (security)

刷新尝试之间的登录线程睡眠时间。

60000

整数

kerberosInitCmd (security)

Kerberos kinit 命令路径.默认为 /usr/bin/kinit。

/usr/bin/kinit

字符串

kerberosPrincipalToLocalRules (security)

用于从主体名称映射到短名称的规则列表(通常是操作系统用户名)。规则按顺序进行评估,第一个与主体名称匹配的规则用于将它映射到短名称。之后的任何规则都将被忽略。默认情况下,格式为 {username}/{hostname}{REALM} 的主体名称映射到 {username}。有关格式的详情,请查看安全授权和 acl 文档。可以使用逗号分隔多个值。

DEFAULT

字符串

kerberosRenewJitter (security)

添加至续订时间的随机危害百分比。

0.05

kerberosRenewWindowFactor (security)

登录线程会休眠,直到指定的窗口因时间从上一次刷新到 ticket 的到期时间才会处于睡眠状态,此时它将尝试更新票据。

0.8

saslJaasConfig (security)

公开 kafka sasl.jaas.config 参数示例: org.apache.kafka.common.security.plain.PlainLoginModule required username=USERNAME password=PASSWORD;。

 

字符串

saslKerberosServiceName (security)

Kafka 运行的 Kerberos 主体名称。这可以在 Kafka 的 JAAS 配置或 Kafka 的配置中定义。

 

字符串

saslMechanism (security)

使用简单验证和安全层(SASL)机制。有关有效值,请参阅。

GSSAPI

字符串

securityProtocol (security)

用于与代理通信的协议。SASL_PLAINTEXT,支持 PLAINTEXT 和 SSL。

PLAINTEXT

字符串

sslCipherSuites (security)

密码套件列表。这是身份验证、加密、MAC 和密钥交换算法的命名组合,用于使用 TLS 或 SSL 网络协议来协商网络连接的安全设置。By 默认为所有可用的加密套件。

 

字符串

sslContextParameters (安全)

使用 Camel SSLContextParameters 对象的 SSL 配置。如果在其他 SSL 端点参数之前应用了它。注意: Kafka 仅支持从文件位置加载密钥存储,因此,在 KeyStoreParameters.resource 选项中以 file: 为位置加上前缀。

 

SSLContextParameters

sslEnabledProtocols (安全)

为 SSL 连接启用的协议列表。TLSv1.2、TLSv1.1 和 TLSv1 默认启用。

 

字符串

sslEndpointAlgorithm (security)

使用服务器证书验证服务器主机名的端点标识算法。

https

字符串

sslKeymanagerAlgorithm (安全)

密钥管理器工厂用于 SSL 连接的算法。默认值是 Java 虚拟机配置的主要管理器工厂算法。

SunX509

字符串

sslKeyPassword (security)

密钥存储文件中私钥的密码。对于客户端,这是可选的。

 

字符串

sslKeystoreLocation (security)

密钥存储文件的位置。这对于客户端来说是可选的,可用于客户端的双向身份验证。

 

字符串

sslKeystorePassword (security)

密钥存储文件的存储密码。对于客户端是可选的,只有配置了 ssl.keystore.location 时才需要。

 

字符串

sslKeystoreType (security)

密钥存储文件的文件格式。对于客户端,这是可选的。默认值为 JKS。

JKS

字符串

sslProtocol (安全)

用于生成 SSLContext 的 SSL 协议。默认设置为 TLS,这适用于大多数情况。最近的 JVM 中允许的值是 TLS、TLSv1.1 和 TLSv1.2。较旧 JVM 中可能会支持 SSLv2 和 SSLv3,但由于已知安全漏洞,不建议使用它们。

 

字符串

sslProvider (安全)

用于 SSL 连接的安全提供商的名称。默认值是 JVM 的默认安全提供程序。

 

字符串

sslTrustmanagerAlgorithm (security)

信任管理器工厂用于 SSL 连接的算法。默认值是 Java 虚拟机配置的信任管理器工厂算法。

PKIX

字符串

sslTruststoreLocation (security)

信任存储文件的位置。

 

字符串

sslTruststorePassword (security)

信任存储文件的密码。

 

字符串

sslTruststoreType (security)

信任存储文件的文件格式。默认值为 JKS。

JKS

字符串

有关 Producer/Consumer 配置的更多信息,请参阅:

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat