3.3. 커넥터 배포 확인


커넥터가 오류 없이 올바르게 시작되면 커넥터가 캡처하도록 구성된 각 테이블에 대한 항목이 생성됩니다. 다운스트림 애플리케이션은 이러한 주제를 구독하여 소스 데이터베이스에서 발생하는 정보 이벤트를 검색할 수 있습니다.

커넥터가 실행 중인지 확인하려면 OpenShift Container Platform 웹 콘솔 또는 OpenShift CLI 툴(oc)에서 다음 작업을 수행합니다.

  • 커넥터 상태를 확인합니다.
  • 커넥터가 주제를 생성하는지 확인합니다.
  • 각 테이블의 초기 스냅샷 중에 커넥터가 생성하는 읽기 작업("op":"r")에 대한 이벤트로 항목이 입력되었는지 확인합니다.

사전 요구 사항

  • Debezium 커넥터는 OpenShift의 AMQ Streams에 배포됩니다.
  • OpenShift oc CLI 클라이언트가 설치되어 있어야 합니다.
  • OpenShift Container Platform 웹 콘솔에 액세스할 수 있습니다.

프로세스

  1. 다음 방법 중 하나를 사용하여 KafkaConnector 리소스의 상태를 확인합니다.

    • OpenShift Container Platform 웹 콘솔에서 다음을 수행합니다.

      1. 검색으로 이동합니다.
      2. 검색 페이지에서 리소스를 클릭하여 리소스 선택 상자를 연 다음 KafkaConnector 를 입력합니다.
      3. KafkaConnectors 목록에서 확인할 커넥터의 이름을 클릭합니다(예: inventory-connector ).
      4. Conditions 섹션의 TypeStatus 열의 값이 ReadyTrue 로 설정되어 있는지 확인합니다.
    • 터미널 창에서 다음을 수행합니다.

      1. 다음 명령을 실행합니다.

        oc describe KafkaConnector <connector-name> -n <project>

        예를 들면 다음과 같습니다.

        oc describe KafkaConnector inventory-connector -n debezium

        이 명령은 다음 출력과 유사한 상태 정보를 반환합니다.

        예 3.3. KafkaConnector 리소스 상태

        Name:         inventory-connector
        Namespace:    debezium
        Labels:       strimzi.io/cluster=my-connect-cluster
        Annotations:  <none>
        API Version:  kafka.strimzi.io/v1beta2
        Kind:         KafkaConnector
        
        ...
        
        Status:
          Conditions:
            Last Transition Time:  2021-12-08T17:41:34.897153Z
            Status:                True
            Type:                  Ready
          Connector Status:
            Connector:
              State:      RUNNING
              worker_id:  10.131.1.124:8083
            Name:         inventory-connector
            Tasks:
              Id:               0
              State:            RUNNING
              worker_id:        10.131.1.124:8083
            Type:               source
          Observed Generation:  1
          Tasks Max:            1
          Topics:
            dbserver1
            dbserver1.inventory.addresses
            dbserver1.inventory.customers
            dbserver1.inventory.geom
            dbserver1.inventory.orders
            dbserver1.inventory.products
            dbserver1.inventory.products_on_hand
        Events:  <none>
  2. 커넥터가 Kafka 주제를 생성했는지 확인합니다.

    • OpenShift Container Platform 웹 콘솔에서 다음을 수행합니다.

      1. 검색으로 이동합니다.
      2. 검색 페이지에서 리소스를 클릭하여 리소스 선택 상자를 연 다음 KafkaTopic 을 입력합니다.
      3. KafkaTopics 목록에서 확인할 주제의 이름을 클릭합니다(예: dbserver1.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d ).
      4. Conditions 섹션의 TypeStatus 열의 값이 ReadyTrue 로 설정되어 있는지 확인합니다.
    • 터미널 창에서 다음을 수행합니다.

      1. 다음 명령을 실행합니다.

        oc get kafkatopics

        이 명령은 다음 출력과 유사한 상태 정보를 반환합니다.

        예 3.4. KafkaTopic 리소스 상태

        NAME                   CLUSTER  PARTITIONS  REPLICATION FACTOR  READY
        connect-cluster-configs  my-cluster   1        1            True
        connect-cluster-offsets  my-cluster   25       1            True
        connect-cluster-status   my-cluster   5        1            True
        consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a my-cluster 50 1 True
        dbserver1---a96f69b23d6118ff415f772679da623fbbb99421 my-cluster 1 1 True
        dbserver1.inventory.addresses---1b6beaf7b2eb57d177d92be90ca2b210c9a56480  my-cluster 1 1 True
        dbserver1.inventory.customers---9931e04ec92ecc0924f4406af3fdace7545c483b  my-cluster 1 1   True
        dbserver1.inventory.geom---9f7e136091f071bf49ca59bf99e86c713ee58dd5  my-cluster 1 1   True
        dbserver1.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d my-cluster 1 1   True
        dbserver1.inventory.products---df0746db116844cee2297fab611c21b56f82dcef my-cluster 1 1   True
        dbserver1.inventory.products-on-hand---8649e0f17ffcc9212e266e31a7aeea4585e5c6b5  my-cluster 1  1 True
        schema-changes.inventory my-cluster    1           1       True
        strimzi-store-topic---effb8e3e057afce1ecf67c3f5d8e4e3ff177fc55                  my-cluster   1    1  True
        strimzi-topic-operator-kstreams-topic-store-changelog---b75e702040b99be8a9263134de3507fc0cc4017b my-cluster 1    1  True
  3. 주제 콘텐츠를 확인합니다.

    • 터미널 창에서 다음 명령을 입력합니다.
    oc exec -n <project>  -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \
    >     --bootstrap-server localhost:9092 \
    >     --from-beginning \
    >     --property print.key=true \
    >     --topic=<topic-name>

    예를 들면 다음과 같습니다.

     oc exec -n debezium  -it my-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \
    >     --bootstrap-server localhost:9092 \
    >     --from-beginning \
    >     --property print.key=true \
    >     --topic=dbserver1.inventory.products_on_hand

    주제 이름을 지정하는 형식은 oc describe 명령과 동일합니다(예: dbserver1.inventory.addresses ).

    주제의 각 이벤트에 대해 명령은 다음 출력과 유사한 정보를 반환합니다.

    예 3.5. Debezium 변경 이벤트의 내용

    {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"dbserver1.inventory.products_on_hand.Key"},"payload":{"product_id":101}}	{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"dbserver1.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"dbserver1.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"2.3.4.Final-redhat-00001","connector":"mysql","name":"dbserver1","ts_ms":1638985247805,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"transaction":null}}

    이전 예에서 페이로드 값은 테이블 dbserver1.products_on_hand 로부터 커넥터 스냅샷에서 읽기( "op" ="r") 이벤트를 생성했음을 보여줍니다. product_id 레코드의 "이전" 상태는 null 이며 레코드에 대해 이전 값이 없음을 나타냅니다. "after" 상태는 product_id 101 가 있는 항목에 대해 3수량을 보여줍니다.

이제 Debezium 커넥터가 인벤토리 데이터베이스에서 캡처하는 변경 이벤트를 볼 수 있습니다.

Red Hat logoGithubRedditYoutubeTwitter

자세한 정보

평가판, 구매 및 판매

커뮤니티

Red Hat 문서 정보

Red Hat을 사용하는 고객은 신뢰할 수 있는 콘텐츠가 포함된 제품과 서비스를 통해 혁신하고 목표를 달성할 수 있습니다.

보다 포괄적 수용을 위한 오픈 소스 용어 교체

Red Hat은 코드, 문서, 웹 속성에서 문제가 있는 언어를 교체하기 위해 최선을 다하고 있습니다. 자세한 내용은 다음을 참조하세요.Red Hat 블로그.

Red Hat 소개

Red Hat은 기업이 핵심 데이터 센터에서 네트워크 에지에 이르기까지 플랫폼과 환경 전반에서 더 쉽게 작업할 수 있도록 강화된 솔루션을 제공합니다.

© 2024 Red Hat, Inc.