228.11. oplog Tail Tracking


oplog collection 跟踪功能允许在 MongoDB 中实施类似于功能的触发器。要激活此集合,需要首先激活副本集。有关此主题的更多信息,请参阅 https://docs.mongodb.com/manual/tutorial/deploy-replica-set/

您可以在下面找到一个基于 Java DSL 的路由的示例,它演示了如何使用组件来跟踪 oplog 集合。在这个特定情况下,我们过滤影响数据库 optlog_test 中的集合 客户 的事件。请注意,tailTrackIncreasingField 是一个时间戳字段('ts'),这意味着您必须使用带有 TIMESTAMP 值的 tailTrackingStrategy 参数。

import com.mongodb.BasicDBObject;
import com.mongodb.MongoClient;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mongodb.MongoDBTailTrackingEnum;
import org.apache.camel.main.Main;

import java.io.InputStream;

/**
 * For this to work you need to turn on the replica set
 * <p>
 * Commands to create a replica set:
 * <p>
 * rs.initiate( {
 * _id : "rs0",
 * members: [ { _id : 0, host : "localhost:27017" } ]
 * })
 */
public class MongoDbTracker {

    private final String database;

    private final String collection;

    private final String increasingField;

    private MongoDBTailTrackingEnum trackingStrategy;

    private int persistRecords = -1;

    private boolean persistenTailTracking;

    public MongoDbTracker(String database, String collection, String increasingField) {
        this.database = database;
        this.collection = collection;
        this.increasingField = increasingField;
    }

    public static void main(String[] args) throws Exception {
        final MongoDbTracker mongoDbTracker = new MongoDbTracker("local", "oplog.rs", "ts");
        mongoDbTracker.setTrackingStrategy(MongoDBTailTrackingEnum.TIMESTAMP);
        mongoDbTracker.setPersistRecords(5);
        mongoDbTracker.setPersistenTailTracking(true);
        mongoDbTracker.startRouter();
        // run until you terminate the JVM
        System.out.println("Starting Camel. Use ctrl + c to terminate the JVM.\n");

    }

    public void setTrackingStrategy(MongoDBTailTrackingEnum trackingStrategy) {
        this.trackingStrategy = trackingStrategy;
    }

    public void setPersistRecords(int persistRecords) {
        this.persistRecords = persistRecords;
    }

    public void setPersistenTailTracking(boolean persistenTailTracking) {
        this.persistenTailTracking = persistenTailTracking;
    }

    void startRouter() throws Exception {
        // create a Main instance
        Main main = new Main();
        main.bind(MongoConstants.CONN_NAME, new MongoClient("localhost", 27017));
        main.addRouteBuilder(new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                getContext().getTypeConverterRegistry().addTypeConverter(InputStream.class, BasicDBObject.class,
                        new MongoToInputStreamConverter());
                from("mongodb://" + MongoConstants.CONN_NAME + "?database=" + database
                        + "&collection=" + collection
                        + "&persistentTailTracking=" + persistenTailTracking
                        + "&persistentId=trackerName" + "&tailTrackDb=local"
                        + "&tailTrackCollection=talendTailTracking"
                        + "&tailTrackField=lastTrackingValue"
                        + "&tailTrackIncreasingField=" + increasingField
                        + "&tailTrackingStrategy=" + trackingStrategy.toString()
                        + "&persistRecords=" + persistRecords
                        + "&cursorRegenerationDelay=1000")
                        .filter().jsonpath("$[?(@.ns=='optlog_test.customers')]")
                        .id("logger")
                        .to("log:logger?level=WARN")
                        .process(new Processor() {
                            public void process(Exchange exchange) throws Exception {
                                Message message = exchange.getIn();
                                System.out.println(message.getBody().toString());
                                exchange.getOut().setBody(message.getBody().toString());
                            }
                        });
            }
        });
        main.run();
    }
}
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.