14.5. MicroProfile Reactive Messaging 참조


다음은 MicroProfile Config 사양에 필요한 reactive messaging 속성 키 접두사 목록입니다.

  • mp.messaging.incoming.[channel-name].[attribute]=[value]
  • mp.messaging.outgoing.[channel-name].[attribute]=[value]
  • mp.messaging.connector.[connector-name].[attribute]=[value]

channel-name@Incoming.value() 또는 @Outgoing.value() 입니다. 자세한 내용은 한 쌍의 커넥터 방법 예제를 참조하십시오.

@Outgoing("to")
public int send() {
   int i = // Randomly generated...
   return i;
}

@Incoming("from")
public void receive(int i) {
   // Process payload
}

이 예에서 필수 속성 접두사는 다음과 같습니다.

  • mp.messaging.incoming.from. 이는 receive() 메서드를 정의합니다.
  • mp.messaging.outgoing.to. 이는 send() 메서드를 정의합니다.

이는 하나의 예입니다. 서로 다른 커넥터가 다른 속성을 인식하므로 구성하려는 커넥터에 따라 표시되는 접두사가 지정됩니다.

다음은 사용자가 @Channel 과 Emitter 구문을 통해 트리거한 reactive 메시징 스트림과 코드 간의 데이터 교환 예입니다.

@Path("/")
@ApplicationScoped
class MyBean {
    @Inject @Channel("my-stream")
    Emitter<String> emitter; 
1


    Publisher<String> dest;

    public MyBean() { 
2

    }

    @Inject
    public MyBean(@Channel("my-stream") Publisher<String> dest) {
        this.dest = subscribeAndAllowMultipleSubscriptions(dest);
    }

    private Publisher subscribeAndAllowMultipleSubscriptions(Publisher delegate) {
    } 
3
 
4
 
5


    @POST
    public PublisherBuilder<String> publish(@FormParam("value") String value) {
        return emitter.send(value);
    }

    @GET
    public Publisher poll() {
        return dest;
    }

    @PreDestroy
    public void close() { 
6


    }
}

인라인 세부 정보:

1
생성자가 삽입된 게시자를 래핑합니다.
2
Java 사양에 대한 CDI(Contexts and dependency Cryostat)를 충족하기 위해 이 빈 생성자가 필요합니다.
3
위임을 구독합니다.
4
여러 서브스크립션을 처리할 수 있는 게시자로 위임을 래핑합니다.
5
래핑 게시자는 위임에서 데이터를 전달합니다.
6
reactive 메시징 제공 게시자의 서브스크립션 취소.

이 예에서 MicroProfile Reactive Messaging은 my-stream 메모리 스트림을 수신하므로 Emitter 를 통해 전송된 메시지는 이 삽입된 게시자에 수신됩니다. 그러나 이 데이터 교환에 성공하려면 다음 조건이 true여야 합니다.

  1. Emitter.send() 를 호출하기 전에 채널에 활성 서브스크립션이 있어야 합니다. 이 예제에서 생성자가 호출하는 subscribeAndAllowMultipleSubscriptions() 메서드는 사용자 코드 호출에 사용할 수 있는 시점까지 활성 서브스크립션이 있는지 확인합니다.
  2. 삽입된 게시자 에 대해 하나의 서브스크립션 만 가질 수 있습니다. REST 호출을 사용하여 수신 게시자를 노출하려면 poll() 메서드를 호출하면 dest 게시자에 새 서브스크립션이 생성되고 각 클라이언트에 삽입된 데이터를 브로드캐스트하기 위해 자체 게시자를 구현해야 합니다.

14.5.3. Apache Kafka 사용자 API

Apache Kafka 사용자 API를 사용하여 Kafka가 수신된 메시지에 대한 자세한 정보를 가져오고 Kafka가 메시지를 처리하는 방법에 영향을 미칠 수 있습니다. 이 API는 io/#159rye/reactive/messaging/kafka/api 패키지에 저장되며 다음 클래스로 구성됩니다.

  • IncomingKafkaRecordMetadata. 이 메타데이터에는 다음 정보가 포함됩니다.

    • 메시지로 표시되는 Kafka 레코드 입니다.
    • 메시지에 사용되는 Kafka 주제파티션 과 그 내의 오프셋 입니다.
    • Message timestamptimestampType.
    • 메시지 헤더 입니다. 이러한 정보는 애플리케이션이 생성 측에 첨부할 수 있고 소비되는 측면에서 수신할 수 있는 정보입니다.
  • OutgoingKafkaRecordMetadata. 이 메타데이터를 사용하면 Kafka에서 메시지를 처리하는 방법을 지정하거나 덮어쓸 수 있습니다. 여기에는 다음 정보가 포함됩니다.

    • Kafka에서 메시지 키로 처리하는 키 .
    • Kafka에서 사용할 주제입니다.
    • 파티션.
    • Kafka가 생성하는 타임스탬프 를 원하지 않는 경우입니다.
    • 헤더.
  • KafkaMetadataUtil 에는 OutgoingKafkaRecordMetadata메시지에 작성하고 메시지에서 IncomingKafkaRecordMetadata 를 읽는 유틸리티 방법이 포함되어 있습니다.
중요

Kafka에 매핑되지 않은 채널로 전송된 메시지에 OutgoingKafkaRecordMetadata 를 작성하는 경우 reactive 메시징 프레임워크가 이를 무시합니다. 반대로 Kafka에 매핑되지 않은 채널에서 Message 에서 IncomingKafkaRecordMetadata 를 읽는 경우 해당 메시지는 null 로 반환됩니다.

메시지 키를작성하고 읽는 방법의 예
@Inject
@Channel("from-user")
Emitter<Integer> emitter;

@Incoming("from-user")
@Outgoing("to-kafka")
public Message<Integer> send(Message<Integer> msg) {
    // Set the key in the metadata
    OutgoingKafkaRecordMetadata<String> md =
            OutgoingKafkaRecordMetadata.<String>builder()
                .withKey("KEY-" + i)
                .build();
    // Note that Message is immutable so the copy returned by this method
    // call is not the same as the parameter to the method
    return KafkaMetadataUtil.writeOutgoingKafkaMetadata(msg, md);
}

@Incoming("from-kafka")
public CompletionStage<Void> receive(Message<Integer> msg) {
    IncomingKafkaRecordMetadata<String, Integer> metadata =
        KafkaMetadataUtil.readIncomingKafkaMetadata(msg).get();

    // We can now read the Kafka record key
    String key = metadata.getKey();

    // When using the Message wrapper around the payload we need to explicitly ack
    // them
    return msg.ack();
}
microprofile-config.properties 파일의 Kafka 매핑 예
kafka.bootstrap.servers=kafka:9092

mp.messaging.outgoing.to-kafka.connector=smallrye-kafka
mp.messaging.outgoing.to-kafka.topic=some-topic
mp.messaging.outgoing.to-kafka.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer
mp.messaging.outgoing.to-kafka.key.serializer=org.apache.kafka.common.serialization.StringSerializer

mp.messaging.incoming.from-kafka.connector=smallrye-kafka
mp.messaging.incoming.from-kafka.topic=some-topic
mp.messaging.incoming.from-kafka.value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
mp.messaging.incoming.from-kafka.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
참고

발신 채널의 key.serializer 와 들어오는 채널의 key.deserializer 를 지정해야 합니다.

14.5.4. Kafka 커넥터의 MicroProfile Config 속성 파일의 예

이는 Kafka 커넥터의 간단한 microprofile-config.properties 파일의 예입니다. 해당 속성은 "MicroProfile reactive messaging connectors for integrating with external messaging systems"의 속성에 해당합니다.

kafka.bootstrap.servers=kafka:9092

mp.messaging.outgoing.to.connector=smallrye-kafka
mp.messaging.outgoing.to.topic=my-topic
mp.messaging.outgoing.to.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer

mp.messaging.incoming.from.connector=smallrye-kafka
mp.messaging.incoming.from.topic=my-topic
mp.messaging.incoming.from.value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
Expand
표 14.6. 항목 토론
항목설명

에서 다음을 수행합니다.

이는 "채널"입니다.

전송,수신

이는 "methods"입니다.

to channel은 send() 메서드에 있고 from 채널은 receive() 메서드에 있습니다.

kafka.bootstrap.servers=kafka:9092

이는 애플리케이션이 연결해야 하는 Kafka 브로커의 URL을 지정합니다. 채널 수준에서 URL을 지정할 수도 있습니다(예: mp.messaging.outgoing.to.bootstrap.servers=kafka:9092).

mp.messaging.outgoing.to.connector=smallrye-kafka

이는 to 채널을 통해 Kafka에서 메시지를 수신할 수 있음을 나타냅니다.

smallrye reactive messaging는 애플리케이션을 빌드하기 위한 프레임워크입니다. smallrye-kafka 값은 SmallRye reactive messaging-specific입니다. Galleon을 사용하여 자체 서버를 프로비저닝하는 경우 microprofile-reactive-messaging-kafka Galleon 계층을 포함하여 Kafka 통합을 활성화할 수 있습니다.

mp.messaging.outgoing.to.topic=my-topic

이는 my-topic 이라는 Kafka 주제로 데이터를 보낼 것임을 나타냅니다.

Kafka "topic"은 메시지가 저장되고 게시되는 카테고리 또는 피드 이름입니다. 모든 Kafka 메시지는 주제로 구성됩니다. 생산자 애플리케이션은 주제 및 소비자 애플리케이션에 데이터를 작성하여 주제 에서 데이터를 읽습니다.

MP.messaging.outgoing.to.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer

이는 커넥터가 IntegerSerializer 를 사용하여 topic에 쓸 때 send() 메서드가 출력되는 값을 직렬화하도록 지시합니다. Kafka는 표준 Java 유형에 대한 직렬화를 제공합니다. org.apache.kafka.common.serialization.Serializer 를 구현하는 클래스를 작성한 다음 해당 클래스를 배포에 포함하여 고유한 serializer를 구현할 수 있습니다.

mp.messaging.incoming.from.connector=smallrye-kafka

이는 channel을 사용하여 Kafka 에서 메시지를 수신할 것임을 나타냅니다. 다시 말하지만 smallrye-kafka 값은 SmallRye reactive messaging-specific입니다.

mp.messaging.incoming.from.topic=my-topic

이는 커넥터가 my-topic 이라는 Kafka 주제에서 데이터를 읽어야 함을 나타냅니다.

MP.messaging.incoming.from.value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer

이는 커넥터가 IntegerDeserializer 를 사용하여 receive() 메서드를 호출하기 전에 주제의 값을 역직렬화하도록 지시합니다. org.apache.kafka.common.serialization.Deserialization.Deserializer 를 구현하는 클래스를 작성한 다음 해당 클래스를 배포에 포함하여 고유한 역직렬을 구현할 수 있습니다.

참고

이 속성 목록은 포괄적이지 않습니다. 자세한 내용은 SmallRye Reactive Messaging Apache Kafka 설명서를 참조하십시오.

필수 MicroProfile Reactive Messaging 접두사

MicroProfile Reactive Messaging 사양에는 Kafka에 대해 다음과 같은 메서드 속성 키 접두사가 필요합니다.

  • mp.messaging.incoming.[channel-name].[attribute]=[value]`
  • mp.messaging.outgoing.[channel-name].[attribute]=[value]`
  • mp.messaging.connector.[connector-name].[attribute]=[value]`

channel-name@Incoming.value() 또는 @Outgoing.value() 입니다.

이제 다음 메서드 쌍 예제를 고려하십시오.

@Outgoing("to")
public int send() {
    int i = // Randomly generated...
    return i;
}

@Incoming("from")
public void receive(int i) {
    // Process payload
}

이 메서드 쌍 예제에서는 다음과 같은 필수 속성 접두사를 기록해 둡니다.

  • mp.messaging.incoming.from. 이 접두사는 receive() 메서드의 구성으로 속성을 선택합니다.
  • mp.messaging.outgoing.to. 이 접두사는 send() 메서드의 구성으로 속성을 선택합니다.

14.5.5. AMQP 커넥터의 MicroProfile Config 속성 파일의 예

이는 AMQP(Advanced Message Queuing Protocol) 커넥터의 간단한 microprofile-config.properties 파일의 예입니다. 해당 속성은 외부 메시징 시스템과 통합을 위한 MicroProfile reactive 메시징 커넥터 의 속성에 해당합니다.

amqp-host=localhost
amqp-port=5672
amqp-username=artemis
amqp-password=artemis

mp.messaging.outgoing.to.connector=smallrye-amqp
mp.messaging.outgoing.to.address=my-topic

mp.messaging.incoming.from.connector=smallrye-amqp
mp.messaging.incoming.from.address=my-topic
Expand
표 14.7. 항목 토론
항목설명

에서 다음을 수행합니다.

이는 "채널"입니다.

전송,수신

이는 "methods"입니다.

to channel은 send() 메서드에 있고 from 채널은 receive() 메서드에 있습니다.

amqp-host=localhost

이는 애플리케이션이 연결해야 하는 AMQP 브로커의 URL을 지정합니다. 다음과 같이 채널 수준에서 URL을 지정할 수도 있습니다. 즉, URL이 지정되지 않은 경우 기본값은 localhost 입니다.

amqp-port=5672

이는 AMQP 브로커의 포트를 지정합니다.

mp.messaging.outgoing.to.connector=smallrye-amqp

이는 채널이 AMQP에 메시지를 전송하도록 한다는 것을 나타냅니다.

smallrye reactive messaging는 애플리케이션을 빌드하기 위한 프레임워크입니다. smallrye-amqp 값은 SmallRye reactive messaging specific입니다. Galleon을 사용하여 서버를 프로비저닝하는 경우 microprofile-reactive-messaging-amqp Galleon 계층을 포함하여 AMQP 통합을 활성화할 수 있습니다.

mp.messaging.outgoing.to.address=my-topic

이는 my-topic 주소의 AMQP 큐에 데이터를 보낼 것임을 나타냅니다. mp.messaging.outgoing.to.address 에 대한 값을 지정하지 않으면 이 예에서 "to"인 채널의 기본값이 설정됩니다.

mp.messaging.incoming.from.connector=smallrye-amqp

이는 from 채널을 사용하여 AMQP 브로커에서 메시지를 수신하려는 것을 나타냅니다. 다시 말하지만 smallrye-amqp 값은 SmallRye reactive messaging-specific입니다.

mp.messaging.incoming.from.address=my-topic

이는 from 채널의 AMQP 대기열 my-topic 에서 데이터를 읽으려는 것을 나타냅니다.

SmallRye Reactive Messaging의 AMQP 커넥터에서 지원하는 전체 속성 목록은 SmallRye Reactive Messaging Connector Configuration Reference 를 참조하십시오.

보안 AMQP 브로커에 연결

SSL/TLS 및 SASL(Simple Authentication and Security Layer)으로 보안된 AMQ 브로커와 연결하려면 microprofile-config.properties 파일에서 연결에 사용할 client-ssl-context 를 정의합니다. 커넥터 수준 및 채널 수준에서도 이 작업을 수행할 수 있습니다.

커넥터 수준 client-ssl-context 정의의 예

amqp-use-ssl=true
mp.messaging.connector.smallrye-amqp.wildfly.elytron.ssl.context=exampleSSLContext

mp.messaging.connector.undercloudrye-amqp.wildfly.elytron.ssl.context 속성은 자체 서명된 인증서를 사용하는 경우에만 필요합니다.

중요

프로덕션 환경에서는 자체 서명된 인증서를 사용하지 마십시오. CA(인증 기관)에서 서명한 인증서만 사용합니다.

다음과 같이 채널에 client-ssl-context 를 지정할 수도 있습니다.

채널 수준 client-ssl-context 정의의 예

mp.messaging.incoming.from.wildfly.elytron.ssl.context=exampleSSLContext

예제에서 exampleSSLContext에서 들어오는 채널과만 연결됩니다.

Expand
표 14.8. 항목 토론
항목설명

amqp-use-ssl

이는 브로커에 연결할 때 보안 연결을 사용하도록 지정합니다.

mp.messaging.connector.smallrye-amqp.wildfly.elytron.ssl.context

AMQ 브로커가 CA(인증 기관) 서명 인증서로 보안된 경우 이 속성을 지정할 필요가 없습니다.

자체 서명된 인증서를 사용하는 경우 관리 모델에서 /subsystem=elytron/client-ssl-context=* 아래의 Elytron 하위 시스템에 정의된 SSLContext 를 지정합니다.

중요

프로덕션 환경에서는 자체 서명된 인증서를 사용하지 마십시오. CA(인증 기관)에서 서명한 인증서만 사용합니다.

다음 관리 CLI 명령을 사용하여 client-ssl-context 를 정의할 수 있습니다.

/subsystem=elytron/client-ssl-context=exampleSSLContext:add(key-manager=exampleServerKeyManager,trust-manager=exampleTLSTrustManager)

자세한 내용은 JBoss EAP 가이드의 SSL/TLS 구성에서양방향 SSL/TLS 구성에 대한 서버 인증서 구성, 클라이언트 인증서에 대한 신뢰 저장소 및 신뢰 관리자 구성 을 참조하십시오.

Red Hat logoGithubredditYoutubeTwitter

자세한 정보

평가판, 구매 및 판매

커뮤니티

Red Hat 문서 정보

Red Hat을 사용하는 고객은 신뢰할 수 있는 콘텐츠가 포함된 제품과 서비스를 통해 혁신하고 목표를 달성할 수 있습니다. 최신 업데이트를 확인하세요.

보다 포괄적 수용을 위한 오픈 소스 용어 교체

Red Hat은 코드, 문서, 웹 속성에서 문제가 있는 언어를 교체하기 위해 최선을 다하고 있습니다. 자세한 내용은 다음을 참조하세요.Red Hat 블로그.

Red Hat 소개

Red Hat은 기업이 핵심 데이터 센터에서 네트워크 에지에 이르기까지 플랫폼과 환경 전반에서 더 쉽게 작업할 수 있도록 강화된 솔루션을 제공합니다.

Theme

© 2026 Red Hat
맨 위로 이동