admin管理员组

文章数量:1403182

I'm working with a PostgreSQL database in Python using psycopg2 and pandas. My workflow involves reading data into Python, uploading it to the database, retrieving it back into Python, and then updating the database. To ensure data integrity, I aim to create a comprehensive mapping between pandas data types and PostgreSQL data types. Currently, I'm encountering an issue where date and time data (including both day and hour) are not being uploaded to PostgreSQL with the correct data type.

I'm utilizing the following function to map pandas data types to PostgreSQL data types:


def map_dtype(dtype):
    if np.issubdtype(dtype, np.integer):
        return 'BIGINT'  # Maps to PostgreSQL's BIGINT for large integers
    elif np.issubdtype(dtype, np.floating):
        return 'DOUBLE PRECISION'  # Maps to PostgreSQL's DOUBLE PRECISION for floating-point numbers
    elif np.issubdtype(dtype, np.datetime64):
        return 'TIMESTAMP'  # Maps to PostgreSQL's TIMESTAMP for date and time
    elif np.issubdtype(dtype, np.bool_):
        return 'BOOLEAN'  # Maps to PostgreSQL's BOOLEAN for boolean values
    else:
        return 'TEXT'  # Default mapping to PostgreSQL's TEXT for other types

This is the function I'm using to upload data to DB:

def upload_to_db(path_to_config, tbl_name, col_str, dataframe):
    """Upload a DataFrame to the specified database."""
    conn = connect_to_db(path_to_config)
    if not conn:
        return

    cursor = conn.cursor()

    try:
        # Get the current user
        cursor.execute("SELECT current_user;")
        current_user = cursor.fetchone()[0]

        # Define schema based on user
        schema = "developer_schema" if current_user == "dev1" else "public"
        full_table_name = f"{schema}.{tbl_name}"

        cursor.execute(f"DROP TABLE IF EXISTS {full_table_name};")
                # Generate the CREATE TABLE statement with explicit data types

        cursor.execute(f"CREATE TABLE {full_table_name} ({col_str});")
        print(f'Table {full_table_name} was created successfully.')

        csv_file_path = f"{tbl_name}.csv"
        dataframe.to_csv(csv_file_path, header=True, index=False, encoding='utf-8')

        with open(csv_file_path, 'r') as my_file:
            print('File opened in memory.')
            sql_statement = f"""
            COPY {full_table_name} FROM STDIN WITH
                CSV
                HEADER
                DELIMITER ',' 
            """
            cursor.copy_expert(sql=sql_statement, file=my_file)
            print('File copied to the database.')

        cursor.execute(f"GRANT SELECT ON TABLE {full_table_name} TO public;")
        connmit()

        print(f'Table {full_table_name} imported to the database successfully.')
    except Exception as e:
        print(f"Error during upload: {e}")
        conn.rollback()
    finally:
        cursor.close()
        conn.close()

        if os.path.exists(csv_file_path):
            try:
                os.remove(csv_file_path)
                print(f"Temporary file {csv_file_path} has been deleted.")
            except Exception as e:
                print(f"Error deleting file {csv_file_path}: {e}")

Is this the best approach? Do you have any suggestions? Ps. I don't want to use sqlalchemy because is too slow for large dataset (my case).

本文标签: