Cosmos DB - Bulk Insert / Update en .Net

Pour ce premier billet, nous allons nous intéresser aux bases Cosmos DB.

Je ne vous ferai pas ici l'histoire du produit, vous en apprendrez bien plus en visitant le lien précédent. Par ailleurs, nous allons nous concentrer ici sur l'API "SQL", anciennement appelée "documentdb".

J'ai implémenté récemment ce type de base NoSQL afin de stocker un volume relativement important de données - environ 30 000 documents - par jour. Ayant fait un petit ETL maison, que je migrerai fort probablement vers ADF à terme, j'ai rapidement été confronté à la nécessité d'envoyer des ensembles de données par paquets afin d'optimiser les temps de traitement.

Il était jusqu'alors un peu contraignant d'implémenter des insertions et modifications en "bulk", tout du moins pour un développeur .Net comme moi, peu enclin à écrire du javascript...

Du Javascript ?!

Oui, vous avez bien lu ! Le moyen recommandé par l'équipe produit pour faire des opérations bulk consiste à écrire des procédures stockées en javascript qui seront executées directement côté serveur. La bonne nouvelle c'est qu'ici nous n'allons pas nous en soucier. Par contre, si vous souhaitez en savoir plus sur les procédures stockées sur CosmosDB, je vous recommande de suivre cette vidéo.

Bref, trève de bavardage, place au code !

Contexte

Nous allons construire ici une application console, très simple, permettant de générer un ensemble de données et de les envoyer dans une base CosmosDB en bulk.

Nos documents auront cette structure:

    public class FakeOrder
    {
        public string AccountNumber { get; set; }
        public string id { get; set; }
        public DateTime OrderDate { get; set; }
        public string Product { get; set; }
        public int DocumentIndex { get; set; }
       
    }

Nous utiliserons la propriété "AccountNumber" comme clé de partitionnement et la propriété id sera la clé unique de notre document. L'idée étant d'avoir un nombre identique de documents créés par partition.

Voici ensuite une méthode simple de génération de documents:

public static IEnumerable<FakeOrder> GenerateFakeOrders(
    int numberToGeneratePerPartition = 1000,
    int numberOfPartitions = 50,
    int initialAccountNumber = 1)
{
    List<FakeOrder> result = new List<FakeOrder>();

    for (int x = 0; x < numberOfPartitions; x++)
    {
        for (int i = 0; i < numberToGeneratePerPartition; i++)
        {
            result.Add(new FakeOrder
            {
                AccountNumber = (initialAccountNumber + x).ToString(),
                OrderDate = DateTime.UtcNow.AddDays(-i),
                DocumentIndex = i,
                Product = __aSyNcId_<_FhfGfkrg__quot;Produit XXX-{i}",
                id = __aSyNcId_<_FhfGfkrg__quot;{(initialAccountNumber + x).ToString()}-{x}-{i}"
            });
        }
    }

    return result;
}

Dépendances

Ajoutons ensuite une référence au package nuget Microsoft.Azure.CosmosDB.BulkExecutor.

Install-Package Microsoft.Azure.CosmosDB.BulkExecutor

Ce package permet d'effectuer imports et mises à jour des données en bulk. L'ajout dans votre projet de ce nuget ajoutera automatiquement une dépendance au package Microsoft.Azure.DocumentDB utilisé pour toutes les interactions avec les bases CosmosDB avec l'API DocumentDB / SQL.

Ce dernier package permet d'effectuer des opérations de gestion (création de bases, création de collections, etc.) ainsi que le requêtage. Concrètement, il apporte une couche de simplification au dessus de l'API REST de CosmosDB.

Initialisation

Nous allons utiliser le client "DocumentClient" afin d'initialiser notre base de données de tests ainsi que la collection qui va porter nos documents. Je passe volontairement l'étape de création de la ressource Cosmos DB, celle-ci est suffisament illustrée dans la documentation. Pensez bien, lors de cette étape, à récupérer l'URL de votre compte Cosmos DB (Url du type https://<account_name>.documents.azure.com:443) ainsi que la clé.

Commençons par créer notre base de données et notre collection de documents. Pour cela, nous initialisons notre client et précisons les informations de base de notre collection.

public static async Task InitializeDatabaseAndCollection(
    string databaseName,
    string collectionName,
    int throughput = 1000)
{
    Uri serviceEndpoint = new Uri(_documentdbUrl);

    _documentClient = new DocumentClient(serviceEndpoint, _documentdbKey);

    _documentCollection = new DocumentCollection();
    _documentCollection.Id = _collection;
    _documentCollection.PartitionKey.Paths.Add("/AccountNumber");
    _documentCollection.IndexingPolicy.Automatic = true;
    _documentCollection.IndexingPolicy.IndexingMode = IndexingMode.Consistent;

    await DropCollectionIfExistAsync(databaseName, collectionName);

    // Create database
    Database database = new Database() { Id = _database };

    await _documentClient.CreateDatabaseIfNotExistsAsync(database);

    // Create collection
    await _documentClient.CreateDocumentCollectionIfNotExistsAsync(
                UriFactory.CreateDatabaseUri(_database),
                _documentCollection,
                new RequestOptions { OfferThroughput = throughput });
}

Afin de repartir d'une base vide à chaque lancement de l'application de test, j'ai ajouté la méthode "DropCollectionIfExistsAsync" permettant de faire du nettoyage si besoin:

public static async Task DropCollectionIfExistAsync(
    string database,
    string collection)
{
    var existingCollection = await _documentClient.ReadDocumentCollectionAsync(
        UriFactory.CreateDocumentCollectionUri(database, collection));
    if(existingCollection != null)
    {
        await _documentClient.DeleteDocumentCollectionAsync(
            UriFactory.CreateDocumentCollectionUri(database, collection));
    }
}

Import

Une fois notre base et notre collection initialisées, nous allons utiliser le "BulkExecutor" pour envoyer nos objets de tests dans CosmosDB.

Voici une méthode dédiée à cette tâche:

public static async Task<BulkImportResponse> BulkInsertDocuments(
    string database,
    string collection,
    IEnumerable<object> documents,
    bool upsert = false)
{
    Uri collectionUri = UriFactory.CreateDocumentCollectionUri(database, collection);
    var collectionResource = await _documentClient.ReadDocumentCollectionAsync(collectionUri);
    var executor = new BulkExecutor(_documentClient, collectionResource);

    await executor.InitializeAsync();

    return await executor.BulkImportAsync(documents, enableUpsert: upsert);
}

Le fonctionnement de cette méthode est assez simple: on récupère tout d'abord notre collection, que nous utilisons ensuite pour initialiser l'objet de type "BulkExecutor", puis on exécute la méthode "BulkImportAsync" de ce dernier pour réaliser l'import.

Remarque: L'utilisation de la méthode "InitializeAsync" du BulkExecutor a pour but d'analyser la structure de la collection dans laquelle l'import sera réalisé. Ainsi, le BulkExecutor aura connaissance de la manière dont sont partitionnées nos données et pourra alors optimiser ses opérations notamment en les exécutant en parallèle sur les différentes partitions.

La méthode "BulkImportAsync" retourne un objet de type "BulkImportResponse" dont voici le détail:

namespace Microsoft.Azure.CosmosDB.BulkExecutor.BulkImport
{
    public sealed class BulkImportResponse
    {
        public BulkImportResponse();

        public long NumberOfDocumentsImported { get; }
        public double TotalRequestUnitsConsumed { get; }
        public TimeSpan TotalTimeTaken { get; }
        public List<object> BadInputDocuments { get; }
    }
}

Un import de 50 000 documents répartis sur 50 partitions a pris 3 minutes et 21 secondes, pour une collection provisionnée avec 1000 RU/s.

Mise à jour

Afin de mettre à jour nos documents précédement importés, nous allons utiliser la méthode "BulkUpdateAsync" du BulkExecutor. Celle-ci prend en paramètre un ensemble d'objets de type "UpdateItem" qui décrivent chacun une ou plusieurs opérations de mise à jour à appliquer à un document.

Voici leur structure:

namespace Microsoft.Azure.CosmosDB.BulkExecutor.BulkUpdate
{
    public sealed class UpdateItem
    {
        public UpdateItem(string id, string partitionKey, IEnumerable<UpdateOperation> updateOperations);

        [JsonProperty("id")]
        public string Id { get; }
        [JsonProperty("pk")]
        public string PartitionKey { get; }
        [JsonProperty("updates")]
        public IEnumerable<UpdateOperation> UpdateOperations { get; }
    }
}

Les opérations de mises à jour sont décrites par des objets de type "UpdateOperation" pouvant être de plusieurs types:

  • SetUpdateOperation - assigne une valeur à une propriété
  • UnsetOperation - suppression d'une propriété
  • IncUpdateOperation - non experimentée
  • RemoveUpdateOperation - non experimentée
  • PushUpdateOperation - non experimentée

Comme indiqué, je n'ai testé que les deux premiers types d'opérations. Il est fort probable que je mette à jour ce billet lorsque j'aurais trouvé une utilité aux trois autres.

Pour notre exemple, nous allons appliquer trois opérations de modification sur nos documents:

  • un ajout d'une propriété simple (une chaîne de caractères)
  • un ajout d'une propriété complexe (un objet)
  • une suppression de la propriété "DocumentIndex"
List<UpdateItem> updateList = initialDocuments.Select(d =>
    new UpdateItem(
        d.id,
        d.AccountNumber,
        new List<UpdateOperation> {
            new SetUpdateOperation<string>(
                "NewSimpleProperty",
                "New Property Value"),
            new SetUpdateOperation<dynamic>(
                "NewComplexProperty",
                new {
                    prop1 = "Hello",
                    prop2 = "World!"
                }),
            new UnsetUpdateOperation(nameof(FakeOrder.DocumentIndex)),
        })).ToList();

Et enfin, la méthode qui va envoyer à la base les instructions de mise à jour:

public static async Task<BulkUpdateResponse> BulkUpdatetDocuments(
    string database,
    string collection,
    IEnumerable<UpdateItem> updates)
{
    Uri collectionUri = UriFactory.CreateDocumentCollectionUri(database, collection);
    var collectionResource = await _documentClient.ReadDocumentCollectionAsync(collectionUri);
    var executor = new BulkExecutor(_documentClient, collectionResource);

    await executor.InitializeAsync();

    return await executor.BulkUpdateAsync(updates);
}

Une mise à jour des mêmes 50 000 documents sur 50 partitions, a pris 12 minutes et 46 secondes, toujours avec 100 RU/s de provisionnées sur la collection.

Pour tester tout ça chez vous, je vous ai conconcté une application console toute simple reprenant le code expliqué dans ce billet. Vous la trouverez ici.