admin管理员组

文章数量:1122846

In the below scala code, I am reading a parquet file, amending value of a column and writing the new dataframe into a new parquet file:

var df = spark.read.parquet(sourcePath)
val newDf = df.withColumn("my_num_field", lit(11.10))
newDf.write.parquet(targetPath)

Running the above code is generating a new parquet file with the new value 11.10 for my_num_field. However, schema type is being changed for this field and few other fields where the original schema type was:

"type" : [ "null", {
      "type" : "fixed",
      "name" : "my_num_field",
      "size" : 16,
      "logicalType" : "decimal",
      "precision" : 14,
      "scale" : 3
    }

And the new data type is now:

"type" : "double"

This is producing below error when I load the parquet on HDFS and run a select query:

incompatible Parquet schema for column 'my_db.my_table.my_num_field'. Column type: DECIMAL(14,3), Parquet schema: required double my_num_field

How can I retain the original schema?

I have tried few things suggested on SOF already and still produces the same outcome:

  • Added cast .cast(DecimalType(14, 3) after lit(11.10).
  • Set overwrite schema flag during both read and write: .option("overwriteSchema","false")
  • Set merge schema flag: .option("mergeSchema","false") and also with and without above overwriteSchema option.

本文标签: Spark Scalaread and write to parquet file and retain the original schemaStack Overflow