Batch operations on cosmosdb using python SDK

229 views Asked by At

While performing the batch operations on Azure cosmosdb using python SDK i am getting the below error. can someone please help to guide/advise me where i am doing wrong and what is required to fix the issue..

Note :- I am getting this error when i perform the replace operation rest everything i.e. create/delete/read/upsert operations are able to execute..

Code:-

import azure.cosmos.cosmos_client as cosmos_client
import azure.cosmos.exceptions as exceptions
from azure.cosmos.http_constants import StatusCodes
from azure.cosmos.partition_key import PartitionKey
import datetime

import config

HOST = config.settings['host']
MASTER_KEY = config.settings['master_key']
DATABASE_ID = config.settings['database_id']
CONTAINER_ID = config.settings['container_id']
CONTAINER_MH_ID = config.settings['container_mh_id']

def execute_item_batch(database):
    print('\n1.10 Executing Batch Item operations\n')
    container = database.create_container_if_not_exists(id="batch_container",
                                                        partition_key=PartitionKey(path='/account_number'))
    # We create three items to use for the sample.
    container.create_item(get_sales_order("read_item"))
    container.create_item(get_sales_order("delete_item"))
    container.create_item(get_sales_order("replace_item"))


    #We create our batch operations
    create_item_operation = ("create", (get_sales_order("create_item"),))
    upsert_item_operation = ("upsert", (get_sales_order("upsert_item"),))
    read_item_operation = ("read", ("read_item",))
    delete_item_operation = ("delete", ("delete_item",))
    replace_item_operation = ("replace", ("replace_item", {"id": "replace_item", "message": "item was replaced"}))
    # replace_item_if_match_operation = ("replace",
    #                                    ("replace_item", {"id": "replace_item", "message": "item was replaced"}),
    #                                    {"if_match_etag": container.client_connection.last_response_headers.get("etag")})
    # replace_item_if_none_match_operation = ("replace",
    #                                        ("replace_item", {"id": "replace_item", "message": "item was replaced"}),
    #                                        {"if_none_match_etag":
    #                                             container.client_connection.last_response_headers.get("etag")})

    # Put our operations into a list
    batch_operations = [
        create_item_operation,
        upsert_item_operation,
        read_item_operation,
        delete_item_operation,
        replace_item_operation,
        # replace_item_if_match_operation,
        # replace_item_if_none_match_operation
        ]

    # # Run that list of operations
    batch_results = container.execute_item_batch(batch_operations=batch_operations, partition_key='Account1')
    # # Batch results are returned as a list of item operation results - or raise a CosmosBatchOperationError if
    # # one of the operations failed within your batch request.
    print("\nResults for the batch operations: {}\n".format(batch_results))

def run_sample():
    client = cosmos_client.CosmosClient(HOST, {'masterKey': MASTER_KEY})
    try:
        # setup database for this sample
        db = client.create_database_if_not_exists(id=DATABASE_ID)
        # setup container for this sample
        execute_item_batch(db)

    finally:
        print("\nrun_sample done")

if __name__ == '__main__':
    run_sample()

Please find the below image for the items in the database..

enter image description here

Error details:-

Traceback (most recent call last):
  File "C:xxxxxxxxxxxxxxxxxxxxcosmosdb\document_management.py", line 552, in <module>
    run_sample()
  File "C:xxxxxxxxxxxxxxxxxxxxcosmosdb\document_management.py", line 518, in run_sample
    execute_item_batch(db)
  File "C:xxxxxxxxxxxxxxxxxxxxcosmosdb\document_management.py", line 225, in execute_item_batch
    batch_results = container.execute_item_batch(batch_operations=batch_operations, partition_key='Account1')
  File "C:xxxxxxxxxxxxxxxxxxxxcosmosdb\lib\site-packages\azure\core\tracing\decorator.py", line 78, in wrapper_use_tracer
    return func(*args, **kwargs)
  File "C:xxxxxxxxxxxxxxxxxxxxcosmosdb\lib\site-packages\azure\cosmos\container.py", line 709, in execute_item_batch
    result = self.client_connection.Batch(
  File "C:xxxxxxxxxxxxxxxxxxxxcosmosdb\lib\site-packages\azure\cosmos\_cosmos_client_connection.py", line 1784, in Batch
    raise exceptions.CosmosBatchOperationError(error_index=error_index,
azure.cosmos.exceptions.CosmosBatchOperationError: Status code: 400 Sub-status: 1001
There was an error in the transactional batch on index 0. Error message: BAD_REQUEST - Request being sent is invalid.
2

There are 2 answers

4
Balaji On

Sample data stored in my Azure Cosmos DB Container:

{
    "id": "order1",
    "product": "Laptop",
    "quantity": 2,
    "status": "Processing",
    "account_number": "Account1"
}

In the above data the status value is changed from Processing to completed by using replace_item method.

The item which is to be replaced in the container is stored in a items_to_replace list. It contains dictionaries where each dictionary represents an item to be replaced in the Cosmos DB collection.

partition key is specified in the read_item operation which is used while performing replace_item method. In your code there is no partition key specified in replace_item_operation .

Below is the code I tried by using replace batch operation:

import azure.cosmos.cosmos_client as cosmos_client
from azure.cosmos import exceptions

HOST = '*****'
MASTER_KEY = '*****'
DATABASE_ID = 'newDb'
CONTAINER_ID = 'newCont'  

def execute_replace_batch(container):
    print('\n1.10 Executing Batch Replace operation\n')

    items_to_replace = [
        {
            "id": "order1",
            "product": "Laptop",
            "quantity": 2,
            "status": "Completed",
            "account_number": "Account1"
        }
    ]

    for item in items_to_replace:
        try:
            existing_item = container.read_item(item['id'], partition_key=item['id'])
            replaced_item = container.replace_item(item=existing_item, body=item)
            print(f"Replaced item with id: {replaced_item['id']}")
        except exceptions.CosmosResourceNotFoundError:
            print(f"Item with id '{item['id']}' not found in the container.")

def run_sample():
    client = cosmos_client.CosmosClient(HOST, {'masterKey': MASTER_KEY})
    try:
        db = client.create_database_if_not_exists(id=DATABASE_ID)
        container = db.get_container_client(CONTAINER_ID)
        execute_replace_batch(container)

    finally:
        print("\nrun_sample done")

if __name__ == '__main__':
    run_sample()

Output:

1.10 Executing Batch Replace operation

Replaced item with id: order1

run_sample done

Item in the Azure Cosmos DB container after execution of the code:

{
    "id": "order1",
    "product": "Laptop",
    "quantity": 2,
    "status": "Completed",
    "account_number": "Account1"
}
2
Matias Quaranta On

HTTP 400 (BadRequest) with StatusCode 1001 means "Partition Key Mismatch".

See: BadRequest (400) with Substatus 1001 when adding new item in Azure Cosmos DB using Microsoft.Azure.CosmosRepository library

The container you are showing has /account_number as Partition Key Definition and the Partition Key Value on the screenshot is "Account1".

The items in your batch have no account_number property, they are missing the Partition Key Value in their bodies (you are specifying id and message only).