2.5. Apache Kafka의 소스
Apache Kafka 클러스터에서 이벤트를 읽고 이러한 이벤트를 싱크에 전달하는 Apache Kafka 소스를 생성할 수 있습니다. OpenShift Container Platform 웹 콘솔, Knative(kn
) CLI를 사용하거나 YAML 파일로 KafkaSource
오브젝트를 직접 생성하고 OpenShift CLI(oc
)를 사용하여 Kafka 소스를 적용하여 Kafka 소스를 생성할 수 있습니다.
2.5.1. 웹 콘솔을 사용하여 Apache Kafka 이벤트 소스 생성
Apache Kafka에 대한 Knative 브로커 구현이 클러스터에 설치되면 웹 콘솔을 사용하여 Apache Kafka 소스를 생성할 수 있습니다. OpenShift Container Platform 웹 콘솔을 사용하면 간소화되고 직관적인 사용자 인터페이스를 통해 Kafka 소스를 생성할 수 있습니다.
사전 요구 사항
-
OpenShift Serverless Operator, Knative Eventing,
KnativeKafka
사용자 정의 리소스가 클러스터에 설치되어 있습니다. - 웹 콘솔에 로그인했습니다.
- 가져오려는 Kafka 메시지를 생성하는 Red Hat AMQ Streams(Kafka) 클러스터에 액세스할 수 있습니다.
- 프로젝트를 생성했거나 OpenShift Container Platform에서 애플리케이션 및 기타 워크로드를 생성하는 데 적절한 역할 및 권한이 있는 프로젝트에 액세스할 수 있습니다.
절차
- 개발자 화면에서 +추가 페이지로 이동하여 이벤트 소스를 선택합니다.
- 이벤트 소스 페이지의 유형 섹션에서 Kafka 소스를 선택합니다.
Kafka 소스 설정을 구성합니다.
- 쉼표로 구분된 부트스트랩 서버 목록을 추가합니다.
- 쉼표로 구분된 주제 목록을 추가합니다.
- 소비자 그룹을 추가합니다.
- 생성한 서비스 계정의 서비스 계정 이름을 선택합니다.
- 이벤트 소스로 싱크를 선택합니다. 싱크는 채널, 브로커 또는 서비스와 같은 리소스이거나 URI일 수 있습니다.
- Kafka 이벤트 소스로 이름을 입력합니다.
- 생성을 클릭합니다.
검증
토폴로지 페이지를 확인하여 Kafka 이벤트 소스가 생성되었고 싱크에 연결되어 있는지 확인할 수 있습니다.
- 개발자 화면에서 토폴로지로 이동합니다.
Kafka 이벤트 소스 및 싱크를 확인합니다.
2.5.2. Knative CLI를 사용하여 Apache Kafka 이벤트 소스 생성
kn source kafka create
명령을 사용하여 Knative(kn
) CLI를 사용하여 Kafka 소스를 생성할 수 있습니다. Knative CLI를 사용하여 이벤트 소스를 생성하면 YAML 파일을 직접 수정하는 것보다 더 간소화되고 직관적인 사용자 인터페이스가 제공됩니다.
사전 요구 사항
-
OpenShift Serverless Operator, Knative Eventing, Knative Serving,
KnativeKafka
사용자 정의 리소스(CR가 클러스터에 설치되어 있습니다. - 프로젝트를 생성했거나 OpenShift Container Platform에서 애플리케이션 및 기타 워크로드를 생성하는 데 적절한 역할 및 권한이 있는 프로젝트에 액세스할 수 있습니다.
- 가져오려는 Kafka 메시지를 생성하는 Red Hat AMQ Streams(Kafka) 클러스터에 액세스할 수 있습니다.
-
Knative(
kn
) CLI가 설치되어 있습니다. -
선택 사항: 이 절차의 확인 단계를 사용하려면 OpenShift CLI(
oc
)를 설치했습니다.
절차
Kafka 이벤트 소스가 작동하는지 확인하려면 수신 이벤트를 서비스 로그에 덤프하는 Knative 서비스를 생성합니다.
$ kn service create event-display \ --image quay.io/openshift-knative/knative-eventing-sources-event-display
KafkaSource
CR을 생성합니다.$ kn source kafka create <kafka_source_name> \ --servers <cluster_kafka_bootstrap>.kafka.svc:9092 \ --topics <topic_name> --consumergroup my-consumer-group \ --sink event-display
참고이 명령의 자리 표시자 값을 소스 이름, 부트스트랩 서버 및 주제의 값으로 바꿉니다.
--servers
,--topics
,--consumergroup
옵션은 Kafka 클러스터에 대한 연결 매개 변수를 지정합니다.--consumergroup
옵션은 선택 사항입니다.선택 사항: 생성한
KafkaSource
CR에 대한 세부 정보를 확인합니다.$ kn source kafka describe <kafka_source_name>
출력 예
Name: example-kafka-source Namespace: kafka Age: 1h BootstrapServers: example-cluster-kafka-bootstrap.kafka.svc:9092 Topics: example-topic ConsumerGroup: example-consumer-group Sink: Name: event-display Namespace: default Resource: Service (serving.knative.dev/v1) Conditions: OK TYPE AGE REASON ++ Ready 1h ++ Deployed 1h ++ SinkProvided 1h
검증 단계
Kafka 인스턴스를 트리거하여 메시지를 항목에 보냅니다.
$ oc -n kafka run kafka-producer \ -ti --image=quay.io/strimzi/kafka:latest-kafka-2.7.0 --rm=true \ --restart=Never -- bin/kafka-console-producer.sh \ --broker-list <cluster_kafka_bootstrap>:9092 --topic my-topic
프롬프트에 메시지를 입력합니다. 이 명령은 다음을 가정합니다.
-
Kafka 클러스터는
kafka
네임스페이스에 설치됩니다. -
my-topic
주제를 사용하도록KafkaSource
오브젝트가 구성되어 있습니다.
-
Kafka 클러스터는
로그를 보고 메시지가 도착했는지 확인합니다.
$ oc logs $(oc get pod -o name | grep event-display) -c user-container
출력 예
☁️ cloudevents.Event Validation: valid Context Attributes, specversion: 1.0 type: dev.knative.kafka.event source: /apis/v1/namespaces/default/kafkasources/example-kafka-source#example-topic subject: partition:46#0 id: partition:46/offset:0 time: 2021-03-10T11:21:49.4Z Extensions, traceparent: 00-161ff3815727d8755848ec01c866d1cd-7ff3916c44334678-00 Data, Hello!
2.5.2.1. Knative CLI 싱크 플래그
Knative(kn
) CLI를 사용하여 이벤트 소스를 생성할 때 --sink
플래그를 사용하여 이벤트가 해당 리소스에서 로 전송되는 싱크를 지정할 수 있습니다. 싱크는 다른 리소스에서 들어오는 이벤트를 수신할 수 있는 모든 주소 지정 가능 또는 호출 가능한 리소스일 수 있습니다.
다음 예제에서는 싱크로 서비스 http://event-display.svc.cluster.local
를 사용하는 싱크 바인딩을 생성합니다.
싱크 플래그를 사용하는 명령의 예
$ kn source binding create bind-heartbeat \
--namespace sinkbinding-example \
--subject "Job:batch/v1:app=heartbeat-cron" \
--sink http://event-display.svc.cluster.local \ 1
--ce-override "sink=bound"
- 1
http://event-display.svc.cluster.local
의svc
는 싱크가 Knative 서비스인지 확인합니다. 기타 기본 싱크 접두사에는channel
, 및broker
가 포함됩니다.
2.5.3. YAML을 사용하여 Apache Kafka 이벤트 소스 생성
YAML 파일을 사용하여 Knative 리소스를 생성하려면 선언적 API를 사용하므로 선언적으로 애플리케이션을 재현할 수 있는 방식으로 설명할 수 있습니다. YAML을 사용하여 Kafka 소스를 생성하려면 KafkaSource
오브젝트를 정의하는 YAML 파일을 생성한 다음 oc apply
명령을 사용하여 적용해야 합니다.
사전 요구 사항
-
OpenShift Serverless Operator, Knative Eventing,
KnativeKafka
사용자 정의 리소스가 클러스터에 설치되어 있습니다. - 프로젝트를 생성했거나 OpenShift Container Platform에서 애플리케이션 및 기타 워크로드를 생성하는 데 적절한 역할 및 권한이 있는 프로젝트에 액세스할 수 있습니다.
- 가져오려는 Kafka 메시지를 생성하는 Red Hat AMQ Streams(Kafka) 클러스터에 액세스할 수 있습니다.
-
OpenShift CLI(
oc
)를 설치합니다.
절차
KafkaSource
오브젝트를 YAML 파일로 생성합니다.apiVersion: sources.knative.dev/v1beta1 kind: KafkaSource metadata: name: <source_name> spec: consumerGroup: <group_name> 1 bootstrapServers: - <list_of_bootstrap_servers> topics: - <list_of_topics> 2 sink: - <list_of_sinks> 3
중요OpenShift Serverless의
KafkaSource
개체에 대한 API의v1beta1
버전만 지원됩니다. 이 버전은 더 이상 사용되지 않으므로 이 API의v1alpha1
버전을 사용하지 마십시오.KafkaSource
오브젝트의 예apiVersion: sources.knative.dev/v1beta1 kind: KafkaSource metadata: name: kafka-source spec: consumerGroup: knative-group bootstrapServers: - my-cluster-kafka-bootstrap.kafka:9092 topics: - knative-demo-topic sink: ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-display
KafkaSource
YAML 파일을 적용합니다.$ oc apply -f <filename>
검증
다음 명령을 입력하여 Kafka 이벤트 소스가 생성되었는지 확인합니다.
$ oc get pods
출력 예
NAME READY STATUS RESTARTS AGE kafkasource-kafka-source-5ca0248f-... 1/1 Running 0 13m
2.5.4. Apache Kafka 소스에 대한 SASL 인증 구성
SASL( Simple Authentication and Security Layer )은 Apache Kafka에서 인증을 위해 사용합니다. 클러스터에서 SASL 인증을 사용하는 경우 사용자가 Kafka 클러스터와 통신하기 위해 Knative에 인증 정보를 제공해야 합니다. 그렇지 않으면 이벤트를 생성하거나 사용할 수 없습니다.
사전 요구 사항
- OpenShift Container Platform에 대한 클러스터 또는 전용 관리자 권한이 있어야 합니다.
-
OpenShift Serverless Operator, Knative Eventing,
KnativeKafka
CR이 OpenShift Container Platform 클러스터에 설치되어 있습니다. - 프로젝트를 생성했거나 OpenShift Container Platform에서 애플리케이션 및 기타 워크로드를 생성하는 데 적절한 역할 및 권한이 있는 프로젝트에 액세스할 수 있습니다.
- Kafka 클러스터의 사용자 이름 및 암호가 있습니다.
-
사용할 SASL 메커니즘을 선택했습니다(예:
PLAIN
,SCRAM-SHA-256
또는SCRAM-SHA-512
). -
TLS가 활성화된 경우 Kafka 클러스터의
ca.crt
인증서 파일도 필요합니다. -
OpenShift(
oc
) CLI를 설치했습니다.
절차
선택한 네임스페이스에서 인증서 파일을 시크릿으로 생성합니다.
$ oc create secret -n <namespace> generic <kafka_auth_secret> \ --from-file=ca.crt=caroot.pem \ --from-literal=password="SecretPassword" \ --from-literal=saslType="SCRAM-SHA-512" \ 1 --from-literal=user="my-sasl-user"
- 1
- SASL 유형은
PLAIN
,SCRAM-SHA-256
또는SCRAM-SHA-512
일 수 있습니다.
다음
사양
구성을 포함하도록 Kafka 소스를 생성하거나 수정합니다.apiVersion: sources.knative.dev/v1beta1 kind: KafkaSource metadata: name: example-source spec: ... net: sasl: enable: true user: secretKeyRef: name: <kafka_auth_secret> key: user password: secretKeyRef: name: <kafka_auth_secret> key: password type: secretKeyRef: name: <kafka_auth_secret> key: saslType tls: enable: true caCert: 1 secretKeyRef: name: <kafka_auth_secret> key: ca.crt ...
- 1
- 퍼블릭 클라우드 Kafka 서비스를 사용하는 경우
caCert
사양이 필요하지 않습니다.