190.2.2. 查询参数(93 参数):


Expand
名称描述默认类型

brokers (common)

要使用的 Kafka 代理的 URL。格式为 host1:port1,host2:port2,列表可以是代理子集或指向代理子集的 VIP。这个选项在 Kafka 文档中称为 bootstrap.servers。

 

字符串

clientId (common)

客户端 ID 是在每次请求中发送的用户指定的字符串,以帮助跟踪调用。它应该以逻辑方式识别发出请求的应用程序。

 

字符串

headerFilterStrategy (common)

使用自定义 HeaderFilterStrategy 过滤来自 Camel 消息的标头。

 

HeaderFilterStrategy

reconnectBackoffMaxMs (common)

重新连接到重复连接失败的代理时要等待的最大时间,以毫秒为单位。如果提供,每个主机的 backoff 将根据连续连接失败增加每个连续连接失败,最多增加这个最大值。在计算后,添加 20% 的随机 jitter 以避免连接停滞。

1000

整数

allowManualCommit (consumer)

是否允许通过 KafkaManualCommit 进行手动提交。如果启用了这个选项,则 KafkaManualCommit 实例存储在 Exchange message 标头上,这允许最终用户访问这个 API,并通过 Kafka consumer 执行手动偏移提交。

false

布尔值

autoCommitEnable (consumer)

如果为 true,请定期提交 ZooKeeper 被消费者获取的消息偏移。当进程失败时,将使用提交的偏移量,作为新消费者开始的位置。

true

布尔值

autoCommitIntervalMs (consumer)

消费者偏移量的频率被提交到 zookeeper。

5000

整数

autoCommitOnStop (consumer)

当消费者停止时,是否执行显式自动提交,以确保代理有来自上一次消耗的消息的提交。这需要打开 autoCommitEnable 选项。可能的值有:sync、sync 或 none。sync 是默认值。

sync

字符串

autoOffsetReset (consumer)

ZooKeeper 中没有初始偏移时做什么:如果偏移偏移范围不足: 最早的 : 自动将偏移重置为最新的偏移:自动将偏移重置为最新偏移失败:将异常设置为使用者

latest

字符串

breakOnFirstError (consumer)

该选项控制在消费者处理交换时所发生的情况,它会失败。如果 选项为 false,则使用者将继续进行下一个消息并进行处理。如果 选项为 true,则使用者将中断,并且会重新查找导致失败的消息偏移,然后重新尝试处理此消息。但是,如果其绑定每次都失败,则可能导致完全无法处理同一消息,例如投毒消息。因此,建议通过使用 Camel 的错误处理程序来处理该示例。

false

布尔值

bridgeErrorHandler (consumer)

允许将消费者桥接到 Camel 路由 Error Handler,这意味着使用者试图获取传入消息或类似信息时出现任何异常,现在将作为一个消息进行处理,并由路由 Error Handler 处理。默认情况下,使用者将使用 org.apache.camel.spi.Exception 处理程序处理异常,该处理程序将记录在 WARN 或 ERROR 级别,并忽略。

false

布尔值

checkCrcs (consumer)

自动检查消耗的记录的 CRC32。这可确保没有发生消息的在线或磁盘损坏。这个检查会增加一些开销,因此在寻求极端性能的情况下可能会禁用它。

true

布尔值

consumerRequestTimeoutMs (consumer)

配置控制客户端等待请求响应的最长时间。如果在超时前未收到响应,如果请求用尽,则客户端将重新发送请求,或者请求失败。

40000

整数

consumersCount (consumer)

连接到 kafka 服务器的消费者数量

1

int

consumerStreams (consumer)

消费者上的并发用户数量

10

int

fetchMaxBytes (consumer)

如果 get 的第一个非空分区中的第一条大于这个值大于这个值,则服务器应返回最多的数据量应返回这个值,以确保使用者可以进行进展。代理接受的最大消息大小通过 message.max.bytes (broker config)或 max.message.bytes (主题配置)来定义。请注意,使用者执行多个并行获取。

52428800

整数

fetchMinBytes (consumer)

服务器应返回获取请求的最小数据量。如果没有足够的数据可用,请求会等待该数据在回答请求前累积累积。

1

整数

fetchWaitMaxMs (consumer)

如果没有足够的数据立即满足 fetch.min.bytes,则服务器将在应答获取请求前阻止的最大时间。

500

整数

groupId (consumer)

唯一标识此消费者所属消费者的消费者进程组的字符串。通过设置同一个组 id 多个进程,表示它们都是同一消费者组的所有部分。消费者需要此选项。

 

字符串

heartbeatIntervalMs (consumer)

在使用 Kafka 的组管理设施时,心跳到消费者协调器之间预期的时间。心跳用于确保消费者的会话保持活跃,并在新消费者加入或离开组时促进重新平衡。该值必须设置小于 session.timeout.ms,但通常不应设置超过该值的 1/3。甚至可以调整它,以控制正常重新平衡的预期时间。

3000

整数

kafkaHeaderDeserializer (consumer)

为 deserialization kafka 标头值设置自定义 KafkaHeaderDeserializer 以 camel headers 值。

 

KafkaHeaderDeserializer

keyDeserializer (consumer)

用于实现 Deserializer 接口的密钥的 Deserializer 类。

org.apache.kafka.common.serialization.StringDeserializer

字符串

maxPartitionFetchBytes (consumer)

服务器将返回每个分区的最大数据量。用于请求的最大内存总量为 #partitions max.partition.fetch.bytes。这种大小必须至少与服务器允许的最大消息大小相同,或者生产者能够发送大于消费者可获取的消息。如果出现这种情况,消费者会卡住在特定分区上获取大量信息。

1048576

整数

maxPollIntervalMs (consumer)

在使用消费者组管理时,poll ()的调用之间的最大延迟。这会在获取更多记录前处于闲置时间的上限。如果在这个超时过期前不调用 poll (),则消费者被视为失败,并且组将重新平衡,从而将分区重新分配给其他成员。

 

Long

maxPollRecords (consumer)

单个调用中返回的最大记录数,用于 poll ()

500

整数

offsetRepository (consumer)

要使用的偏移存储库,在本地存储每个主题分区的误差。定义一个将禁用 autocommit。

 

StateRepository

partitionAssignor (consumer)

使用组管理时,客户端将使用的分区分配策略在原始消费者实例之间分发分区所有权

org.apache.kafka.clients.consumer.RangeAssignor

字符串

pollTimeoutMs (consumer)

轮询 KafkaConsumer 时使用的超时。

5000

Long

seekTo (consumer)

设定如果 KafkaConsumer 将在启动时读取或结束:从开头读取:从结尾读取,这将替换掉前面的属性 seekToBeginning

 

字符串

sessionTimeoutMs (consumer)

使用 Kafka 的组管理功能时检测故障的超时时间。

10000

整数

topicIsPattern (consumer)

主题是模式(正则表达式)。这可用于订阅与模式匹配的动态主题。

false

布尔值

valueDeserializer (consumer)

对实现 Deserializer 接口的值进行反序列化器类。

org.apache.kafka.common.serialization.StringDeserializer

字符串

exceptionHandler (consumer)

要让使用者使用自定义 ExceptionHandler。请注意,如果启用了选项 bridgeErrorHandler,则不使用这个选项。默认情况下,消费者处理异常,这将在 WARN 或 ERROR 级别记录,并忽略。

 

ExceptionHandler

exchangePattern (consumer)

在使用者创建交换时设置交换模式。

 

ExchangePattern

bridgeEndpoint (producer)

如果选项为 true,则 KafkaProducer 将忽略入站消息的 KafkaConstants.TOPIC 标头设置。

false

布尔值

bufferMemorySize (producer)

制作者可用于缓冲记录的内存总量,等待发送到服务器。如果记录的发送速度快于服务器,则生产者不会根据 block.on.buffer.full 指定的首选项进行阻止或抛出异常。此设置应该大致对应于制作者将使用的总内存,而不是硬绑定,因为生产者不使用所有内存。一些额外的内存用于压缩(如果启用了压缩)以及维护航班请求。

33554432

整数

circularTopicDetection (producer)

如果 选项为 true,则 KafkaProducer 将检测到消息是否试图将消息发送到它所来自的同一主题(如果从 kafka consumer 原始)。如果 KafkaConstants.TOPIC 标头与原始 kafka consumer 主题相同,则忽略标头设置,并使用制作者端点的主题。换句话说,这可避免将相同的消息发送回来自哪里。如果选项 bridgeEndpoint 设为 true,则使用这个选项。

true

布尔值

compressionCodec (producer)

此参数允许您指定此制作者生成的所有数据的压缩代码。有效值为 none、gzip 和 snappy。

none

字符串

connectionMaxIdleMs (producer)

在此配置指定的毫秒数后关闭闲置连接。

540000

整数

enableIdempotence (producer)

如果设置为 'true',则制作者将确保流中写入每条消息的一个副本。如果 'false',则生产者重试可能会在流中写入重试消息的副本。如果设置为 true,这个选项需要 max.in.flight.requests.per.connection 设置为 1,且重试无法为零,另外的攻击必须设置为 'all'。

false

布尔值

kafkaHeaderSerializer (producer)

为序列化 camel 标头值设置自定义 KafkaHeaderDeserializer 到 kafka 标头值。

 

KafkaHeaderSerializer

key (producer)

记录密钥(如果没有指定密钥,则为 null)。如果配置了这个选项,则优先于标头 KafkaConstants#KEY

 

字符串

keySerializerClass (producer)

密钥的 serializer 类(如果未指定任何信息,默认为与消息相同)。

org.apache.kafka.common.serialization.StringSerializer

字符串

lingerMs (producer)

制作者将请求传输到单一批处理请求之间的任何记录分组在一起。通常,仅当记录到达的速度比发送的速度快时,这才会发生。但是在某些情形中,客户端可能也会减少请求的数量,即使在中等负载下也是如此。此设置通过添加少量延迟来达到此目的,而不是立即发送记录,以便生产者最多等待给定延迟发送其他记录,从而使发送其他记录可以合并在一起。这可能被视为与在 TCP 中 Nagle 的算法类似。此设置提供批量延迟的上限:一旦我们获得批次,每个分区的相应记录会立即发送,而无论此设置如何,我们将立即发送超过此分区的字节数,那么我们将会"linger' 以便等待更多记录显示。此设置默认为 0 (即没有延迟)。例如,设置 linger.ms=5 将对发送的请求数量产生影响,但会最多为加载加载时发送的记录添加 5ms 的延迟。

0

整数

maxBlockMs (producer)

配置控制 kafka 发送到 kafka 的时长。由于多个原因,这些方法可以被阻断。例如: buffer full,metadata 不可用。这种配置会对获取元数据的总时间、键和值序列化、在执行 send ()时对缓冲区内存进行分区和分配进行最大的限制。对于 partitionsFor (),此配置会在等待元数据时实施最长时间阈值

60000

整数

maxInFlightRequest (producer)

在阻断前,客户端将在单一连接上发送的最大未确认请求数。请注意,如果此设置设定为大于 1 且存在失败发送,则因为重试导致消息重新排序的风险(例如,如果启用了重试)。

5

整数

maxRequestSize (producer)

请求的最大大小。这也是最高记录大小的上限。请注意,该服务器具有自己的记录大小,这可能与此大小不同。此设置将限制制作者将在单一请求中发送的记录数量,以避免发送大量请求。

1048576

整数

metadataMaxAgeMs (producer)

在毫秒内,我们强制刷新元数据,即使我们没有看到任何分区领导更改来主动发现任何新的代理或分区。

300000

整数

metricReporters (producer)

用作指标报告者的类列表。实施MetricReporter 接口允许在创建新指标的类中插入内容。JmxReporter 始终包含在内来注册 JMX 统计数据。

 

字符串

metricsSampleWindowMs (producer)

为计算指标保留的样本数量。

30000

整数

noOfMetricsSample (producer)

为计算指标保留的样本数量。

2

整数

partitioner (producer)

partitioner 类,用于对st 子主题之间的消息进行分区。默认分区器基于密钥的哈希。

org.apache.kafka.clients.producer.internals.DefaultPartitioner

字符串

partitionKey (producer)

记录将被发送的分区(如果没有指定分区,则为 null)。如果配置了这个选项,则优先于标头 KafkaConstants#PARTITION_KEY

 

整数

producerBatchSize (producer)

每当多个记录被发送到同一分区时,生产者将尝试将记录集中到更少的请求。这有助于在客户端和服务器中性能。此配置以字节为单位控制默认批处理大小。将不尝试批量记录,而不是发送到代理的批处理。请求将包含多个批处理,每个分区都有多个批处理,每个分区都可用于发送数据。一个小批处理大小将降低批处理,并可以降低吞吐量(零批处理大小完全将完全禁用批处理)。非常大的批量大小可能会更加严重地使用内存,因为我们将始终在指定批处理大小中分配指定批处理大小的缓冲区。

16384

整数

queueBufferingMaxMessages (producer)

在使用 async 模式时可排队制作者的最大未处理消息数量,然后才能阻止制作者或必须丢弃数据。

10000

整数

receiveBufferBytes (producer)

TCP 接收缓冲区的大小(SO_RCVBUF),用于读取数据。

65536

整数

reconnectBackoffMs (producer)

尝试重新连接给定主机前等待的时间。这可避免在紧张循环中重复连接到主机。这个 backoff 适用于由消费者发送到代理的所有请求。

50

整数

recordMetadata (producer)

制作者应该存储 RecordMetadata 结果是否应该发送到 Kafka。结果存储在包含 RecordMetadata 元数据的列表中。列表存储在带有键 KafkaConstants#KAFKA_RECORDMETA 的标头中

true

布尔值

requestRequiredAcks (producer)

生产者数量要求领导人在考虑请求完成前收到。这会控制发送的记录的持久性。以下设置是 common: acks=0 如果设为零,则制作者不会等待全部来自服务器的任何致谢。记录将立即添加到套接字缓冲区并被视为发送。在这种情况下,无法保证服务器收到记录,并且重试配置不会生效(因为客户端通常不会知道任何故障)。每个记录给出的误差将始终设置为 -1. acks=1 意味着领导机会将记录写入其本地日志,但是不会对所有后续者等待完全确认。在这种情况下,领导机应该立即在记录后立即失败,但后续人已复制记录将会丢失。acks=all 意味着领导机将等待完全同步副本组确认记录。这样可保证,只要至少有一个 in-sync 副本仍保持上线,则记录将不会丢失。这是可用最强的保证。

1

字符串

requestTimeoutMs (producer)

在向客户端发送错误前,代理会等待满足 request.required.acks 要求的时间。

305000

整数

retries (producer)

设置大于零的值将导致客户端重新发送发送失败并显示可能瞬态错误的任何记录。请注意,这个重试与客户端在收到错误时重新输入记录不同。允许重试可能会更改记录顺序,因为如果将两个记录发送到单个分区,则第一个记录失败并重试,但第二个记录可能会首先出现。

0

整数

retryBackoffMs (producer)

在重试前,生产者都会刷新相关主题的元数据,以查看是否已选定新的领导人。由于领导选举需要一定的时间,因此此属性指定制作者在刷新元数据前等待的时间。

100

整数

sendBufferBytes (producer)

套接字写入缓冲区大小

131072

整数

serializerClass (producer)

消息的 serializer 类。

org.apache.kafka.common.serialization.StringSerializer

字符串

workerPool (producer)

使用自定义 worker 池在 kafka 服务器后继续路由 Exchange,已确认通过异步非阻塞处理从 KafkaProducer 发送到它的消息。

 

ExecutorService

workerPoolCoreSize (producer)

kafka 服务器后用于继续路由 Exchange 的 worker 池的核心线程数量,确认已使用异步非阻塞处理从 KafkaProducer 发送到它的消息。

10

整数

workerPoolMaxSize (producer)

kafka 服务器后用于继续路由 Exchange 的 worker 池的最大线程数量已被确认使用异步非阻塞处理从 KafkaProducer 发送到它的消息。

20

整数

同步 (高级)

设置同步处理是否应当严格使用,还是允许 Camel 使用异步处理(如果受支持)。

false

布尔值

interceptorClasses (monitoring)

为制作者或消费者设置拦截器。制作者拦截器必须是实施 org.apache.kafka.clients.producer.ProducerInterceptor 拦截器的类,必须实施 org.apache.kafka.clients.consumer.ConsumerInterceptors.consumer.ConsumerInterceptoror,它将在运行时引发一个类广播异常。

 

字符串

kerberosBeforeReloginMin Time (security)

刷新尝试之间的登录线程睡眠时间。

60000

整数

kerberosInitCmd (security)

Kerberos kinit 命令路径.默认为 /usr/bin/kinit

/usr/bin/kinit

字符串

kerberosPrincipalToLocal 规则 (安全性)

用于从主体名称映射到短名称的规则列表(通常是操作系统用户名)。规则按顺序进行评估,第一个与主体名称匹配的规则用于将它映射到短名称。之后的任何规则都将被忽略。默认情况下,格式为 username/hostnameREALM 的主体名称映射到 username。有关格式的详情,请查看安全授权和 acl。可以使用逗号分隔多个值

DEFAULT

字符串

kerberosRenewJitter (security)

添加至续订时间的随机危害百分比。

0.05

kerberosRenewWindowFactor (security)

登录线程会休眠,直到指定的窗口因时间从上一次刷新到 ticket 的到期时间才会处于睡眠状态,此时它将尝试更新票据。

0.8

saslJaasConfig (security)

公开 kafka sasl.jaas.config 参数示例: org.apache.kafka.common.security.plain.PlainLoginModule required username=USERNAME password=PASSWORD;

 

字符串

saslKerberosServiceName (security)

Kafka 运行的 Kerberos 主体名称。这可以在 Kafka 的 JAAS 配置或 Kafka 的配置中定义。

 

字符串

saslMechanism (security)

使用简单验证和安全层(SASL)机制。如需有效值,请参阅 http://www.iana.org/assignments/sasl-mechanisms/sasl-mechanisms.xhtml

GSSAPI

字符串

securityProtocol (security)

用于与代理通信的协议。目前只支持 PLAINTEXT 和 SSL。

PLAINTEXT

字符串

sslCipherSuites (security)

密码套件列表。这是身份验证、加密、MAC 和密钥交换算法的命名组合,用于使用 TLS 或 SSL 网络协议来协商网络连接的安全设置。By 默认为所有可用的加密套件。

 

字符串

sslContextParameters (security)

使用 Camel SSLContextParameters 对象的 SSL 配置。如果在其他 SSL 端点参数之前应用了它。

 

SSLContextParameters

sslEnabledProtocols (security)

为 SSL 连接启用的协议列表。TLSv1.2、TLSv1.1 和 TLSv1 默认启用。

TLSv1.2,TLSv1.1,TLSv1

字符串

sslEndpointAlgorithm (security)

使用服务器证书验证服务器主机名的端点标识算法。

 

字符串

sslKeymanagerAlgorithm (security)

密钥管理器工厂用于 SSL 连接的算法。默认值是 Java 虚拟机配置的主要管理器工厂算法。

SunX509

字符串

sslKeyPassword (security)

密钥存储文件中私钥的密码。对于客户端,这是可选的。

 

字符串

sslKeystoreLocation (security)

密钥存储文件的位置。这对于客户端来说是可选的,可用于客户端的双向身份验证。

 

字符串

sslKeystorePassword (security)

密钥存储文件的存储密码。对于客户端是可选的,只有配置了 ssl.keystore.location 时才需要。

 

字符串

sslKeystoreType (security)

密钥存储文件的文件格式。对于客户端,这是可选的。默认值为 JKS

JKS

字符串

sslProtocol (security)

用于生成 SSLContext 的 SSL 协议。默认设置为 TLS,这适用于大多数情况。最近的 JVM 中允许的值是 TLS、TLSv1.1 和 TLSv1.2。较旧 JVM 中可能会支持 SSLv2 和 SSLv3,但由于已知安全漏洞,不建议使用它们。

TLS

字符串

sslProvider (security)

用于 SSL 连接的安全提供商的名称。默认值是 JVM 的默认安全提供程序。

 

字符串

sslTrustmanagerAlgorithm (security)

信任管理器工厂用于 SSL 连接的算法。默认值是 Java 虚拟机配置的信任管理器工厂算法。

PKIX

字符串

sslTruststoreLocation (security)

信任存储文件的位置。

 

字符串

sslTruststorePassword (security)

信任存储文件的密码。

 

字符串

sslTruststoreType (security)

信任存储文件的文件格式。默认值为 JKS。

JKS

字符串

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat