admin管理员组

文章数量:1355609

from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import skewness, kurtosis, stddev

from airflow.configuration import conf

import sys

def transform_forex_data(file_path, access_key, secret_key):
    try:
        print(f"CSV FILE PATH: {file_path}")

        # how this works is basically we specify spark.jars.packages = .apache.hadoop:hadoop-aws.3.2.0
        spark = SparkSession.builder.appName('feature-engineering') \
        .config("spark.jars.packages", ".apache.hadoop:hadoop-aws:3.2.0") \
        .config("spark.hadoop.fs.s3a.access.key", access_key) \
        .config("spark.hadoop.fs.s3a.secret.key", secret_key) \
        .config("spark.hadoop.fs.s3a.impl", ".apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.hadoop.fs.s3a.aws.credentials.provider", ".apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
        .getOrCreate()

        # spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", access_key)
        # spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", secret_key)

        usd_php_forex_4h_spark_df = spark.read.csv(file_path, header=True, inferSchema=True)
        usd_php_forex_4h_spark_df.createOrReplaceTempView("usd_php_forex")

    except Exception as e:
        print(f"Error {e} has occured.")

if __name__ == "__main__":
    # access argument vectors given in spark submit job operator
    # which will be the path to the newly saved .csv file
    file_path = sys.argv[1]
    print(file_path)

    # get secrets
    AWS_ACCESS_KEY_ID = conf.get("secrets", "aws_access_key_id")
    AWS_SECRET_ACCESS_KEY = conf.get("secrets", "aws_secret_access_key")

    # pass file path to task
    transform_forex_data(file_path=file_path,
        access_key=AWS_ACCESS_KEY_ID,
        secret_key=AWS_SECRET_ACCESS_KEY)

I've tried provided the spark.hadoop.fs.s3a.impl configuration with value .apache.hadoop.fs.s3a.S3AFileSystem as well as provided my aws access key id and secret access key in order to read the .csv file from the bucket. I've also configured the uri string that will be read by spark with "s3a" instead of "s3" e.g. "s3a://{bucket_name}/raw/usd_php_forex_4hour.csv" which is the file_path variable. Am I missing something here?

本文标签: