第6章 Kafka クライアントの開発
任意のプログラミング言語で Kafka クライアントを作成し、Streams for Apache Kafka に接続します。
Kafka クラスターとやりとりするには、クライアントアプリケーションがメッセージを生成および消費できる必要があります。基本的な Kafka クライアントアプリケーションを開発して設定するには、少なくとも次のことを行う必要があります。
- Kafka クラスターに接続するための設定をセットアップする
- プロデューサーとコンシューマーを使用してメッセージを送受信する
Kafka クラスターに接続し、プロデューサーとコンシューマーを使用するための基本設定をセットアップすることは、Kafka クライアント開発の最初のステップです。その後、入力、セキュリティー、パフォーマンス、エラー処理、クライアントアプリケーションの機能の改善に拡張できます。
前提条件
以下のプロパティー値を含むクライアントプロパティーファイルが作成されました。
手順
- Java、Python、.NET などのプログラミング言語用の Kafka クライアントライブラリーを選択します。Streams for Apache Kafka では、Red Hat によって構築されたクライアントライブラリーのみがサポートされます。現在、Streams for Apache Kafka は Java クライアントライブラリーのみを提供します。
- パッケージマネージャーを使用するか、ソースからライブラリーをダウンロードして手動でライブラリーをインストールします。
- Kafka クライアントに必要なクラスと依存関係をコードにインポートします。
作成するクライアントのタイプに応じて、Kafka コンシューマーオブジェクトまたはプロデューサーオブジェクトを作成します。
クライアントは、Kafka コンシューマー、プロデューサ、Streams プロセッサー、および管理者のいずれかになります。
Kafka クラスターに接続するための設定プロパティー (必要に応じてブローカーアドレス、ポート、認証情報など) を指定します。
ローカルの Kafka デプロイメントの場合は、
localhost:9092
のようなアドレスから始めることができます。ただし、Streams for Apache Kafka によって管理される Kafka クラスターを使用する場合は、oc
コマンドを使用して、Kafka
カスタムリソースのステータスからブートストラップアドレスを取得できます。oc get kafka <kafka_cluster_name> -o=jsonpath='{.status.listeners[*].bootstrapServers}{"\n"}'
oc get kafka <kafka_cluster_name> -o=jsonpath='{.status.listeners[*].bootstrapServers}{"\n"}'
Copy to Clipboard Copied! このコマンドは、Kafka クラスター上のクライアント接続のリスナーによって公開されたブートストラップアドレスを取得します。
- Kafka コンシューマーまたはプロデューサーオブジェクトを使用して、トピックのサブスクライブ、メッセージの生成、または Kafka クラスターからのメッセージの取得を行います。
- エラー処理に注意してください。これは、Kafka に接続して通信する場合、特に高可用性と操作の容易さが重視される運用システムでは非常に重要です。効果的なエラー処理は、プロトタイプと実稼働レベルのアプリケーションを区別する重要な要素であり、Kafka だけでなく、あらゆる堅牢なソフトウェアシステムにも該当します。
6.1. Kafka プロデューサーアプリケーションの例
この Java ベースの Kafka プロデューサーアプリケーションは、Kafka トピックへのメッセージを生成する自己完結型アプリケーションの例です。クライアントは Kafka Producer
API を使用して、いくつかのエラー処理を行いながらメッセージを非同期に送信します。
クライアントは、メッセージ処理用の Callback
インターフェイスを実装します。
Kafka プロデューサーアプリケーションを実行するには、Producer
クラスの main
メソッドを実行します。クライアントは、randomBytes
メソッドを使用して、メッセージペイロードとしてランダムなバイト配列を生成します。クライアントは、NUM_MESSAGES
メッセージ (設定例では 50) が送信されるまで、指定された Kafka トピックにメッセージを生成します。プロデューサはスレッドセーフであるため、複数のスレッドが単一のプロデューサインスタンスを使用できます。
Kafka プロデューサーインスタンスはスレッドセーフになるように設計されており、複数のスレッドが 1 つのプロデューサーインスタンスを共有できます。
このサンプルクライアントは、特定のユースケース向けに、より複雑な Kafka プロデューサを構築するための基本基盤を提供します。セキュアな接続の実装 など、追加の機能を組み込むことができます。
前提条件
-
指定された
BOOTSTRAP_SERVERS
で実行されている Kafka ブローカー -
メッセージが生成される
TOPIC_NAME
という名前の Kafka トピック。 - クライアント依存関係
Kafka プロデューサーアプリケーションを実装する前に、プロジェクトに必要な依存関係を含める必要があります。Java ベースの Kafka クライアントの場合は、Kafka クライアント JAR が含まれます。この JAR ファイルには、クライアントの構築と実行に必要な Kafka ライブラリーが含まれています。
Maven プロジェクトの pom.xml
ファイルに依存関係を追加する方法は、「Kafka クライアントの依存関係を Maven プロジェクトに追加する」 を参照してください。
Configuration
プロデューサー
クラスで指定された次の定数を使用して、プロデューサーアプリケーションを設定できます。
BOOTSTRAP_SERVERS
- Kafka ブローカーに接続するためのアドレスおよびポート。
TOPIC_NAME
- メッセージを生成する Kafka トピックの名前。
NUM_MESSAGES
- 停止する前に生成するメッセージの数。
MESSAGE_SIZE_BYTES
- 各メッセージのバイト単位のサイズ。
PROCESSING_DELAY_MS
- メッセージ送信間の遅延 (ミリ秒単位)。これにより、メッセージの処理時間をシミュレートでき、テストに役立ちます。
プロデューサーアプリケーションの例
import java.util.Properties; import java.util.Random; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.LongSerializer; public class Producer implements Callback { private static final Random RND = new Random(0); private static final String BOOTSTRAP_SERVERS = "localhost:9092"; private static final String TOPIC_NAME = "my-topic"; private static final long NUM_MESSAGES = 50; private static final int MESSAGE_SIZE_BYTES = 100; private static final long PROCESSING_DELAY_MS = 1000L; protected AtomicLong messageCount = new AtomicLong(0); public static void main(String[] args) { new Producer().run(); } public void run() { System.out.println("Running producer"); try (var producer = createKafkaProducer()) { byte[] value = randomBytes(MESSAGE_SIZE_BYTES); while (messageCount.get() < NUM_MESSAGES) { sleep(PROCESSING_DELAY_MS); producer.send(new ProducerRecord<>(TOPIC_NAME, messageCount.get(), value), this); messageCount.incrementAndGet(); } } } private KafkaProducer<Long, byte[]> createKafkaProducer() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ProducerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); return new KafkaProducer<>(props); } private void sleep(long ms) { try { TimeUnit.MILLISECONDS.sleep(ms); } catch (InterruptedException e) { throw new RuntimeException(e); } } private byte[] randomBytes(int size) { if (size <= 0) { throw new IllegalArgumentException("Record size must be greater than zero"); } byte[] payload = new byte[size]; for (int i = 0; i < payload.length; ++i) { payload[i] = (byte) (RND.nextInt(26) + 65); } return payload; } private boolean retriable(Exception e) { if (e instanceof IllegalArgumentException || e instanceof UnsupportedOperationException || !(e instanceof RetriableException)) { return false; } else { return true; } } @Override public void onCompletion(RecordMetadata metadata, Exception e) { if (e != null) { System.err.println(e.getMessage()); if (!retriable(e)) { e.printStackTrace(); System.exit(1); } } else { System.out.printf("Record sent to %s-%d with offset %d%n", metadata.topic(), metadata.partition(), metadata.offset()); } } }
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.LongSerializer;
public class Producer implements Callback {
private static final Random RND = new Random(0);
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String TOPIC_NAME = "my-topic";
private static final long NUM_MESSAGES = 50;
private static final int MESSAGE_SIZE_BYTES = 100;
private static final long PROCESSING_DELAY_MS = 1000L;
protected AtomicLong messageCount = new AtomicLong(0);
public static void main(String[] args) {
new Producer().run();
}
public void run() {
System.out.println("Running producer");
try (var producer = createKafkaProducer()) {
byte[] value = randomBytes(MESSAGE_SIZE_BYTES);
while (messageCount.get() < NUM_MESSAGES) {
sleep(PROCESSING_DELAY_MS);
producer.send(new ProducerRecord<>(TOPIC_NAME, messageCount.get(), value), this);
messageCount.incrementAndGet();
}
}
}
private KafkaProducer<Long, byte[]> createKafkaProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
return new KafkaProducer<>(props);
}
private void sleep(long ms) {
try {
TimeUnit.MILLISECONDS.sleep(ms);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private byte[] randomBytes(int size) {
if (size <= 0) {
throw new IllegalArgumentException("Record size must be greater than zero");
}
byte[] payload = new byte[size];
for (int i = 0; i < payload.length; ++i) {
payload[i] = (byte) (RND.nextInt(26) + 65);
}
return payload;
}
private boolean retriable(Exception e) {
if (e instanceof IllegalArgumentException
|| e instanceof UnsupportedOperationException
|| !(e instanceof RetriableException)) {
return false;
} else {
return true;
}
}
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
System.err.println(e.getMessage());
if (!retriable(e)) {
e.printStackTrace();
System.exit(1);
}
} else {
System.out.printf("Record sent to %s-%d with offset %d%n",
metadata.topic(), metadata.partition(), metadata.offset());
}
}
}
- 1
- クライアントは、
createKafkaProducer
メソッドを使用して Kafka プロデューサを作成します。プロデューサは、Kafka トピックにメッセージを非同期的に送信します。 - 2
- バイト配列は、Kafka トピックに送信される各メッセージのペイロードとして使用されます。
- 3
- 送信されるメッセージの最大数は、
NUM_MESSAGES
定数値によって決まります。 - 4
- メッセージレートは、送信される各メッセージ間の遅延によって制御されます。
- 5
- プロデューサは、トピック名、メッセージカウント値、およびメッセージ値を渡します。
- 6
- クライアントは、提供された設定を使用して
KafkaProducer
インスタンスを作成します。プロパティーファイルを使用することも、設定を直接追加することもできます。基本設定の詳細については、4章Kafka クラスターに接続するためのクライアントアプリケーションの設定 を参照してください。 - 7
- Kafka ブローカーへの接続。
- 8
- ランダムに生成された UUID を使用するプロデューサーの一意のクライアント ID。クライアント ID は必須ではありませんが、リクエストのソースを追跡するのに役立ちます。
- 9
- キーおよび値をバイト配列として処理するための適切なシリアライザークラス。
- 10
- 指定されたミリ秒数の間、メッセージ送信プロセスに遅延を導入するメソッド。メッセージの送信を担当するスレッドが一時停止中に中断されると、
InterruptedException
エラーがスローされます。 - 11
- Kafka トピックに送信される各メッセージのペイロードとして機能する、特定のサイズのランダムなバイト配列を作成するメソッド。このメソッドはランダムな整数を生成し、ASCII コードの大文字を表す
65
を加算します (65 はA
、66 はB
など)。ASCII コードはペイロード配列に 1 バイトとして保存されます。ペイロードサイズがゼロ以下の場合、IllegalArgumentException
がスローされます。 - 12
- 例外の後にメッセージの送信を再試行するかどうかを確認するメソッド。Kafka プロデューサは、接続エラーなどの特定のエラーに対する再試行を自動的に処理します。このメソッドをカスタマイズして、他のエラーを含めることができます。null および指定された例外、または
RetriableException
インターフェイスを実装していない例外の場合はfalse
を返します。 - 13
- Kafka ブローカーによってメッセージが確認されたときに呼び出されるメソッド。成功すると、トピック、パーティション、メッセージのオフセット位置の詳細を含むメッセージが出力されます。メッセージの送信時にエラーが発生した場合は、エラーメッセージが出力されます。このメソッドは例外をチェックし、それが致命的なエラーであるか致命的ではないエラーであるかに基づいて適切なアクションを実行します。エラーが致命的ではない場合、メッセージ送信プロセスは続行されます。エラーが致命的である場合、スタックトレースが出力され、プロデューサは終了します。
エラー処理
プロデューサーアプリケーションによって捕捉された致命的な例外:
InterruptedException
-
一時停止中に現在のスレッドが中断された場合にスローされるエラー。通常、中断はプロデューサーを停止またはシャットダウンするときに発生します。例外は
RuntimeException
として再スローされ、プロデューサーが終了します。 IllegalArgumentException
- プロデューサが無効または不適切な引数を受け取ったときにスローされるエラー。たとえば、トピックが欠落している場合、例外がスローされます。
UnsupportedOperationException
-
操作がサポートされていない場合、またはメソッドが実装されていない場合にスローされるエラー。たとえば、サポートされていないプロデューサー設定を使用しようとしたり、
KafkaProducer
クラスでサポートされていないメソッドを呼び出そうとした場合、例外が出力されます。
プロデューサーアプリケーションによって捕捉された致命的ではない例外:
RetriableException
-
Kafka クライアントライブラリーによって提供される
RetriableException
インターフェイスを実装する例外に対してスローされるエラー。
致命的ではないエラーの場合、プロデューサーはメッセージの送信を続けます。
デフォルトでは、Kafka は少なくとも 1 回のメッセージ配信セマンティクスで動作するので、特定のシナリオではメッセージが複数回配信され、重複が発生する可能性があります。このリスクを回避するには、Kafka プロデューサーでトランザクションを有効にすること を検討してください。トランザクションは、1 回限りの配信を強化します。さらに、retries
設定プロパティーを使用して、プロデューサーがメッセージの送信を中止するまでに再試行する回数を制御できます。この設定は、メッセージ送信エラー時に retriable
メソッドが true
を返す回数に影響します。