admin管理员组

文章数量:1356851

I was trying to load a netcdf file and convert it into several parquet files as chunk for further processing. Each file contains data for a unique lat, lon pair for a given time period.

So I started with reading the data. I have already precomputed the unique lat, lon pairs:

# Load unique lat/lon pairs
unique_data = np.load("/data/swadhin/processed_imager/filtered/meteosat/interp/unique_lat_lon.npz")
unique_lat = unique_data["latitude"]
unique_lon = unique_data["longitude"]
lat_lon_pairs = list(zip(unique_lat, unique_lon))

# Round lat/lon for consistency with file names
lat_lon_pairs = [(round(lat, 6), round(lon, 6)) for lat, lon in zip(unique_lat, unique_lon)]

The netcdf and parquet files:

# Paths
nc_dir = "/data/swadhin/processed_imager/filtered/meteosat/interp/atm_variables/temporal/t_test"
parquet_dir = "/data/swadhin/processed_imager/filtered/meteosat/interp/atm_variables/temporal/parquet"

# Load all existing Parquet filenames into a set **once**
existing_files = set(os.listdir(parquet_dir)

Then I read the nc file and process it in parallel with, each process working for a set of lat, lon:

def get_optimal_workers():
    """Determine an optimal number of workers based on CPU and memory availability."""
    cpu_count = psutil.cpu_count(logical=False)  # Physical cores
    available_mem = psutil.virtual_memory().available / (1024 ** 3)  # GB
    max_workers = min(cpu_count, int(available_mem / 10))  # Adjust per workload
    return max(30, max_workers)  # Ensure at least 10 workers

def save_to_parquet(df, file_path):
    """Handles appending to a Parquet file correctly."""
    table = pa.Table.from_pandas(df)
    
    if os.path.exists(file_path):
        # Read existing data and append
        existing_table = pq.read_table(file_path)
        combined_table = pa.concat_tables([existing_table, table])
        pq.write_table(combined_table, file_path, compression="snappy")
    else:
        pq.write_table(table, file_path, compression="snappy")

def process_lat_lon(lat, lon, ds):
    """Extracts data for a single (lat, lon) pair and appends to Parquet."""
    # Find the exact index in the 2D latitude/longitude grid
    indices = np.where((ds.latitude == lat) & (ds.longitude == lon))
    if len(indices[0]) == 0:
        return  # Skip if not found (shouldn't happen)
    
    lat_idx, lon_idx = indices[0][0], indices[1][0]

    # Extract time, levels, and temperature values
    time_vals = ds.time.values
    temp_vals = ds.t[:, :, lat_idx, lon_idx].values  # Shape: (time, levels)

    # Skip if all values are NaN
    if np.isnan(temp_vals).all():
        return
    
    # Prepare DataFrame
    df = pd.DataFrame({
        "latitude": lat,
        "longitude": lon,
        "time": time_vals,
        "variable": temp_vals.tolist()  # Store level values as a list
    })
    
    parquet_filename = f"{lat:.6f}_{lon:.6f}.parquet"
    if parquet_filename in existing_files:
        print(f"File {parquet_filename} exists. Skipping...")
    else:
        print(f"File {parquet_filename} does not exist. Creating new file...")
        save_to_parquet(df, os.path.join(parquet_dir, parquet_filename))


def process_lat_lon_wrapper(lat_lon, nc_file):
    """Each worker process opens the dataset separately."""
    file_path = os.path.join(nc_dir, nc_file)
    ds = xr.open_dataset(file_path).chunk({"time": -1})  # Load dataset then rechunk

    lat, lon = lat_lon
    return process_lat_lon(lat, lon, ds)  # Pass dataset explicitly

def process_nc_file(nc_file):
    file_path = os.path.join(nc_dir, nc_file)
    print(f"Processing {nc_file}...")

    # Get list of unique lat/lon pairs
    lat_lon_pairs = list(zip(unique_lat, unique_lon))

    # Parallel processing using ProcessPoolExecutor
    with ProcessPoolExecutor(max_workers= max(40, get_optimal_workers())) as executor:
        list(tqdm(executor.map(process_lat_lon_wrapper, lat_lon_pairs, itertools.repeat(nc_file)), 
                  total=len(lat_lon_pairs), 
                  desc=f"Processing {nc_file}"))
        
    print(f"✅ {nc_file} processed.")

    del ds  # Close the dataset
    gc.collect()

# Process each NetCDF file
for nc_file in os.listdir(nc_dir):
    if nc_file.endswith(".nc"):
        process_nc_file(nc_file)

But this code works very slowly, almost taking 7-8 hours for a single netcdf file. Is there any way it can be made faster, or am I doing it inefficiently?

本文标签: Error in Converting a NetCDF File to Pandas Parquet (CSV)Stack Overflow