2.2.6. Kafka Connect API の公開
KafkaConnector リソースを使用してコネクターを管理する代わりに、Kafka Connect REST API を使用します。Kafka Connect REST API は、<connect_cluster_name>-connect-api:8083 で実行しているサービスとして利用できます。<connect_cluster_name> は Kafka Connect クラスターの名前です。サービスは、Kafka Connect インスタンスの作成時に作成されます。
strimzi.io/use-connector-resources アノテーションは KafkaConnectors を有効にします。アノテーションを KafkaConnect リソース設定に適用した場合、そのアノテーションを削除して Kafka Connect API を使用する必要があります。それ以外の場合は、Kafka Connect REST API を使用して直接手動で変更した変更は Cluster Operator によって元に戻されます。
コネクター設定を JSON オブジェクトとして追加できます。
コネクター設定を追加するための curl 要求の例
curl -X POST \
http://my-connect-cluster-connect-api:8083/connectors \
-H 'Content-Type: application/json' \
-d '{ "name": "my-source-connector",
"config":
{
"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector",
"file": "/opt/kafka/LICENSE",
"topic":"my-topic",
"tasksMax": "4",
"type": "source"}
}'
API には OpenShift クラスター内でのみアクセスできます。OpenShift クラスター外で実行しているアプリケーションに Kafka Connect API にアクセスできるようにするには、以下の機能のいずれかを作成して手動で公開できます。
-
LoadBalancerまたはNodePortタイプのサービス -
Ingressリソース - OpenShift ルート
接続は安全ではないため、外部アクセスが推奨されています。
サービスを作成する場合には、<connect_cluster_name>-connect-api サービスの セレクター からラベルを使用して、サービスがトラフィックをルーティングする Pod を設定します。
サービスのセレクター設定
# ...
selector:
strimzi.io/cluster: my-connect-cluster
strimzi.io/kind: KafkaConnect
strimzi.io/name: my-connect-cluster-connect
#...
また、外部クライアントからの HTTP 要求を許可する NetworkPolicy を作成する必要があります。
Kafka Connect API への要求を許可する NetworkPolicy の例
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: my-custom-connect-network-policy
spec:
ingress:
- from:
- podSelector:
matchLabels:
app: my-connector-manager
ports:
- port: 8083
protocol: TCP
podSelector:
matchLabels:
strimzi.io/cluster: my-connect-cluster
strimzi.io/kind: KafkaConnect
strimzi.io/name: my-connect-cluster-connect
policyTypes:
- Ingress
- 1
- API への接続が許可される Pod のラベル。
クラスター外でコネクター設定を追加するには、curl コマンドで API を公開するリソースの URL を使用します。
関連情報
- REST API でサポートされる操作は、Apache Kafka のドキュメント を参照してください。