admin管理员组

文章数量:1398465

Im currently trying to implement Pydeequ for identifying anomalies in volumes for specific time periods, the problem is that pydeequ is picking up the latest entry from the metrics repository instead of picking the value i want it to compare agaisnt. My code would be scheduled to run on custom frequencies, and it would be great if i could pick the correct volume to compare against based on the day it runs.

For example, if in running my code on Tuesday, I want pydeequ to do the volume check with the values from last week's Tuesday and not from the last run (which would be Monday). Ideally if i could pass the result_key value in the function it would be able to pick the right value but im struggling with this.

My verification suite code:

VerificationSuite(spark).onData(current_df) \
                    .useRepository(metrics_repository) \
                    .saveOrAppendResult(result_key) \
                    .addAnomalyCheck(
                    RelativeRateOfChangeStrategy(maxRateDecrease=max_rate_decrease, maxRateIncrease=max_rate_increase),
                    Size()) \
                    .run()

Result key format:

result_key = ResultKey(spark, ResultKey.current_milli_time(), {
                    "table_name": table_name,
                    "time_period": {time_period},
                    "day": datetime.now().strftime("%A"),
                    "alliance_name": alliance,
                    "Status": Success/Warning,
                    "Percentage_change": f"{percentage_change:.2f}"
                })

Result: It picks the latest historical volume check value instead of allowing me to give a custom time

本文标签: pythonPydeequVolume checks based on custom result key valuesStack Overflow