191.2. 选项


Kafka 组件支持 9 个选项,如下所列。

Name描述默认值类型

configuration (common)

允许预先配置带有端点重复使用的通用选项的 Kafka 组件。

 

KafkaConfiguration

代理 (common)

要使用的 Kafka 代理的 URL。格式为 host1:port1,host2:port2,列表可以是代理的子集,也可以是指向代理子集的 VIP。这个选项在 Kafka 文档中被称为 bootstrap.servers。

 

字符串

workerpool (advanced)

要在 kafka 服务器确认通过异步非阻塞处理从 KafkaProducer 发送到它的消息后,使用共享自定义 worker 池继续路由 Exchange。如果使用这个选项,则必须处理线程池的生命周期,以便在不再需要时关闭池。

 

ExecutorService

useGlobalSslContext 参数 (security)

启用使用全局 SSL 上下文参数。

false

布尔值

breakOnFirstError (consumer)

这个选项控制消费者处理交换时会发生什么,它会失败。如果选项为 false,则使用者将继续进入下一个消息并处理它。如果选项为 true,则消费者将中断,并且将看到导致失败的消息偏移,然后重新尝试处理此消息。但是,如果绑定到每次都失败,则可能会导致意外处理同一消息,例如 poison 信息。因此,建议您处理这个情况,例如使用 Camel 的错误处理程序。

false

布尔值

allowManualCommit (consumer)

是否允许通过 KafkaManualCommit 进行手动提交。如果启用了这个选项,则 KafkaManualCommit 实例存储在 Exchange message 标头中,它允许最终用户访问此 API 并通过 Kafka 使用者执行手动偏移提交。

false

布尔值

kafkaManualCommit Factory (consumer)

用于创建 KafkaManualCommit 实例的工厂。这允许在进行手动提交时,需要自定义 KafkaManualCommit 实例来插件自定义 KafkaManualCommit 实例。

 

KafkaManualCommit Factory

resolveProperty Placeholders (advanced)

组件是否应在启动时解析属性占位符。只有 String 类型的属性可以使用属性占位符。

true

布尔值

shutdownTimeout (common)

超时时间(以毫秒为单位)会安全等待消费者或制作者关闭并终止其 worker 线程。

30000

int

Kafka 端点使用 URI 语法进行配置:

kafka:topic

使用以下路径和查询参数:

191.2.1. 路径参数(1 参数):

Name描述默认值类型

topic

要使用的主题 必需 名称。在消费者中,您可以使用逗号分隔多个主题。生产者只能发送消息到单个主题。

 

字符串

191.2.2. 查询参数(94 参数):

Name描述默认值类型

代理 (common)

要使用的 Kafka 代理的 URL。格式为 host1:port1,host2:port2,列表可以是代理的子集,也可以是指向代理子集的 VIP。这个选项在 Kafka 文档中被称为 bootstrap.servers。

 

字符串

clientId (common)

客户端 ID 是在每个请求中发送的用户指定的字符串,以帮助跟踪调用。它应在逻辑上标识发出请求的应用程序。

 

字符串

headerFilterStrategy (common)

使用自定义 HeaderFilterStrategy 过滤到 Camel 消息的标头。

 

HeaderFilterStrategy

reconnectBackoffMaxMs (common)

重新连接到重复连接失败的代理时等待的最大时间(以毫秒为单位)。如果提供,每个主机的 backoff 将为每个连续的连接失败指数增加,直到最高值。在计算 backoff 增长后,添加了 20% 的随机 jitter,以避免连接状况。

1000

整数

allowManualCommit (consumer)

是否允许通过 KafkaManualCommit 进行手动提交。如果启用了这个选项,则 KafkaManualCommit 实例存储在 Exchange message 标头中,它允许最终用户访问此 API 并通过 Kafka 使用者执行手动偏移提交。

false

布尔值

autoCommitEnable (consumer)

如果为 true,请定期提交到 ZooKeeper,以偏移已由消费者获取的信息。当进程作为新消费者开始的位置失败时,将使用此提交偏移。

true

布尔值

autoCommitIntervalMs (consumer)

ms 中消费者偏移用于 zookeeper 的频率。

5000

整数

autoCommitOnStop (consumer)

当消费者停止以确保代理具有最近使用的消息提交时,是否执行显式自动提交。这要求打开 autoCommitEnable 选项。可能的值有: sync、sync 或 none。sync 是默认值。

同步

字符串

autoOffsetReset (consumer)

当 ZooKeeper 中没有初始偏移时,或者偏移没有范围: earliest : 自动将偏移重置为最早的偏移量: 自动将偏移重置为最新的偏移失败: 向消费者抛出异常

latest

字符串

breakOnFirstError (consumer)

这个选项控制消费者处理交换时会发生什么,它会失败。如果选项为 false,则使用者将继续进入下一个消息并处理它。如果选项为 true,则消费者将中断,并且将看到导致失败的消息偏移,然后重新尝试处理此消息。但是,如果绑定到每次都失败,则可能会导致意外处理同一消息,例如 poison 信息。因此,建议您处理这个情况,例如使用 Camel 的错误处理程序。

false

布尔值

bridgeErrorHandler (consumer)

允许将消费者桥接到 Camel 路由错误处理程序,这意味着当消费者试图选择传入消息或类似信息时发生异常,现在将作为消息处理并由路由 Error Handler 处理。默认情况下,使用者将使用 org.apache.camel.spi.ExceptionHandler 来处理例外情况,该处理程序将被记录在 WARN 或 ERROR 级别,并忽略。

false

布尔值

checkCrcs (consumer)

自动检查消耗的记录的 CRC32。这样可确保不会发生对消息进行 on-wire 或 on-disk 崩溃。此检查增加了一些开销,因此在寻求极佳性能的情况下可能会禁用它。

true

布尔值

consumerRequestTimeoutMs (consumer)

配置控制客户端等待请求响应的最长时间。如果在超时之前没有收到响应,客户端将在需要时重新发送请求(如果重试用时失败)。

40000

整数

consumersCount (consumer)

连接到 kafka 服务器的消费者数量

1

int

consumerStreams (consumer)

消费者上的并发用户数

10

int

fetchMaxBytes (consumer)

如果 fetch 请求的第一个非空分区中的第一个非空分区大于这个值,则服务器应返回的最大数据量。代理接受的最大消息大小通过 message.max.bytes (broker config)或 max.message.bytes (topic config)定义。请注意,消费者并行执行多个获取。

52428800

整数

fetchMinBytes (consumer)

服务器应返回的最小数据量,用于获取请求。如果数据不足,请求将等待这么多的数据在回答请求前累积。

1

整数

fetchWaitMaxMs (consumer)

如果没有足够的数据来立即满足 fetch.min.bytes,则在回答获取请求前,服务器将阻止的最大时间量。

500

整数

GroupId (consumer)

唯一标识此消费者所属的消费者进程组的字符串。通过设置同一组 id 多个进程,表示它们都是同一消费者组的一部分。消费者需要这个选项。

 

字符串

heartbeatIntervalMs (consumer)

使用 Kafka 的组管理功能时,与消费者协调器之间预期的时间。心跳用于确保消费者保持活跃状态,并在新用户加入或离开该组时促进重新平衡。该值必须小于 session.timeout.ms,但设置的值通常不应超过该值的 1/3。它可以调整甚至较低,以控制正常重新平衡的预期时间。

3000

整数

kafkaHeaderDeserializer (consumer)

设置自定义 KafkaHeaderDeserializer,将 deserialization kafka 标头值设置为 camel 标头值。

 

KafkaHeaderDeserializer

keyDeserializer (consumer)

实现 Deserializer 接口的密钥的反序列化器类。

org.apache.kafka.common.serialization.StringDeserializer

字符串

maxPartitionFetchBytes (consumer)

服务器将返回的每个分区的最大数据量。用于请求的最大内存总量为 #partitions max.partition.fetch.bytes。此大小必须至少与服务器允许的最大消息大小相同,或者生产者可以发送消息大于消费者是否可以获取。如果发生这种情况,消费者会卡住在某个分区上获取大型消息。

1048576

整数

maxPollIntervalMs (consumer)

使用消费者组管理时,poll ()调用之间的最大延迟。这会在获取更多记录前将上限放在消费者可以闲置的时间长度。如果在这个超时时间前没有调用 poll (),则消费者被视为失败,并且组将重新平衡,以便将分区重新分配给另一个成员。

 

Long

maxPollRecords (consumer)

在单个调用中返回的最大记录数()

500

整数

offsetRepository (consumer)

使用偏移存储库来本地存储主题的每个分区的偏移量。定义一个将禁用自动提交。

 

StateRepository

partitionAssignor (consumer)

使用组管理时,客户端将在消费者实例之间分发分区所有权的策略的类名称。

org.apache.kafka.clients.consumer.RangeAssignor

字符串

pollTimeoutMs (consumer)

轮询 KafkaConsumer 时使用的超时。

5000

Long

seekTo (consumer)

set if KafkaConsumer will read from start or end on start: beginning : read from beginning end : read from end this is replace the earlier property seekToBeginning

 

字符串

sessionTimeoutMs (consumer)

使用 Kafka 组管理工具时用于检测故障的超时。

10000

整数

shutdownTimeout (common)

超时时间(以毫秒为单位)会安全等待消费者或制作者关闭并终止其 worker 线程。

30000

int

topicIsPattern (consumer)

主题是否为模式(正则表达式)。这可用于订阅与模式匹配的主题数量。

false

布尔值

valueDeserializer (consumer)

deserializer 类,用于实现 Deserializer 接口的值。

org.apache.kafka.common.serialization.StringDeserializer

字符串

ExceptionHandler ( consumer)

要让使用者使用自定义例外处理程序:请注意,如果启用了 bridgeErrorHandler 选项,则此选项不使用。默认情况下,消费者将处理异常,其记录在 WARN 或 ERROR 级别中,并忽略。

 

ExceptionHandler

exchangePattern (consumer)

在消费者创建交换时设置交换模式。

 

ExchangePattern

bridgeEndpoint (producer)

如果选项为 true,则 KafkaProducer 将忽略入站消息的 KafkaConstants.TOPIC 标头设置。

false

布尔值

bufferMemorySize (producer)

制作者可用于缓冲记录等待发送到服务器的内存总量。如果记录比将记录发送到服务器的速度快,生产者将根据 block.on.buffer.full.full 指定的首选项阻止或抛出异常。此设置应大致对应于制作者将使用的总内存,但不是硬绑定,因为生产者使用的所有内存都不用于缓冲。一些额外的内存将用于压缩(如果启用了压缩),以及维护动态请求。

33554432

整数

circularTopicDetection (producer)

如果选项为 true,则 KafkaProducer 将检测信息是否尝试发回到它可能来自的同一主题,如果消息是从 kafka 使用者原始的。如果 KafkaConstants.TOPIC 标头与原始 kafka 消费者主题相同,则忽略标头设置,并使用制作者端点的主题。换句话说,这样可避免将同一消息发送回它的位置。如果选项 bridgeEndpoint 设为 true,则不使用这个选项。

true

布尔值

compressionCodec (producer)

这个参数允许您为此制作者生成的所有数据指定压缩 codec。有效值为 none、gzip 和 snappy。

none

字符串

connectionMaxIdleMs (producer)

在这个配置指定的毫秒数后关闭闲置连接。

540000

整数

enableIdempotence (producer)

如果设置为 'true',则生成者将确保每个消息的一个副本用流写入。如果 'false',则生成者可能会重试流中重试消息的重复。如果设置为 true,这个选项需要 max.in.flight.requests.per.connection 设置为 1,并且重试不能为零,另外将 acks 设置为 'all'。

false

布尔值

kafkaHeaderSerializer (producer)

设置自定义 KafkaHeaderDeserializer,将序列化 camel 标头值设置为 kafka 标头值。

 

KafkaHeaderSerializer

key (producer)

记录键(如果未指定密钥,则为 null)。如果配置了这个选项,则它优先于标头 KafkaConstants#KEY

 

字符串

keySerializerClass (producer)

键的 serializer 类(如果没有提供,则默认为与消息相同)。

org.apache.kafka.common.serialization.StringSerializer

字符串

lingerMs (producer)

生产者将请求传输之间到达单个批处理请求的任何记录组合在一起。通常,这只在记录到达速度快于发送时发生。然而,在某些情况下,客户端可能希望减少请求的数量,即使负载低下也是如此。此设置通过添加少量的 artificial 延迟来实现这一目的,而是不会立即发送记录,而是会等待到给定延迟,以允许发送其他记录一起批处理。这可能被视为与 TCP 中的 Nagle 算法类似。此设置会使批处理延迟有上限:一旦我们获得 batch.size,对于一个分区而言,无论此设置如何立即发送,则无论此设置如何,我们会立即发送的记录数超过这个分区所累计的字节数,我们将"闲置",等待更多记录显示。此设置默认为 0 (例如,无延迟)。例如,设置 linger.ms=5 会影响减少发送的请求数,但会对负载发送的记录最多增加 5ms 的延迟。

0

整数

maxBlockMs (producer)

配置控制发送到 kafka 的时长。这些方法可能会因为多种原因阻止。例如:缓冲区已满,元数据不可用。此配置对获取元数据、键和值序列化和值、分区和分配缓冲区内存的总时间施加最大限制,在执行 send ()时对缓冲区内存进行分区和分配。如果是 partitionsFor (),此配置会在等待元数据时产生最大时间阈值

60000

整数

maxInFlightRequest (producer)

客户端在阻止前在单个连接上发送的最大未确认请求数。请注意,如果此设置大于 1,且发送失败,则因为重试而出现消息重新排序的风险(例如,如果启用了重试)。

5

整数

maxRequestSize (producer)

请求的最大大小。这实际上也是最大记录大小的上限。请注意,服务器对记录大小有自己的上限,可能与此不同。此设置将限制制作者将在单个请求中发送的记录批处理数量,以避免发送大量请求。

1048576

整数

metadataMaxAgeMs (producer)

即使我们未看到任何分区领导力更改来主动发现任何新的代理或分区,我们才会强制刷新元数据的时间(以毫秒为单位)。

300000

整数

metricReporters (producer)

用作指标报告器的类列表。实施 MetricReporter 接口允许插入新指标创建通知的类。总是包括 JmxReporter 来注册 JMX 统计信息。

 

字符串

metricsSampleWindowMs (producer)

为计算指标维护的示例数量。

30000

整数

noOfMetricsSample (producer)

为计算指标维护的示例数量。

2

整数

partitioner (producer)

在子主题间分区消息的分区类。默认分区器基于密钥的哈希。

org.apache.kafka.clients.producer.internals.DefaultPartitioner

字符串

PartitionKey ( producer)

将记录发送到的分区(如果没有指定分区,则为 null)。如果配置了这个选项,则它优先于标头 KafkaConstants#PARTITION_KEY

 

整数

producerBatchSize (producer)

每当将多个记录发送到同一分区时,生成者将尝试将记录一起批处理到较少的请求。这有助于客户端和服务器的性能。此配置控制默认批处理大小(以字节为单位)。对于大于此 size.Requests 的批处理记录不会进行任何尝试。发送到代理的请求将包含多个批处理,每个分区一个可用数据。小批处理大小将较少的批处理,并且可能会降低吞吐量(批量大小为零将完全禁用批处理)。非常大的批处理大小可能会更严重地使用内存,因为我们将始终在附加记录下分配指定批处理大小的缓冲。

16384

整数

queueBufferingMaxMessages (producer)

在使用 async 模式时,可以在生成者必须被阻止或数据丢弃前,可以排队生成者的最大未发送消息数。

10000

整数

receiveBufferBytes (producer)

读取数据时要使用的 TCP 接收缓冲区(SO_RCVBUF)的大小。

65536

整数

reconnectBackoffMs (producer)

尝试重新连接给定主机前需要等待的时间。这可避免在紧密循环中重复连接到主机。此 backoff 适用于消费者向代理发送的所有请求。

50

整数

RecordMetadata ( producer)

制作者是否应该存储发送到 Kafka 的 RecordMetadata 结果。结果存储在包含 RecordMetadata 元数据的列表中。该列表存储在带有键 KafkaConstantsKAFKA_RECORDMETA 的标头中

true

布尔值

requestRequiredAcks (producer)

在考虑请求完成前,生成者需要收到的确认数量。这控制发送的记录的持久性。以下设置是常见的:acks=0 如果设为零,则生成者不会等待服务器中的任何确认。记录将立即添加到套接字缓冲区中并被视为发送。在这种情况下,不能保证服务器收到记录,重试配置不会生效(因为客户端通常不知道任何故障)。每个记录返回的偏移量始终设置为 -1 acks=1,这意味着领导机会将记录写入到其本地日志,但不会等待所有后续者的完全确认。在这种情况下,领导机在确认记录后马上失败,但在后续者复制记录之前,该记录将会丢失。acks=all 意味着领导机将等待全部同步副本确认记录。这样可保证记录在至少一个同步副本仍然处于活动状态时不会丢失。这是最强的可用保证。

1

字符串

requestTimeoutMs (producer)

代理在向客户端发送错误前尝试满足 request.required.acks 要求的时间长度。

305000

整数

retries (producer)

设置大于零的值将导致客户端重新发送发送失败的记录,并显示潜在的临时错误。请注意,这个重试与在收到错误时重新记录不同的是不同的。允许重试可能会更改记录顺序,因为如果将两个记录发送到单个分区,第一次失败并重试,但会成功,但第二个记录可能会首先出现。

0

整数

retryBackoffMs (producer)

在每个重试前,生成者会刷新相关主题的元数据,以查看是否选择了新的领导。由于领导选举需要一些时间,此属性指定制作者在刷新元数据前等待的时间。

100

整数

sendBufferBytes (producer)

套接字写入缓冲区大小

131072

整数

serializerClass (producer)

用于消息的 serializer 类。

org.apache.kafka.common.serialization.StringSerializer

字符串

workerpool (producer)

要在 kafka 服务器确认使用异步非阻塞处理从 KafkaProducer 发送到它的消息后,使用自定义 worker 池来继续路由 Exchange。

 

ExecutorService

workerPoolCoreSize (producer)

kafka 服务器确认从 KafkaProducer 发送到 KafkaProducer 的消息后,用于继续路由 Exchange 的 worker 池的核心线程数量,使用异步非阻塞处理。

10

整数

workerPoolMaxSize (producer)

kafka 服务器确认从 KafkaProducer 发送到 KafkaProducer 的消息后,用于继续路由 Exchange 的 worker 池的最大线程数量。

20

整数

同步 (高级)

设置是否应严格使用同步处理,还是允许 Camel 使用异步处理(如果支持)。

false

布尔值

interceptorClasses (monitoring)

为生成者或消费者设置拦截器。生产者拦截器必须是实施 org.apache.kafka.clients.producer.ProducerInterceptor Consumer 拦截器的类,必须是实现 org.apache.kafka.clients.consumer.ConsumerInterceptor.consumer.ConsumerInterceptor 注意,它会在运行时抛出类广播异常

 

字符串

kerberosBeforeReloginMin Time (security)

刷新尝试之间的登录线程睡眠时间。

60000

整数

kerberosInitCmd (security)

Kerberos kinit 命令路径。默认为 /usr/bin/kinit

/usr/bin/kinit

字符串

kerberosPrincipalToLocal Rules (security)

从主体名称映射到短名称(通常是操作系统用户名)的规则列表。规则按顺序评估,第一个与主体名称匹配的规则用于将其映射到短名称。稍后列表中的任何规则都会被忽略。默认情况下,用户名/主机名REALM的主体名称映射到 username。有关格式的详情,请参阅安全授权和 acls。可以使用逗号分隔多个值

DEFAULT

字符串

kerberosRenewJitter (security)

添加到续订时间的随机 jitter 的百分比。

0.05

�

kerberosRenewWindowFactor (security)

登录线程将处于睡眠状态,直到达到最后一次刷新到票据到期的时间因素前处于睡眠状态,此时将尝试续订票据。

0.8

�

saslJaasConfig (security)

公开 kafka sasl.jaas.config 参数示例: org.apache.kafka.common.security.plain.PlainLoginModule required username=USERNAME password=PASSWORD;

 

字符串

saslKerberosServiceName (security)

Kafka 运行的 Kerberos 主体名称。这可以在 Kafka 的 JAAS 配置或 Kafka 配置中定义。

 

字符串

saslMechanism (security)

使用简单身份验证和安全层(SASL)机制。如需有效值,请参阅 http://www.iana.org/assignments/sasl-mechanisms/sasl-mechanisms.xhtml

GSSAPI

字符串

securityProtocol (security)

用于与代理通信的协议。目前只支持 PLAINTEXT 和 SSL。

PLAINTEXT

字符串

sslCipherSuites (security)

密码套件列表。这是用来使用 TLS 或 SSL 网络协议协商网络连接的安全设置的身份验证、加密、MAC 和密钥交换算法的命名组合。只要默认支持所有可用的密码套件。

 

字符串

sslContextParameters (security)

使用 Camel SSLContextParameters 对象的 SSL 配置。如果配置在其他 SSL 端点参数之前应用。

 

SSLContextParameters

sslEnabledProtocols (security)

为 SSL 连接启用的协议列表。TLSv1.2、TLSv1.1 和 TLSv1 会被默认启用。

TLSv1.2,TLSv1.1,TLSv1

字符串

sslEndpointAlgorithm (security)

使用服务器证书验证服务器主机名的端点标识算法。

 

字符串

sslKeymanagerAlgorithm (security)

密钥管理器工厂用于 SSL 连接的算法。默认值是为 Java 虚拟机配置的密钥管理器工厂算法。

SunX509

字符串

sslKeyPassword (security)

密钥存储文件中私钥的密码。这对客户端是可选的。

 

字符串

sslKeystoreLocation (security)

密钥存储文件的位置。这对客户端是可选的,可用于客户端进行双向身份验证。

 

字符串

sslKeystorePassword (security)

密钥存储文件的存储密码。这对客户端是可选的,只有在配置了 ssl.keystore.location 时才需要。

 

字符串

sslKeystoreType (security)

密钥存储文件的文件格式。这对客户端是可选的。默认值为 JKS

JKS

字符串

SSLProtocol ( security)

用于生成 SSLContext 的 SSL 协议。默认设置为 TLS,对于大多数情况来说是正常的。最近 JVM 中允许的值是 TLS、TLSv1.1 和 TLSv1.2。较旧的 JVM 中可能会支持 SSL、SSLv2 和 SSLv3,但由于已知的安全漏洞,不建议使用它们。

TLS

字符串

sslProvider (security)

用于 SSL 连接的安全供应商的名称。默认值是 JVM 的默认安全提供程序。

 

字符串

sslTrustmanagerAlgorithm (security)

信任管理器工厂用于 SSL 连接的算法。默认值是为 Java 虚拟机配置的信任管理器工厂算法。

PKIX

字符串

sslTruststoreLocation (security)

信任存储文件的位置。

 

字符串

sslTruststorePassword (security)

信任存储文件的密码。

 

字符串

sslTruststoreType (security)

信任存储文件的文件格式。默认值为 JKS。

JKS

字符串

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.