admin管理员组

文章数量:1122826

Suppose I want to compute variance and/or standard deviation with non-default ddof in a groupby context, I can do:

df.groupby("a")["b"].var(ddof=2)

If I want that to happen together with other aggregations, I can use:

df.groupby("a").agg(b_var = ("b", "var"), c_sum = ("c", "sum"))

My understanding is that to be able to have non default ddof I should create a custom aggregation.

Here what I got so far:

def var(ddof: int = 1) -> dd.Aggregation:
    import dask.dataframe as dd

    return dd.Aggregation(
        name="var",
        chunk=lambda s: (s.count(), s.sum(), (s.pow(2)).sum()),
        agg=lambda count, sum_, sum_sq: (count.sum(), sum_.sum(), sum_sq.sum()),
        finalize=lambda count, sum_, sum_sq: (sum_sq - (sum_ ** 2 / count)) / (count - ddof),
    )

Yet, I encounter a RuntimeError:

df.groupby("a").agg({"b": var(2)})

RuntimeError('Failed to generate metadata for DecomposableGroupbyAggregation(frame=df, arg={‘b’: <dask.dataframe.groupby.Aggregation object at 0x7fdfb8469910>}

What am I missing? Is there a better way to achieve this?

Replacing s.pow(2) with s**2 also results in an error.

Full script:

import dask.dataframe as dd

data = {
    "a": [1, 1, 1, 1, 2, 2, 2],
    "b": range(7),
    "c": range(10, 3, -1),
}

df = dd.from_dict(data, 2)

def var(ddof: int = 1) -> dd.Aggregation:
    import dask.dataframe as dd

    return dd.Aggregation(
        name="var",
        chunk=lambda s: (s.count(), s.sum(), (s.pow(2)).sum()),
        agg=lambda count, sum_, sum_sq: (count.sum(), sum_.sum(), sum_sq.sum()),
        finalize=lambda count, sum_, sum_sq: (sum_sq - (sum_ ** 2 / count)) / (count - ddof),
    )

df.groupby("a").agg(b_var = ("b", "var"), c_sum = ("c", "sum"))  # <- no issue

df.groupby("a").agg(b_var = ("b", var(2)), c_sum = ("c", "sum"))  # <- RuntimeError

Suppose I want to compute variance and/or standard deviation with non-default ddof in a groupby context, I can do:

df.groupby("a")["b"].var(ddof=2)

If I want that to happen together with other aggregations, I can use:

df.groupby("a").agg(b_var = ("b", "var"), c_sum = ("c", "sum"))

My understanding is that to be able to have non default ddof I should create a custom aggregation.

Here what I got so far:

def var(ddof: int = 1) -> dd.Aggregation:
    import dask.dataframe as dd

    return dd.Aggregation(
        name="var",
        chunk=lambda s: (s.count(), s.sum(), (s.pow(2)).sum()),
        agg=lambda count, sum_, sum_sq: (count.sum(), sum_.sum(), sum_sq.sum()),
        finalize=lambda count, sum_, sum_sq: (sum_sq - (sum_ ** 2 / count)) / (count - ddof),
    )

Yet, I encounter a RuntimeError:

df.groupby("a").agg({"b": var(2)})

RuntimeError('Failed to generate metadata for DecomposableGroupbyAggregation(frame=df, arg={‘b’: <dask.dataframe.groupby.Aggregation object at 0x7fdfb8469910>}

What am I missing? Is there a better way to achieve this?

Replacing s.pow(2) with s**2 also results in an error.

Full script:

import dask.dataframe as dd

data = {
    "a": [1, 1, 1, 1, 2, 2, 2],
    "b": range(7),
    "c": range(10, 3, -1),
}

df = dd.from_dict(data, 2)

def var(ddof: int = 1) -> dd.Aggregation:
    import dask.dataframe as dd

    return dd.Aggregation(
        name="var",
        chunk=lambda s: (s.count(), s.sum(), (s.pow(2)).sum()),
        agg=lambda count, sum_, sum_sq: (count.sum(), sum_.sum(), sum_sq.sum()),
        finalize=lambda count, sum_, sum_sq: (sum_sq - (sum_ ** 2 / count)) / (count - ddof),
    )

df.groupby("a").agg(b_var = ("b", "var"), c_sum = ("c", "sum"))  # <- no issue

df.groupby("a").agg(b_var = ("b", var(2)), c_sum = ("c", "sum"))  # <- RuntimeError
Share Improve this question edited Dec 27, 2024 at 13:42 FBruzzesi asked Dec 27, 2024 at 8:18 FBruzzesiFBruzzesi 6,4753 gold badges18 silver badges40 bronze badges
Add a comment  | 

2 Answers 2

Reset to default 3 +100

As answered in Dask Discourse Forum, I don't think your custom Aggregation implementation is correct.

However, a simpler solution can be applied:

import dask.dataframe as dd
import functools

data = {
    "a": [1, 1, 1, 1, 2, 2, 2],
    "b": range(7),
    "c": range(10, 3, -1),
}

df = dd.from_dict(data, 2)

var_ddof_2 = functools.partial(dd.groupby.DataFrameGroupBy.var, ddof=2)
df.groupby("a").agg(b_var = ("b", var_ddof_2), c_sum = ("c", "sum"))

As brought up in the discourse conversation, for dask[dataframe]>2024.12, dd.groupby.DataFrameGroupBy is removed from dask itself, and dask_expr._groupby.GroupBy is used instead. However this is not a public exposed class (as of now - Jan 2025).

The plan seems to be to eventually merge dask_expr into dask.

本文标签: pythondask var and std with ddof in groupby context and other aggregationsStack Overflow