How To Perform Azure Cross Container Transaction Operation In Cosmos DB

Introduction

This information was gathered while Implementing ingesting data to Cosmos DB.

The requirement was to insert approximately one thousand documents in cosmos DB containers in minimum time with atomicity. We have to ingest data into two collections using Transactions

Limitations

By default cosmos DB operations do not support transactions for this first we used TransactionBatch class.

Currently, there are two known limits:

  • The Azure Cosmos DB request size limit constrains the size of the TransactionalBatch payload to not exceed 2 MB, and the maximum execution time is 5 seconds.
  • There is a current limit of 100 operations per TransactionalBatch to ensure the performance is as expected and within SLAs.

Next was procedures.

As described in MS docs "The JavaScript-based stored procedures, triggers, UDFs, and merge procedures are wrapped within an ambient ACID transaction with snapshot isolation across all items within the logical partition. During its execution, if the JavaScript program throws an exception, the entire transaction is aborted and rolled back. The resulting programming model is simple yet powerful. JavaScript developers get a durable programming model while still using their familiar language constructs and library primitives."

Procedures have a limit of a maximum of 5 seconds run time if you need to run a procedure for more than that then you need to use continuation.

So this seemed a better fit for me as the max row would reach only 1000 and in the context of cosmosDb, 5 seconds for 1000 rows is more than enough.

Example

Write Stored Procedure for upsert; i.e., insert/update

Below is the upsert procedure which first checks the item in the container and if it is present in the container then updates the records otherwise inserts the item in the container.

function createToDoItems(items) {
    var collection = getContext().getCollection();
    var collectionLink = collection.getSelfLink();
    var count = 0;

    if (!items) throw new Error("The array is undefined or null.");

    var numItems = items.length;

    if (numItems == 0) {
        getContext().getResponse().setBody(0);
        return;
    }

    tryCreate(items[count], callback);

    function tryCreate(item, callback, continuation) {
        var options = { disableAutomaticIdGeneration: false };

		var requestOptions = {continuation: continuation};
		var query = "SELECT * FROM c WHERE c.id = '"+item.id+"'";
		 
		var isAccepted = collection.queryDocuments(collectionLink, query, requestOptions, function (err, documents, responseOptions) {
            if (err) throw err;

            if (documents.length > 0) {
                tryUpdate(documents[0], item);
            } else {
                var isCreated = collection.createDocument(collectionLink, item, options, callback);
				if (!isAccepted) getContext().getResponse().setBody(count);
            }
        });
        if (!isAccepted) getContext().getResponse().setBody(count);
    }

    function callback(err, item, options) {
        if (err) throw err;
        count++;
        if (count >= numItems) {
            getContext().getResponse().setBody(count);
        } else {
            tryCreate(items[count], callback);
        }
    }
	
	function tryUpdate(document, update) {
		var requestOptions = {etag: document._etag};

		var isAccepted = collection.replaceDocument(document._self, update, requestOptions, function (err, updatedDocument, responseOptions) {
			if (err) throw err;
			count++;
			if (count >= numItems) {
				getContext().getResponse().setBody(count);
			}
			else{	
				tryCreate(items[count], callback);
			}
			if (!isAccepted) getContext().getResponse().setBody(count);
		});
	}
}

Write C# code to call the cosmos procedure.

try
{ 
    var result = await this.container.Container.Scripts.ExecuteStoredProcedureStreamAsync("<SProcName>", new PartitionKey("<PartitionKeyValue>"), new[] { Data.ToArray() }); 
    result.EnsureSuccessStatusCode();    
    resultCode = result.StatusCode;
}
catch (CosmosException ex)
{
}

By using this approach we completed the operation in 27 milliseconds average time after 25 requests which is quite impressive.

Now if you are performing operations on 2 containers with different partition keys then sproc will help you a lot. For ACID I came up with few rules of my own,

  1. The approach I went ahead with was to first upsert the child data
  2. Fetch all the existing child container records in a list. {We will use the later in rollback}.
  3. If child upsert faces some issue then the sproc will throw an exception and rollback the transaction and I will not proceed with parent data
  4. If the child is successful then I'll proceed with parent container data
  5. If Parent container upsert in successfully {Yay!}
  6. If parent container upsert faces an issue then sproc will throw an exception and roll back this transaction but the child data is already inserted and we have to revert that manually. {Now we will use the data which we got in step 2.}
  7. Delete the entire child container item and insert the data which we fetched in step 2.
public async Task RollBack(IEnumerable<Data> existingData, IEnumerable<Data> newData)
{
    // Check if exisitingData is present
    // if true  -> Delete new created documents and create old documents
    // if false -> Delete the new created documents from collection
    if (existingData?.Count() > 0)
    {
        await this.DeleteAsync(existingData.ToList());
        await this.UpsertAsync(newData.ToList());
    }
    else
    {
        await this.DeleteAsync(newData.ToList());
    }
}

And that's it using these simple steps I was able to perform my transaction. The worst-case time duration for this logic was 200 milliseconds for 1000 items which is good for me. Well, you can always modify and those rules according to your needs and come with a new approach altogether.

Please reach out to me if you find a better way of doing this.

I hope you find this useful.

Thanks.


Similar Articles