第4章 Kafka クラスターに接続するためのクライアントアプリケーションの設定
Kafka クラスターに接続するには、ブローカーを識別して接続を有効にする最小限のプロパティーのセットを使用してクライアントアプリケーションを設定する必要があります。さらに、メッセージを Kafka で使用されるバイト配列形式に変換したり、その形式からメッセージを変換したりするためのシリアライザー/デシリアライザーメカニズムを追加する必要があります。コンシューマークライアントを開発する場合は、最初の接続を Kafka クラスターに追加することから始めます。これは、使用可能なすべてのブローカーを検出するために使用されます。接続を確立したら、Kafka トピックからのメッセージの消費、および Kafka トピックへのメッセージの生成を開始できます。
必須ではありませんが、ログとメトリックの収集でクライアントを識別できるように、一意のクライアント ID を使用することを推奨します。
プロパティーファイルでプロパティーを設定できます。プロパティーファイルを使用すると、コードを再コンパイルせずに設定を変更できます。
たとえば、次のコードを使用して Java クライアントにプロパティーをロードできます。
設定プロパティーをクライアントにロードする
Properties props = new Properties(); try (InputStream propStream = Files.newInputStream(Paths.get(filename))) { props.load(propStream); }
Properties props = new Properties();
try (InputStream propStream = Files.newInputStream(Paths.get(filename))) {
props.load(propStream);
}
設定オブジェクトのコードにプロパティーを直接追加することもできます。たとえば、Java クライアントアプリケーションに setProperty()
メソッドを使用できます。プロパティーを直接追加することは、設定するプロパティーの数が少ない場合に便利なオプションです。
4.1. 基本的なプロデューサークライアントの設定
プロデューサークライアントを開発するときは、次のように設定します。
- Kafka クラスターへの接続
- メッセージキーを Kafka ブローカーのバイトに変換するシリアライザー
- メッセージ値を Kafka ブローカーのバイトに変換するシリアライザー
圧縮されたメッセージを送信および保存する場合は、圧縮タイプを追加することもできます。
基本的なプロデューサークライアント設定プロパティー
client.id = my-producer-id bootstrap.servers = my-cluster-kafka-bootstrap:9092 key.serializer = org.apache.kafka.common.serialization.StringSerializer value.serializer = org.apache.kafka.common.serialization.StringSerializer
client.id = my-producer-id
bootstrap.servers = my-cluster-kafka-bootstrap:9092
key.serializer = org.apache.kafka.common.serialization.StringSerializer
value.serializer = org.apache.kafka.common.serialization.StringSerializer
プロデューサークライアント設定をコードに直接追加する
Properties props = new Properties(); props.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "my-producer-id"); props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "my-cluster-kafka-bootstrap:9092"); props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
Properties props = new Properties();
props.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "my-producer-id");
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "my-cluster-kafka-bootstrap:9092");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
KafkaProducer
は、送信するメッセージの文字列キーと値のタイプを指定します。使用されるシリアライザーは、Kafka に送信する前に、指定された型のキーと値をバイトに変換できる必要があります。