admin管理员组

文章数量:1126991

I am trying to upload a JSON file to Azure Cosmos DB - Gremlin API. I created a partition key (PK) as /LOCATIONSTATE in Cosmos DB which is one of the tags in JSON. Below is the python script I am using to load the data. I have rechecked twice and there are no null values in the JSON data for PK. How can I fix this error?

I have tried changing the PK value to some other JSON tag like /LOCATIONZIP or /LOCATIONLOOKUPID but the error remains the same. Is Cosmos not able to read the data because of ""? Please help!

import json
import requests
from gremlin_python.driver import client, serializer
import uuid  # To generate unique IDs if 'id' is missing
from azure.storage.filedatalake import DataLakeServiceClient

# Replace with your own details
endpoint = ""
database = ""
graph = ""
primary_key = ""

# ADLS connection details
adls_account_url = ""
adls_file_system = "json"
adls_file_path = ""

# Azure Data Lake Storage Client
def get_adls_client():
    service_client = DataLakeServiceClient(account_url=adls_account_url, credential="")
    return service_client

def read_json_file_from_adls(file_name):
    service_client = get_adls_client()
    file_system_client = service_client.get_file_system_client(file_system=adls_file_system)
    file_client = file_system_client.get_file_client(file_name)

    file_content = file_client.download_file()
    return json.loads(file_content.readall().decode('utf-8'))

# Initialize the Gremlin client
gremlin_client = client.Client(
    endpoint,
    'g',
    username=f"/dbs/{database}/colls/{graph}",
    password=primary_key,
    message_serializer=serializer.GraphSONSerializersV2d0()
)

# Function to process a single JSON file and create the graph
def process_json_file(file_name):
    # Read the JSON file from ADLS
    json_data = read_json_file_from_adls(file_name)

    print(f"Processing file: {file_name}")

    # Iterate over each record in the JSON array
    for record in json_data["XYZ"]:
        # Use LOCATIONSTATE as the partition key
        location_state = record.get("LOCATIONSTATE", "").strip()
        if not location_state:
            location_state = str(uuid.uuid4())  # Fallback to a generated unique ID
            record["LOCATIONSTATE"] = location_state  # Optionally update the record with the new LOCATIONSTATE

        location_name = record.get("LOCATIONNAME", "").replace("'", "\\'")
        location_city = record.get("LOCATIONCITY", "").replace("'", "\\'")
        location_state = record.get("LOCATIONSTATE", "").replace("'", "\\'")

        # Create LOCATIONSTATE vertex
        try:
            create_location_state_query = (
                f"g.addV('location_state').property('id', '{location_state}')"
                f".property('zip', '{location_state}')"
                f".property('partitionKey', '{location_state}')"
            )
            gremlin_client.submit(create_location_state_query).all().result()
            print(f"Created location_state vertex: {location_state}")
        except Exception as e:
            print(f"Error creating location_state vertex {location_state}: {e}")
            continue

        # Create LOCATIONNAME vertex and add edge
        location_name_id = str(uuid.uuid4())
        try:
            create_location_name_query = (
                f"g.addV('location_name').property('id', '{location_name_id}')"
                f".property('name', '{location_name}')"
                f".property('partitionKey', '{location_state}')"
            )
            gremlin_client.submit(create_location_name_query).all().result()

            edge_location_name_query = f"g.V('{location_state}').addE('name').to(g.V('{location_name_id}'))"
            gremlin_client.submit(edge_location_name_query).all().result()
            print(f"Created edge: {location_state} -> {location_name}")
        except Exception as e:
            print(f"Error creating location_name or edge for {location_state}: {e}")

        # Create LOCATIONCITY vertex and add edge
        location_city_id = str(uuid.uuid4())
        try:
            create_location_city_query = (
                f"g.addV('location_city').property('id', '{location_city_id}')"
                f".property('city', '{location_city}')"
                f".property('partitionKey', '{location_state}')"
            )
            gremlin_client.submit(create_location_city_query).all().result()

            edge_location_city_query = f"g.V('{location_state}').addE('city').to(g.V('{location_city_id}'))"
            gremlin_client.submit(edge_location_city_query).all().result()
            print(f"Created edge: {location_state} -> {location_city}")
        except Exception as e:
            print(f"Error creating location_city or edge for {location_state}: {e}")

        # Create LOCATIONSTATE vertex and add edge
        location_state_id = str(uuid.uuid4())
        try:
            create_location_state_query = (
                f"g.addV('location_state').property('id', '{location_state_id}')"
                f".property('state', '{location_state}')"
                f".property('partitionKey', '{location_state}')"
            )
            gremlin_client.submit(create_location_state_query).all().result()

            edge_location_state_query = f"g.V('{location_state}').addE('state').to(g.V('{location_state_id}'))"
            gremlin_client.submit(edge_location_state_query).all().result()
            print(f"Created edge: {location_state} -> {location_state}")
        except Exception as e:
            print(f"Error creating location_state or edge for {location_state}: {e}")

# Process the JSON file
try:
    process_json_file(adls_file_path)
except Exception as e:
    print(f"Error processing the file: {e}")

# Close the Gremlin client
gremlin_client.close()

Here is the sample JSON file:

{
"XYZ Grocery": [
    {
        "Section" : "XYZ",
        "LOCATIONLOOKUPID" : "123",
        "LOCATIONNAME" : "abc",
        "LOCATIONSTREETADDRESS1" : "def",
        "LOCATIONSTREETADDRESS2" : null,
        "LOCATIONSTREETADDRESS3" : null,
        "LOCATIONCITY" : "hunter",
        "LOCATIONSTATE" : "IL",
        "LOCATIONZIP" : "R34",
        "LOCATIONCOUNTRY" : "USA",
        "LOCATIONURL" : "",
        "LOCATIONTYPE" : "",
        "CONTACTTYPE" : "",
        "CONTACTISPRIMARY" : "0",
        "CONTACTJOBTITLE" : "",
        "CONTACTTITLE" : "",
        "CONTACTNICKNAME" : null
},
    {
            "Section" : "XYZ",
            "LOCATIONLOOKUPID" : "456",
            "LOCATIONNAME" : "mno",
            "LOCATIONSTREETADDRESS1" : "stu",
            "LOCATIONSTREETADDRESS2" : null,
            "LOCATIONSTREETADDRESS3" : null,
            "LOCATIONCITY" : "hunter",
            "LOCATIONSTATE" : "IL",
            "LOCATIONZIP" : "60511",
            "LOCATIONCOUNTRY" : "USA",
            "LOCATIONURL" : "",
            "LOCATIONTYPE" : "",
            "CONTACTTYPE" : "",
            "CONTACTISPRIMARY" : "0",
            "CONTACTJOBTITLE" : "",
            "CONTACTTITLE" : "",
            "CONTACTNICKNAME" : null
}

I am trying to upload a JSON file to Azure Cosmos DB - Gremlin API. I created a partition key (PK) as /LOCATIONSTATE in Cosmos DB which is one of the tags in JSON. Below is the python script I am using to load the data. I have rechecked twice and there are no null values in the JSON data for PK. How can I fix this error?

I have tried changing the PK value to some other JSON tag like /LOCATIONZIP or /LOCATIONLOOKUPID but the error remains the same. Is Cosmos not able to read the data because of ""? Please help!

import json
import requests
from gremlin_python.driver import client, serializer
import uuid  # To generate unique IDs if 'id' is missing
from azure.storage.filedatalake import DataLakeServiceClient

# Replace with your own details
endpoint = ""
database = ""
graph = ""
primary_key = ""

# ADLS connection details
adls_account_url = ""
adls_file_system = "json"
adls_file_path = ""

# Azure Data Lake Storage Client
def get_adls_client():
    service_client = DataLakeServiceClient(account_url=adls_account_url, credential="")
    return service_client

def read_json_file_from_adls(file_name):
    service_client = get_adls_client()
    file_system_client = service_client.get_file_system_client(file_system=adls_file_system)
    file_client = file_system_client.get_file_client(file_name)

    file_content = file_client.download_file()
    return json.loads(file_content.readall().decode('utf-8'))

# Initialize the Gremlin client
gremlin_client = client.Client(
    endpoint,
    'g',
    username=f"/dbs/{database}/colls/{graph}",
    password=primary_key,
    message_serializer=serializer.GraphSONSerializersV2d0()
)

# Function to process a single JSON file and create the graph
def process_json_file(file_name):
    # Read the JSON file from ADLS
    json_data = read_json_file_from_adls(file_name)

    print(f"Processing file: {file_name}")

    # Iterate over each record in the JSON array
    for record in json_data["XYZ"]:
        # Use LOCATIONSTATE as the partition key
        location_state = record.get("LOCATIONSTATE", "").strip()
        if not location_state:
            location_state = str(uuid.uuid4())  # Fallback to a generated unique ID
            record["LOCATIONSTATE"] = location_state  # Optionally update the record with the new LOCATIONSTATE

        location_name = record.get("LOCATIONNAME", "").replace("'", "\\'")
        location_city = record.get("LOCATIONCITY", "").replace("'", "\\'")
        location_state = record.get("LOCATIONSTATE", "").replace("'", "\\'")

        # Create LOCATIONSTATE vertex
        try:
            create_location_state_query = (
                f"g.addV('location_state').property('id', '{location_state}')"
                f".property('zip', '{location_state}')"
                f".property('partitionKey', '{location_state}')"
            )
            gremlin_client.submit(create_location_state_query).all().result()
            print(f"Created location_state vertex: {location_state}")
        except Exception as e:
            print(f"Error creating location_state vertex {location_state}: {e}")
            continue

        # Create LOCATIONNAME vertex and add edge
        location_name_id = str(uuid.uuid4())
        try:
            create_location_name_query = (
                f"g.addV('location_name').property('id', '{location_name_id}')"
                f".property('name', '{location_name}')"
                f".property('partitionKey', '{location_state}')"
            )
            gremlin_client.submit(create_location_name_query).all().result()

            edge_location_name_query = f"g.V('{location_state}').addE('name').to(g.V('{location_name_id}'))"
            gremlin_client.submit(edge_location_name_query).all().result()
            print(f"Created edge: {location_state} -> {location_name}")
        except Exception as e:
            print(f"Error creating location_name or edge for {location_state}: {e}")

        # Create LOCATIONCITY vertex and add edge
        location_city_id = str(uuid.uuid4())
        try:
            create_location_city_query = (
                f"g.addV('location_city').property('id', '{location_city_id}')"
                f".property('city', '{location_city}')"
                f".property('partitionKey', '{location_state}')"
            )
            gremlin_client.submit(create_location_city_query).all().result()

            edge_location_city_query = f"g.V('{location_state}').addE('city').to(g.V('{location_city_id}'))"
            gremlin_client.submit(edge_location_city_query).all().result()
            print(f"Created edge: {location_state} -> {location_city}")
        except Exception as e:
            print(f"Error creating location_city or edge for {location_state}: {e}")

        # Create LOCATIONSTATE vertex and add edge
        location_state_id = str(uuid.uuid4())
        try:
            create_location_state_query = (
                f"g.addV('location_state').property('id', '{location_state_id}')"
                f".property('state', '{location_state}')"
                f".property('partitionKey', '{location_state}')"
            )
            gremlin_client.submit(create_location_state_query).all().result()

            edge_location_state_query = f"g.V('{location_state}').addE('state').to(g.V('{location_state_id}'))"
            gremlin_client.submit(edge_location_state_query).all().result()
            print(f"Created edge: {location_state} -> {location_state}")
        except Exception as e:
            print(f"Error creating location_state or edge for {location_state}: {e}")

# Process the JSON file
try:
    process_json_file(adls_file_path)
except Exception as e:
    print(f"Error processing the file: {e}")

# Close the Gremlin client
gremlin_client.close()

Here is the sample JSON file:

{
"XYZ Grocery": [
    {
        "Section" : "XYZ",
        "LOCATIONLOOKUPID" : "123",
        "LOCATIONNAME" : "abc",
        "LOCATIONSTREETADDRESS1" : "def",
        "LOCATIONSTREETADDRESS2" : null,
        "LOCATIONSTREETADDRESS3" : null,
        "LOCATIONCITY" : "hunter",
        "LOCATIONSTATE" : "IL",
        "LOCATIONZIP" : "R34",
        "LOCATIONCOUNTRY" : "USA",
        "LOCATIONURL" : "",
        "LOCATIONTYPE" : "",
        "CONTACTTYPE" : "",
        "CONTACTISPRIMARY" : "0",
        "CONTACTJOBTITLE" : "",
        "CONTACTTITLE" : "",
        "CONTACTNICKNAME" : null
},
    {
            "Section" : "XYZ",
            "LOCATIONLOOKUPID" : "456",
            "LOCATIONNAME" : "mno",
            "LOCATIONSTREETADDRESS1" : "stu",
            "LOCATIONSTREETADDRESS2" : null,
            "LOCATIONSTREETADDRESS3" : null,
            "LOCATIONCITY" : "hunter",
            "LOCATIONSTATE" : "IL",
            "LOCATIONZIP" : "60511",
            "LOCATIONCOUNTRY" : "USA",
            "LOCATIONURL" : "",
            "LOCATIONTYPE" : "",
            "CONTACTTYPE" : "",
            "CONTACTISPRIMARY" : "0",
            "CONTACTJOBTITLE" : "",
            "CONTACTTITLE" : "",
            "CONTACTNICKNAME" : null
}
Share Improve this question edited Jan 9 at 11:37 dv_confusedcoder asked Jan 8 at 19:58 dv_confusedcoderdv_confusedcoder 11 silver badge3 bronze badges 1
  • Your JSON uses XYZ Grocery as the top-level key, but your script refers to ABC. – Balaji Commented Jan 9 at 1:58
Add a comment  | 

1 Answer 1

Reset to default 0

Error During JSON Data Upload to Cosmos Gremlin - Gremlin Query Execution Error: Cannot add a vertex where the partition key property has value 'null'

As your code throws an error because the query in the code has hardcoded partition key for each vertex whereas below code query dynamically constructs query based on JSON data and it passes SAS token with read and write access. The data gets uploaded successfully into the Azure Cosmos DB Gremlin API Container as shown in the below output.

import json
import uuid
from gremlin_python.driver import client, serializer
from azure.storage.filedatalake import DataLakeServiceClient

endpoint = "wss://<accName>.gremlin.cosmos.azure.com:443/"  
database = "locations"    
graph = "locationGraph"          
primary_key = "<primaryKey>"   

adls_account_url = "https://<storageAccName>.dfs.core.windows.net"
adls_file_system = "cont1"                                  
adls_file_path = "sample1.json"                                            

def get_adls_client():
    service_client = DataLakeServiceClient(account_url=adls_account_url, credential="<SAS token>")  
    return service_client

def read_json_file_from_adls(file_name):
    service_client = get_adls_client()
    file_system_client = service_client.get_file_system_client(file_system=adls_file_system)
    file_client = file_system_client.get_file_client(file_name)

    file_content = file_client.download_file()
    return json.loads(file_content.readall().decode('utf-8'))

def process_json_file(file_name):
    json_data = read_json_file_from_adls(file_name)
    
    for location_data in json_data.get('XYZ Grocery', []):
        
        try:
            gremlin_query = "g.addV('Location')"
            
            for key, value in location_data.items():
                if value is not None:  
                    gremlin_query += f".property('{key}', '{value}')"
            
            gremlin_client.submit(gremlin_query)
            print(f"Data for LOCATIONLOOKUPID {location_data['LOCATIONLOOKUPID']} successfully inserted into Cosmos DB Gremlin.")
        except Exception as e:
            print(f"Error inserting data for LOCATIONLOOKUPID {location_data['LOCATIONLOOKUPID']}: {e}")

gremlin_client = client.Client(
    endpoint,
    'g',
    username=f"/dbs/{database}/colls/{graph}",
    password=primary_key,
    message_serializer=serializer.GraphSONSerializersV2d0()
)

try:
    process_json_file(adls_file_path)
except Exception as e:
    print(f"Error processing the file: {e}")

gremlin_client.close()

Output

Data for LOCATIONLOOKUPID 123 successfully inserted into Cosmos DB Gremlin.
Data for LOCATIONLOOKUPID 456 successfully inserted into Cosmos DB Gremlin.

本文标签: