14.5. MicroProfile Reactive Messaging 참조
14.5.1. 외부 메시징 시스템과 통합을 위한 MicroProfile reactive 메시징 커넥터 링크 복사링크가 클립보드에 복사되었습니다!
다음은 MicroProfile Config 사양에 필요한 reactive messaging 속성 키 접두사 목록입니다.
-
mp.messaging.incoming.[channel-name].[attribute]=[value] -
mp.messaging.outgoing.[channel-name].[attribute]=[value] -
mp.messaging.connector.[connector-name].[attribute]=[value]
channel-name 은 @Incoming.value() 또는 @Outgoing.value() 입니다. 자세한 내용은 한 쌍의 커넥터 방법 예제를 참조하십시오.
@Outgoing("to")
public int send() {
int i = // Randomly generated...
return i;
}
@Incoming("from")
public void receive(int i) {
// Process payload
}
이 예에서 필수 속성 접두사는 다음과 같습니다.
-
mp.messaging.incoming.from. 이는receive()메서드를 정의합니다. -
mp.messaging.outgoing.to. 이는send()메서드를 정의합니다.
이는 하나의 예입니다. 서로 다른 커넥터가 다른 속성을 인식하므로 구성하려는 커넥터에 따라 표시되는 접두사가 지정됩니다.
14.5.2. reactive 메시징 스트림과 사용자 초기화 코드 간의 데이터 교환 예 링크 복사링크가 클립보드에 복사되었습니다!
다음은 사용자가 @Channel 과 Emitter 구문을 통해 트리거한 reactive 메시징 스트림과 코드 간의 데이터 교환 예입니다.
@Path("/")
@ApplicationScoped
class MyBean {
@Inject @Channel("my-stream")
Emitter<String> emitter;
Publisher<String> dest;
public MyBean() {
}
@Inject
public MyBean(@Channel("my-stream") Publisher<String> dest) {
this.dest = subscribeAndAllowMultipleSubscriptions(dest);
}
private Publisher subscribeAndAllowMultipleSubscriptions(Publisher delegate) {
}
@POST
public PublisherBuilder<String> publish(@FormParam("value") String value) {
return emitter.send(value);
}
@GET
public Publisher poll() {
return dest;
}
@PreDestroy
public void close() {
}
}
인라인 세부 정보:
이 예에서 MicroProfile Reactive Messaging은 my-stream 메모리 스트림을 수신하므로 Emitter 를 통해 전송된 메시지는 이 삽입된 게시자에 수신됩니다. 그러나 이 데이터 교환에 성공하려면 다음 조건이 true여야 합니다.
-
Emitter.send()를 호출하기 전에 채널에 활성 서브스크립션이 있어야 합니다. 이 예제에서 생성자가 호출하는subscribeAndAllowMultipleSubscriptions()메서드는 사용자 코드 호출에 사용할 수 있는 시점까지 활성 서브스크립션이 있는지 확인합니다. -
삽입된
게시자에 대해 하나의서브스크립션만 가질 수 있습니다. REST 호출을 사용하여 수신 게시자를 노출하려면poll()메서드를 호출하면dest게시자에 새 서브스크립션이 생성되고 각 클라이언트에 삽입된 데이터를 브로드캐스트하기 위해 자체 게시자를 구현해야 합니다.
14.5.3. Apache Kafka 사용자 API 링크 복사링크가 클립보드에 복사되었습니다!
Apache Kafka 사용자 API를 사용하여 Kafka가 수신된 메시지에 대한 자세한 정보를 가져오고 Kafka가 메시지를 처리하는 방법에 영향을 미칠 수 있습니다. 이 API는 io/#159rye/reactive/messaging/kafka/api 패키지에 저장되며 다음 클래스로 구성됩니다.
IncomingKafkaRecordMetadata. 이 메타데이터에는 다음 정보가 포함됩니다.-
메시지로 표시되는 Kafka 레코드
키입니다. -
메시지에사용되는 Kafka주제및파티션과 그 내의오프셋입니다. -
Messagetimestamp및timestampType. -
메시지헤더입니다. 이러한 정보는 애플리케이션이 생성 측에 첨부할 수 있고 소비되는 측면에서 수신할 수 있는 정보입니다.
-
메시지로 표시되는 Kafka 레코드
OutgoingKafkaRecordMetadata. 이 메타데이터를 사용하면 Kafka에서 메시지를 처리하는 방법을 지정하거나 덮어쓸 수 있습니다. 여기에는 다음 정보가 포함됩니다.-
Kafka에서 메시지
키로처리하는 키 . -
Kafka에서 사용할
주제입니다. -
파티션. -
Kafka가 생성하는
타임스탬프를 원하지 않는 경우입니다. -
헤더.
-
Kafka에서 메시지
-
KafkaMetadataUtil에는OutgoingKafkaRecordMetadata를메시지에작성하고 메시지에서IncomingKafkaRecordMetadata를 읽는 유틸리티 방법이 포함되어 있습니다.
Kafka에 매핑되지 않은 채널로 전송된 메시지에 OutgoingKafkaRecordMetadata 를 작성하는 경우 reactive 메시징 프레임워크가 이를 무시합니다. 반대로 Kafka에 매핑되지 않은 채널에서 Message 에서 IncomingKafkaRecordMetadata 를 읽는 경우 해당 메시지는 null 로 반환됩니다.
메시지 키를작성하고 읽는 방법의 예
@Inject
@Channel("from-user")
Emitter<Integer> emitter;
@Incoming("from-user")
@Outgoing("to-kafka")
public Message<Integer> send(Message<Integer> msg) {
// Set the key in the metadata
OutgoingKafkaRecordMetadata<String> md =
OutgoingKafkaRecordMetadata.<String>builder()
.withKey("KEY-" + i)
.build();
// Note that Message is immutable so the copy returned by this method
// call is not the same as the parameter to the method
return KafkaMetadataUtil.writeOutgoingKafkaMetadata(msg, md);
}
@Incoming("from-kafka")
public CompletionStage<Void> receive(Message<Integer> msg) {
IncomingKafkaRecordMetadata<String, Integer> metadata =
KafkaMetadataUtil.readIncomingKafkaMetadata(msg).get();
// We can now read the Kafka record key
String key = metadata.getKey();
// When using the Message wrapper around the payload we need to explicitly ack
// them
return msg.ack();
}
microprofile-config.properties 파일의 Kafka 매핑 예
kafka.bootstrap.servers=kafka:9092
mp.messaging.outgoing.to-kafka.connector=smallrye-kafka
mp.messaging.outgoing.to-kafka.topic=some-topic
mp.messaging.outgoing.to-kafka.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer
mp.messaging.outgoing.to-kafka.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.incoming.from-kafka.connector=smallrye-kafka
mp.messaging.incoming.from-kafka.topic=some-topic
mp.messaging.incoming.from-kafka.value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
mp.messaging.incoming.from-kafka.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
발신 채널의 key.serializer 와 들어오는 채널의 key.deserializer 를 지정해야 합니다.
14.5.4. Kafka 커넥터의 MicroProfile Config 속성 파일의 예 링크 복사링크가 클립보드에 복사되었습니다!
이는 Kafka 커넥터의 간단한 microprofile-config.properties 파일의 예입니다. 해당 속성은 "MicroProfile reactive messaging connectors for integrating with external messaging systems"의 속성에 해당합니다.
kafka.bootstrap.servers=kafka:9092
mp.messaging.outgoing.to.connector=smallrye-kafka
mp.messaging.outgoing.to.topic=my-topic
mp.messaging.outgoing.to.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer
mp.messaging.incoming.from.connector=smallrye-kafka
mp.messaging.incoming.from.topic=my-topic
mp.messaging.incoming.from.value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
| 항목 | 설명 |
|---|---|
|
| 이는 "채널"입니다. |
|
| 이는 "methods"입니다.
|
|
|
이는 애플리케이션이 연결해야 하는 Kafka 브로커의 URL을 지정합니다. 채널 수준에서 URL을 지정할 수도 있습니다(예: |
|
|
이는
smallrye reactive messaging는 애플리케이션을 빌드하기 위한 프레임워크입니다. |
|
|
이는 Kafka "topic"은 메시지가 저장되고 게시되는 카테고리 또는 피드 이름입니다. 모든 Kafka 메시지는 주제로 구성됩니다. 생산자 애플리케이션은 주제 및 소비자 애플리케이션에 데이터를 작성하여 주제 에서 데이터를 읽습니다. |
|
|
이는 커넥터가 |
|
|
이는 channel을 사용하여 Kafka |
|
|
이는 커넥터가 |
|
|
이는 커넥터가 |
이 속성 목록은 포괄적이지 않습니다. 자세한 내용은 SmallRye Reactive Messaging Apache Kafka 설명서를 참조하십시오.
필수 MicroProfile Reactive Messaging 접두사
MicroProfile Reactive Messaging 사양에는 Kafka에 대해 다음과 같은 메서드 속성 키 접두사가 필요합니다.
-
mp.messaging.incoming.[channel-name].[attribute]=[value]` -
mp.messaging.outgoing.[channel-name].[attribute]=[value]` -
mp.messaging.connector.[connector-name].[attribute]=[value]`
channel-name 은 @Incoming.value() 또는 @Outgoing.value() 입니다.
이제 다음 메서드 쌍 예제를 고려하십시오.
@Outgoing("to")
public int send() {
int i = // Randomly generated...
return i;
}
@Incoming("from")
public void receive(int i) {
// Process payload
}
이 메서드 쌍 예제에서는 다음과 같은 필수 속성 접두사를 기록해 둡니다.
-
mp.messaging.incoming.from. 이 접두사는receive()메서드의 구성으로 속성을 선택합니다. -
mp.messaging.outgoing.to. 이 접두사는send()메서드의 구성으로 속성을 선택합니다.
14.5.5. AMQP 커넥터의 MicroProfile Config 속성 파일의 예 링크 복사링크가 클립보드에 복사되었습니다!
이는 AMQP(Advanced Message Queuing Protocol) 커넥터의 간단한 microprofile-config.properties 파일의 예입니다. 해당 속성은 외부 메시징 시스템과 통합을 위한 MicroProfile reactive 메시징 커넥터 의 속성에 해당합니다.
amqp-host=localhost
amqp-port=5672
amqp-username=artemis
amqp-password=artemis
mp.messaging.outgoing.to.connector=smallrye-amqp
mp.messaging.outgoing.to.address=my-topic
mp.messaging.incoming.from.connector=smallrye-amqp
mp.messaging.incoming.from.address=my-topic
| 항목 | 설명 |
|---|---|
|
| 이는 "채널"입니다. |
|
| 이는 "methods"입니다.
|
|
|
이는 애플리케이션이 연결해야 하는 AMQP 브로커의 URL을 지정합니다. 다음과 같이 채널 수준에서 URL을 지정할 수도 |
|
| 이는 AMQP 브로커의 포트를 지정합니다. |
|
| 이는 채널이 AMQP에 메시지를 전송하도록 한다는 것을 나타냅니다.
smallrye reactive messaging는 애플리케이션을 빌드하기 위한 프레임워크입니다. |
|
|
이는 |
|
|
이는 |
|
|
이는 |
SmallRye Reactive Messaging의 AMQP 커넥터에서 지원하는 전체 속성 목록은 SmallRye Reactive Messaging Connector Configuration Reference 를 참조하십시오.
보안 AMQP 브로커에 연결
SSL/TLS 및 SASL(Simple Authentication and Security Layer)으로 보안된 AMQ 브로커와 연결하려면 microprofile-config.properties 파일에서 연결에 사용할 client-ssl-context 를 정의합니다. 커넥터 수준 및 채널 수준에서도 이 작업을 수행할 수 있습니다.
커넥터 수준 client-ssl-context 정의의 예
amqp-use-ssl=true
mp.messaging.connector.smallrye-amqp.wildfly.elytron.ssl.context=exampleSSLContext
mp.messaging.connector.undercloudrye-amqp.wildfly.elytron.ssl.context 속성은 자체 서명된 인증서를 사용하는 경우에만 필요합니다.
프로덕션 환경에서는 자체 서명된 인증서를 사용하지 마십시오. CA(인증 기관)에서 서명한 인증서만 사용합니다.
다음과 같이 채널에 client-ssl-context 를 지정할 수도 있습니다.
채널 수준 client-ssl-context 정의의 예
mp.messaging.incoming.from.wildfly.elytron.ssl.context=exampleSSLContext
예제에서 exampleSSLContext 는 에서 들어오는 채널과만 연결됩니다.
| 항목 | 설명 |
|---|---|
|
| 이는 브로커에 연결할 때 보안 연결을 사용하도록 지정합니다. |
|
| AMQ 브로커가 CA(인증 기관) 서명 인증서로 보안된 경우 이 속성을 지정할 필요가 없습니다.
자체 서명된 인증서를 사용하는 경우 관리 모델에서 중요 프로덕션 환경에서는 자체 서명된 인증서를 사용하지 마십시오. CA(인증 기관)에서 서명한 인증서만 사용합니다.
다음 관리 CLI 명령을 사용하여
자세한 내용은 JBoss EAP 가이드의 SSL/TLS 구성에서양방향 SSL/TLS 구성에 대한 서버 인증서 구성, 클라이언트 인증서에 대한 신뢰 저장소 및 신뢰 관리자 구성 을 참조하십시오. |