admin管理员组文章数量:1126991
I am trying to upload a JSON file to Azure Cosmos DB - Gremlin API. I created a partition key (PK) as /LOCATIONSTATE in Cosmos DB which is one of the tags in JSON. Below is the python script I am using to load the data. I have rechecked twice and there are no null values in the JSON data for PK. How can I fix this error?
I have tried changing the PK value to some other JSON tag like /LOCATIONZIP or /LOCATIONLOOKUPID but the error remains the same. Is Cosmos not able to read the data because of ""? Please help!
import json
import requests
from gremlin_python.driver import client, serializer
import uuid # To generate unique IDs if 'id' is missing
from azure.storage.filedatalake import DataLakeServiceClient
# Replace with your own details
endpoint = ""
database = ""
graph = ""
primary_key = ""
# ADLS connection details
adls_account_url = ""
adls_file_system = "json"
adls_file_path = ""
# Azure Data Lake Storage Client
def get_adls_client():
service_client = DataLakeServiceClient(account_url=adls_account_url, credential="")
return service_client
def read_json_file_from_adls(file_name):
service_client = get_adls_client()
file_system_client = service_client.get_file_system_client(file_system=adls_file_system)
file_client = file_system_client.get_file_client(file_name)
file_content = file_client.download_file()
return json.loads(file_content.readall().decode('utf-8'))
# Initialize the Gremlin client
gremlin_client = client.Client(
endpoint,
'g',
username=f"/dbs/{database}/colls/{graph}",
password=primary_key,
message_serializer=serializer.GraphSONSerializersV2d0()
)
# Function to process a single JSON file and create the graph
def process_json_file(file_name):
# Read the JSON file from ADLS
json_data = read_json_file_from_adls(file_name)
print(f"Processing file: {file_name}")
# Iterate over each record in the JSON array
for record in json_data["XYZ"]:
# Use LOCATIONSTATE as the partition key
location_state = record.get("LOCATIONSTATE", "").strip()
if not location_state:
location_state = str(uuid.uuid4()) # Fallback to a generated unique ID
record["LOCATIONSTATE"] = location_state # Optionally update the record with the new LOCATIONSTATE
location_name = record.get("LOCATIONNAME", "").replace("'", "\\'")
location_city = record.get("LOCATIONCITY", "").replace("'", "\\'")
location_state = record.get("LOCATIONSTATE", "").replace("'", "\\'")
# Create LOCATIONSTATE vertex
try:
create_location_state_query = (
f"g.addV('location_state').property('id', '{location_state}')"
f".property('zip', '{location_state}')"
f".property('partitionKey', '{location_state}')"
)
gremlin_client.submit(create_location_state_query).all().result()
print(f"Created location_state vertex: {location_state}")
except Exception as e:
print(f"Error creating location_state vertex {location_state}: {e}")
continue
# Create LOCATIONNAME vertex and add edge
location_name_id = str(uuid.uuid4())
try:
create_location_name_query = (
f"g.addV('location_name').property('id', '{location_name_id}')"
f".property('name', '{location_name}')"
f".property('partitionKey', '{location_state}')"
)
gremlin_client.submit(create_location_name_query).all().result()
edge_location_name_query = f"g.V('{location_state}').addE('name').to(g.V('{location_name_id}'))"
gremlin_client.submit(edge_location_name_query).all().result()
print(f"Created edge: {location_state} -> {location_name}")
except Exception as e:
print(f"Error creating location_name or edge for {location_state}: {e}")
# Create LOCATIONCITY vertex and add edge
location_city_id = str(uuid.uuid4())
try:
create_location_city_query = (
f"g.addV('location_city').property('id', '{location_city_id}')"
f".property('city', '{location_city}')"
f".property('partitionKey', '{location_state}')"
)
gremlin_client.submit(create_location_city_query).all().result()
edge_location_city_query = f"g.V('{location_state}').addE('city').to(g.V('{location_city_id}'))"
gremlin_client.submit(edge_location_city_query).all().result()
print(f"Created edge: {location_state} -> {location_city}")
except Exception as e:
print(f"Error creating location_city or edge for {location_state}: {e}")
# Create LOCATIONSTATE vertex and add edge
location_state_id = str(uuid.uuid4())
try:
create_location_state_query = (
f"g.addV('location_state').property('id', '{location_state_id}')"
f".property('state', '{location_state}')"
f".property('partitionKey', '{location_state}')"
)
gremlin_client.submit(create_location_state_query).all().result()
edge_location_state_query = f"g.V('{location_state}').addE('state').to(g.V('{location_state_id}'))"
gremlin_client.submit(edge_location_state_query).all().result()
print(f"Created edge: {location_state} -> {location_state}")
except Exception as e:
print(f"Error creating location_state or edge for {location_state}: {e}")
# Process the JSON file
try:
process_json_file(adls_file_path)
except Exception as e:
print(f"Error processing the file: {e}")
# Close the Gremlin client
gremlin_client.close()
Here is the sample JSON file:
{
"XYZ Grocery": [
{
"Section" : "XYZ",
"LOCATIONLOOKUPID" : "123",
"LOCATIONNAME" : "abc",
"LOCATIONSTREETADDRESS1" : "def",
"LOCATIONSTREETADDRESS2" : null,
"LOCATIONSTREETADDRESS3" : null,
"LOCATIONCITY" : "hunter",
"LOCATIONSTATE" : "IL",
"LOCATIONZIP" : "R34",
"LOCATIONCOUNTRY" : "USA",
"LOCATIONURL" : "",
"LOCATIONTYPE" : "",
"CONTACTTYPE" : "",
"CONTACTISPRIMARY" : "0",
"CONTACTJOBTITLE" : "",
"CONTACTTITLE" : "",
"CONTACTNICKNAME" : null
},
{
"Section" : "XYZ",
"LOCATIONLOOKUPID" : "456",
"LOCATIONNAME" : "mno",
"LOCATIONSTREETADDRESS1" : "stu",
"LOCATIONSTREETADDRESS2" : null,
"LOCATIONSTREETADDRESS3" : null,
"LOCATIONCITY" : "hunter",
"LOCATIONSTATE" : "IL",
"LOCATIONZIP" : "60511",
"LOCATIONCOUNTRY" : "USA",
"LOCATIONURL" : "",
"LOCATIONTYPE" : "",
"CONTACTTYPE" : "",
"CONTACTISPRIMARY" : "0",
"CONTACTJOBTITLE" : "",
"CONTACTTITLE" : "",
"CONTACTNICKNAME" : null
}
I am trying to upload a JSON file to Azure Cosmos DB - Gremlin API. I created a partition key (PK) as /LOCATIONSTATE in Cosmos DB which is one of the tags in JSON. Below is the python script I am using to load the data. I have rechecked twice and there are no null values in the JSON data for PK. How can I fix this error?
I have tried changing the PK value to some other JSON tag like /LOCATIONZIP or /LOCATIONLOOKUPID but the error remains the same. Is Cosmos not able to read the data because of ""? Please help!
import json
import requests
from gremlin_python.driver import client, serializer
import uuid # To generate unique IDs if 'id' is missing
from azure.storage.filedatalake import DataLakeServiceClient
# Replace with your own details
endpoint = ""
database = ""
graph = ""
primary_key = ""
# ADLS connection details
adls_account_url = ""
adls_file_system = "json"
adls_file_path = ""
# Azure Data Lake Storage Client
def get_adls_client():
service_client = DataLakeServiceClient(account_url=adls_account_url, credential="")
return service_client
def read_json_file_from_adls(file_name):
service_client = get_adls_client()
file_system_client = service_client.get_file_system_client(file_system=adls_file_system)
file_client = file_system_client.get_file_client(file_name)
file_content = file_client.download_file()
return json.loads(file_content.readall().decode('utf-8'))
# Initialize the Gremlin client
gremlin_client = client.Client(
endpoint,
'g',
username=f"/dbs/{database}/colls/{graph}",
password=primary_key,
message_serializer=serializer.GraphSONSerializersV2d0()
)
# Function to process a single JSON file and create the graph
def process_json_file(file_name):
# Read the JSON file from ADLS
json_data = read_json_file_from_adls(file_name)
print(f"Processing file: {file_name}")
# Iterate over each record in the JSON array
for record in json_data["XYZ"]:
# Use LOCATIONSTATE as the partition key
location_state = record.get("LOCATIONSTATE", "").strip()
if not location_state:
location_state = str(uuid.uuid4()) # Fallback to a generated unique ID
record["LOCATIONSTATE"] = location_state # Optionally update the record with the new LOCATIONSTATE
location_name = record.get("LOCATIONNAME", "").replace("'", "\\'")
location_city = record.get("LOCATIONCITY", "").replace("'", "\\'")
location_state = record.get("LOCATIONSTATE", "").replace("'", "\\'")
# Create LOCATIONSTATE vertex
try:
create_location_state_query = (
f"g.addV('location_state').property('id', '{location_state}')"
f".property('zip', '{location_state}')"
f".property('partitionKey', '{location_state}')"
)
gremlin_client.submit(create_location_state_query).all().result()
print(f"Created location_state vertex: {location_state}")
except Exception as e:
print(f"Error creating location_state vertex {location_state}: {e}")
continue
# Create LOCATIONNAME vertex and add edge
location_name_id = str(uuid.uuid4())
try:
create_location_name_query = (
f"g.addV('location_name').property('id', '{location_name_id}')"
f".property('name', '{location_name}')"
f".property('partitionKey', '{location_state}')"
)
gremlin_client.submit(create_location_name_query).all().result()
edge_location_name_query = f"g.V('{location_state}').addE('name').to(g.V('{location_name_id}'))"
gremlin_client.submit(edge_location_name_query).all().result()
print(f"Created edge: {location_state} -> {location_name}")
except Exception as e:
print(f"Error creating location_name or edge for {location_state}: {e}")
# Create LOCATIONCITY vertex and add edge
location_city_id = str(uuid.uuid4())
try:
create_location_city_query = (
f"g.addV('location_city').property('id', '{location_city_id}')"
f".property('city', '{location_city}')"
f".property('partitionKey', '{location_state}')"
)
gremlin_client.submit(create_location_city_query).all().result()
edge_location_city_query = f"g.V('{location_state}').addE('city').to(g.V('{location_city_id}'))"
gremlin_client.submit(edge_location_city_query).all().result()
print(f"Created edge: {location_state} -> {location_city}")
except Exception as e:
print(f"Error creating location_city or edge for {location_state}: {e}")
# Create LOCATIONSTATE vertex and add edge
location_state_id = str(uuid.uuid4())
try:
create_location_state_query = (
f"g.addV('location_state').property('id', '{location_state_id}')"
f".property('state', '{location_state}')"
f".property('partitionKey', '{location_state}')"
)
gremlin_client.submit(create_location_state_query).all().result()
edge_location_state_query = f"g.V('{location_state}').addE('state').to(g.V('{location_state_id}'))"
gremlin_client.submit(edge_location_state_query).all().result()
print(f"Created edge: {location_state} -> {location_state}")
except Exception as e:
print(f"Error creating location_state or edge for {location_state}: {e}")
# Process the JSON file
try:
process_json_file(adls_file_path)
except Exception as e:
print(f"Error processing the file: {e}")
# Close the Gremlin client
gremlin_client.close()
Here is the sample JSON file:
{
"XYZ Grocery": [
{
"Section" : "XYZ",
"LOCATIONLOOKUPID" : "123",
"LOCATIONNAME" : "abc",
"LOCATIONSTREETADDRESS1" : "def",
"LOCATIONSTREETADDRESS2" : null,
"LOCATIONSTREETADDRESS3" : null,
"LOCATIONCITY" : "hunter",
"LOCATIONSTATE" : "IL",
"LOCATIONZIP" : "R34",
"LOCATIONCOUNTRY" : "USA",
"LOCATIONURL" : "",
"LOCATIONTYPE" : "",
"CONTACTTYPE" : "",
"CONTACTISPRIMARY" : "0",
"CONTACTJOBTITLE" : "",
"CONTACTTITLE" : "",
"CONTACTNICKNAME" : null
},
{
"Section" : "XYZ",
"LOCATIONLOOKUPID" : "456",
"LOCATIONNAME" : "mno",
"LOCATIONSTREETADDRESS1" : "stu",
"LOCATIONSTREETADDRESS2" : null,
"LOCATIONSTREETADDRESS3" : null,
"LOCATIONCITY" : "hunter",
"LOCATIONSTATE" : "IL",
"LOCATIONZIP" : "60511",
"LOCATIONCOUNTRY" : "USA",
"LOCATIONURL" : "",
"LOCATIONTYPE" : "",
"CONTACTTYPE" : "",
"CONTACTISPRIMARY" : "0",
"CONTACTJOBTITLE" : "",
"CONTACTTITLE" : "",
"CONTACTNICKNAME" : null
}
Share
Improve this question
edited Jan 9 at 11:37
dv_confusedcoder
asked Jan 8 at 19:58
dv_confusedcoderdv_confusedcoder
11 silver badge3 bronze badges
1
|
1 Answer
Reset to default 0Error During JSON Data Upload to Cosmos Gremlin - Gremlin Query Execution Error: Cannot add a vertex where the partition key property has value 'null'
As your code throws an error because the query in the code has hardcoded partition key for each vertex whereas below code query dynamically constructs query based on JSON data and it passes SAS token with read and write access. The data gets uploaded successfully into the Azure Cosmos DB Gremlin API Container as shown in the below output.
import json
import uuid
from gremlin_python.driver import client, serializer
from azure.storage.filedatalake import DataLakeServiceClient
endpoint = "wss://<accName>.gremlin.cosmos.azure.com:443/"
database = "locations"
graph = "locationGraph"
primary_key = "<primaryKey>"
adls_account_url = "https://<storageAccName>.dfs.core.windows.net"
adls_file_system = "cont1"
adls_file_path = "sample1.json"
def get_adls_client():
service_client = DataLakeServiceClient(account_url=adls_account_url, credential="<SAS token>")
return service_client
def read_json_file_from_adls(file_name):
service_client = get_adls_client()
file_system_client = service_client.get_file_system_client(file_system=adls_file_system)
file_client = file_system_client.get_file_client(file_name)
file_content = file_client.download_file()
return json.loads(file_content.readall().decode('utf-8'))
def process_json_file(file_name):
json_data = read_json_file_from_adls(file_name)
for location_data in json_data.get('XYZ Grocery', []):
try:
gremlin_query = "g.addV('Location')"
for key, value in location_data.items():
if value is not None:
gremlin_query += f".property('{key}', '{value}')"
gremlin_client.submit(gremlin_query)
print(f"Data for LOCATIONLOOKUPID {location_data['LOCATIONLOOKUPID']} successfully inserted into Cosmos DB Gremlin.")
except Exception as e:
print(f"Error inserting data for LOCATIONLOOKUPID {location_data['LOCATIONLOOKUPID']}: {e}")
gremlin_client = client.Client(
endpoint,
'g',
username=f"/dbs/{database}/colls/{graph}",
password=primary_key,
message_serializer=serializer.GraphSONSerializersV2d0()
)
try:
process_json_file(adls_file_path)
except Exception as e:
print(f"Error processing the file: {e}")
gremlin_client.close()
Output
Data for LOCATIONLOOKUPID 123 successfully inserted into Cosmos DB Gremlin.
Data for LOCATIONLOOKUPID 456 successfully inserted into Cosmos DB Gremlin.
本文标签:
版权声明:本文标题:azure cosmosdb gremlinapi - Error During JSON Data Upload to Cosmos Gremlin - Gremlin Query Execution Error: Cannot add a vertex 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1736690710a1947913.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
XYZ Grocery
as the top-level key, but your script refers toABC
. – Balaji Commented Jan 9 at 1:58