admin管理员组

文章数量:1334148

I am using Avro4s to save GroupState in spark code. GroupState needs to be stored as Array[Byte].

To further simplyfy and making issue reproducible I am updating steps to reproduce the issue.

This is my case class.

private[objects] final case class AoState(
    conf: AnomalyObjectConf,
    timedScores: Array[TimedScores] = Array(),
    events: Option[AnomalyObject] = None
  ) extends CommonGroupState[AnomalyObjectConf, AnomalyObject, Option[AnomalyObject]] 

private[spe] final case class AnomalyObjectConf(
    pmuId: Int,
    asThreshold: Double,
    aasThreshold: Double,
    durationThresholdSeconds: Int,
    resourceIds: Seq[Long],
    contextIds: Seq[Long],
    version: Long,
    updatedTimestamp: Instant,
    deleteFlag: Boolean = false
  ) extends CommonConf

private[spe] case class TimedScores(
    creationDate: Instant,
    asScores: MatrixScore,
    aasScores: MatrixScore = emptyMatrix
  )
private[spe] type MatrixScore = util.List[java.lang.Double]

This is my avro schema for AnomalyObject class that is also available as events in case class above.

    {
  "namespace": "com.mycomosi.dto.aiops",
  "protocol": "aiops",
  "doc": "AIOPS specific Data Transfer Objects",
  "types":
  [
    {
      "type": "record",
      "name": "AnomalySignaturesSnap",
      "doc": "Anomaly signatures at specific time",
      "fields": [
        {"name": "timestamp", "type": ["null", { "type": "long", "logicalType": "timestamp-millis"}], "doc": "Measurement date time"},
        {"name": "anomalyScores", "type": ["null", {"type": "array", "items": ["null","double"], "default":  null}], "doc": "Matrix of anomaly score value encoded into an Array. The matrix is array encoded. It requires resourceIds & contextIds arrays for decoding AS.The value AS for the Xth resource and Yth context is available in the array cell Y+X*len(contextIds). The AAS for this PMU is the latest value from the array"}
      ]
    },
    {
      "type": "record",
      "name": "AnomalySignatures",
      "doc": "Anomaly signatures over time with resource and context ids",
      "fields": [
        {"name": "resourceIds", "type": {"type": "array", "items": "long"}, "doc": "Ids of the resources"},
        {"name": "contextIds", "type": {"type": "array", "items": "long"}, "doc": "Ids of the measurement contexts"},
        {"name": "anomalyScores", "type": { "type": "array", "items": ["null","com.mycomosi.dto.aiops.AnomalySignaturesSnap"]}, "doc": "Anomaly signatures over time"}
      ]
    },
    {
      "type": "record",
      "name": "AnomalyObject",
      "doc": "Anomaly object status with signature",
      "fields": [
        {"name": "aoId", "type": { "type": "string", "logicalType": "UUID"}, "doc": "Anomaly object unique Id"},
        {"name": "pmuId", "type": "int", "doc": "PMU unique Id"},
        {"name": "status", "type": {"type": "enum", "name": "AoStatus", "symbols" : ["OPEN", "CLOSED"]}, "doc": "Anomaly object status"},
        {"name": "startedTimestamp", "type": { "type": "long", "logicalType": "timestamp-millis"}, "doc": "Anomaly object opening date"},
        {"name": "closedTimestamp", "type": ["null", { "type": "long", "logicalType": "timestamp-millis"}], "default": null, "doc": "Anomaly object closure date"},
        {"name": "updatedTimestamp", "type": { "type": "long", "logicalType": "timestamp-millis"}, "doc": "Update timestamp"},
        {"name": "pmuAnomalyScore", "type":"double", "doc": "Latest PMU aggregated anomaly score"},
        {"name": "signatureDtls", "type":"com.mycomosi.dto.aiops.AnomalySignatures", "doc": "Anomaly signature"}
      ]
    }
  ]
}

while I create a simple test to generate case class avro schema using avro4s 4.x library

import com.mycomosi.bda.spe.spark.anomaly.objects.AnomalyObjectsDtos.AoState
import com.sksamuel.avro4s.{AvroSchema, SchemaFor}
import .scalatest.funsuite.AnyFunSuite

class AnomalyObjectAvroTest extends AnyFunSuite {

  test("Test AOState Avro Schema") {
    implicit lazy val schemaForAoState: SchemaFor[AoState] = SchemaFor[AoState]
    val schema = AvroSchema[AoState]
    println(schema.toString(true))

  }


}

I keep getting error as follows

An exception or error caused a run to abort.

java.lang.StackOverflowError
    at com.mycomosi.bda.spe.spark.anomaly.objects.AnomalyObjectAvroTest.schemaForAoState$1(AnomalyObjectAvroTest.scala:10)
    at com.mycomosi.bda.spe.spark.anomaly.objects.AnomalyObjectAvroTest.schemaForAoState$lzycompute$1(AnomalyObjectAvroTest.scala:10)
    at com.mycomosi.bda.spe.spark.anomaly.objects.AnomalyObjectAvroTest.schemaForAoState$1(AnomalyObjectAvroTest.scala:10)
    at com.mycomosi.bda.spe.spark.anomaly.objects.AnomalyObjectAvroTest.schemaForAoState$lzycompute$1(AnomalyObjectAvroTest.scala:10)
    at com.mycomosi.bda.spe.spark.anomaly.objects.AnomalyObjectAvroTest.schemaForAoState$1(AnomalyObjectAvroTest.scala:10)
    at com.mycomosi.bda.spe.spark.anomaly.objects.AnomalyObjectAvroTest.schemaForAoState$lzycompute$1(AnomalyObjectAvroTest.scala:10)
    at com.mycomosi.bda.spe.spark.anomaly.objects.AnomalyObjectAvroTest.schemaForAoState$1(AnomalyObjectAvroTest.scala:10)
    at com.mycomosi.bda.spe.spark.anomaly.objects.AnomalyObjectAvroTest.schemaForAoState$lzycompute$1(AnomalyObjectAvroTest.scala:10)
    at com.mycomosi.bda.spe.spark.anomaly.objects.AnomalyObjectAvroTest.schemaForAoState$1(AnomalyObjectAvroTest.scala:10)
    at com.mycomosi.bda.spe.spark.anomaly.objects.AnomalyObjectAvroTest.schemaForAoState$lzycompute$1(AnomalyObjectAvroTest.scala:10)
    at com.mycomosi.bda.spe.spark.anomaly.objects.AnomalyObjectAvroTest.schemaForAoState$1(AnomalyObjectAvroTest.scala:10)
    at com.mycomosi.bda.spe.spark.anomaly.objects.AnomalyObjectAvroTest.schemaForAoState$lzycompute$1(AnomalyObjectAvroTest.scala:10)
    at com.mycomosi.bda.spe.spark.anomaly.objects.AnomalyObjectAvroTest.schemaForAoState$1(AnomalyObjectAvroTest.scala:10)
    at com.mycomosi.bda.spe.spark.anomaly.objects.AnomalyObjectAvroTest.schemaForAoState$lzycompute$1(AnomalyObjectAvroTest.scala:10)
    at com.mycomosi.bda.spe.spark.anomaly.objects.AnomalyObjectAvroTest.schemaForAoState$1(AnomalyObjectAvroTest.scala:10)
    at com.mycomosi.bda.spe.spark.anomaly.objects.AnomalyObjectAvroTest.schemaForAoState$lzycompute$1(AnomalyObjectAvroTest.scala:10)
    at com.mycomosi.bda.spe.spark.anomaly.objects.AnomalyObjectAvroTest.schemaForAoState$1(AnomalyObjectAvroTest.scala:10)
    at com.mycomosi.bda.spe.spark.anomaly.objects.AnomalyObjectAvroTest.schemaForAoState$lzycompute$1(AnomalyObjectAvroTest.scala:10)
    at com.mycomosi.bda.spe.spark.anomaly.objects.AnomalyObjectAvroTest.schemaForAoState$1(AnomalyObjectAvroTest.scala:10)
    at com.mycomosi.bda.spe.spark.anomaly.objects.AnomalyObjectAvroTest.schemaForAoState$lzycompute$1(AnomalyObjectAvroTest.scala:10)
    at com.mycomosi.bda.spe.spark.anomaly.objects.AnomalyObjectAvroTest.schemaForAoState$1(AnomalyObjectAvroTest.scala:10)
    at com.mycomosi.bda.spe.spark.anomaly.objects.AnomalyObjectAvroTest.schemaForAoState$lzycompute$1(AnomalyObjectAvroTest.scala:10)
    at com.mycomosi.bda.spe.spark.anomaly.objects.AnomalyObjectAvroTest.schemaForAoState$1(AnomalyObjectAvroTest.scala:10)
    at com.mycomosi.bda.spe.spark.anomaly.objects.AnomalyObjectAvroTest.schemaForAoState$lzycompute$1(AnomalyObjectAvroTest.scala:10)

Hope this would help to reproduce the problem, please suggest me a good possible solution. I don't understand why this is getting into recursion and keep getting me into error.

本文标签: scalaEncoders with Avro4s end up with StackOverflowStack Overflow