228.11. oplog Tail Tracking
oplog collection 跟踪功能允许在 MongoDB 中实施类似于功能的触发器。要激活此集合,需要首先激活副本集。有关此主题的更多信息,请参阅 https://docs.mongodb.com/manual/tutorial/deploy-replica-set/。
您可以在下面找到一个基于 Java DSL 的路由的示例,它演示了如何使用组件来跟踪 oplog 集合。在这个特定情况下,我们过滤影响数据库 optlog_test 中的集合 客户 的事件。请注意,tailTrackIncreasingField
是一个时间戳字段('ts'),这意味着您必须使用带有 TIMESTAMP 值的 tailTrackingStrategy
参数。
import com.mongodb.BasicDBObject; import com.mongodb.MongoClient; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mongodb.MongoDBTailTrackingEnum; import org.apache.camel.main.Main; import java.io.InputStream; /** * For this to work you need to turn on the replica set * <p> * Commands to create a replica set: * <p> * rs.initiate( { * _id : "rs0", * members: [ { _id : 0, host : "localhost:27017" } ] * }) */ public class MongoDbTracker { private final String database; private final String collection; private final String increasingField; private MongoDBTailTrackingEnum trackingStrategy; private int persistRecords = -1; private boolean persistenTailTracking; public MongoDbTracker(String database, String collection, String increasingField) { this.database = database; this.collection = collection; this.increasingField = increasingField; } public static void main(String[] args) throws Exception { final MongoDbTracker mongoDbTracker = new MongoDbTracker("local", "oplog.rs", "ts"); mongoDbTracker.setTrackingStrategy(MongoDBTailTrackingEnum.TIMESTAMP); mongoDbTracker.setPersistRecords(5); mongoDbTracker.setPersistenTailTracking(true); mongoDbTracker.startRouter(); // run until you terminate the JVM System.out.println("Starting Camel. Use ctrl + c to terminate the JVM.\n"); } public void setTrackingStrategy(MongoDBTailTrackingEnum trackingStrategy) { this.trackingStrategy = trackingStrategy; } public void setPersistRecords(int persistRecords) { this.persistRecords = persistRecords; } public void setPersistenTailTracking(boolean persistenTailTracking) { this.persistenTailTracking = persistenTailTracking; } void startRouter() throws Exception { // create a Main instance Main main = new Main(); main.bind(MongoConstants.CONN_NAME, new MongoClient("localhost", 27017)); main.addRouteBuilder(new RouteBuilder() { @Override public void configure() throws Exception { getContext().getTypeConverterRegistry().addTypeConverter(InputStream.class, BasicDBObject.class, new MongoToInputStreamConverter()); from("mongodb://" + MongoConstants.CONN_NAME + "?database=" + database + "&collection=" + collection + "&persistentTailTracking=" + persistenTailTracking + "&persistentId=trackerName" + "&tailTrackDb=local" + "&tailTrackCollection=talendTailTracking" + "&tailTrackField=lastTrackingValue" + "&tailTrackIncreasingField=" + increasingField + "&tailTrackingStrategy=" + trackingStrategy.toString() + "&persistRecords=" + persistRecords + "&cursorRegenerationDelay=1000") .filter().jsonpath("$[?(@.ns=='optlog_test.customers')]") .id("logger") .to("log:logger?level=WARN") .process(new Processor() { public void process(Exchange exchange) throws Exception { Message message = exchange.getIn(); System.out.println(message.getBody().toString()); exchange.getOut().setBody(message.getBody().toString()); } }); } }); main.run(); } }