admin管理员组

文章数量:1400375

We are targeting to keep a counter in Cosmos NoSQL container in single logical partition. (Single region write account)

{
    "version": <int>
}

If we use patch increment operation to increment version value, will it be ACID compliant, means if two application servers, in parallel, hits increment operation, can they get conflict error or they will both be in isolation totally and execute sequentially. (Not intending to use Etag with patch operations)

Existing value suppose version = 4 in containerFor example in parallel if Cosmos patch api is fired, details below:Server 1 : patch(increment by1)Server 2: patch(increment by 1)Server 3: patch(increment by 1)............Server n: patch(increment by 1)

will value be 4+n without any errors and conflict exception? or it might be 4 + (x), x <= n, and are there any chances of errors like conflict error etc., assuming RU/s are sufficient at container level

We are targeting to keep a counter in Cosmos NoSQL container in single logical partition. (Single region write account)

{
    "version": <int>
}

If we use patch increment operation to increment version value, will it be ACID compliant, means if two application servers, in parallel, hits increment operation, can they get conflict error or they will both be in isolation totally and execute sequentially. (Not intending to use Etag with patch operations)

Existing value suppose version = 4 in containerFor example in parallel if Cosmos patch api is fired, details below:Server 1 : patch(increment by1)Server 2: patch(increment by 1)Server 3: patch(increment by 1)............Server n: patch(increment by 1)

will value be 4+n without any errors and conflict exception? or it might be 4 + (x), x <= n, and are there any chances of errors like conflict error etc., assuming RU/s are sufficient at container level

Share Improve this question edited Mar 26 at 22:09 David Makogon 70.9k22 gold badges145 silver badges198 bronze badges asked Mar 26 at 10:10 Satyam GargSatyam Garg 1769 bronze badges
Add a comment  | 

2 Answers 2

Reset to default 1

Just like with operations like Replace or Upsert, if you want to handle concurrent operations over the same document, you need to use Optimistic Concurrency Control.

In that scenario, if 2 operations on the same document (id + Logical Partition Key) are attempted, only one will succeed and the other will return 412, which you need to handle if needed.

Normally the flow for OCC is:

  1. Perform a Read on the item, get the ETag.

  2. Apply the Patch/Replace passing the ETag

  3. Handle the 412 if needed. In case it needs to increase again, then repeat steps 1 and 2.

From some testing in parallel on a single write region account if you execute n successful increment patch requests then the final result is that the document is incremented n times.

Large document test

I set up a test for a single partition collection with a single very large document (1000162 bytes for which a read takes 16/17 RU). Wildcard indexing was disabled for the container.

This had 4 tasks in parallel all executing patch operations until they achieved 1,000 successful requests, at the end of which the pathToIncrement was 4000 as expected (with no lost updates)

The total requests metric graph during the test was below

The aggregated results were

status code 200 429 449
total requests 4000 6224 1443
total request units 5064438.73 2371.04762 225813.294
avg request units 1266.1 0.4 156.5

So quite a few were rejected for rate limiting (429) but also there were a significant proportion that were rejected with status code 449 (indicating that the operation should be retried) - from the comments here this is an expected possible result from concurrent updates to the same item.

As for how this is implemented internally I haven't seen any documentation on that. The RU costs per operation are significant so I assume that the server probably does do the document read and attempts a write and then finds a concurrency violation and abandons the write.

The successful writes are around 8x the cost of the 449 status ones which may correspond to successful writes also having to delete the previous document version and needing to be applied across 4 replicas.

Small document test

On creating the document without the bigString property it is < 150 bytes so is less RU constrained and can support more patch operations per minute.

After repeating the test with the above configuration and with PerformPatchOperations(10_000) the final value in the document was 40000 as expected.

Annoyingly the TotalRequests metrics seemed to lose some of these but the graph from that FWIW was

showing a steady stream of successful and unsuccessful attempts and no throttling on this occasion. The average costs per operation based on the (incomplete) metrics data for this attempt was

status code 200 449
total requests 38925 10528
total request units 405221.024 15541.3333
avg request units 10.4 1.5

Quick and dirty code used for this test was

using System.Net;
using Microsoft.Azure.Cosmos;

var connString =
    "...";
var databaseName = "db"; //this database will be dropped and recreated so don't use an existing one!
var collectionName = "test_collection";


await CreateCollectionAndInsertDocument();

var patchers = new[]
{
    new CosmosPatchTester("patchTester1", connString, databaseName, collectionName),
    new CosmosPatchTester("patchTester2", connString, databaseName, collectionName),
    new CosmosPatchTester("patchTester3", connString, databaseName, collectionName),
    new CosmosPatchTester("patchTester4", connString, databaseName, collectionName)
};


var patchTasks = patchers.Select(p => p.PerformPatchOperations(1_000)).ToArray();

Console.WriteLine("patchTasks array populated");


Task.WaitAll(patchTasks);

Console.WriteLine("Finished");

Console.ReadLine();


async Task CreateCollectionAndInsertDocument()
{
    var client = new CosmosClient(connString,
        new CosmosClientOptions {EnableContentResponseOnWrite = false, ConnectionMode = ConnectionMode.Direct});

    var db = client.GetDatabase(databaseName);

    try
    {
        await db.DeleteAsync();
    }
    catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.NotFound)
    {
    }

    db = await client.CreateDatabaseAsync(databaseName);

    var containerProperties = new ContainerProperties(collectionName, "/id");

    containerProperties.IndexingPolicy.ExcludedPaths.Add(new ExcludedPath
    {
        Path = "/*"
    });

    var coll = await db.CreateContainerAsync(containerProperties,
        ThroughputProperties.CreateAutoscaleThroughput(10_000));

    var container = coll.Container;

    await container.CreateItemAsync(new {id = "doc1", pathToIncrement = 0, bigString = new string('*', 1_000_000)});

    Console.WriteLine("Database [re]created and single document inserted");
}

public class CosmosPatchTester
{
    private readonly string _id;
    private readonly Container _container;

    public CosmosPatchTester(string id, string connString, string databaseName, string collectionName)
    {
        _id = id;
        var client = new CosmosClient(connString,
            new CosmosClientOptions {EnableContentResponseOnWrite = false, ConnectionMode = ConnectionMode.Direct});

        var db = client.GetDatabase(databaseName);
        _container = db.GetContainer(collectionName);
    }

    public async Task PerformPatchOperations(int count)
    {
        List<PatchOperation> operations = [PatchOperation.Increment("/pathToIncrement", 1)];

        var successfulCalls = 0;

        while (successfulCalls < count)
            try
            {
                await _container.PatchItemAsync<object>(
                    "doc1",
                    new PartitionKey("doc1"),
                    operations
                );

                successfulCalls++;
            }
            catch (CosmosException ex)
            {
                Console.WriteLine($"Tester {_id} encountered error status {ex.StatusCode}");
            }
    }
}

本文标签: