admin管理员组

文章数量:1122846

I've got a project on Databricks to load XML files from one folder. There are different logic for each type of file, so i've made several notebooks for each type of file that starts with a prefix. Example : Account_20240101.xml

My question :

  1. I want to load all the files that start with a prefix from a blob storage with only using PySpark libaries. I've managed to do this with OS library but my teammate says it's not efficient and asks it all to be in PySpark.

Here is the how i did it with OS :

%pip install azure-storage-blob

from azure.storage.blob import BlobServiceClient
from pyspark.sql import SparkSession
from pyspark.sql.functions import schema_of_xml, expr, col, current_timestamp,lit,explode_outer, input_file_name, regexp_extract,concat_ws
from pyspark.sql.types import NullType
# accounts-snapshots
import os  

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Load Multiple XML Files to DataFrame") \
    .getOrCreate()

# Azure Blob Storage credentials
storage_account_name = "********"
storage_account_key = "*****"
container_name = "incoming"
prefix = "accounts"
excluded_substring = "cards-snapshots"

# Path to store files locally
local_tmp_dir = "/dbfs/tmp/cards_files/"
os.makedirs(local_tmp_dir, exist_ok=True)  # Ensure the directory exists

# Initialize Blob Service Client
blob_service_client = BlobServiceClient(
    f"https://{storage_account_name}.blob.core.windows",
    credential=storage_account_key
)

# Connect to the container
container_client = blob_service_client.get_container_client(container_name)

# Download files with the specified prefix, excluding "cards-snapshots"
xml_files = []
for blob in container_client.list_blobs(name_starts_with=f"XML_Test/{prefix}"):
    # Filter out blobs containing the excluded substring
    if excluded_substring not in blob.name:
        local_file_path = os.path.join(local_tmp_dir, os.path.basename(blob.name))
        blob_client = container_client.get_blob_client(blob.name)
        try:
            with open(local_file_path, "wb") as file:
                file.write(blob_client.download_blob().readall())
            print(f"File downloaded successfully: {local_file_path}")
            xml_files.append(f"file://{local_file_path}")  # Add the local path with file:// prefix
        except Exception as e:
            print(f"Failed to download {blob.name}")
            print(e)

# Combine file paths as a comma-separated string
file_paths = ",".join(xml_files)

# Load all matching files into a single DataFrame 
# Add filename column to track the source file

raw_df = spark.read.format("com.databricks.spark.xml") \
    .option("rowTag", "AccountSnapshot") \
    .load(file_paths)\
    .withColumn(
        "@FileName",
        regexp_extract(input_file_name(), r"([^/]+)$", 1)  # Extract only the file name
    )

# Display the schema and a sample of the DataFrame
display(raw_df.limit(10))

i didn't manage to load it with Pyspark only directly from the blob with a prefix

here is the code for Pyspark only :

from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_extract, input_file_name

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Load Multiple XML Files to DataFrame") \
    .config("fs.azure.account.key.<your-storage-account-name>.blob.core.windows", "<your-storage-account-key>") \
    .getOrCreate()

# Azure Blob Storage credentials
storage_account_name = "******"
storage_account_key = "*******"
container_name = "incoming"
prefix = "accounts"
excluded_substring = "cards-snapshots"

# Define the path to Azure Blob Storage
blob_storage_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows/{prefix}*"

# Load files directly from Azure Blob Storage into a DataFrame
raw_df = spark.read.format("com.databricks.spark.xml") \
    .option("rowTag", "AccountSnapshot") \
    .load(blob_storage_path) \
    .filter(~input_file_name().contains(excluded_substring)) \
    .withColumn(
        "@FileName",
        regexp_extract(input_file_name(), r"([^/]+)$", 1)  # Extract only the file name
    )

# Display the schema and a sample of the DataFrame
raw_df.show(truncate=False)

i get an error on line : .load(blob_storage_path)
details of error:

Py4JJavaError: An error occurred while calling o412.load.
: shaded.databricks.apache.hadoop.fs.azure.AzureException: shaded.databricks.apache.hadoop.fs.azure.AzureException: Unable to access container incoming in account *******.blob.core.windows using anonymous credentials, and no credentials found for them  in the configuration

How do i load directly from Blob using PySpark?

本文标签: xmlDatabricksloading files with a prefix and by timestamp (metadata) from folderStack Overflow