Articles in this section

Listen for real-time changes from MongoDB using CDC

Prerequisites

Before you begin:

Overview

Use Change Data Capture (CDC) to stream inserts, updates, and deletes from MongoDB collections into your flows in near real-time. CDC reads from MongoDB change streams, so changes arrive within seconds rather than waiting for a scheduled run.

Creating a MongoDB CDC listener provides the following benefits:

  • MongoDB CDC listener delivers updates within seconds, rather than waiting for the next scheduled run.

  • Reduces MongoDB load by reading the oplog instead of running heavy scheduled queries.

  • Streams only changes, cutting data transfer and downstream compute.

  • Uses server-side aggregation pipelines to filter and select fields from specific databases and collections.

  • Works with Slowly Changing Dimension (SCD) Type 2 history tables without comparing entire tables, and can show before-and-after changes if Change Stream capture is turned on.

Create a listener as the source in an integration flow to capture change data from MongoDB in real time.

Create a MongoDB listener

Follow these steps to configure a listener that captures change data from MongoDB and connects it to your flow.

  1. In the left navigation, go to Build > Flows > Create flow.

  2. In Flow builder, select Add source and configure details in the new flow step.

    add-source-flow-builder.png

Configure step and connection details

step-details.png
  1. In Application, search and select the database connector MongoDB from the list.

  2. In Step type, select Listen for real-time data from source application.

  3. In Name your step, enter a unique name. You can reuse this listener across flows, so a specific name makes it easier to identify in a list.

  4. Optional: In Describe your step, enter a clear and concise description of the flow step to help anyone using this listener in their flows understand its purpose. Keep this description updated as you make changes.

After configuring the step, complete the listener details.

listener-general-details.png

Configure listener

Configure the scope and filters that determine which MongoDB changes your listener captures.

Tip

Before configuring the listener, review FAQs – MongoDB listener for change data capture (CDC) for guidance on snapshot modes, cursor behavior, and scope changes.

configure-real-time-application_in_source_application.png

Tip

You can use regular expressions to specify patterns for databases, collections, or fields.

  1. In Connection, select a MongoDB connection from the list. If you don't have a connection, select Create connection at the end of the list and set up a connection to MongoDB.

  2. In Collections, enter a comma-separated list of values to specify which MongoDB collections to listen to.

    • If you want to listen to collections on the connection database, enter the collection names.

    • If you want to listen to collections on databases other than the connection database, enter the full path like database.collection.

    • If you want to listen to multiple collections, enter only collection names for those on the connection database, and the full path like database.collection for those on other databases.

    For example, enter users, database2.products where users represents a collection on the connection database and database2.products represents the products collection on a different database.

    Tip

    If you leave Collections blank, events from all collections on the connection database are captured. Use collection.exclude.list in Additional properties to ignore specific collections.

  3. Optional: In Aggregation pipeline, enter a specific flow of operations on databases, collections, or fields based on how you would like to process, transform, and return results. Each successive operation is based on the previous result. You must always specify the full path for collections and fields here.

  4. In Additional properties, enter key-value pairs for databases, collections, or fields that you would like to listen to or not listen to. You must always specify the full path for collections and fields here. Based on your requirements, you can set optional properties, such as:

    • database.include.list

    • database.exclude.list

    • collection.exclude.list

    • field.exclude.list

    For example, if you want to listen to databases other than the connection database, you can set database.include.list.

    Default properties: capture.mode defaults to change_streams_update_full and snapshot.mode defaults to no_data. You can change these default values if required.

  5. In Fields to include, select which parts of the change event to include in the listener payload. By default, only the after field is selected, but you can reduce payload size and simplify downstream processing by choosing only the fields you need (for example, just after and op to capture the updated record and operation type). Nested fields can also be included or excluded for more control.

Advanced settings

Configure these settings only if your scenario requires them.

advanced-listener.png
  1. In Max wait time, set the maximum time (in seconds) for the listener to wait before sending a batch of events. Typically, events are sent when the page size limit is reached. If the events are fewer, they are sent after the wait time. The default is 300 seconds. The minimum wait time is 30 seconds.

  2. In Page size, specify how many records you want in each page of data. The Celigo platform splits exported data into pages of records. The default is 250 records per page; pages are capped automatically at 5 MB. The application you are importing data into will most often be the bottleneck on page size.

  3. In Data URI template, when your flow runs but has data errors, use this field to ensure that all the errors in your job dashboard have a link to the original data from the export application. This field uses a handlebars template to generate the dynamic links based on the data being exported.

  4. Turn on Do not store retry data if you don't want the Celigo platform to store retry data for records that fail in your flow. Storing retry data can slow down your flow's overall performance if you are processing very large numbers of failing records.

  5. In Override trace key template, define a trace key that the Celigo platform will use to identify a unique record. Any value you provide overrides the default trace key for your app. You can specify a single field, such as {{record.field1}}, or use a handlebars expression. For example, {{join "_" record.field1 record.field2}} generates a trace key such as 123_456. If you have applied a transformation to exported data, reference its fields in the trace key template without the path record. — for example, {{field1}}.

After saving your listener, it appears in the flow builder. Connect it to an import step to begin capturing changes from MongoDB.