admin管理员组

文章数量:1405907

I have 2 environments, Staging & Production. Both using the exact code, but one environment can read the contents within the file, while other cannot.

I can see both FileInfo using 2 codes:

Python:
mssparkutils.fs.ls(path)

mssparkutils.fs.ls(f'file:{mssparkutils.fs.getMountPath("/mount1")}{staging_path}')
Output:
FileInfo(path=abfss://container_name@storage_account.dfs.core.windows/Staging_path/test.csv, 
name=test.csv, size=1000)]

Output:
FileInfo(path=file:/synfs/notebook/22/mount1/Staging_path/test.csv, 
name=test.csv, size=1000)]

Staging works, but when I try this in Production

df = pd.read_csv(f'file:{mssparkutils.fs.getMountPath("/mount1")}{staging_path}test.csv')
display(df)

<urlopen error [Errno 5] Input/output error: '/synfs/notebook/22/mount1/Staging_path/test.csv'>

I have 2 environments, Staging & Production. Both using the exact code, but one environment can read the contents within the file, while other cannot.

I can see both FileInfo using 2 codes:

Python:
mssparkutils.fs.ls(path)

mssparkutils.fs.ls(f'file:{mssparkutils.fs.getMountPath("/mount1")}{staging_path}')
Output:
FileInfo(path=abfss://container_name@storage_account.dfs.core.windows/Staging_path/test.csv, 
name=test.csv, size=1000)]

Output:
FileInfo(path=file:/synfs/notebook/22/mount1/Staging_path/test.csv, 
name=test.csv, size=1000)]

Staging works, but when I try this in Production

df = pd.read_csv(f'file:{mssparkutils.fs.getMountPath("/mount1")}{staging_path}test.csv')
display(df)

<urlopen error [Errno 5] Input/output error: '/synfs/notebook/22/mount1/Staging_path/test.csv'>

Share Improve this question edited Mar 7 at 6:49 Dan Wang asked Mar 7 at 6:31 Dan WangDan Wang 215 bronze badges 1
  • it could be permission issue. try getting using spark once spark.read.csv("path_without_file_prefix") – JayashankarGS Commented Mar 10 at 11:19
Add a comment  | 

1 Answer 1

Reset to default 0

Make sure the Managed Identity assigned in the Production environment has the necessary permissions to access both the storage account and the specific file. Without the right permissions, the system will not be able to read the file properly.

Then, confirm that the mount point (/mount1) is correctly set up in Production. You can check the list of mounts using the below code:

mssparkutils.fs.mounts()

If you see /mount1 is missing or incorrectly mounted, you can remount it using the below code:

mssparkutils.fs.unmount("/mount1")
mssparkutils.fs.mount(
    "abfss://<Yourcontainername>@a<Your storage account name>dfs.core.windows",
    "/mount1",
    {"linkedService": "workspacestoragetest"}
)

After remounting, check if that the file path exists and is accessible by listing the directory contents using the below code:

mssparkutils.fs.ls(f'file:{mssparkutils.fs.getMountPath("/mount1")}{staging_path}')

ERROR: <urlopen error [Errno 5] Input/output error: '/synfs/notebook/22/mount1/Staging_path/test.csv'>

If you are still seeing an Input/Output error, it could be due to network issues. Check for any firewall rules or network restrictions that might be blocking access to the storage account from the Production environment.

If the Linked Service to Azure Data Lake Storage Gen2 is using a managed private endpoint with a dfs URI, you'll also need to set up a secondary managed private endpoint using the Azure Blob Storage option with a blob URI. This ensures that the internal fsspec/adlfs library can properly connect via the BlobServiceClient interface.

Know more about from this link

it is a good idea to implement retry logic to avoid failures due to temporary issues. Here's how you can do it:

import time
from urllib.error import URLError
retries = 3
for attempt in range(retries):
    try:
        df0 = pd.read_csv(
            f'file:{mssparkutils.fs.getMountPath("/mount1")}{staging_path}ABC.zip',
            compression='zip', sep='|', names=abc, dtype=xyz
        )
        break  # Exit the loop if successful
    except URLError as e:
        if attempt < retries - 1:
            time.sleep(5)  # Wait for 5 seconds before retrying
            continue
        else:
            raise e  # Raise the error if all retries fail
            

To make debugging easier, add logging so you can capture details about any errors:

import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
try:
    df0 = pd.read_csv(
        f'file:{mssparkutils.fs.getMountPath("/mount1")}{staging_path}ABC.zip',
        compression='zip', sep='|', names=abc, dtype=xyz
    )
except URLError as e:
    logger.error(f"Error reading file: {e}")
    raise e

By following these steps, you can identify the root cause of the issue—whether it’s permissions, mount points, network restrictions, or transient errors—and apply the necessary fix.

本文标签: