admin管理员组

文章数量:1122832

I am working with a Parquet file that contains a complicated nested column structure. Specifically, I have a column where the structure is as follows:

 {
   "dynamodb": {
      "NewImage": {
         "DiscountData": {
            "M": {
               "TEST1": {"N": "0"},
               "TEST2": {"N": "0"},
               "TEST3": {"N": "0"},
               "TEST4": {"N": "0"},
               "TEST5": {"N": "1"}
            }
         }
      }
   }
}

The key challenge is the M field. It is dynamic: it can contain varying keys (TEST1, TEST2, etc.) depending on the dataset. I am processing multiple Parquet files, and the structure of M may change across files.

I attempted to use the option to infer the schema , but I am having this issue:

AnalysisException: Found duplicate column(s) in the data schema 

To make it consistent, I want to:

Treat the entire content of M as a string (e.g., JSON string), or Convert M into a MapType where keys and values are strings.

Here’s the schema I tried to define:

from pyspark.sql.types import StructType, StructField, StringType, MapType

schema = StructType([
    StructField("dynamodb", StructType([
        StructField("NewImage", StructType([
            StructField("DiscountData", StructType([
                StructField("M", MapType(StringType(), StringType()), True)
            ]), True)
        ]), True)
    ]), True)
])

This is what i am using to create the dataframe:

df_disscountcoderule = spark.read \
    .schema(schema) \
    .option("mergeSchema", "true") \
    .option("recursiveFileLookup", "true") \
    .parquet(f"s3://{source_bucket}/{source_discountcodesrules_prefix}")\
    .filter(input_file_name().endswith(".parquet"))\
    .filter(~input_file_name().contains("processing-failed"))

However, I get the following error when applying this schema:

ava.lang.ClassCastException: class org.apache.spark.sql.types.MapType cannot be cast to class org.apache.spark.sql.types.StructType (org.apache.spark.sql.types.MapType and org.apache.spark.sql.types.StructType are in unnamed module of loader 'app')

My Questions:

Is there a way to create a static schema that can handle this dynamic structure? How can I convert the M field into a JSON string or a MapType for consistent processing? I appreciate any guidance or alternative approaches!

本文标签: apache sparkIs there a way to create a schema for this columnStack Overflow