26.4.2. 查询参数(102 参数)


Name描述默认类型

additionalProperties (common)

为 kafka consumer 或 kafka producer 设置额外的属性,以防它们无法直接在 camel 配置中设置(例如:尚未在 Camel 配置中反映的新 Kafka 属性),其属性必须带有 additionalProperies 前缀。e.g: additionalProperties.transactional.id=12345&additionalProperties.schema.url=http://localhost:8811/avro。

 

map

brokers (common)

要使用的 Kafka 代理的 URL。格式为 host1:port1,host2:port2,列表可以是代理的子集,也可以是指向代理子集的 VIP。这个选项称为 Kafka 文档中的 bootstrap.servers。

 

字符串

clientId (common)

客户端 ID 是在每个请求中发送的用户指定字符串,以帮助追踪调用。它应该以逻辑方式标识发出请求的应用程序。

 

字符串

headerFilterStrategy (common)

使用自定义 HeaderFilterStrategy 过滤到 Camel 消息的标头。

 

HeaderFilterStrategy

reconnectBackoffMaxMs (common)

重新连接到重复连接失败的代理时要等待的最大时间(毫秒)。如果提供,每个主机的 backoff 将为每个连续的连接失败指数增加,最高最高。在计算了 backoff 增长后,会添加 20% 的随机 jitter 来避免连接停滞。

1000

整数

shutdownTimeout (common)

毫秒的超时,以等待使用者或制作者正常地关闭并终止其 worker 线程。

30000

int

allowManualCommit (consumer)

是否允许通过 KafkaManualCommit 进行手动提交。如果启用了这个选项,则 KafkaManualCommit 实例存储在 Exchange 消息标头中,它允许最终用户通过 Kafka 使用者访问此 API 并执行手动偏移提交。

false

布尔值

autoCommitEnable (consumer)

如果为 true,请定期提交到由消费者获取的消息偏移。当进程作为新消费者开始的位置失败时,将使用这个提交的误差。

true

布尔值

autoCommitIntervalMs (consumer)

consumer 偏移量被提交到 zookeeper 的频率。

5000

整数

autoCommitOnStop (consumer)

在消费者停止时,是否执行明确的自动提交,以确保代理具有最近使用的消息的提交。这要求打开选项 autoCommitEnable。可能的值有: sync、sync 或 none。sync 是默认值。

枚举值:

  • sync
  • async
  • none

sync

字符串

autoOffsetReset (consumer)

当 ZooKeeper 中没有初始偏移,或者一个偏移范围不足时,应该怎么办:最早的 : 会自动将偏移重置为最早的偏移 latest:自动将偏移重置为最新误差失败:将异常分配给消费者。

枚举值:

  • latest
  • earliest
  • none

latest

字符串

breakOnFirstError (consumer)

该选项控制消费者处理交换时会发生的情况,而且它失败。如果 选项为 false,则使用者将继续显示下一个消息并进行处理。如果 选项为 true,则使用者中断,并将返回到消息偏移导致失败,然后重新尝试处理此消息。但是,如果每次绑定失败,这将导致无量处理相同的消息,例如:因此,建议您使用 Camel 错误处理程序来处理示例。

false

布尔值

bridgeErrorHandler (consumer)

允许将消费者桥接到 Camel 路由 Error Handler,这意味着在消费者尝试获取传入的消息时发生任何异常,或像这样一样处理,消息现在将被作为消息进行处理,并由路由 Error Handler 处理。默认情况下,使用者将使用 org.apache.camel.spi.ExceptionHandler 处理异常,该处理程序将记录在 WARN 或 ERROR 级别并忽略。

false

布尔值

checkCrcs (consumer)

自动检查所消耗的记录的 CRC32。这样可确保不会对消息发生任何有线或磁盘上的崩溃。这个检查增加了一些开销,因此在寻求性能时可能会禁用它。

true

布尔值

commitTimeoutMs (consumer)

代码将等待同步提交完成的最长时间(以毫秒为单位)。

5000

Long

consumerRequestTimeoutMs (consumer)

配置控制客户端等待请求响应的最大时间。如果在超时前没有收到响应,客户端会在需要时重新发送请求,或者在重试耗尽时失败请求。

40000

整数

consumersCount (consumer)

连接到 kafka 服务器的用户数量。每个消费者都在单独的线程中运行,该线程检索并处理传入数据。

1

int

fetchMaxBytes (consumer)

服务器应返回的最大数据量为获取请求,如果获取的第一个非空分区中的第一个消息大于这个值,则信息仍会返回,以确保使用者可以进行进度。代理接受的最大消息大小通过 message.max.bytes(broker config)或 max.message.bytes(topic 配置)定义。请注意,使用者并行执行多个获取。

52428800

整数

fetchMinBytes (consumer)

服务器应返回获取请求的最小数据量。如果数据不足,请求会等待大量数据在回答请求前等待累积数据。

1

整数

fetchWaitMaxMs (consumer)

如果没有足够的数据立即满足 fetch.min.bytes,则服务器在回答获取请求前将阻断的最大时间。

500

整数

groupId (consumer)

唯一标识此使用者所属的消费者流程组的字符串。通过设置同一组 id 多个进程,表示它们都是同一消费者组的所有部分。消费者需要这个选项。

 

字符串

groupInstanceId (consumer)

由最终用户提供的消费者实例的唯一标识符。只允许非空字符串。如果设置,使用者被视为静态成员,这意味着任何时候都只允许具有此 ID 的实例。这可与较大的会话超时结合使用,以避免出现临时不可用(如进程重启)导致的重新平衡问题。如果没有设置,消费者会将该组加入为动态成员,这是传统行为。

 

字符串

headerDeserializer (consumer)

使用自定义 KafkaHeaderDeserializer 来 deserialize kafka 标头值。

 

KafkaHeaderDeserializer

heartbeatIntervalMs (consumer)

使用 Kafka 的组群管理工具时,心跳到消费者协调器之间的预期时间。心跳用于确保消费者的会话保持活跃状态,并在新消费者加入或离开该组时促进重新平衡。该值必须小于 session.timeout.ms,但通常设置的值通常不应超过 1/3。它可以被调整,以便控制正常重新平衡的预期时间。

3000

整数

keyDeserializer (consumer)

为实现 Deserializer 接口的密钥的 Deserializer 类。

org.apache.kafka.common.serialization.StringDeserializer

字符串

maxPartitionFetchBytes (consumer)

服务器的每个分区中要返回的最大数据量。用于请求的最大总内存为 #partitions max.partition.fetch.bytes。这个大小必须至少是服务器允许的最大消息大小,或生产者可以发送大于消费者可以获取的消息。如果发生这种情况,消费者可能会卡住在某个分区上获取大量消息。

1048576

整数

maxPollIntervalMs (consumer)

使用消费者组管理时调用 poll()之间的最大延迟。这样一来,在获取更多记录前,可以闲置这个时间。如果在此超时的过期前没有调用 poll(),则消费者被视为失败,组将重新平衡,以将分区重新分配给另一个成员。

 

Long

maxPollRecords (consumer)

单个调用中返回到 poll()中的最大记录数。

500

整数

offsetRepository (consumer)

用于本地存储主题每个分区的偏移库。定义一个将禁用 autocommit。

 

StateRepository

partitionAssignor (consumer)

使用组管理时,客户端将使用分区分配策略在消费者实例之间分发分区所有权。

org.apache.kafka.clients.consumer.RangeAssignor

字符串

pollOnError (consumer)

如果 kafka 异常在轮询新消息时要做什么。除非已在端点级别上配置显式值,否则在默认情况下将使用组件配置中的值。DISCARD 将丢弃消息,并继续轮询下一个消息。ERROR_HANDLER 将使用 Camel 的错误处理程序来处理异常,之后再继续轮询下一个消息。RECONNECT 将重新连接消费者,然后尝试重新轮询消息 RETRY 将让消费者再次轮询相同的消息(如果消费者应该能再次使用消息,则必须手动启动/重新启动)。

枚举值:

  • DISCARD
  • ERROR_HANDLER
  • RECONNECT
  • RETRY
  • STOP

ERROR_HANDLER

PollOnError

pollTimeoutMs (consumer)

轮询 KafkaConsumer 时使用的超时。

5000

Long

resumeStrategy (consumer)

这个选项允许用户设置自定义恢复策略。当分区被分配(例如:连接或重新连接时)时会执行 resume 策略。它允许实施自定义如何恢复操作,并提供 seekTo 和 offsetRepository 机制更灵活的替代方案。有关实现详情,请参阅 KafkaConsumerResumeStrategy。此选项不会影响自动提交设置。使用此设置的实施很可能希望使用手动提交选项进行评估。

 

KafkaConsumerResumeStrategy

seekTo (consumer)

设置 KafkaConsumer 将从启动开始读取或结束:从一开始开始读取:从末尾开始阅读:从末尾开始,这正在替换之前属性 seekToBeginning。

枚举值:

  • 开始
  • end
 

字符串

sessionTimeoutMs (consumer)

在使用 Kafka 的组群管理工具时,用于检测失败的超时时间。

10000

整数

specificAvroReader (consumer)

这可让特定 Avro 读取器用于 Confluent Platform schema registry 和 io.confluent.kafka.serializers.KafkaAvroDeserializer。这个选项仅适用于 Confluent Platform(不是标准 Apache Kafka)。

false

布尔值

topicIsPattern (consumer)

主题是模式(正则表达式)。这可以用于订阅与模式匹配的主题数量。

false

布尔值

valueDeserializer (consumer)

实施 Deserializer 接口的值的 Deserializer 类。

org.apache.kafka.common.serialization.StringDeserializer

字符串

exceptionHandler (consumer (advanced))

要让消费者使用自定义 ExceptionHandler。请注意,如果选项 bridgeErrorHandler 已启用,则不会使用这个选项。默认情况下,消费者处理异常,该消费者在 WARN 或 ERROR 级别上记录并忽略。

 

ExceptionHandler

exchangePattern (consumer (advanced))

在消费者创建交换时设置交换模式。

枚举值:

  • InOnly
  • InOut
  • InOptionalOut
 

ExchangePattern

kafkaManualCommitFactory (consumer (advanced))

创建 KafkaManualCommit 实例所使用的因素。这样,在手动提交来自开箱即用的默认实现时,需要插入自定义 KafkaManualCommit 实例来创建一个自定义 KafkaManualCommit 实例。

 

KafkaManualCommitFactory

bufferMemorySize (producer)

制作者可用于将等待发送到服务器的内存总量为:如果记录的发送速度快于生产者将会被发送到服务器,或根据 block.on.buffer.full 指定的优先级而引发异常。此设置应当与制作者所使用的总内存对应,但不是硬绑定,因为生产者所使用的所有内存都不会被使用。有些额外的内存将用于压缩(如果启用压缩),以及维护动态请求。

33554432

整数

compressionCodec (producer)

此参数允许您为这个制作者生成的所有数据指定压缩代码。有效值为 none、gzip 和 snappy。

枚举值:

  • none
  • gzip
  • snappy
  • lz4

none

字符串

connectionMaxIdleMs (producer)

在此配置指定的毫秒数后关闭闲置连接。

540000

整数

deliveryTimeoutMs (producer)

在调用 send()返回后报告成功或失败的时间上有上限。这会限制在发送前会延迟记录的总时间、从代理等待确认的时间(如果预期),以及可重试发送失败的时间。

120000

整数

enableIdempotence (producer)

如果设置为 'true',则制作者将确保在流中写入每个消息的一个副本。如果 'false',则制作者重试可能会在流中写入重试消息的重复数据。如果设置为 true,这个选项则需要将 max.in.flight.requests.per.connection 设置为 1,且重试无法为零,另外 acks 必须设置为"all"。

false

布尔值

headerSerializer (producer)

使用自定义 KafkaHeaderSerializer 来序列化 kafka 标头值。

 

KafkaHeaderSerializer

key (producer)

记录密钥(如果未指定密钥,则为 null)。如果已经配置了这个选项,它将优先于标头 KafkaConstants#KEY。

 

字符串

keySerializer (producer)

键的序列化类(如果没有指定,则为与消息相同)。

org.apache.kafka.common.serialization.StringSerializer

字符串

lazyStartProducer (producer)

制作者是否应该启动 lazy(在第一个消息上)。通过启动 lazy,您可以使用它来允许 CamelContext 和路由在启动期间启动,否则在启动期间出现问题,并导致路由启动失败。通过将这个启动延迟到 lazy 后,可以在通过 Camel 的路由错误处理程序路由消息期间处理启动失败。注意在处理第一个消息时,创建并启动制作者可能花费较少的时间,从而延长处理的总处理时间。

false

布尔值

lingerMs (producer)

制作者将到达请求传输之间的所有记录分组到一个批处理请求中。通常,只有在记录到达的速度快于发送日志时,才会被加载。然而,在某些情况下,客户端可能希望降低请求数量,甚至处于中等负载。此设置通过添加少量的资料延迟来实现这个值,而不是立即发送记录者将等待发送到给定延迟,以便发送其他记录以便可以组合使用发送发送其他记录。这可以被视为 TCP 中的 Nagle 的算法相似。此设置达到批处理延迟的上限:一旦我们获得批处理.size 值得记录,无论无论此设置如何,任何设置都会立即发送。但是,如果我们小于这个分区,我们会为指定时间"闲置者"显示,等待更多记录显示。此设置默认为 0(即无延迟)。例如,设置 linger.ms=5 将对发送的请求数量产生影响,但会将最多 5ms 延迟添加到没有负载时发送的记录。

0

整数

maxBlockMs (producer)

配置控制发送到 kafka 将阻断的时长。这些方法可能会因多种原因被阻止。例如:缓冲区 full,元数据不可用。此配置会对获取元数据、密钥和值序列化、分区和分配缓冲区内存的总时间实施最大限制。如果是 partitionsFor(),这个配置会在等待元数据时产生最大时间阈值。

60000

整数

maxInFlightRequest (producer)

在阻塞之前,客户端将在单个连接上发送的最大未确认请求数。请注意,如果此设置大于 1 且有失败发送,则因为重试(如果启用了重试,则出现消息重新排序风险)。

5

整数

maxRequestSize (producer)

请求的最大值。这也实际上是记录最大记录大小的最大值。请注意,服务器在记录大小中有自己的 cap,该大小可能与此不同。此设置将限制制作者将在单个请求中发送的记录批处理数量,以避免发送大量请求。

1048576

整数

metadataMaxAgeMs (producer)

毫秒内的时间(以毫秒为单位),即使在我们未看到任何分区领导更改的情况下,也会强制更新元数据,以便主动发现任何新的代理或分区。

300000

整数

metricReporters (producer)

用作指标报告器的类列表。通过实施 MetricReporter 接口,可以插入新指标创建通知的类。JmxReporter 始终包含在内以注册 JMX 统计。

 

字符串

metricsSampleWindowMs (producer)

为计算指标维护的示例数量。

30000

整数

noOfMetricsSample (producer)

为计算指标维护的示例数量。

2

整数

partitioner (producer)

在子主题间为分区消息的 partitioner 类。默认分区程序基于密钥的哈希。

org.apache.kafka.clients.producer.internals.DefaultPartitioner

字符串

partitionKey (producer)

记录要发送到的分区(如果未指定分区,则为 null)。如果已经配置了这个选项,它将优先于标头 KafkaConstants#PARTITION_KEY。

 

整数

producerBatchSize (producer)

当多个记录发送到同一分区时,生产者将尝试将记录组合成更少的请求。这有助于客户端和服务器上的性能。此配置以字节为单位控制默认的批处理大小。发送到该代理的批处理记录不会包括超过这个大小的批处理。请求会包含多个批处理,每个分区对应一个可用数据的分区。小批量大小会使批处理较小,且可能降低吞吐量(一个批处理大小将完全禁用批处理)。非常大的批处理大小可能会更严重地使用内存,因为我们总是会在额外记录中为指定批处理大小分配一个缓冲区。

16384

整数

queueBufferingMaxMessages (producer)

在使用 async 模式时可以放入制作者时可以排队的最大未发送消息数量,然后才能丢弃制作者或数据。

10000

整数

receiveBufferBytes (producer)

在读取数据时要使用的 TCP 接收缓冲区(SO_RCVBUF)的大小。

65536

整数

reconnectBackoffMs (producer)

尝试重新连接给定主机前等待的时间。这可避免在紧张循环中重复连接到主机。这个 backoff 适用于消费者向代理发送的所有请求。

50

整数

recordMetadata (producer)

制作者是否应将 RecordMetadata 结果从发送到 Kafka。结果存储在包含 RecordMetadata 元数据的列表中。这个列表保存在带有键 KafkaConstants#KAFKA_RECORDMETA 的标头中。

true

布尔值

requestRequiredAcks (producer)

致谢的数量要求在考虑请求完成前收到领导机。这控制发送的记录的持久性。以下设置很常见:acks=0 If 设为零,那么制作者不会等待服务器出现任何确认。记录将立即添加到套接字缓冲区并被视为已发送。无法保证服务器在这个示例中收到记录,并且重试配置不会生效(因为客户端通常不知道任何失败)。每个记录返回的偏移将始终设置为 -1. acks=1。这意味着领导记录会将记录写入其本地日志,但不会向所有后续记录做出完全确认。在这种情况下,在确认记录后,领导者应立即失败,但随后其复制后将会丢失该记录。acks=all 意味着领导机将等待整套 in-sync 副本确认记录。这保证记录不会丢失,只要至少一个 in-sync 副本仍保持活动。这是最强大的可用保证。

枚举值:

  • -1
  • 0
  • 1
  • all

1

字符串

requestTimeoutMs (producer)

代理在向客户端发送错误前等待尝试满足 request.required.acks 要求的时间长度。

30000

整数

retries (producer)

设置大于零的值将导致客户端重新发送发送失败的记录,并显示可能存在瞬时错误。请注意,如果客户端在收到错误时重新输入记录,这个重试不相同。允许重试可能会更改记录顺序,因为当两个记录发送到单个分区,且第一个失败,且重试,但第二次成功,那么可能会先出现第二个记录。

0

整数

retryBackoffMs (producer)

在每次重试前,制作者会刷新相关主题的元数据,查看是否选择了新的领导。由于领导选举机制需要一定时间,因此此属性指定了在刷新元数据前,制作者在刷新元数据前等待的时间。

100

整数

sendBufferBytes (producer)

套接字写入缓冲区大小。

131072

整数

valueSerializer (producer)

消息的序列化器类。

org.apache.kafka.common.serialization.StringSerializer

字符串

workerPool (producer)

为了使用自定义 worker 池在 kafka 服务器后继续路由 Exchange,请确认使用异步非阻塞处理从 KafkaProducer 发送到它的消息。如果使用这个选项,则必须处理线程池的生命周期,以便在不再需要时关闭池。

 

ExecutorService

workerPoolCoreSize (producer)

在 kafka 服务器确认使用异步非阻塞处理从 KafkaProducer 发送到 KafkaProducer 后 worker 池的核心线程数量。

10

整数

workerPoolMaxSize (producer)

在 kafka 服务器确认使用异步非阻塞处理从 KafkaProducer 发送到 KafkaProducer 后,用于继续路由 Exchange 的最大线程数量。

20

整数

kafkaClientFactory (advanced)

用于创建 org.apache.kafka.clients.consumer.KafkaConsumer 和 org.apache.kafka.clients.producer.KafkaProducer 实例的工厂。这允许配置自定义工厂来创建带有扩展 vanilla Kafka 客户端的逻辑的实例。

 

KafkaClientFactory

同步 (高级)

设定同步处理是否应严格使用。

false

布尔值

schemaRegistryURL (confluent)

要使用的 Confluent Platform schema 注册表服务器的 URL。格式为 host1:port1,host2:port2。在 Confluent Platform 文档中,这称为 schema.registry.url。这个选项仅适用于 Confluent Platform(不是标准 Apache Kafka)。

 

字符串

interceptorClasses (monitoring)

为制作者或消费者设置拦截器。制作者拦截器必须是实施 org.apache.kafka.clients.producer.producer.ProducerInterceptor 拦截器的类,则必须为实施 org.apache.kafka.clients.consumer.ConsumerInterceptor.consumer.

 

字符串

kerberosBeforeReloginMinTime (security)

刷新尝试之间的登录线程睡眠时间。

60000

整数

kerberosInitCmd (security)

Kerberos kinit 命令路径.默认为 /usr/bin/kinit。

/usr/bin/kinit

字符串

kerberosPrincipalToLocalRules (security)

用于从主体名称映射到短名称(通常是操作系统用户名)的规则列表。规则按照顺序评估,第一个与主体名称匹配的规则被用来将其映射到一个短名称。之后的所有规则将被忽略。默认情况下,格式为 {username}/{hostname}{REALM} 的主体名称被映射到 {username}。有关格式的更多详情,请参阅安全授权和 acls 文档。可以使用逗号分隔多个值。

DEFAULT

字符串

kerberosRenewJitter (security)

添加至续订时间的随机jitter 的百分比。

0.05

kerberosRenewWindowFactor (security)

登录线程将休眠,直到达到从最后刷新到 ticket 的过期时间,此时它将尝试续订票据。

0.8

saslJaasConfig (security)

公开 kafka sasl.jaas.config 参数示例: org.apache.kafka.common.security.plain.PlainLoginModule required username=USERNAME password=PASSWORD;.

 

字符串

saslKerberosServiceName (security)

Kafka 运行的 Kerberos 主体名称。这可以在 Kafka 的 JAAS 配置或 Kafka 配置中定义。

 

字符串

saslMechanism (security)

使用简单身份验证和安全层(SASL)机制。如需有效的值,请参阅。

GSSAPI

字符串

securityProtocol (security)

用于与代理通信的协议。支持 SASL_PLAINTEXT、PLAINTEXT 和 SSL。

PLAINTEXT

字符串

sslCipherSuites (security)

密码套件列表。这是用于使用 TLS 或 SSL 网络协议协商网络连接的安全设置的验证、加密、MAC 和密钥交换算法的命名组合。通过默认值,支持所有可用的密码套件。

 

字符串

sslContextParameters (security)

使用 Camel SSLContextParameters 对象进行 SSL 配置。如果配置,则在其他 SSL 端点参数之前应用。注意: Kafka 仅支持从文件位置加载密钥存储,因此在 KeyStoreParameters.resource 选项中使用 file: 前缀。

 

SSLContextParameters

sslEnabledProtocols (security)

为 SSL 连接启用的协议列表。TLSv1.2、TLSv1.1 和 TLSv1 默认启用。

 

字符串

sslEndpointAlgorithm (security)

使用服务器证书验证服务器主机名的端点标识算法。

https

字符串

sslKeymanagerAlgorithm (security)

密钥管理器工厂用于 SSL 连接的算法。默认值为 Java 虚拟机配置的主要管理器工厂算法。

SunX509

字符串

sslKeyPassword (security)

密钥存储文件中的私钥密码。对于客户端,这是可选的。

 

字符串

sslKeystoreLocation (security)

密钥存储文件的位置。对于客户端,这是可选的,可用于客户端双向身份验证。

 

字符串

sslKeystorePassword (security)

键存储文件的存储密码。这是可选的,并且只有在配置了 ssl.keystore.location 时才需要。

 

字符串

sslKeystoreType (security)

密钥存储文件的文件格式。对于客户端,这是可选的。默认值为 JKS。

JKS

字符串

sslProtocol (security)

用于生成 SSLContext 的 SSL 协议。默认设置为 TLS,对于大多数情况来说是正常的。最近 JVM 中允许的值是 TLS、TLSv1.1 和 TLSv1.2。在较早的 JVM 中可能会支持 SSL、SSLv2 和 SSLv3,但由于已知安全漏洞,不建议采用它们的使用。

 

字符串

sslProvider (security)

用于 SSL 连接的安全供应商名称。默认值是 JVM 的默认安全供应商。

 

字符串

sslTrustmanagerAlgorithm (security)

信任管理器工厂用于 SSL 连接的算法。默认值为 Java 虚拟机配置的信任管理器工厂算法。

PKIX

字符串

sslTruststoreLocation (security)

信任存储文件的位置。

 

字符串

sslTruststorePassword (security)

信任存储文件的密码。

 

字符串

sslTruststoreType (security)

信任存储文件的文件格式。默认值为 JKS。

JKS

字符串

有关 Producer/Consumer 配置的更多信息,请参阅:

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.