admin管理员组

文章数量:1294064

I have a table consists of 30 column in which first 5 columns made up to the primary key and rest are information i want to group on with primary keys.

Primary key = year,month,day,hr,ID
rest columns = a,b,c,d,e,f........y

Now I want to calculate count on primary keys with one column at a time like

group 1 = df.groupby(year,month,day,hr,ID,a).agg(count("a").alias("a_count"))
group 2 = df.groupby(year,month,day,hr,ID,b).agg(count("b").alias("b_count"))
...
...

For this we are using for loop and inside each loop we joining this aggregation with previous aggregation.

for colname in distribution_cols:
    col_df = select_df.groupBy(year,month,day,hr,ID,colname).agg(F.count('*').alias(colname+'total_count'),F.countDistinct(username).alias(colname + 'unique_users'))

This process is lengthy and taking too much resource and time. Is there better solution for this in Spark way.

Edit: - For example Lets consider we have input table

hr user mobile mobile_type sim_type
1 1 iphone4 ios celluer1
1 2 oppo android celluer2
1 3 vivo android celluer2
1 4 pixel android celluer3
1 5 iphone6 ios celluer3
2 3 iphone4 ios celluer1
2 3 oppo ios celluer1
2 3 vivo ios celluer4
2 1 pixel android celluer3
2 1 pixel android celluer3
2 2 iphone6 ios celluer4
1 1 iphone4 ios celluer1

I have a table consists of 30 column in which first 5 columns made up to the primary key and rest are information i want to group on with primary keys.

Primary key = year,month,day,hr,ID
rest columns = a,b,c,d,e,f........y

Now I want to calculate count on primary keys with one column at a time like

group 1 = df.groupby(year,month,day,hr,ID,a).agg(count("a").alias("a_count"))
group 2 = df.groupby(year,month,day,hr,ID,b).agg(count("b").alias("b_count"))
...
...

For this we are using for loop and inside each loop we joining this aggregation with previous aggregation.

for colname in distribution_cols:
    col_df = select_df.groupBy(year,month,day,hr,ID,colname).agg(F.count('*').alias(colname+'total_count'),F.countDistinct(username).alias(colname + 'unique_users'))

This process is lengthy and taking too much resource and time. Is there better solution for this in Spark way.

Edit: - For example Lets consider we have input table

hr user mobile mobile_type sim_type
1 1 iphone4 ios celluer1
1 2 oppo android celluer2
1 3 vivo android celluer2
1 4 pixel android celluer3
1 5 iphone6 ios celluer3
2 3 iphone4 ios celluer1
2 3 oppo ios celluer1
2 3 vivo ios celluer4
2 1 pixel android celluer3
2 1 pixel android celluer3
2 2 iphone6 ios celluer4
1 1 iphone4 ios celluer1

we are currently aggregating like below

//df some already created aggregation
val mobileTypeGroup = df.groupBy("hr","user", "mobile_type")
      .agg(
        count("mobile").alias("mobile_count"),
        countDistinct("user").alias("unique_mobile_user_count")
      )
 
// Group by sim_type and calculate count and unique user count
val simTypeGroup = df.groupBy("hr","user","sim_type")
      .agg(
        count("mobile").alias("sim_count"),
        countDistinct("user").alias("unique_sim_user_count")
      )

// Join the results back to the original DataFrame
val result = df
      .join(mobileTypeGroup, Seq("user","mobile_type"), "left")
      .join(simTypeGroup, Seq("user","sim_type"), "left")

In here, we have hr and user combined to primary key. In this example we have aggregated df on existing data and we are aggregating with common column(primary column) + 1 unique column each time. In example , I have shown only 2 iteration we are doing more than 30 times.

below is the resultant table

Whats the best way to do this operation?

Share Improve this question edited Feb 24 at 17:40 Chris 2,85114 silver badges15 bronze badges asked Feb 12 at 13:51 Ayush GoyalAyush Goyal 3929 silver badges28 bronze badges 6
  • 2 Can you add a sample input and related expected output ? We need at least 1 PK column, 2 or 3 a,b,c columns, and let's say, maybe 10 lines. Present that as input, and manually, create the output and show us. Because, I thinkg there is something wrong in the logic you want to achieve and I need to be sure to understand. – Steven Commented Feb 12 at 14:10
  • 1 agree with Steven but I suspect what you are looking for is aggregating in a single pass of the data. If so I don't have a pyspark based solution but for scala Quality's aggregation functions cover that use case. – Chris Commented Feb 12 at 14:25
  • @Steven, I have added input and output of data – Ayush Goyal Commented Feb 13 at 11:15
  • are you using pyspark or spark in scala ? – Steven Commented Feb 13 at 15:20
  • @Steven I am using Spark with Scala – Ayush Goyal Commented Feb 14 at 10:38
 |  Show 1 more comment

2 Answers 2

Reset to default 1

perhaps not really an answer but this much text in a comment doesn't work well.

The original question and the current example are a little at odds, similarly the count distinct on users (also in the example) would always seem to be one as user is itself included in the group by.

If you just want counts on the non primary key columns (or other combinations) then:

group 1 = df.groupby(year,month,day,hr,ID).agg(count_distinct("a").alias("a_count"), count_distinct("b").alias("b_count"),...)

would seem to work, if you really want more like the second example where your grouping itself is always different then the join also doesn't make sense but although the groupings could be done in a single pass there just doesn't seem to be a sensible (at least not an obvious one to me) way to join them into a single dataset (only the primary keys are related).

Use map() to convert rows rows to map[String,String] with

    map('a',a,'b',b,'c',c,...)

Use explode() to convert the map into key, value pairs

    explode(map('a',a,'b',b,'c',c,...))

Finish with groupBy your key fields, key, value

Using your made up example, something like this could be what you are looking for. It generates a tall table with counts by columns and their values which you could pivot into rows.

 val input: DataFrame = Seq(
    (1, 1, "iphone4", "ios", "celluer1"),
    (1, 2, "oppo", "android", "celluer2"),
    (1, 3, "vivo", "android", "celluer2"),
    (1, 4, "pixel", "android", "celluer3"),
    (1, 5, "iphone6", "ios", "celluer3"),
    (2, 3, "iphone4", "ios", "celluer1"),
    (2, 3, "oppo", "ios", "celluer1"),
    (2, 3, "vivo", "ios", "celluer4"),
    (2, 1, "pixel", "android", "celluer3"),
    (2, 1, "pixel", "android", "celluer3"),
    (2, 2, "iphone6", "ios", "celluer4"),
    (1, 1, "iphone4", "ios", "celluer1")
  ).toDF("hr", "user", "mobile", "mobile_type", "sim_type")

  input.selectExpr("hr", "user",
      "explode(map(" +
        "'mobile',mobile," +
        "'mobile_type',mobile_type," +
        "'sim_type',sim_type" +
        "))")
    .groupBy("hr", "user", "key", "value").count()
    .groupBy("hr", "key", "value")
    .agg(
      sum("count").alias("total_count"),
      count("*").alias("unique_count")
    )
    .show(false)

本文标签: scalaSpark aggregate on different group of ColumnStack Overflow