191.2. 选项


Kafka 组件支持 9 个选项,如下所列。

名称描述默认值类型

configuration (common)

允许使用端点将重复使用的通用选项预配置 Kafka 组件。

 

KafkaConfiguration

brokers (common)

要使用的 Kafka 代理的 URL。格式为 host1:port1,host2:port2,列表可以是代理的子集或指向代理子集的 VIP。这个选项在 Kafka 文档中称为 bootstrap.servers。

 

字符串

workerPool (advanced)

要在 kafka 服务器确认从 KafkaProducer 发送的消息使用异步非阻塞处理,使用共享自定义 worker 池继续路由交换。如果使用这个选项,您必须处理线程池的生命周期,以便在不再需要时关闭池。

 

ExecutorService

useGlobalSslContext Parameters (security)

启用使用全局 SSL 上下文参数。

false

布尔值

breakOnFirstError (consumer)

此选项控制消费者处理交换时会发生什么,并且失败。如果选项为 false,则消费者将继续进入下一个消息并进行处理。如果选项为 true,则消费者将显示为导致失败的消息偏移,然后重新尝试处理此消息。但是,如果绑定到每次失败,这可能会导致正常处理同一消息的处理,例如 poison 消息。因此,建议您使用 Camel 的错误处理程序来应对这种情况。

false

布尔值

allowManualCommit (consumer)

是否允许通过 KafkaManualCommit 进行手动提交。如果启用了这个选项,则 KafkaManualCommit 实例存储在 Exchange 消息标头中,它允许最终用户访问此 API 并通过 Kafka 消费者执行手动偏移提交。

false

布尔值

kafkaManualCommit Factory (consumer)

用于创建 KafkaManualCommit 实例的工厂。这允许在手动提交来自开箱即用的默认实现时,自定义工厂创建自定义 KafkaManualCommit 实例。

 

KafkaManualCommit Factory

resolveProperty Placeholders (advanced)

启动时组件是否应解析自身上的属性占位符。只有属于 String 类型的属性才能使用属性占位符。

true

布尔值

shutdownTimeout (common)

以毫秒为单位,等待消费者或制作者正常关闭并终止其 worker 线程。

30000

int

Kafka 端点使用 URI 语法进行配置:

kafka:topic

使用以下路径和查询参数:

191.2.1. 路径参数(1 参数):

名称描述默认值类型

topic

要使用的主题 必需 名称。在消费者中,您可以使用逗号分隔多个主题。制作者只能发送消息到单个主题。

 

字符串

191.2.2. 查询参数(94 参数):

名称描述默认值类型

brokers (common)

要使用的 Kafka 代理的 URL。格式为 host1:port1,host2:port2,列表可以是代理的子集或指向代理子集的 VIP。这个选项在 Kafka 文档中称为 bootstrap.servers。

 

字符串

clientId (common)

客户端 ID 是每个请求中发送的用户指定字符串,以帮助追踪调用。它应该逻辑地标识发出请求的应用程序。

 

字符串

headerFilterStrategy (common)

使用自定义 HeaderFilterStrategy 过滤标头到 Camel 消息。

 

HeaderFilterStrategy

reconnectBackoffMaxMs (common)

重新连接到重复连接失败的代理时等待的最大时间(以毫秒为单位)。如果提供,每个主机的 backoff 将为每个连续的连接失败指数增加,直到最高值。在计算 backoff 增长后,添加了 20% 的随机 jitter,以避免连接状况。

1000

整数

allowManualCommit (consumer)

是否允许通过 KafkaManualCommit 进行手动提交。如果启用了这个选项,则 KafkaManualCommit 实例存储在 Exchange 消息标头中,它允许最终用户访问此 API 并通过 Kafka 消费者执行手动偏移提交。

false

布尔值

autoCommitEnable (consumer)

如果为 true,请定期提交到 ZooKeeper,以偏移已由消费者获取的信息。当进程失败作为新消费者开始的位置时,将使用此提交的偏移。

true

布尔值

autoCommitIntervalMs (consumer)

消费者偏移提交至 zookeeper 的频率(ms)。

5000

整数

autoCommitOnStop (consumer)

是否在消费者停止时执行显式自动提交,以确保代理从最后使用的消息中有提交。这要求打开了选项 autoCommitEnable。可能的值有: sync、sync 或 none。sync 是默认值。

同步

字符串

autoOffsetReset (consumer)

当 ZooKeeper 中没有初始偏移时,或偏移没有范围:earliest : automatically the offset to the earliest offset : automatically the offset to the latest offset failed: throw exception to the consumer

latest

字符串

breakOnFirstError (consumer)

此选项控制消费者处理交换时会发生什么,并且失败。如果选项为 false,则消费者将继续进入下一个消息并进行处理。如果选项为 true,则消费者将显示为导致失败的消息偏移,然后重新尝试处理此消息。但是,如果绑定到每次失败,这可能会导致正常处理同一消息的处理,例如 poison 消息。因此,建议您使用 Camel 的错误处理程序来应对这种情况。

false

布尔值

bridgeErrorHandler (consumer)

允许将消费者桥接到 Camel 路由错误处理程序,这意味着当消费者试图选择传入消息或类似信息时发生异常,现在将作为消息处理并由路由 Error Handler 处理。默认情况下,使用者将使用 org.apache.camel.spi.ExceptionHandler 来处理例外情况,该处理程序将被记录在 WARN 或 ERROR 级别,并忽略。

false

布尔值

checkCrcs (consumer)

自动检查消耗的记录的 CRC32。这样可确保不会发生对消息进行 on-wire 或 on-disk 崩溃。此检查增加了一些开销,因此在寻求极佳性能的情况下可能会禁用它。

true

布尔值

consumerRequestTimeoutMs (consumer)

配置控制客户端等待请求响应的最长时间。如果在超时之前没有收到响应,客户端将在需要时重新发送请求(如果重试用时失败)。

40000

整数

consumersCount (consumer)

连接到 kafka 服务器的使用者数量

1

int

consumerStreams (consumer)

消费者上的并发用户数

10

int

fetchMaxBytes (consumer)

如果获取的第一个非空分区大于这个值,则服务器应返回的最大数据量为 fetch 请求。这不是绝对的,如果 fetch 的第一个非空分区中的信息大于这个值,则消息仍会返回,以确保消费者能够进行进度。代理接受的最大消息大小通过 message.max.bytes (broker config)或 max.message.bytes (topic config)定义。请注意,消费者并行执行多个获取。

52428800

整数

fetchMinBytes (consumer)

服务器应返回的最小数据量,用于获取请求。如果数据不足,请求将等待这么多的数据在回答请求前累积。

1

整数

fetchWaitMaxMs (consumer)

如果没有足够的数据立即满足 fetch.min.bytes,则服务器在回答获取请求前将阻断的最大时间

500

整数

groupId (consumer)

唯一标识此消费者所属的消费者进程组的字符串。通过设置同一组 id 多个进程,表示它们都是同一消费者组的一部分。消费者需要这个选项。

 

字符串

heartbeatIntervalMs (consumer)

使用 Kafka 的组管理功能时,与消费者协调器之间预期的时间。心跳用于确保消费者保持活跃状态,并在新用户加入或离开该组时促进重新平衡。该值必须小于 session.timeout.ms,但通常不应设置高于该值的 1/3。它可以调整甚至较低,以控制正常重新平衡的预期时间。

3000

整数

kafkaHeaderDeserializer (consumer)

为 deserialization kafka 标头值设置自定义 KafkaHeaderDeserializer 来 camel 标头值。

 

KafkaHeaderDeserializer

keyDeserializer (consumer)

deserializer 类用于实现 Deserializer 接口的密钥。

org.apache.kafka.common.serialization.StringDeserializer

字符串

maxPartitionFetchBytes (consumer)

服务器将返回的每个分区的最大数据量。用于请求的最大内存总量为 #partitions max.partition.fetch.bytes。此大小必须至少与服务器允许的最大消息大小一样大,否则制作者可能会发送大于消费者可以获取的消息。如果发生这种情况,消费者可能会卡住在某个分区中获取大量消息。

1048576

整数

maxPollIntervalMs (consumer)

使用消费者组管理时,poll ()调用之间的最大延迟。这会在获取更多记录前将上限放在消费者可以闲置的时间长度。如果在这个超时时间前没有调用 poll (),则消费者被视为失败,并且组将重新平衡,以便将分区重新分配给另一个成员。

 

Long

maxPollRecords (consumer)

单个调用返回到 poll ()中返回的最大记录数。

500

整数

offsetRepository (consumer)

用来本地存储主题每个分区的偏移量的偏移存储库。定义将禁用 autocommit。

 

StateRepository

partitionAssignor (consumer)

使用组管理时,客户端将使用的类名称在消费者实例之间分发分区所有权

org.apache.kafka.clients.consumer.RangeAssignor

字符串

pollTimeoutMs (consumer)

轮询 KafkaConsumer 时使用的超时。

5000

Long

seekTo (consumer)

设置 KafkaConsumer 是否在启动时从开始或结束: beginning : read from beginning : read from end This is replace the earlier property seekToBeginning

 

字符串

sessionTimeoutMs (consumer)

使用 Kafka 的组管理功能时检测失败的超时。

10000

整数

shutdownTimeout (common)

以毫秒为单位,等待消费者或制作者正常关闭并终止其 worker 线程。

30000

int

topicIsPattern (consumer)

主题是否为模式(正则表达式)。这可用于订阅与模式匹配的动态主题数量。

false

布尔值

valueDeserializer (consumer)

deserializer 类用于实现 Deserializer 接口的值。

org.apache.kafka.common.serialization.StringDeserializer

字符串

exceptionHandler (consumer)

要让使用者使用自定义例外处理程序:请注意,如果启用了 bridgeErrorHandler 选项,则此选项不使用。默认情况下,消费者将处理异常,其记录在 WARN 或 ERROR 级别中,并忽略。

 

ExceptionHandler

exchangePattern (consumer)

在消费者创建交换时设置交换模式。

 

ExchangePattern

bridgeEndpoint (producer)

如果选项为 true,则 KafkaProducer 将忽略入站消息的 KafkaConstants.TOPIC 标头设置。

false

布尔值

bufferMemorySize (producer)

制作者可用于缓冲记录等待发送到服务器的内存总量。如果发送记录的速度比生成者可以更快地发送到服务器,则根据 block.on.buffer.full 指定的首选项阻止或抛出异常。此设置应该与生成者使用的总内存对应,但不是硬绑定,因为生成者都用于缓冲区。一些额外的内存将用于压缩(如果启用了压缩),以及维护动态请求。

33554432

整数

circularTopicDetection (producer)

如果选项为 true,则 KafkaProducer 会检测消息是否试图发送回同一主题,如果消息是从 kafka consumer 原始的。如果 KafkaConstants.TOPIC 标头与原始 kafka 消费者主题相同,则忽略标头设置,并使用 producer 端点的主题。换句话说,这避免将相同的消息发送回来自哪里。如果选项 bridgeEndpoint 设为 true,则此选项不会被使用。

true

布尔值

compressionCodec (producer)

此参数允许您为此制作者生成的所有数据指定压缩代码c。有效值为 none、gzip 和 snappy。

none

字符串

connectionMaxIdleMs (producer)

在这个配置指定的毫秒数后关闭闲置连接。

540000

整数

enableIdempotence (producer)

如果设置为 'true',则制作者将确保每个消息的确切副本写入流。如果 'false',则制作者重试可能会在流中写入重试消息的副本。如果设置为 true,则此选项将需要 max.in.flight.requests.per.connection 设置为 1,重试不能为零,另外 acks 必须设置为 'all'。

false

布尔值

kafkaHeaderSerializer (producer)

为 serialization camel 标头值设置为 kafka 标头值设置自定义 KafkaHeaderDeserializer。

 

KafkaHeaderSerializer

key (producer)

记录密钥(如果没有指定密钥,则为 null)。如果配置了这个选项,则它优先于标头 KafkaConstants#KEY

 

字符串

keySerializerClass (producer)

密钥的序列化器类(如果没有提供,默认为与消息相同)。

org.apache.kafka.common.serialization.StringSerializer

字符串

lingerMs (producer)

生产者将请求传输之间到达单个批处理请求的任何记录组合在一起。通常,这只在记录到达速度快于发送时发生。然而,在某些情况下,客户端可能希望减少请求的数量,即使负载低下也是如此。此设置通过添加少量人工延迟来实现此目的,即不立即发送记录,而不是立即发送记录,让生产者最多等待给定延迟,以允许发送其他记录,以便可以将发送批处理在一起。这可能被视为与 TCP 中的 Nagle 算法类似。此设置提供批处理延迟的上限:一旦我们获得批处理。无论此设置如何,无论此设置如何,它都会立即发送一次记录,但是如果我们为此分区累积了这个字节,我们将"linger"用于等待更多记录显示。此设置默认为 0 (例如,无延迟)。例如,设置 linger.ms=5 效果会减少发送的请求数量,但会向负载中发送的记录添加最多 5ms 的延迟。

0

整数

maxBlockMs (producer)

配置控制发送到 kafka 将阻止的时长。出于多种原因,这些方法可以被阻止。例如:缓冲区满,元数据不可用。此配置对获取元数据、密钥和值、执行 send ()时缓冲内存所花费的总时间实施最大限制。如果是 partitionsFor (),此配置会在等待元数据时实施最长时间阈值

60000

整数

maxInFlightRequest (producer)

客户端在阻止前在单个连接上发送的最大未确认请求数。请注意,如果将此设置设置为大于 1,且发送失败,则可能会因为重试而重新排序消息的风险(例如,如果启用了重试)。

5

整数

maxRequestSize (producer)

请求的最大大小。这也实际上是最大记录大小的上限。请注意,服务器在记录大小上具有自己的上限,可能与此不同。此设置将限制制作者将在单个请求中发送的记录批处理数量,以避免发送大量请求。

1048576

整数

metadataMaxAgeMs (producer)

即使我们未看到任何分区领导力更改来主动发现任何新的代理或分区,我们才会强制刷新元数据的时间(以毫秒为单位)。

300000

整数

metricReporters (producer)

用作指标报告器的类列表。通过实施 MetricReporter 接口,可以插入将收到新指标创建通知的类。总是包括 JmxReporter 来注册 JMX 统计信息。

 

字符串

metricsSampleWindowMs (producer)

为计算指标维护的示例数量。

30000

整数

noOfMetricsSample (producer)

为计算指标维护的示例数量。

2

整数

partitioner (producer)

用于分区子主题中分区消息的分区类。默认分区器基于密钥的哈希。

org.apache.kafka.clients.producer.internals.DefaultPartitioner

字符串

partitionKey (producer)

将发送记录的分区(如果未指定分区,则为 null)。如果配置了这个选项,则它优先于标头 KafkaConstants#PARTITION_KEY

 

整数

producerBatchSize (producer)

每当将多个记录发送到同一分区时,生成者将尝试将记录一起批处理到较少的请求。这有助于客户端和服务器的性能。此配置控制默认批处理大小(以字节为单位)。不会尝试批处理记录大于这个大小。发送到代理的Requests 将包含多个批处理,每个分区都可用于发送数据。小批处理大小会减少批处理,并可能会减少吞吐量(零的批处理大小将完全禁用批处理)。非常大的批处理大小可能会更严重地使用内存,因为我们将始终在附加记录下分配指定批处理大小的缓冲。

16384

整数

queueBufferingMaxMessages (producer)

在使用 async 模式时可以放入生产者的最大未发送消息数,然后才能阻止生产者或必须丢弃数据。

10000

整数

receiveBufferBytes (producer)

读取数据时要使用的 TCP 接收缓冲区(SO_RCVBUF)的大小。

65536

整数

reconnectBackoffMs (producer)

尝试重新连接给定主机前等待的时间。这可避免在紧密循环中重复连接到主机。此 backoff 应用到消费者发送到代理的所有请求。

50

整数

recordMetadata (producer)

producer 是否应该存储来自发送到 Kafka 的 RecordMetadata 结果。结果存储在包含 RecordMetadata 元数据的列表中。该列表存储在一个带有键 KafkaConstants#KAFKA_RECORDMETA 的标头中

true

布尔值

requestRequiredAcks (producer)

在考虑请求完成前,生成者需要收到的确认数量。这控制发送的记录的持久性。以下设置很常见:acks=0如果设为零,则生成者不会等待来自服务器的任何确认。记录将立即添加到套接字缓冲区中并被视为发送。无法保证服务器已收到记录,重试配置不会生效(因为客户端通常不知道任何失败)。对每个记录给出的偏移始终设置为 -1 acks=1,这意味着领导机将记录写入其本地日志,但不会等待所有后续者完全确认。在这种情况下,领导机会在确认记录后立即失败,但在后续者复制之前,记录将会丢失。acks=all 意味着领导机将等待整个同步副本集合来确认记录。这样可保证记录在至少一个同步副本仍然处于活动状态时不会丢失。这是最强的可用保证。

1

字符串

requestTimeoutMs (producer)

在将一个错误发送到客户端前,代理会等待尝试满足 request.required.acks 要求的时间。

305000

整数

retries (producer)

设置大于零的值将导致客户端重新发送发送失败的记录,并显示潜在的临时错误。请注意,这个重试与在收到错误时重新记录不同的是不同的。允许重试可能会更改记录顺序,因为如果两个记录发送到单个分区,则第一次失败并且被重试,但第二个记录会先出现,然后是第二个记录。

0

整数

retryBackoffMs (producer)

每次重试前,生成者会刷新相关主题的元数据,以查看是否选择了新的领导。由于领导选举需要一些时间,因此此属性指定生成者在刷新元数据前等待的时间。

100

整数

sendBufferBytes (producer)

套接字写入缓冲区大小

131072

整数

serializerClass (producer)

serializer 类用于消息。

org.apache.kafka.common.serialization.StringSerializer

字符串

workerPool (producer)

要在 kafka 服务器确认从 KafkaProducer 发送的消息使用异步非阻塞处理,使用自定义 worker 池继续路由交换。

 

ExecutorService

workerPoolCoreSize (producer)

kafka 服务器在 kafka 服务器确认从 KafkaProducer 发送的消息(使用异步非阻塞处理)来继续路由交换的 worker 池的核心线程数量。

10

整数

workerPoolMaxSize (producer)

kafka 服务器之后,worker 池的最大线程数量用于继续路由交换,使用异步非块处理确认从 KafkaProducer 发送的消息。

20

整数

同步 (advanced)

设置是否应严格使用同步处理,或者 Camel 允许使用异步处理(如果受支持)。

false

布尔值

interceptorClasses (monitoring)

为制作者或消费者设置拦截器。制作者拦截器必须是实施 org.apache.kafka.clients.producer.ProducerInterceptor 拦截器的类,必须是实施 org.apache.kafka.clients.consumer.ConsumerInterceptor 的类。如果您对消费者使用 Producer 拦截器,它将在运行时抛出类 cast 异常

 

字符串

kerberosBeforeReloginMin Time (security)

刷新尝试之间的登录线程睡眠时间。

60000

整数

kerberosInitCmd (security)

Kerberos kinit 命令路径。默认为 /usr/bin/kinit

/usr/bin/kinit

字符串

kerberosPrincipalToLocal Rules (security)

从主体名称映射到短名称(通常是操作系统用户名)的规则列表。规则按顺序评估,第一个与主体名称匹配的规则用于将其映射到短名称。稍后列表中的任何规则都会被忽略。默认情况下,用户名/主机名REALM 的主体名称映射到 username。有关格式的详情,请查看安全授权和 acl。可以使用逗号分隔多个值

DEFAULT

字符串

kerberosRenewJitter (security)

添加到续订时间的随机 jitter 的百分比。

0.05

�

kerberosRenewWindowFactor (security)

登录线程将处于睡眠状态,直到达到最后一次刷新到票据到期的时间因素前处于睡眠状态,此时将尝试续订票据。

0.8

�

saslJaasConfig (security)

公开 kafka sasl.jaas.config 参数示例: org.apache.kafka.common.security.plain.PlainLoginModule required username=USERNAME password=PASSWORD;

 

字符串

saslKerberosServiceName (security)

Kafka 运行的 Kerberos 主体名称。这可以在 Kafka 的 JAAS 配置或 Kafka 配置中定义。

 

字符串

saslMechanism (security)

使用简单身份验证和安全层(SASL)机制。有关有效值,请参阅 http://www.iana.org/assignments/sasl-mechanisms/sasl-mechanisms.xhtml

GSSAPI

字符串

securityProtocol (security)

用于与代理通信的协议。目前只支持 PLAINTEXT 和 SSL。

PLAINTEXT

字符串

sslCipherSuites (security)

密码套件列表。这是用来使用 TLS 或 SSL 网络协议协商网络连接的安全设置的身份验证、加密、MAC 和密钥交换算法的命名组合。支持所有可用的密码套件。

 

字符串

sslContextParameters (security)

使用 Camel SSLContextParameters 对象的 SSL 配置。如果配置它,它会在其他 SSL 端点参数之前应用。

 

SSLContextParameters

sslEnabledProtocols (security)

为 SSL 连接启用的协议列表。TLSv1.2、TLSv1.1 和 TLSv1 默认启用。

TLSv1.2,TLSv1.1,TLSv1

字符串

sslEndpointAlgorithm (security)

使用服务器证书验证服务器主机名的端点标识算法。

 

字符串

sslKeymanagerAlgorithm (security)

密钥管理器工厂用于 SSL 连接的算法。默认值是为 Java 虚拟机配置的密钥管理器工厂算法。

SunX509

字符串

sslKeyPassword (security)

密钥存储文件中的私钥密码。这对客户端是可选的。

 

字符串

sslKeystoreLocation (security)

密钥存储文件的位置。这对客户端是可选的,可用于客户端进行双向身份验证。

 

字符串

sslKeystorePassword (security)

密钥存储文件的存储密码。对于客户端,这是可选的,只有在配置了 ssl.keystore.location 时才需要。

 

字符串

sslKeystoreType (security)

密钥存储文件的文件格式。这对客户端是可选的。默认值为 JKS

JKS

字符串

sslProtocol (security)

用于生成 SSLContext 的 SSL 协议。默认设置为 TLS,这适用于大多数情况。最近的 JVM 中允许的值为 TLS、TLSv1.1 和 TLSv1.2。较旧的 JVM 中可能支持 SSL、SSLv2 和 SSLv3,但由于已知的安全漏洞,不建议使用它们。

TLS

字符串

sslProvider (security)

用于 SSL 连接的安全供应商的名称。默认值是 JVM 的默认安全提供程序。

 

字符串

sslTrustmanagerAlgorithm (security)

信任管理器工厂用于 SSL 连接的算法。默认值是为 Java 虚拟机配置的信任管理器工厂算法。

PKIX

字符串

sslTruststoreLocation (security)

信任存储文件的位置。

 

字符串

sslTruststorePassword (security)

信任存储文件的密码。

 

字符串

sslTruststoreType (security)

信任存储文件的文件格式。默认值为 JKS。

JKS

字符串

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.