admin管理员组

文章数量:1123507

I am running a test job in the CI/CD github actions for functional tests. It read source files, run the job, then compare the actual df with the expected file given in tests/ressources folder. The code of the job is given by :

import shutil
import sys
from pathlib import Path

import pytest
from pyspark.sql import SparkSession

sys.path.append("../..")
from bronze_supplydata_staging.job import Job
from bronze_supplydata_staging.job_parameters import JobParameters
from supplydata_ec_utils.logger import create_logger
from supplydata_ec_utils.paths import get_resource_path
from supplydata_ec_utils.spark import create_spark_session
from supplydata_ec_utils.test_utils import (
    are_dataframe_equal,
    build_actual_output,
    build_expected_output,
    clean,
    create_database,
    get_testing_directory,
    list_expected_tables,
)

logger = create_logger(__name__)

# The following fluxes an all be tested :

flux_config = {
    "r10_17x100a100a04388_17x100a100f0054x": "2023-03-15",
    "r10_17x100a100b0591v_17x100a100f0054x": "2023-02-07",
    "r10_17x100a100a03748_17x100a100f0054x": "2024-09-25",
    "r10_17x100a100a05473_17x100a100f0054x": "2024-10-03",
    "r10_17x100b100a2035a_17x100a100f0054x": "2024-10-17",
    "r10_17x100b100a20570_17x100a100f0054x": "2021-12-13",
    "r10_17x100b100a2003n_17x100a100f0054x": "2024-09-05",
    "r10_17x100a100a05546_17x100a100f0054x": "2023-01-06",
    "r10_17x100a100a0416i_17x100a100f0054x": "2024-11-12",
    "r10_17x100b100a2021l_17x100a100f0054x": "2024-11-07",
    "r10_17x100a100a0513k_17x100a100f0054x": "2023-02-07",
    "r10_17x100a100a0503n_17x100a100f0054x": "2023-02-07",
    "r10_17x100a100a03578_17x100a100f0054x": "2023-02-10",
    "r10_17xstavold2096h_17x100a100f0054x": "2024-11-13",
    "r10_17x100a100a0001a_17x100a100f0054x": "2024-11-28",
    "r10_17x100b100a20376_17x100a100f0054x": "2024-11-13",
    "r10_17x100a100a04752_17x100a100f0054x": "2024-11-15",
    "r10_17x100a100a0462b_17x100a100f0054x": "2023-02-07",
    "r10a_17x100a100a0001a_17x100a100f0054x": "2024-11-25",
    "c12_17x100a100a0001a_17x100a100f0054x": "2024-11-20",
    "r4q_17x100a100a0001a_17x100a100f0054x": "2024-11-20",
    "c15_17x100a100a0001a_17x100a100f0054x": "2024-11-20",
    "c15_17x100a100a04752_17x100a100f0054x": "2024-10-29",
    "r4h_17x100a100a0001a_17x100a100f0054x": "2024-11-26",
    "r4m_17x100a100a0001a_17x100a100f0054x": "2024-11-03",
    "r4c_17x100b100a2015g_17x100a100f0054x": "2024-11-15",
    "r4c_17x100a100a0513k_17x100a100f0054x": "2024-11-22",
    "r4c_17x100a100a0462b_17x100a100f0054x": "2024-11-29",
    "r4c_17x100a100a03748_17x100a100f0054x": "2023-03-14",
    "r4c_17x100a100a04388_17x100a100f0054x": "2024-11-15",
    "r4c_17x100a100a04671_17x100a100f0054x": "2024-11-24",
    "r4c_17x100a100b0591v_17x100a100f0054x": "2024-12-05",
    "r4c_17x100a100a0503n_17x100a100f0054x": "2024-12-04",
    "r4c_17x100a100a03772_17x100a100f0054x": "2024-12-04",
    "r4c_17x100a100a05473_17x100a100f0054x": "2024-10-03",
    "r4c_17x100a100a05546_17x100a100f0054x": "2024-12-03",
    "r4c_17x100a100a0396z_17x100a100f0054x": "2024-11-04",
    "r4c_17x100a100a04752_17x100a100f0054x": "2024-11-15",
    "r4c_17x100b100a20376_17x100a100f0054x": "2024-11-13",
    "r4c_17x100b100a2078t_17x100a100f0054x": "2023-07-06",
    "r4c_17xstavold2096h_17x100a100f0054x": "2024-11-13",
    "r4c_17x100a100a03578_17x100a100f0054x": "2024-12-08",
    "bigadjusted_10xfrrteq_11xcnrddsvefoo": "2024-11-19",
    "r18_17x100b100a20376_17x100a100f0054x": "2022-12-08",
    "r18_17x100a100a03683_17x100a100f0054x": "2022-12-08",
    "r18_17x100a100a0001a_11xcnrddsvefoo": "2020-04-10",
    "r18_17x100b100a2157x_17x100a100f0054x": "2022-10-21",
    "r18_17x100b100a2035a_17x100a100f0054x": "2022-10-10",
    "r18_17x100b100a2009b_17x100a100f0054x": "2023-01-06",
    "r18_17x100a100a04752_17x100a100f0054x": "2024-08-06",
    "r18_17x100a100a0416i_17x100a100f0054x": "2023-06-05",
    "r18_17x100a100a03772_17x100a100f0054x": "2022-09-04",
    "r18_17x100a100a05473_17x100a100f0054x": "2022-08-08",
    "r18_17x100b100a21089_17x100a100f0054x": "2024-02-08",
    "bigmetering_10xfrrteq_17x100a100f0054x": "2024-11-19",
    "bigphysical_10xfrrteq_11xcnrddsvefoo": "2024-12-05",
}


@pytest.fixture(scope="function",autouse=True)
def setup():
    """
    Clean previous artefacts and copy, paste resources used for testing purposes.
    """
    logger.info(f"Cleaning previous artefacts.")
    clean(__file__)
    src: Path = Path(get_resource_path("tests", "resources"))
    dest = get_testing_directory()
    shutil.copytree(
        src,
        dest,
        symlinks=False,
        ignore=None,
        copy_function=shutil.copy2,
        ignore_dangling_symlinks=False,
        dirs_exist_ok=False,
    )


def clean_up(spark_session: SparkSession, run_number: int):
    """
    Cleans up the Spark resources after the tests are completed.
    It unpersists any cached RDDs, clears Spark catalog metadata, etc.
    """
    try:
        # Unpersist RDDs
        logger.info(f"Cleaning all persisted data from previous test")
        for rdd_id, j_rdd in spark_session.sparkContext._jsc.getPersistentRDDs().items():
            # clear all cached RDDs
            j_rdd.unpersist(True)
            logger.info(f"RDD id={rdd_id} un-persisted")

        # Drop tables explicitly
        tables = spark_session.catalog.listTables()
        for table in tables:
            logger.info(f"DROP TABLE bronzesupplydatadev.{table}")
            spark_session.sql(f"DROP TABLE IF EXISTS bronzesupplydatadev.{table}")

        # Clear cache
        spark_session.catalog.clearCache()
        logger.info("Spark cache cleared successfully.")

    except Exception as e:
        logger.warning(f"Error during cleanup: {str(e)}")
    finally:
        if spark_session:
            try:
                spark_session.stop()
                logger.info("Spark session stopped successfully.")
            except Exception as e:
                logger.warning(f"Error stopping Spark session: {str(e)}")


def assert_results(run_number, spark, single_flux=None):
    """
    Validates test results by comparing actual and expected results for given flux
    It generates actual and expected and return boolean
    True : if test pass, and False otherwise
    """
    flux_tables = [
        "c12_17x100a100a0001a_17x100a100f0054x_staging",
        "c15_17x100a100a0001a_17x100a100f0054x_staging",
        "c15_17x100a100a04752_17x100a100f0054x_staging"
        "r10_17x100a100a04388_17x100a100f0054x_staging",
        "r10_17x100a100a04388_17x100a100f0054x_staging_10_min",
        "r10_17x100a100b0591v_17x100a100f0054x_staging",
        "r10_17x100a100b0591v_17x100a100f0054x_staging_10_min",
        "r10_17x100a100a03748_17x100a100f0054x_staging",
        "r10_17x100a100a03748_17x100a100f0054x_staging_10_min",
        "r10_17x100a100a05473_17x100a100f0054x_staging",
        "r10_17x100a100a05473_17x100a100f0054x_staging_10_min",
        "r10_17x100b100a2035a_17x100a100f0054x_staging",
        "r10_17x100b100a2035a_17x100a100f0054x_staging_10_min",
        "r10_17x100b100a20570_17x100a100f0054x_staging",
        "r10_17x100b100a20570_17x100a100f0054x_staging_10_min",
        "r10_17x100b100a2003n_17x100a100f0054x_staging",
        "r10_17x100b100a2003n_17x100a100f0054x_staging_10_min",
        "r10_17x100a100a05546_17x100a100f0054x_staging",
        "r10_17x100a100a05546_17x100a100f0054x_staging_10_min",
        "r10_17x100a100a0416i_17x100a100f0054x_staging",
        "r10_17x100a100a0416i_17x100a100f0054x_staging_10_min",
        "r10_17x100b100a2021l_17x100a100f0054x_staging",
        "r10_17x100b100a2021l_17x100a100f0054x_staging_10_min",
        "r10_17x100a100a0513k_17x100a100f0054x_staging",
        "r10_17x100a100a0513k_17x100a100f0054x_staging_10_min",
        "r10_17x100a100a0503n_17x100a100f0054x_staging",
        "r10_17x100a100a0503n_17x100a100f0054x_staging_10_min",
        "r10_17x100a100a03578_17x100a100f0054x_staging",
        "r10_17x100a100a03578_17x100a100f0054x_staging_10_min",
        "r10_17xstavold2096h_17x100a100f0054x_staging",
        "r10_17xstavold2096h_17x100a100f0054x_staging_10_min",
        "r10_17x100a100a0001a_17x100a100f0054x_staging",
        "r10_17x100a100a0001a_17x100a100f0054x_staging_10_min",
        "r10_17x100b100a20376_17x100a100f0054x_staging",
        "r10_17x100b100a20376_17x100a100f0054x_staging_10_min",
        "r10_17x100a100a04752_17x100a100f0054x_staging",
        "r10_17x100a100a04752_17x100a100f0054x_staging_10_min",
        "r10_17x100a100a0462b_17x100a100f0054x_staging",
        "r10_17x100a100a0462b_17x100a100f0054x_staging_10_min",
        "r10a_17x100a100a0001a_17x100a100f0054x_staging",
        "r10a_17x100a100a0001a_17x100a100f0054x_staging_10_min",
        "r4q_17x100a100a0001a_17x100a100f0054x_staging",
        "r4q_17x100a100a0001a_17x100a100f0054x_staging_10_min",
        "r4c_17x100a100b0591v_17x100a100f0054x_staging",
        "r4c_17x100a100b0591v_17x100a100f0054x_staging_10_min",
        "r4c_17x100a100a0503n_17x100a100f0054x_staging",
        "r4c_17x100a100a0503n_17x100a100f0054x_staging_10_min",
        "r4c_17x100a100a03772_17x100a100f0054x_staging",
        "r4c_17x100a100a03772_17x100a100f0054x_staging_10_min",
        "r4c_17x100a100a05473_17x100a100f0054x_staging",
        "r4c_17x100a100a05473_17x100a100f0054x_staging_10_min",
        "r4c_17x100a100a05546_17x100a100f0054x_staging",
        "r4c_17x100a100a05546_17x100a100f0054x_staging_10_min",
        "r4c_17xstavold2096h_17x100a100f0054x_staging",
        "r4c_17xstavold2096h_17x100a100f0054x_staging_10_min",
        "r4c_17x100b100a2078t_17x100a100f0054x_staging",
        "r4c_17x100b100a2078t_17x100a100f0054x_staging_10_min",
        "r4c_17x100b100a20376_17x100a100f0054x_staging",
        "r4c_17x100b100a20376_17x100a100f0054x_staging_10_min",
        "r4c_17x100a100a04752_17x100a100f0054x_staging",
        "r4c_17x100a100a04752_17x100a100f0054x_staging_10_min",
        "r4c_17x100a100a0396z_17x100a100f0054x_staging",
        "r4c_17x100a100a0396z_17x100a100f0054x_staging_10_min",
        "r4c_17x100b100a2015g_17x100a100f0054x_staging",
        "r4c_17x100b100a2015g_17x100a100f0054x_staging_10_min",
        "r4c_17x100a100a0513k_17x100a100f0054x_staging",
        "r4c_17x100a100a0513k_17x100a100f0054x_staging_10_min",
        "r4c_17x100a100a0462b_17x100a100f0054x_staging",
        "r4c_17x100a100a0462b_17x100a100f0054x_staging_10_min",
        "r4c_17x100a100a03748_17x100a100f0054x_staging",
        "r4c_17x100a100a03748_17x100a100f0054x_staging_10_min",
        "r4c_17x100a100a04388_17x100a100f0054x_staging",
        "r4c_17x100a100a04388_17x100a100f0054x_staging_10_min",
        "r4c_17x100a100a04671_17x100a100f0054x_staging",
        "r4c_17x100a100a04671_17x100a100f0054x_staging_10_min",
        "r4c_17x100a100a03578_17x100a100f0054x_staging",
        "r4c_17x100a100a03578_17x100a100f0054x_staging_10_min",
        "r4m_17x100a100a0001a_17x100a100f0054x_staging",
        "r4m_17x100a100a0001a_17x100a100f0054x_staging_10_min",
        "bigadjusted_10xfrrteq_11xcnrddsvefoo_staging",
        "bigadjusted_10xfrrteq_11xcnrddsvefoo_staging_5_min",
        "bigadjusted_10xfrrteq_11xcnrddsvefoo_staging_10_min",
        "bigphysical_10xfrrteq_11xcnrddsvefoo_staging",
        "bigphysical_10xfrrteq_11xcnrddsvefoo_staging_10_min"
        "bigmetering_10xfrrteq_17x100a100f0054x_staging",
        "bigmetering_10xfrrteq_17x100a100f0054x_staging_10_min",
        "r18_17x100a100a0001a_11xcnrddsvefoo_staging",
        "r18_17x100a100a0001a_11xcnrddsvefoo_staging_10_min",
        "r18_17x100b100a2157x_17x100a100f0054x_staging",
        "r18_17x100b100a2157x_17x100a100f0054x_staging_10_min",
        "r18_17x100b100a2035a_17x100a100f0054x_staging",
        "r18_17x100b100a2035a_17x100a100f0054x_staging_10_min",
        "r18_17x100b100a2009b_17x100a100f0054x_staging",
        "r18_17x100b100a2009b_17x100a100f0054x_staging_10_min",
        "r18_17x100a100a03683_17x100a100f0054x_staging",
        "r18_17x100a100a03683_17x100a100f0054x_staging_10_min",
        "r18_17x100b100a20376_17x100a100f0054x_staging",
        "r18_17x100b100a20376_17x100a100f0054x_staging_10_min",
        "r18_17x100a100a04752_17x100a100f0054x_staging",
        "r18_17x100a100a04752_17x100a100f0054x_staging_10_min",
        "r18_17x100a100a0416i_17x100a100f0054x_staging",
        "r18_17x100a100a0416i_17x100a100f0054x_staging_10_min",
        "r18_17x100a100a03772_17x100a100f0054x_staging",
        "r18_17x100a100a03772_17x100a100f0054x_staging_10_min",
        "r18_17x100a100a05473_17x100a100f0054x_staging",
        "r18_17x100a100a05473_17x100a100f0054x_staging_10_min",
        "r18_17x100b100a21089_17x100a100f0054x_staging",
        "r18_17x100b100a21089_17x100a100f0054x_staging_10_min",


    ]

    results = {}

    # convert flux_tables into set for performed search
    flux_tables_set = set(flux_tables)

    # If a single flux is provided, reduce `flux_tables` to only this flux.
    if single_flux:
        associated_files = [
            f"{single_flux}_staging",
            f"{single_flux}_staging_10_min"
        ]
        flux_tables_set = associated_files

    tables_to_test = [
        table_name for table_name in list_expected_tables(run_number)
        if table_name in flux_tables_set
    ]

    for table_name in tables_to_test:
        logger.info(
            f"Start functional tests for bronze job, "
            f'args: run = "{run_number}", staging_table_name = "{table_name}"'
        )

        actual = build_actual_output(spark, f"bronzesupplydataprod", table_name)
        expected = build_expected_output(spark, table_name, run_number)

        untested_columns = [
            "integration_datetime",
            "raw_bucket",
            "run_id",
            "start_interval_run",
        ]
        merging_columns = [
            column.replace("_EXP", "")
            for column in expected.columns
            if column.replace("_EXP", "") not in untested_columns
        ]
        results[f"Run{run_number} - {table_name}"] = are_dataframe_equal(
            spark,
            actual,
            expected,
            merging_columns=merging_columns,
            untested_columns=untested_columns,
        )
    return results


def run_staging_test(run_number):
    """
    Execute staging tests for a specific run number.
    This function performs the following key operations:
    - Creates a Spark session with required packages
    - Configures Spark session parameters
    - Creates a test database
    - Filters flux configurations based on the run number
    - Runs jobs for selected flux identifiers
    - Validates test results
    - Cleans up Spark resources
    Args:
        run_number (int): The run number (1, 2, 3, ...) determining which flux identifiers to test.
    Raises:
        AssertionError: If any of the test results return False, indicating a test failure for the specified run.
    """
    spark_session: SparkSession = create_spark_session(
        spark_packages=[
            "com.databricks:spark-xml_2.12:0.15.0",
            "io.delta:delta-storage:3.1.0",
        ]
    )
    spark_session.conf.set("spark.sql.session.timeZone", "UTC")

    try:
        database: str = f"bronzesupplydataprod"
        create_database(spark_session, database)

        # Filter flux configurations for the specific run
        flux_to_test = {
            k: v for k, v in flux_config.items()
            if is_flux_for_run(k, run_number)
        }

        # Execute jobs for selected flux identifiers
        for flux_identifier, partition_date in flux_to_test.items():
            argv = [
                "--env", "prod",
                "--run-date", partition_date,
                "--flux-identifiers", flux_identifier,
                "--run-id", "007",
                "--is-local", "true",
                "--from-date", partition_date,
            ]
            job_parameters: JobParameters = JobParameters.parse(argv)
            Job(spark_session, job_parameters).run(flux_identifier)

        # Validate results and clean up
        results = assert_results(run_number, spark_session)

        # Ensure all tests passed
        assert False not in results.values(), f"Test run {run_number} failed"
    finally:
        clean_up(spark_session, run_number)


def is_flux_for_run(flux_identifier, run_number):
    """
    Determine if a specific flux should be tested in the current run.
    This function implements a logic to categorize flux identifiers
    into different test runs based on their prefixes.
    Args:
        flux_identifier (str): The unique identifier for a flux.
        run_number (int): The current run number (1, 2, 3, ...).
    Returns:
        bool: True if the flux should be tested in the current run, False otherwise.
    """

    flux_groups = {
        1: [flux for flux in flux_config.keys() if flux.startswith(('r10', 'c12', 'c15', 'r10a'))],
        2: [flux for flux in flux_config.keys() if flux.startswith(('r4c', 'r4m', 'r4h', 'r4q'))],
        3: [flux for flux in flux_config.keys() if flux.startswith(('bigadjusted', 'bigmetering', 'bigphysical', 'r18'))]
    }

    return flux_identifier in flux_groups.get(run_number, [])


@pytest.mark.sequential
def test_run_staging_test1():
    run_staging_test(1)

@pytest.mark.sequential
def test_run_staging_test2():
    run_staging_test(2)

@pytest.mark.sequential
def test_run_staging_test3():
    run_staging_test(3)

@pytest.mark.sequential
def test_single_flux():
    # Example test function demonstrating how to test a single flux.
    # This function serves as a template for testing a specific flux identifier.
    # Users can modify the flux identifier to test different fluxes.

    test_flux = "r10_17x100b100a2021l_17x100a100f0054x"
    run_single_flux_staging_test(test_flux)
    pass


def run_single_flux_staging_test(flux_identifier: str):
    """
    Execute staging test for a single specific flux identifier.
    This function performs a comprehensive test for a single flux by:
    - Creating a Spark session with required packages
    - Configuring Spark session parameters
    - Creating a test database
    - Running the job for the specified flux identifier
    - Validating test results
    - Cleaning up Spark resources
    Args:
        flux_identifier (str): The unique identifier of the flux to be tested.
    Raises:
        AssertionError: If the test results do not match the expected output.
        KeyError: If the flux identifier is not found in the flux configuration.
    """
    # Verify flux exists in configuration
    if flux_identifier not in flux_config:
        raise KeyError(f"Flux identifier {flux_identifier} not found in configuration")

    # Create Spark session
    spark_session: SparkSession = create_spark_session(
        spark_packages=[
            "com.databricks:spark-xml_2.12:0.15.0",
            "io.delta:delta-storage:3.1.0",
        ]
    )
    spark_session.conf.set("spark.sql.session.timeZone", "UTC")
    database: str = "bronzesupplydataprod"
    table_name = f"{database}.{flux_identifier}_staging"
    # Clean before creating table
    spark_session.sql(f"DROP TABLE IF EXISTS {table_name}")

    create_database(spark_session, database)

    # Get partition date for the flux
    partition_date = flux_config[flux_identifier]

    # Prepare job arguments
    argv = [
        "--env", "prod",
        "--run-date", partition_date,
        "--flux-identifiers", flux_identifier,
        "--run-id", "007",
        "--is-local", "true",
        "--from-date", partition_date,
    ]

    # Run the job
    job_parameters: JobParameters = JobParameters.parse(argv)
    Job(spark_session, job_parameters).run(flux_identifier)

    # Determine the run number
    run_number = determine_run_number(flux_identifier)

    # Validate results
    results = assert_results(run_number, spark_session, single_flux=flux_identifier)

    # Ensure all tests passed
    assert False not in results.values(), f"Test for flux {flux_identifier} failed"
    # Ensure Spark session is cleaned up
    clean_up(spark_session, run_number)


def determine_run_number(flux_identifier: str) -> int:
    """
    Determine the appropriate run number for a given flux identifier.
    Args:
        flux_identifier (str): The unique identifier of the flux.
    Returns:
        int: The run number (1, 2, or 3) based on the flux identifier prefix.
    """
    if flux_identifier.startswith(('r10', 'c12', 'c15', 'r10a')):
        return 1
    elif flux_identifier.startswith(('r4c', 'r4m', 'r4h', 'r4q')):
        return 2
    elif flux_identifier.startswith(('bigadjusted', 'bigmetering', 'bigphysical', 'r18')):
        return 3
    else:
        raise ValueError(f"Unable to determine run number for flux: {flux_identifier}")

The job is well working in local. But, when running it in the pipeline. It has these errors :

E pyspark.errors.exceptions.captured.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:Insert of object "org.apache.hadoop.hive.metastore.model.MDatabase@28d4f805" using statement "INSERT INTO DBS (DB_ID,"DESC",DB_LOCATION_URI,"NAME",OWNER_NAME,OWNER_TYPE) VALUES (?,?,?,?,?,?)" failed : Table/View 'DBS' does not exist.) Caused by: ERROR 42X05: Table/View 'DBS' does not exist. 25/01/10 08:09:50 ERROR ObjectStore: javax.jdo.JDOException: Exception thrown when executing query : SELECT 'org.apache.hadoop.hive.metastore.model.MDatabase' AS NUCLEUS_TYPE,A0."DESC",A0.DB_LOCATION_URI,A0."NAME",A0.OWNER_NAME,A0.OWNER_TYPE,A0.DB_ID FROM DBS A0 WHERE A0."NAME" = ? at org.datanucleus.api.jdo.NucleusJDOHelper.getJDOExceptionForNucleusException(NucleusJDOHelper.java:677)

I dont think that the problem is given by spark configuration, or metastore configuration or permissions because we have other testing job that run equivalent functional tests using same tools and functions. Any help, please ? Have you any idea, please ?

本文标签: pysparkERROR 42X05 TableView 39DBS39 does not existStack Overflow