191.3. Spring Boot Auto-Configuration


组件支持 99 个选项,如下所列。

Expand
Name描述默认值类型

camel.component.kafka.allow-manual-commit

是否允许通过 KafkaManualCommit 进行手动提交。如果启用了这个选项,则 KafkaManualCommit 实例存储在 Exchange 消息标头中,它允许最终用户访问此 API 并通过 Kafka 消费者执行手动偏移提交。

false

布尔值

camel.component.kafka.break-on-first-error

此选项控制消费者处理交换时会发生什么,并且失败。如果选项为 false,则消费者将继续进入下一个消息并进行处理。如果选项为 true,则消费者将显示为导致失败的消息偏移,然后重新尝试处理此消息。但是,如果绑定到每次失败,这可能会导致正常处理同一消息的处理,例如 poison 消息。因此,建议您使用 Camel 的错误处理程序来应对这种情况。

false

布尔值

camel.component.kafka.brokers

要使用的 Kafka 代理的 URL。格式为 host1:port1,host2:port2,列表可以是代理的子集或指向代理子集的 VIP。这个选项在 Kafka 文档中称为 bootstrap.servers。

 

字符串

camel.component.kafka.configuration.allow-manual-commit

是否允许通过 KafkaManualCommit 进行手动提交。如果启用了这个选项,则 KafkaManualCommit 实例存储在 Exchange 消息标头中,它允许最终用户访问此 API 并通过 Kafka 消费者执行手动偏移提交。

false

布尔值

camel.component.kafka.configuration.auto-commit-enable

如果为 true,请定期提交到 ZooKeeper,以偏移已由消费者获取的信息。当进程失败作为新消费者开始的位置时,将使用此提交的偏移。

true

布尔值

camel.component.kafka.configuration.auto-commit-interval-ms

消费者偏移提交至 zookeeper 的频率(ms)。

5000

整数

camel.component.kafka.configuration.auto-commit-on-stop

是否在消费者停止时执行显式自动提交,以确保代理从最后使用的消息中有提交。这要求打开了选项 autoCommitEnable。可能的值有: sync、sync 或 none。sync 是默认值。

同步

字符串

camel.component.kafka.configuration.auto-offset-reset

当 ZooKeeper 中没有初始偏移时,或偏移没有范围:earliest : automatically the offset to the earliest offset : automatically the offset to the latest offset failed: throw exception to the consumer

latest

字符串

camel.component.kafka.configuration.break-on-first-error

此选项控制消费者处理交换时会发生什么,并且失败。如果选项为 false,则消费者将继续进入下一个消息并进行处理。如果选项为 true,则消费者将显示为导致失败的消息偏移,然后重新尝试处理此消息。但是,如果绑定到每次失败,这可能会导致正常处理同一消息的处理,例如 poison 消息。因此,建议您使用 Camel 的错误处理程序来应对这种情况。

false

布尔值

camel.component.kafka.configuration.bridge-endpoint

如果选项为 true,则 KafkaProducer 将忽略入站消息的 KafkaConstants.TOPIC 标头设置。

false

布尔值

camel.component.kafka.configuration.brokers

要使用的 Kafka 代理的 URL。格式为 host1:port1,host2:port2,列表可以是代理的子集或指向代理子集的 VIP。这个选项在 Kafka 文档中称为 bootstrap.servers。

 

字符串

camel.component.kafka.configuration.buffer-memory-size

制作者可用于缓冲记录等待发送到服务器的内存总量。如果发送记录的速度比生成者可以更快地发送到服务器,则根据 block.on.buffer.full 指定的首选项阻止或抛出异常。此设置应该与生成者使用的总内存对应,但不是硬绑定,因为生成者都用于缓冲区。一些额外的内存将用于压缩(如果启用了压缩),以及维护动态请求。

33554432

整数

camel.component.kafka.configuration.check-crcs

自动检查消耗的记录的 CRC32。这样可确保不会发生对消息进行 on-wire 或 on-disk 崩溃。此检查增加了一些开销,因此在寻求极佳性能的情况下可能会禁用它。

true

布尔值

camel.component.kafka.configuration.circular-topic-detection

如果选项为 true,则 KafkaProducer 会检测消息是否试图发送回同一主题,如果消息是从 kafka consumer 原始的。如果 KafkaConstants.TOPIC 标头与原始 kafka 消费者主题相同,则忽略标头设置,并使用 producer 端点的主题。换句话说,这避免将相同的消息发送回来自哪里。如果选项 bridgeEndpoint 设为 true,则此选项不会被使用。

true

布尔值

camel.component.kafka.configuration.client-id

客户端 ID 是每个请求中发送的用户指定字符串,以帮助追踪调用。它应该逻辑地标识发出请求的应用程序。

 

字符串

camel.component.kafka.configuration.compression-codec

此参数允许您为此制作者生成的所有数据指定压缩代码c。有效值为 none、gzip 和 snappy。

none

字符串

camel.component.kafka.configuration.connection-max-idle-ms

在这个配置指定的毫秒数后关闭闲置连接。

540000

整数

camel.component.kafka.configuration.consumer-request-timeout-ms

配置控制客户端等待请求响应的最长时间。如果在超时之前没有收到响应,客户端将在需要时重新发送请求(如果重试用时失败)。

40000

整数

camel.component.kafka.configuration.consumer-streams

消费者上的并发用户数

10

整数

camel.component.kafka.configuration.consumers-count

连接到 kafka 服务器的使用者数量

1

整数

camel.component.kafka.configuration.enable-idempotence

如果设置为 'true',则制作者将确保每个消息的确切副本写入流。如果 'false',则制作者重试可能会在流中写入重试消息的副本。如果设置为 true,则此选项将需要 max.in.flight.requests.per.connection 设置为 1,重试不能为零,另外 acks 必须设置为 'all'。

false

布尔值

camel.component.kafka.configuration.fetch-max-bytes

如果获取的第一个非空分区大于这个值,则服务器应返回的最大数据量为 fetch 请求。这不是绝对的,如果 fetch 的第一个非空分区中的信息大于这个值,则消息仍会返回,以确保消费者能够进行进度。代理接受的最大消息大小通过 message.max.bytes (broker config)或 max.message.bytes (topic config)定义。请注意,消费者并行执行多个获取。

52428800

整数

camel.component.kafka.configuration.fetch-min-bytes

服务器应返回的最小数据量,用于获取请求。如果数据不足,请求将等待这么多的数据在回答请求前累积。

1

整数

camel.component.kafka.configuration.fetch-wait-max-ms

如果没有足够的数据立即满足 fetch.min.bytes,则服务器在回答获取请求前将阻断的最大时间

500

整数

camel.component.kafka.configuration.group-id

唯一标识此消费者所属的消费者进程组的字符串。通过设置同一组 id 多个进程,表示它们都是同一消费者组的一部分。消费者需要这个选项。

 

字符串

camel.component.kafka.configuration.header-filter-strategy

使用自定义 HeaderFilterStrategy 过滤标头到 Camel 消息。

 

HeaderFilterStrategy

camel.component.kafka.configuration.heartbeat-interval-ms

使用 Kafka 的组管理功能时,与消费者协调器之间预期的时间。心跳用于确保消费者保持活跃状态,并在新用户加入或离开该组时促进重新平衡。该值必须小于 session.timeout.ms,但通常不应设置高于该值的 1/3。它可以调整甚至较低,以控制正常重新平衡的预期时间。

3000

整数

camel.component.kafka.configuration.interceptor-classes

为制作者或消费者设置拦截器。制作者拦截器必须是实施 org.apache.kafka.clients.producer.ProducerInterceptor 拦截器的类,必须是实施 org.apache.kafka.clients.consumer.ConsumerInterceptor 的类。如果您对消费者使用 Producer 拦截器,它将在运行时抛出类 cast 异常

 

字符串

camel.component.kafka.configuration.kafka-header-deserializer

为 deserialization kafka 标头值设置自定义 KafkaHeaderDeserializer 来 camel 标头值。

 

KafkaHeaderDeserializer

camel.component.kafka.configuration.kafka-header-serializer

为 serialization camel 标头值设置为 kafka 标头值设置自定义 KafkaHeaderDeserializer。

 

KafkaHeaderSerializer

camel.component.kafka.configuration.kerberos-before-relogin-min-time

刷新尝试之间的登录线程睡眠时间。

60000

整数

camel.component.kafka.configuration.kerberos-init-cmd

Kerberos kinit 命令路径。默认为 /usr/bin/kinit

/usr/bin/kinit

字符串

camel.component.kafka.configuration.kerberos-principal-to-local-rules

从主体名称映射到短名称(通常是操作系统用户名)的规则列表。规则按顺序评估,第一个与主体名称匹配的规则用于将其映射到短名称。稍后列表中的任何规则都会被忽略。默认情况下,用户名/主机名REALM 的主体名称映射到 username。有关格式的详情,请查看安全授权和 acl。可以使用逗号分隔多个值

DEFAULT

字符串

camel.component.kafka.configuration.kerberos-renew-jitter

添加到续订时间的随机 jitter 的百分比。

 

camel.component.kafka.configuration.kerberos-renew-window-factor

登录线程将处于睡眠状态,直到达到最后一次刷新到票据到期的时间因素前处于睡眠状态,此时将尝试续订票据。

 

camel.component.kafka.configuration.key

记录密钥(如果没有指定密钥,则为 null)。如果配置了这个选项,则它优先于标头 KafkaConstants#KEY

 

字符串

camel.component.kafka.configuration.key-deserializer

deserializer 类用于实现 Deserializer 接口的密钥。

org.apache.kafka.common.serialization.StringDeserializer

字符串

camel.component.kafka.configuration.key-serializer-class

密钥的序列化器类(如果没有提供,默认为与消息相同)。

org.apache.kafka.common.serialization.StringSerializer

字符串

camel.component.kafka.configuration.linger-ms

生产者将请求传输之间到达单个批处理请求的任何记录组合在一起。通常,这只在记录到达速度快于发送时发生。然而,在某些情况下,客户端可能希望减少请求的数量,即使负载低下也是如此。此设置通过添加少量人工延迟来实现此目的,即不立即发送记录,而不是立即发送记录,让生产者最多等待给定延迟,以允许发送其他记录,以便可以将发送批处理在一起。这可能被视为与 TCP 中的 Nagle 算法类似。此设置提供批处理延迟的上限:一旦我们获得批处理。无论此设置如何,无论此设置如何,它都会立即发送一次记录,但是如果我们为此分区累积了这个字节,我们将"linger"用于等待更多记录显示。此设置默认为 0 (例如,无延迟)。例如,设置 linger.ms=5 效果会减少发送的请求数量,但会向负载中发送的记录添加最多 5ms 的延迟。

0

整数

camel.component.kafka.configuration.max-block-ms

配置控制发送到 kafka 将阻止的时长。出于多种原因,这些方法可以被阻止。例如:缓冲区满,元数据不可用。此配置对获取元数据、密钥和值、执行 send ()时缓冲内存所花费的总时间实施最大限制。如果是 partitionsFor (),此配置会在等待元数据时实施最长时间阈值

60000

整数

camel.component.kafka.configuration.max-in-flight-request

客户端在阻止前在单个连接上发送的最大未确认请求数。请注意,如果将此设置设置为大于 1,且发送失败,则可能会因为重试而重新排序消息的风险(例如,如果启用了重试)。

5

整数

camel.component.kafka.configuration.max-partition-fetch-bytes

服务器将返回的每个分区的最大数据量。用于请求的最大内存总量为 #partitions max.partition.fetch.bytes。此大小必须至少与服务器允许的最大消息大小一样大,否则制作者可能会发送大于消费者可以获取的消息。如果发生这种情况,消费者可能会卡住在某个分区中获取大量消息。

1048576

整数

camel.component.kafka.configuration.max-poll-interval-ms

使用消费者组管理时,poll ()调用之间的最大延迟。这会在获取更多记录前将上限放在消费者可以闲置的时间长度。如果在这个超时时间前没有调用 poll (),则消费者被视为失败,并且组将重新平衡,以便将分区重新分配给另一个成员。

 

Long

camel.component.kafka.configuration.max-poll-records

单个调用返回到 poll ()中返回的最大记录数。

500

整数

camel.component.kafka.configuration.max-request-size

请求的最大大小。这也实际上是最大记录大小的上限。请注意,服务器在记录大小上具有自己的上限,可能与此不同。此设置将限制制作者将在单个请求中发送的记录批处理数量,以避免发送大量请求。

1048576

整数

camel.component.kafka.configuration.metadata-max-age-ms

即使我们未看到任何分区领导力更改来主动发现任何新的代理或分区,我们才会强制刷新元数据的时间(以毫秒为单位)。

300000

整数

camel.component.kafka.configuration.metric-reporters

用作指标报告器的类列表。通过实施 MetricReporter 接口,可以插入将收到新指标创建通知的类。总是包括 JmxReporter 来注册 JMX 统计信息。

 

字符串

camel.component.kafka.configuration.metrics-sample-window-ms

为计算指标维护的示例数量。

30000

整数

camel.component.kafka.configuration.no-of-metrics-sample

为计算指标维护的示例数量。

2

整数

camel.component.kafka.configuration.offset-repository

用来本地存储主题每个分区的偏移量的偏移存储库。定义将禁用 autocommit。

 

StateRepository

camel.component.kafka.configuration.partition-assignor

使用组管理时,客户端将使用的类名称在消费者实例之间分发分区所有权

org.apache.kafka.clients.consumer.RangeAssignor

字符串

camel.component.kafka.configuration.partition-key

将发送记录的分区(如果未指定分区,则为 null)。如果配置了这个选项,则它优先于标头 KafkaConstants#PARTITION_KEY

 

整数

camel.component.kafka.configuration.partitioner

用于分区子主题中分区消息的分区类。默认分区器基于密钥的哈希。

org.apache.kafka.clients.producer.internals.DefaultPartitioner

字符串

camel.component.kafka.configuration.poll-timeout-ms

轮询 KafkaConsumer 时使用的超时。

5000

Long

camel.component.kafka.configuration.producer-batch-size

每当将多个记录发送到同一分区时,生成者将尝试将记录一起批处理到较少的请求。这有助于客户端和服务器的性能。此配置控制默认批处理大小(以字节为单位)。不会尝试批处理记录大于这个大小。发送到代理的Requests 将包含多个批处理,每个分区都可用于发送数据。小批处理大小会减少批处理,并可能会减少吞吐量(零的批处理大小将完全禁用批处理)。非常大的批处理大小可能会更严重地使用内存,因为我们将始终在附加记录下分配指定批处理大小的缓冲。

16384

整数

camel.component.kafka.configuration.queue-buffering-max-messages

在使用 async 模式时可以放入生产者的最大未发送消息数,然后才能阻止生产者或必须丢弃数据。

10000

整数

camel.component.kafka.configuration.receive-buffer-bytes

读取数据时要使用的 TCP 接收缓冲区(SO_RCVBUF)的大小。

65536

整数

camel.component.kafka.configuration.reconnect-backoff-max-ms

重新连接到重复连接失败的代理时等待的最大时间(以毫秒为单位)。如果提供,每个主机的 backoff 将为每个连续的连接失败指数增加,直到最高值。在计算 backoff 增长后,添加了 20% 的随机 jitter,以避免连接状况。

1000

整数

camel.component.kafka.configuration.reconnect-backoff-ms

尝试重新连接给定主机前等待的时间。这可避免在紧密循环中重复连接到主机。此 backoff 应用到消费者发送到代理的所有请求。

50

整数

camel.component.kafka.configuration.record-metadata

producer 是否应该存储来自发送到 Kafka 的 RecordMetadata 结果。结果存储在包含 RecordMetadata 元数据的列表中。该列表存储在一个带有键 KafkaConstants#KAFKA_RECORDMETA 的标头中

true

布尔值

camel.component.kafka.configuration.request-required-acks

在考虑请求完成前,生成者需要收到的确认数量。这控制发送的记录的持久性。以下设置很常见:acks=0如果设为零,则生成者不会等待来自服务器的任何确认。记录将立即添加到套接字缓冲区中并被视为发送。无法保证服务器已收到记录,重试配置不会生效(因为客户端通常不知道任何失败)。对每个记录给出的偏移始终设置为 -1 acks=1,这意味着领导机将记录写入其本地日志,但不会等待所有后续者完全确认。在这种情况下,领导机会在确认记录后立即失败,但在后续者复制之前,记录将会丢失。acks=all 意味着领导机将等待整个同步副本集合来确认记录。这样可保证记录在至少一个同步副本仍然处于活动状态时不会丢失。这是最强的可用保证。

1

字符串

camel.component.kafka.configuration.request-timeout-ms

在将一个错误发送到客户端前,代理会等待尝试满足 request.required.acks 要求的时间。

305000

整数

camel.component.kafka.configuration.retries

设置大于零的值将导致客户端重新发送发送失败的记录,并显示潜在的临时错误。请注意,这个重试与在收到错误时重新记录不同的是不同的。允许重试可能会更改记录顺序,因为如果两个记录发送到单个分区,则第一次失败并且被重试,但第二个记录会先出现,然后是第二个记录。

0

整数

camel.component.kafka.configuration.retry-backoff-ms

每次重试前,生成者会刷新相关主题的元数据,以查看是否选择了新的领导。由于领导选举需要一些时间,因此此属性指定生成者在刷新元数据前等待的时间。

100

整数

camel.component.kafka.configuration.sasl-jaas-config

公开 kafka sasl.jaas.config 参数示例: org.apache.kafka.common.security.plain.PlainLoginModule required username=USERNAME password=PASSWORD;

 

字符串

camel.component.kafka.configuration.sasl-kerberos-service-name

Kafka 运行的 Kerberos 主体名称。这可以在 Kafka 的 JAAS 配置或 Kafka 配置中定义。

 

字符串

camel.component.kafka.configuration.sasl-mechanism

使用简单身份验证和安全层(SASL)机制。有关有效值,请参阅 http://www.iana.org/assignments/sasl-mechanisms/sasl-mechanisms.xhtml

GSSAPI

字符串

camel.component.kafka.configuration.security-protocol

用于与代理通信的协议。目前只支持 PLAINTEXT 和 SSL。

PLAINTEXT

字符串

camel.component.kafka.configuration.seek-to

设置 KafkaConsumer 是否在启动时从开始或结束: beginning : read from beginning : read from end This is replace the earlier property seekToBeginning

 

字符串

camel.component.kafka.configuration.send-buffer-bytes

套接字写入缓冲区大小

131072

整数

camel.component.kafka.configuration.serializer-class

serializer 类用于消息。

org.apache.kafka.common.serialization.StringSerializer

字符串

camel.component.kafka.configuration.session-timeout-ms

使用 Kafka 的组管理功能时检测失败的超时。

10000

整数

camel.component.kafka.configuration.ssl-cipher-suites

密码套件列表。这是用来使用 TLS 或 SSL 网络协议协商网络连接的安全设置的身份验证、加密、MAC 和密钥交换算法的命名组合。支持所有可用的密码套件。

 

字符串

camel.component.kafka.configuration.ssl-context-parameters

使用 Camel SSLContextParameters 对象的 SSL 配置。如果配置它,它会在其他 SSL 端点参数之前应用。

 

SSLContextParameters

camel.component.kafka.configuration.ssl-enabled-protocols

为 SSL 连接启用的协议列表。TLSv1.2、TLSv1.1 和 TLSv1 默认启用。

TLSv1.2,TLSv1.1,TLSv1

字符串

camel.component.kafka.configuration.ssl-endpoint-algorithm

使用服务器证书验证服务器主机名的端点标识算法。

 

字符串

camel.component.kafka.configuration.ssl-key-password

密钥存储文件中的私钥密码。这对客户端是可选的。

 

字符串

camel.component.kafka.configuration.ssl-keymanager-algorithm

密钥管理器工厂用于 SSL 连接的算法。默认值是为 Java 虚拟机配置的密钥管理器工厂算法。

SunX509

字符串

camel.component.kafka.configuration.ssl-keystore-location

密钥存储文件的位置。这对客户端是可选的,可用于客户端进行双向身份验证。

 

字符串

camel.component.kafka.configuration.ssl-keystore-password

密钥存储文件的存储密码。对于客户端,这是可选的,只有在配置了 ssl.keystore.location 时才需要。

 

字符串

camel.component.kafka.configuration.ssl-keystore-type

密钥存储文件的文件格式。这对客户端是可选的。默认值为 JKS

JKS

字符串

camel.component.kafka.configuration.ssl-protocol

用于生成 SSLContext 的 SSL 协议。默认设置为 TLS,这适用于大多数情况。最近的 JVM 中允许的值为 TLS、TLSv1.1 和 TLSv1.2。较旧的 JVM 中可能支持 SSL、SSLv2 和 SSLv3,但由于已知的安全漏洞,不建议使用它们。

TLS

字符串

camel.component.kafka.configuration.ssl-provider

用于 SSL 连接的安全供应商的名称。默认值是 JVM 的默认安全提供程序。

 

字符串

camel.component.kafka.configuration.ssl-trustmanager-algorithm

信任管理器工厂用于 SSL 连接的算法。默认值是为 Java 虚拟机配置的信任管理器工厂算法。

PKIX

字符串

camel.component.kafka.configuration.ssl-truststore-location

信任存储文件的位置。

 

字符串

camel.component.kafka.configuration.ssl-truststore-password

信任存储文件的密码。

 

字符串

camel.component.kafka.configuration.ssl-truststore-type

信任存储文件的文件格式。默认值为 JKS。

JKS

字符串

camel.component.kafka.configuration.topic

要使用的主题名称。在消费者中,您可以使用逗号分隔多个主题。制作者只能发送消息到单个主题。

 

字符串

camel.component.kafka.configuration.topic-is-pattern

主题是否为模式(正则表达式)。这可用于订阅与模式匹配的动态主题数量。

false

布尔值

camel.component.kafka.configuration.value-deserializer

deserializer 类用于实现 Deserializer 接口的值。

org.apache.kafka.common.serialization.StringDeserializer

字符串

camel.component.kafka.configuration.worker-pool

要在 kafka 服务器确认从 KafkaProducer 发送的消息使用异步非阻塞处理,使用自定义 worker 池继续路由交换。

 

ExecutorService

camel.component.kafka.configuration.worker-pool-core-size

kafka 服务器在 kafka 服务器确认从 KafkaProducer 发送的消息(使用异步非阻塞处理)来继续路由交换的 worker 池的核心线程数量。

10

整数

camel.component.kafka.configuration.worker-pool-max-size

kafka 服务器之后,worker 池的最大线程数量用于继续路由交换,使用异步非块处理确认从 KafkaProducer 发送的消息。

20

整数

camel.component.kafka.enabled

启用 kafka 组件

true

布尔值

camel.component.kafka.kafka-manual-commit-factory

用于创建 KafkaManualCommit 实例的工厂。这允许在手动提交来自开箱即用的默认实现时,自定义工厂创建自定义 KafkaManualCommit 实例。选项是 org.apache.camel.component.kafka.KafkaManualCommitFactory 类型。

 

字符串

camel.component.kafka.resolve-property-placeholders

启动时组件是否应解析自身上的属性占位符。只有属于 String 类型的属性才能使用属性占位符。

true

布尔值

camel.component.kafka.shutdown-timeout

以毫秒为单位,等待消费者或制作者正常关闭并终止其 worker 线程。

30000

整数

camel.component.kafka.use-global-ssl-context-parameters

启用使用全局 SSL 上下文参数。

false

布尔值

camel.component.kafka.worker-pool

要在 kafka 服务器确认从 KafkaProducer 发送的消息使用异步非阻塞处理,使用共享自定义 worker 池继续路由交换。如果使用这个选项,您必须处理线程池的生命周期,以便在不再需要时关闭池。选项是一个 java.util.concurrent.ExecutorService 类型。

 

字符串

有关 Producer/Consumer 配置的更多信息:

http://kafka.apache.org/documentation.html#newconsumerconfigshttp://kafka.apache.org/documentation.html#producerconfigs

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat