第4章 Kafka クラスターに接続するためのクライアントアプリケーションの設定
Kafka クラスターに接続するには、ブローカーを識別して接続を有効にする最小限のプロパティーのセットを使用してクライアントアプリケーションを設定する必要があります。さらに、メッセージを Kafka で使用されるバイト配列形式に変換したり、その形式からメッセージを変換したりするためのシリアライザー/デシリアライザーメカニズムを追加する必要があります。コンシューマークライアントを開発する場合は、最初の接続を Kafka クラスターに追加することから始めます。これは、使用可能なすべてのブローカーを検出するために使用されます。接続を確立したら、Kafka トピックからのメッセージの消費、および Kafka トピックへのメッセージの生成を開始できます。
必須ではありませんが、ログとメトリックの収集でクライアントを識別できるように、一意のクライアント ID を使用することを推奨します。
プロパティーファイルでプロパティーを設定できます。プロパティーファイルを使用すると、コードを再コンパイルせずに設定を変更できます。
たとえば、次のコードを使用して Java クライアントにプロパティーをロードできます。
設定プロパティーをクライアントにロードする
Properties properties = new Properties(); InsetPropertyStream insetPropertyStream = new FileInsetPropertyStream("config.properties"); properties.load(insetPropertyStream); KafkaProducer<String, String> consumer = new KafkaProducer<>(properties);
設定オブジェクトのコードにプロパティーを直接追加することもできます。たとえば、Java クライアントアプリケーションに setProperty()
メソッドを使用できます。プロパティーを直接追加することは、設定するプロパティーの数が少ない場合に便利なオプションです。
4.1. 基本的なプロデューサークライアントの設定
プロデューサークライアントを開発するときは、次のように設定します。
- Kafka クラスターへの接続
- メッセージキーを Kafka ブローカーのバイトに変換するシリアライザー
- メッセージ値を Kafka ブローカーのバイトに変換するシリアライザー
圧縮されたメッセージを送信および保存する場合は、圧縮タイプを追加することもできます。
基本的なプロデューサークライアント設定プロパティー
client.id = my-producer-id 1 bootstrap.servers = my-cluster-kafka-bootstrap:9092 2 key.serializer = org.apache.kafka.common.serialization.StringSerializer 3 value.serializer = org.apache.kafka.common.serialization.StringSerializer 4
プロデューサークライアント設定をコードに直接追加する
Properties props = new Properties(); props.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "my-producer-id"); props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "my-cluster-kafka-bootstrap:9092"); props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
KafkaProducer
は、送信するメッセージの文字列キーと値のタイプを指定します。使用されるシリアライザーは、Kafka に送信する前に、指定された型のキーと値をバイトに変換できる必要があります。