admin管理员组文章数量:1294064
I have a table consists of 30 column in which first 5 columns made up to the primary key and rest are information i want to group on with primary keys.
Primary key = year,month,day,hr,ID
rest columns = a,b,c,d,e,f........y
Now I want to calculate count on primary keys with one column at a time like
group 1 = df.groupby(year,month,day,hr,ID,a).agg(count("a").alias("a_count"))
group 2 = df.groupby(year,month,day,hr,ID,b).agg(count("b").alias("b_count"))
...
...
For this we are using for loop and inside each loop we joining this aggregation with previous aggregation.
for colname in distribution_cols:
col_df = select_df.groupBy(year,month,day,hr,ID,colname).agg(F.count('*').alias(colname+'total_count'),F.countDistinct(username).alias(colname + 'unique_users'))
This process is lengthy and taking too much resource and time. Is there better solution for this in Spark way.
Edit: - For example Lets consider we have input table
hr | user | mobile | mobile_type | sim_type |
---|---|---|---|---|
1 | 1 | iphone4 | ios | celluer1 |
1 | 2 | oppo | android | celluer2 |
1 | 3 | vivo | android | celluer2 |
1 | 4 | pixel | android | celluer3 |
1 | 5 | iphone6 | ios | celluer3 |
2 | 3 | iphone4 | ios | celluer1 |
2 | 3 | oppo | ios | celluer1 |
2 | 3 | vivo | ios | celluer4 |
2 | 1 | pixel | android | celluer3 |
2 | 1 | pixel | android | celluer3 |
2 | 2 | iphone6 | ios | celluer4 |
1 | 1 | iphone4 | ios | celluer1 |
I have a table consists of 30 column in which first 5 columns made up to the primary key and rest are information i want to group on with primary keys.
Primary key = year,month,day,hr,ID
rest columns = a,b,c,d,e,f........y
Now I want to calculate count on primary keys with one column at a time like
group 1 = df.groupby(year,month,day,hr,ID,a).agg(count("a").alias("a_count"))
group 2 = df.groupby(year,month,day,hr,ID,b).agg(count("b").alias("b_count"))
...
...
For this we are using for loop and inside each loop we joining this aggregation with previous aggregation.
for colname in distribution_cols:
col_df = select_df.groupBy(year,month,day,hr,ID,colname).agg(F.count('*').alias(colname+'total_count'),F.countDistinct(username).alias(colname + 'unique_users'))
This process is lengthy and taking too much resource and time. Is there better solution for this in Spark way.
Edit: - For example Lets consider we have input table
hr | user | mobile | mobile_type | sim_type |
---|---|---|---|---|
1 | 1 | iphone4 | ios | celluer1 |
1 | 2 | oppo | android | celluer2 |
1 | 3 | vivo | android | celluer2 |
1 | 4 | pixel | android | celluer3 |
1 | 5 | iphone6 | ios | celluer3 |
2 | 3 | iphone4 | ios | celluer1 |
2 | 3 | oppo | ios | celluer1 |
2 | 3 | vivo | ios | celluer4 |
2 | 1 | pixel | android | celluer3 |
2 | 1 | pixel | android | celluer3 |
2 | 2 | iphone6 | ios | celluer4 |
1 | 1 | iphone4 | ios | celluer1 |
we are currently aggregating like below
//df some already created aggregation
val mobileTypeGroup = df.groupBy("hr","user", "mobile_type")
.agg(
count("mobile").alias("mobile_count"),
countDistinct("user").alias("unique_mobile_user_count")
)
// Group by sim_type and calculate count and unique user count
val simTypeGroup = df.groupBy("hr","user","sim_type")
.agg(
count("mobile").alias("sim_count"),
countDistinct("user").alias("unique_sim_user_count")
)
// Join the results back to the original DataFrame
val result = df
.join(mobileTypeGroup, Seq("user","mobile_type"), "left")
.join(simTypeGroup, Seq("user","sim_type"), "left")
In here, we have hr and user combined to primary key. In this example we have aggregated df on existing data and we are aggregating with common column(primary column) + 1 unique column each time. In example , I have shown only 2 iteration we are doing more than 30 times.
below is the resultant table
Whats the best way to do this operation?
Share Improve this question edited Feb 24 at 17:40 Chris 2,85114 silver badges15 bronze badges asked Feb 12 at 13:51 Ayush GoyalAyush Goyal 3929 silver badges28 bronze badges 6- 2 Can you add a sample input and related expected output ? We need at least 1 PK column, 2 or 3 a,b,c columns, and let's say, maybe 10 lines. Present that as input, and manually, create the output and show us. Because, I thinkg there is something wrong in the logic you want to achieve and I need to be sure to understand. – Steven Commented Feb 12 at 14:10
- 1 agree with Steven but I suspect what you are looking for is aggregating in a single pass of the data. If so I don't have a pyspark based solution but for scala Quality's aggregation functions cover that use case. – Chris Commented Feb 12 at 14:25
- @Steven, I have added input and output of data – Ayush Goyal Commented Feb 13 at 11:15
- are you using pyspark or spark in scala ? – Steven Commented Feb 13 at 15:20
- @Steven I am using Spark with Scala – Ayush Goyal Commented Feb 14 at 10:38
2 Answers
Reset to default 1perhaps not really an answer but this much text in a comment doesn't work well.
The original question and the current example are a little at odds, similarly the count distinct on users (also in the example) would always seem to be one as user is itself included in the group by.
If you just want counts on the non primary key columns (or other combinations) then:
group 1 = df.groupby(year,month,day,hr,ID).agg(count_distinct("a").alias("a_count"), count_distinct("b").alias("b_count"),...)
would seem to work, if you really want more like the second example where your grouping itself is always different then the join also doesn't make sense but although the groupings could be done in a single pass there just doesn't seem to be a sensible (at least not an obvious one to me) way to join them into a single dataset (only the primary keys are related).
Use map() to convert rows rows to map[String,String] with
map('a',a,'b',b,'c',c,...)
Use explode() to convert the map into key, value pairs
explode(map('a',a,'b',b,'c',c,...))
Finish with groupBy your key fields, key, value
Using your made up example, something like this could be what you are looking for. It generates a tall table with counts by columns and their values which you could pivot into rows.
val input: DataFrame = Seq(
(1, 1, "iphone4", "ios", "celluer1"),
(1, 2, "oppo", "android", "celluer2"),
(1, 3, "vivo", "android", "celluer2"),
(1, 4, "pixel", "android", "celluer3"),
(1, 5, "iphone6", "ios", "celluer3"),
(2, 3, "iphone4", "ios", "celluer1"),
(2, 3, "oppo", "ios", "celluer1"),
(2, 3, "vivo", "ios", "celluer4"),
(2, 1, "pixel", "android", "celluer3"),
(2, 1, "pixel", "android", "celluer3"),
(2, 2, "iphone6", "ios", "celluer4"),
(1, 1, "iphone4", "ios", "celluer1")
).toDF("hr", "user", "mobile", "mobile_type", "sim_type")
input.selectExpr("hr", "user",
"explode(map(" +
"'mobile',mobile," +
"'mobile_type',mobile_type," +
"'sim_type',sim_type" +
"))")
.groupBy("hr", "user", "key", "value").count()
.groupBy("hr", "key", "value")
.agg(
sum("count").alias("total_count"),
count("*").alias("unique_count")
)
.show(false)
本文标签: scalaSpark aggregate on different group of ColumnStack Overflow
版权声明:本文标题:scala - Spark aggregate on different group of Column - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1741594198a2387314.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论