5장. 주제에서 메시지 전송 및 수신
OpenShift에 설치된 Kafka 클러스터에서 메시지를 전송하고 수신합니다.
다음 절차에서는 Kafka 클라이언트를 사용하여 메시지를 생성하고 사용하는 방법을 설명합니다. OpenShift에 클라이언트를 배포하거나 로컬 Kafka 클라이언트를 OpenShift 클러스터에 연결할 수 있습니다. 둘 중 하나 또는 두 옵션을 사용하여 Kafka 클러스터 설치를 테스트할 수 있습니다. 로컬 클라이언트의 경우 OpenShift 경로 연결을 사용하여 Kafka 클러스터에 액세스합니다.
oc 명령줄 툴을 사용하여 Kafka 클라이언트를 배포 및 실행합니다.
사전 요구 사항
로컬 프로듀서 및 소비자의 경우:
OpenShift 클러스터에 배포된 Kafka 클라이언트에서 메시지 전송 및 수신
생산자 및 소비자 클라이언트를 OpenShift 클러스터에 배포합니다. 그런 다음 클라이언트를 사용하여 동일한 네임스페이스의 Kafka 클러스터에서 메시지를 보내고 받을 수 있습니다. 배포에서는 Kafka를 실행하기 위해 AMQ Streams 컨테이너 이미지를 사용합니다.
oc명령줄 인터페이스를 사용하여 Kafka 생산자를 배포합니다.이 예제에서는 Kafka 클러스터
my-cluster에 연결하는 Kafka 생산자를 배포합니다.my-topic이라는 항목이 생성됩니다.OpenShift에 Kafka 생산자 배포
oc run kafka-producer -ti \ --image=registry.redhat.io/amq7/amq-streams-kafka-33-rhel8:2.3.0 \ --rm=true \ --restart=Never \ -- bin/kafka-console-producer.sh \ --bootstrap-server my-cluster-kafka-bootstrap:9092 \ --topic my-topic참고연결에 실패하면 Kafka 클러스터가 실행 중이며 올바른 클러스터 이름이
bootstrap-server로 지정되었는지 확인합니다.- 명령 프롬프트에서 여러 메시지를 입력합니다.
-
OpenShift 웹 콘솔에서 홈 > 프로젝트 페이지로 이동하여 생성한
amq-streams-kafka프로젝트를 선택합니다. -
Pod 목록에서
kafka-producer를 클릭하여 생산자 Pod 세부 정보를 확인합니다. - 로그 페이지를 선택하여 입력한 메시지가 있는지 확인합니다.
oc명령줄 인터페이스를 사용하여 Kafka 소비자를 배포합니다.OpenShift에 Kafka 소비자 배포
oc run kafka-consumer -ti \ --image=registry.redhat.io/amq7/amq-streams-kafka-33-rhel8:2.3.0 \ --rm=true \ --restart=Never \ -- bin/kafka-console-consumer.sh \ --bootstrap-server my-cluster-kafka-bootstrap:9092 \ --topic my-topic \ --from-beginning소비자는
my-topic에 생성된 메시지를 소비했습니다.- 명령 프롬프트에서 소비자 콘솔에 들어오는 메시지가 표시되는지 확인합니다.
-
OpenShift 웹 콘솔에서 홈 > 프로젝트 페이지로 이동하여 생성한
amq-streams-kafka프로젝트를 선택합니다. -
Pod 목록에서
kafka-consumer를 클릭하여 소비자 Pod 세부 정보를 확인합니다. - 사용한 메시지가 있는지 확인하려면 로그 페이지를 선택합니다.
로컬에서 실행되는 Kafka 클라이언트에서 메시지 전송 및 수신
명령줄 인터페이스를 사용하여 로컬 시스템에서 Kafka 생산자 및 소비자를 실행합니다.
AMQ Streams 소프트웨어 다운로드 페이지에서 AMQ Streams <version > 바이너리를 다운로드하여 추출합니다.
amq-streams- <version> -bin.zip파일의 압축을 풉니다.명령줄 인터페이스를 열고
my-topic주제와 TLS의 인증 속성을 사용하여 Kafka 콘솔 프로듀서를 시작합니다.OpenShift 경로를 사용하여 Kafka 브로커에 액세스 하는 데 필요한 속성을 추가합니다.
- 사용 중인 OpenShift 경로에 대해 호스트 이름과 포트 443을 사용합니다.
브로커 인증서에 대해 생성한 신뢰 저장소에 대한 암호 및 참조를 사용합니다.
로컬 Kafka 생산자 시작
kafka-console-producer.sh \ --bootstrap-server my-cluster-kafka-listener1-bootstrap-amq-streams-kafka.apps.ci-ln-50kcyvt-72292.origin-ci-int-gce.dev.rhcloud.com:443 \ --producer-property security.protocol=SSL \ --producer-property ssl.truststore.password=password \ --producer-property ssl.truststore.location=client.truststore.jks \ --topic my-topic
- 생산자가 실행 중인 명령줄 인터페이스에 메시지를 입력합니다.
- Enter를 눌러 메시지를 보냅니다.
새 명령줄 인터페이스 탭 또는 창을 열고 Kafka 콘솔 소비자를 시작하여 메시지를 수신합니다.
생산자와 동일한 연결 세부 정보를 사용합니다.
로컬 Kafka 소비자 시작
kafka-console-consumer.sh \ --bootstrap-server my-cluster-kafka-listener1-bootstrap-amq-streams-kafka.apps.ci-ln-50kcyvt-72292.origin-ci-int-gce.dev.rhcloud.com:443 \ --consumer-property security.protocol=SSL \ --consumer-property ssl.truststore.password=password \ --consumer-property ssl.truststore.location=client.truststore.jks \ --topic my-topic --from-beginning- 소비자 콘솔에 들어오는 메시지가 표시되는지 확인합니다.
- Crtl+C를 눌러 Kafka 콘솔 생산자 및 소비자를 종료합니다.