Incremental Copy Cosmos DB Data To SQL Server

Are you using Azure Cosmos DB in your cloud solutions and do you have the need to copy your Cosmos DB data to SQL Server on a regular interval and keep it in sync? In this article I will explain how this is accomplished with Azure Data Factory. Instead of using a traditional full data load method that copies the full dataset from a particular source, we will be using an incremental method. The incremental method is more efficient because it will only copy newly created or modified data compared to the last run. The image below shows the end result of the pipeline that will be created to run the data transfer process.

This article assumes that SQL Server is running on-premise, but this is not required. You can easily connect with a SQL Server running in Azure as well.

Incremental Copy Pipeline

Create and install a self-hosted integration runtime

By default, Azure Data Factory provides the Azure runtime with the name AutoResolveIntegrationRuntime. This runtime runs in Azure and can be used, in conjunction with a linked service, to connect with Azure resources such as your Cosmos DB. You need a self-hosted integration runtime to run the copy activity between your Cosmos DB and your on-premise SQL Server database. The self-hosted integration runtime only makes outbound HTTP-based connections to the internet.

Open the Azure Data Factory Studio and navigate to the integration runtimes which you can find on the Manage page. Create a new self-hosted integration runtime with a matching name, for example: OnPremiseIntegrationRuntime, so the distinction with the Azure runtime is clear. After creation you will need to download and install the integration runtime on your on-premise machine. Make sure the machine is equipped with the minimum system requirements Microsoft recommends and is able to connect with Azure and your SQL Server database. Click on the just created self-hosted integration runtime in your Azure Data Factory and follow the steps under ‘Option 2: Manual setup’ as shown in the image below.

Self-Hosted Integration Runtime

After you used one of the keys to register your self-hosted integration runtime, you should get the following result.

Register Self-Hosted Integration Runtime

Create a linked service for Cosmos DB

A linked service contains the connection string that the service will use at runtime to connect with your database. Open the Azure Data Factory Studio and navigate to the linked services which you can find on the Manage page. Create a new linked service for your Cosmos DB and make sure you select the auto resolve integration runtime during the creation process. The image below shows an example of the linked service for Cosmos DB (SQL API).

Azure Cosmos DB (SQL API) Linked Service

Create a linked service for SQL Server database

Create a new linked service for your SQL Server database and make sure you select the self-hosted integration runtime you created earlier. Because your SQL Server runs on-premise, you need this runtime to make the connection. The image below shows an example of the linked service for SQL Server database.

SQL Server Linked Service

Create a watermark table and dataset

To make the incremental method work, we need to keep track of what has been copied during the last pipeline run. This can be accomplished using a watermark table. A watermark is a column or attribute in the source database that has the last updated timestamp or an incrementing key. After completion of the copy activity, the watermark table wil be updated with the max timestamp of the copied data. The next time the pipeline runs it will query the watermark table to retrieve the timestamp of the last run (old watermark) and query the Cosmos DB to retrieve the max timestamp of the latest data (new watermark). The delta between the old watermark and the new watermark specifies the data to be copied from Cosmos DB to SQL Server. The examples in this article will work with timestamps, but this can also be achieved using an incrementing key.

Create the following watermark table in your on-premise SQL Server database:

CREATE TABLE Watermark
(
    TableName VARCHAR(128) NOT NULL,
    WatermarkValue BIGINT NOT NULL
)

After you created the watermark table in your SQL Server database, open Azure Data Factory Studio and navigate to the factory resources which you can find on the Author page. Create a new dataset and link it to your watermark table using the linked service and on-premise integration runtime as shown in the image below.

Watermark Dataset

Create a stored procedure to update the watermark table

The pipeline will use a stored procedure to update the watermark table after completion of the copy activity. Add the following stored procedure to your SQL Server database:

CREATE PROCEDURE sp_update_watermark @LastModified BIGINT, @TableName varchar(50)
AS

BEGIN

UPDATE Watermark
SET [WatermarkValue] = @LastModified
WHERE [TableName] = @TableName

END

Create a source and sink dataset for your data

The source dataset will be linked to your container in Cosmos DB that holds the JSON data. The sink dataset will be linked to your table in SQL Server where your JSON data will be copied to. You might already have an idea what your table in SQL Server will look like, but showing some examples can do no harm and might give you some helpfull insights. Below you can find example JSON data that needs to be saved in a SQL Server database. Note the _ts attribute. It specifies the last updated timestamp of the resource and it will be used to update the watermark table discussed above.

[
    {
        "id": "775e7ee3-c6b7-4a24-ba03-098d87879ad6",
        "Title": "Azure Data Factory Cookbook",
        "Author": "Dmitry Anoshin",
        "Publisher": "Packt Publishing (December 24, 2020)",
        "Description": "Learn how to load and transform data from various sources, both on-premises and on cloud.",
        "_ts": 1665852768
    },
    {
        "id": "2490b538-fdcf-4fdf-b2cc-41e9da8685dd",
        "Title": "Azure Data Factory by Example",
        "Author": "Richard Swinbank",
        "Publisher": "Apress; 1st ed. edition (June 10, 2021)",
        "Description": "Data engineers will use this book to build skills in Azure Data Factory v2 (ADF).",
        "_ts": 1665853796
    }
]

You can use the table definition shown below as the destination table in SQL Server. To keep things simple the column names match the attribute names of the JSON.

CREATE TABLE Books
(
    Id NVARCHAR(128) NULL,
    Title NVARCHAR(128) NULL,
    Author NVARCHAR(128) NULL,
    Publisher NVARCHAR(128) NULL,
    Description NVARCHAR(1024) NULL,
    Timestamp BIGINT NULL
)

You can use this same approach to define your destination table where the column names match the attribute names of the JSON data in your own Cosmos DB. I used the _ts attribute as the watermark value, but if you want you can use an incrementing key. If you have everything in place, create the corresponding source and sink datasets in Azure Data Factory. Using the examples above, you would end up with a BooksSourceDataset (linked to your container in Cosmos DB) and a BooksDestinationDataset (linked to your table in SQL Server).

Create the incremental copy pipeline

Now we can finally start creating the pipeline. Open Azure Data Factory Studio and navigate to the factory resources which you can find on the Author page. Create a new pipeline and name it IncrementalCopyPipeline as shown in the image below. After creation you get an editor window on which you can drag and drop activities.

Add lookup old watermark activity to the incremental copy pipeline

The first activity the pipeline needs to execute is retrieve the last recorded watermark value. This value is updated after the completion of the copy activity by the stored procedure you created earlier. Search for the lookup activity in the left menu and drag and drop it into the editor. Name it LookupOldWatermarkActivity and select the SQL Server Watermark dataset as the source dataset as shown in the image below.

Lookup Old Watermark Activity

Add the following query to the query area:

SELECT TableName, WatermarkValue
FROM Watermark
WHERE TableName = 'books'

Add lookup new watermark activity to the incremental copy pipeline

The second activity the pipeline needs to execute is retrieve the maximum timestamp of the latest data we want to copy. With the old and new watermark values retrieved, we can calculate the delta which you will use later in the query for the copy activity. Drag and drop a new lookup activity into the editor, name it LookupNewWatermarkActivity and select your Cosmos DB source dataset as shown in the image below.

Lookup New Watermark Activity

You can use the query below to select the maximum timestamp of the JSON data in Cosmos DB. The _ts attribute is a system attribute, so it will also be available in your Cosmos DB.

SELECT TOP 1 b._ts AS NewWatermarkvalue 
FROM books b
ORDER BY b._ts DESC

Add copy activity to the incremental copy pipeline

The third activity the pipeline needs to execute is the actual copy activity. Search for the copy activity in the left menu and drag and drop it into the editor. Give it the name: IncrementalCopyActivity.

Incremental Copy Activity

At the Source tab, select your Cosmos DB source dataset and specify the query to retrieve the data to be copied. To retrieve only the changes that haven’t been copied yet, the select query will use the old and new watermark values in the where statement. Add the following query to the query area.

select * from books b where b._ts > @{activity('LookupOldWatermarkActivity').output.firstRow.Watermarkvalue} and b._ts <= @{activity('LookupNewWatermarkActivity').output.firstRow.NewWatermarkvalue}

At the Sink tab, select your SQL Server sink dataset and check the insert write behaviour. Than navigate to the Mapping tab where you can map your JSON data to your destination table in SQL Server. If you want, you can omit some attributes that you don’t want to map.

Add update watermark activity to the incremental copy pipeline

The final piece to make this all work is the stored procedure activity which will update the watermark table with the maximum timestamp of the copied data. Search for the stored procedure activity in the left menu and drag and drop it into the editor. In the Settings tab, select the linked service that connects with your on-premise SQL Server database. This will automatically set the integration runtime to the self-hosted integration runtime which runs on your on-premise machine and is responsible for the connectivity between Azure and your SQL Server database. Select the stored procedure sp_update_watermark.

Update Watermark Activity

The update watermark stored procedure requires the LastModified and TableName parameters. Use the maximum timestamp returned by the LookupNewWatermarkActivity as the value for the LastModified parameter. Use the table name, returned by the LookupOldWatermarkActivity, as the value for the TableName parameter. The table below shows how to pass these values as parameters to the stored procedure.

NameTypeValue
LastModifiedInt64@{activity(‘LookupNewWatermarkActivity’).output.firstRow.NewWatermarkvalue}
TableNameString@{activity(‘LookupOldWatermarkActivity’).output.firstRow.TableName}

Run the incremental copy pipeline

After you published your pipeline to Azure Data Factory using the Publish all button, you can hit the Debug button to run your pipeline and check if everything works as expected. If all activities succeeded, you will get the same output as shown in the image below. This means your Cosmos DB data has been successfully copied to SQL Server.

Run Incremental Copy Pipeline

The next thing you want to do is create a scheduled trigger that runs on a regular interval as shown in the image below. This way you can keep your SQL Server database in sync with your Cosmos DB.

Scheduled Trigger

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top