admin管理员组文章数量:1122846
I've got a project on Databricks to load XML files from one folder. There are different logic for each type of file, so i've made several notebooks for each type of file that starts with a prefix. Example : Account_20240101.xml
My question :
- I want to load all the files that start with a prefix from a blob storage with only using PySpark libaries. I've managed to do this with OS library but my teammate says it's not efficient and asks it all to be in PySpark.
Here is the how i did it with OS :
%pip install azure-storage-blob
from azure.storage.blob import BlobServiceClient
from pyspark.sql import SparkSession
from pyspark.sql.functions import schema_of_xml, expr, col, current_timestamp,lit,explode_outer, input_file_name, regexp_extract,concat_ws
from pyspark.sql.types import NullType
# accounts-snapshots
import os
# Initialize Spark session
spark = SparkSession.builder \
.appName("Load Multiple XML Files to DataFrame") \
.getOrCreate()
# Azure Blob Storage credentials
storage_account_name = "********"
storage_account_key = "*****"
container_name = "incoming"
prefix = "accounts"
excluded_substring = "cards-snapshots"
# Path to store files locally
local_tmp_dir = "/dbfs/tmp/cards_files/"
os.makedirs(local_tmp_dir, exist_ok=True) # Ensure the directory exists
# Initialize Blob Service Client
blob_service_client = BlobServiceClient(
f"https://{storage_account_name}.blob.core.windows",
credential=storage_account_key
)
# Connect to the container
container_client = blob_service_client.get_container_client(container_name)
# Download files with the specified prefix, excluding "cards-snapshots"
xml_files = []
for blob in container_client.list_blobs(name_starts_with=f"XML_Test/{prefix}"):
# Filter out blobs containing the excluded substring
if excluded_substring not in blob.name:
local_file_path = os.path.join(local_tmp_dir, os.path.basename(blob.name))
blob_client = container_client.get_blob_client(blob.name)
try:
with open(local_file_path, "wb") as file:
file.write(blob_client.download_blob().readall())
print(f"File downloaded successfully: {local_file_path}")
xml_files.append(f"file://{local_file_path}") # Add the local path with file:// prefix
except Exception as e:
print(f"Failed to download {blob.name}")
print(e)
# Combine file paths as a comma-separated string
file_paths = ",".join(xml_files)
# Load all matching files into a single DataFrame
# Add filename column to track the source file
raw_df = spark.read.format("com.databricks.spark.xml") \
.option("rowTag", "AccountSnapshot") \
.load(file_paths)\
.withColumn(
"@FileName",
regexp_extract(input_file_name(), r"([^/]+)$", 1) # Extract only the file name
)
# Display the schema and a sample of the DataFrame
display(raw_df.limit(10))
i didn't manage to load it with Pyspark only directly from the blob with a prefix
here is the code for Pyspark only :
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_extract, input_file_name
# Initialize Spark session
spark = SparkSession.builder \
.appName("Load Multiple XML Files to DataFrame") \
.config("fs.azure.account.key.<your-storage-account-name>.blob.core.windows", "<your-storage-account-key>") \
.getOrCreate()
# Azure Blob Storage credentials
storage_account_name = "******"
storage_account_key = "*******"
container_name = "incoming"
prefix = "accounts"
excluded_substring = "cards-snapshots"
# Define the path to Azure Blob Storage
blob_storage_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows/{prefix}*"
# Load files directly from Azure Blob Storage into a DataFrame
raw_df = spark.read.format("com.databricks.spark.xml") \
.option("rowTag", "AccountSnapshot") \
.load(blob_storage_path) \
.filter(~input_file_name().contains(excluded_substring)) \
.withColumn(
"@FileName",
regexp_extract(input_file_name(), r"([^/]+)$", 1) # Extract only the file name
)
# Display the schema and a sample of the DataFrame
raw_df.show(truncate=False)
i get an error on line : .load(blob_storage_path)
details of error:
Py4JJavaError: An error occurred while calling o412.load.
: shaded.databricks.apache.hadoop.fs.azure.AzureException: shaded.databricks.apache.hadoop.fs.azure.AzureException: Unable to access container incoming in account *******.blob.core.windows using anonymous credentials, and no credentials found for them in the configuration
How do i load directly from Blob using PySpark?
本文标签: xmlDatabricksloading files with a prefix and by timestamp (metadata) from folderStack Overflow
版权声明:本文标题:xml - Databricks - loading files with a prefix and by timestamp (metadata) from folder - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1736282956a1926810.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论