226.10. oplog Tail Tracking
oplog 컬렉션 추적 기능을 사용하면 MongoDB에서 기능과 같은 트리거를 구현할 수 있습니다. 이 컬렉션을 활성화하려면 먼저 복제본 세트를 활성화해야 합니다. 이 항목에 대한 자세한 내용은 https://docs.mongodb.com/manual/tutorial/deploy-replica-set/ 을 참조하십시오.
아래에서 구성 요소를 사용하여 oplog 컬렉션을 추적하는 방법을 시연하는 Java DSL 기반 경로의 예를 찾을 수 있습니다. 이 특정 사례에서는 데이터베이스 optlog_test 의 컬렉션 고객에 영향을 미치는 이벤트를 필터링하고 있습니다. tailtrackIncreasingField 는 타임 스탬프 필드 ('ts')로, TIMESTAMP 값과 함께 tail trackingStrategy 매개변수를 사용해야 함을 의미합니다.
import com.mongodb.BasicDBObject;
import com.mongodb.MongoClient;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mongodb.MongoDBTailTrackingEnum;
import org.apache.camel.main.Main;
import java.io.InputStream;
/**
* For this to work you need to turn on the replica set
* <p>
* Commands to create a replica set:
* <p>
* rs.initiate( {
* _id : "rs0",
* members: [ { _id : 0, host : "localhost:27017" } ]
* })
*/
public class MongoDbTracker {
private final String database;
private final String collection;
private final String increasingField;
private MongoDBTailTrackingEnum trackingStrategy;
private int persistRecords = -1;
private boolean persistenTailTracking;
public MongoDbTracker(String database, String collection, String increasingField) {
this.database = database;
this.collection = collection;
this.increasingField = increasingField;
}
public static void main(String[] args) throws Exception {
final MongoDbTracker mongoDbTracker = new MongoDbTracker("local", "oplog.rs", "ts");
mongoDbTracker.setTrackingStrategy(MongoDBTailTrackingEnum.TIMESTAMP);
mongoDbTracker.setPersistRecords(5);
mongoDbTracker.setPersistenTailTracking(true);
mongoDbTracker.startRouter();
// run until you terminate the JVM
System.out.println("Starting Camel. Use ctrl + c to terminate the JVM.\n");
}
public void setTrackingStrategy(MongoDBTailTrackingEnum trackingStrategy) {
this.trackingStrategy = trackingStrategy;
}
public void setPersistRecords(int persistRecords) {
this.persistRecords = persistRecords;
}
public void setPersistenTailTracking(boolean persistenTailTracking) {
this.persistenTailTracking = persistenTailTracking;
}
void startRouter() throws Exception {
// create a Main instance
Main main = new Main();
main.bind(MongoConstants.CONN_NAME, new MongoClient("localhost", 27017));
main.addRouteBuilder(new RouteBuilder() {
@Override
public void configure() throws Exception {
getContext().getTypeConverterRegistry().addTypeConverter(InputStream.class, BasicDBObject.class,
new MongoToInputStreamConverter());
from("mongodb://" + MongoConstants.CONN_NAME + "?database=" + database
+ "&collection=" + collection
+ "&persistentTailTracking=" + persistenTailTracking
+ "&persistentId=trackerName" + "&tailTrackDb=local"
+ "&tailTrackCollection=talendTailTracking"
+ "&tailTrackField=lastTrackingValue"
+ "&tailTrackIncreasingField=" + increasingField
+ "&tailTrackingStrategy=" + trackingStrategy.toString()
+ "&persistRecords=" + persistRecords
+ "&cursorRegenerationDelay=1000")
.filter().jsonpath("$[?(@.ns=='optlog_test.customers')]")
.id("logger")
.to("log:logger?level=WARN")
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
Message message = exchange.getIn();
System.out.println(message.getBody().toString());
exchange.getOut().setBody(message.getBody().toString());
}
});
}
});
main.run();
}
}
import com.mongodb.BasicDBObject;
import com.mongodb.MongoClient;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mongodb.MongoDBTailTrackingEnum;
import org.apache.camel.main.Main;
import java.io.InputStream;
/**
* For this to work you need to turn on the replica set
* <p>
* Commands to create a replica set:
* <p>
* rs.initiate( {
* _id : "rs0",
* members: [ { _id : 0, host : "localhost:27017" } ]
* })
*/
public class MongoDbTracker {
private final String database;
private final String collection;
private final String increasingField;
private MongoDBTailTrackingEnum trackingStrategy;
private int persistRecords = -1;
private boolean persistenTailTracking;
public MongoDbTracker(String database, String collection, String increasingField) {
this.database = database;
this.collection = collection;
this.increasingField = increasingField;
}
public static void main(String[] args) throws Exception {
final MongoDbTracker mongoDbTracker = new MongoDbTracker("local", "oplog.rs", "ts");
mongoDbTracker.setTrackingStrategy(MongoDBTailTrackingEnum.TIMESTAMP);
mongoDbTracker.setPersistRecords(5);
mongoDbTracker.setPersistenTailTracking(true);
mongoDbTracker.startRouter();
// run until you terminate the JVM
System.out.println("Starting Camel. Use ctrl + c to terminate the JVM.\n");
}
public void setTrackingStrategy(MongoDBTailTrackingEnum trackingStrategy) {
this.trackingStrategy = trackingStrategy;
}
public void setPersistRecords(int persistRecords) {
this.persistRecords = persistRecords;
}
public void setPersistenTailTracking(boolean persistenTailTracking) {
this.persistenTailTracking = persistenTailTracking;
}
void startRouter() throws Exception {
// create a Main instance
Main main = new Main();
main.bind(MongoConstants.CONN_NAME, new MongoClient("localhost", 27017));
main.addRouteBuilder(new RouteBuilder() {
@Override
public void configure() throws Exception {
getContext().getTypeConverterRegistry().addTypeConverter(InputStream.class, BasicDBObject.class,
new MongoToInputStreamConverter());
from("mongodb://" + MongoConstants.CONN_NAME + "?database=" + database
+ "&collection=" + collection
+ "&persistentTailTracking=" + persistenTailTracking
+ "&persistentId=trackerName" + "&tailTrackDb=local"
+ "&tailTrackCollection=talendTailTracking"
+ "&tailTrackField=lastTrackingValue"
+ "&tailTrackIncreasingField=" + increasingField
+ "&tailTrackingStrategy=" + trackingStrategy.toString()
+ "&persistRecords=" + persistRecords
+ "&cursorRegenerationDelay=1000")
.filter().jsonpath("$[?(@.ns=='optlog_test.customers')]")
.id("logger")
.to("log:logger?level=WARN")
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
Message message = exchange.getIn();
System.out.println(message.getBody().toString());
exchange.getOut().setBody(message.getBody().toString());
}
});
}
});
main.run();
}
}