admin管理员组文章数量:1388146
I have the following data below:
+-------+----------+------------+---------+---------------------+-----------+
|id |resource id|resource name|event-desc|event-date |ipaddress1 |
+-------+----------+------------+---------+---------------------+-----------+
|2010001|119 |Netopia |logout |+56975-05-07 23:01:37|25:34:21:44|
|2010001|119 |Netopia |login |+56975-05-07 23:01:37|25:34:21:44|
|2010001|119 |Netopia |logout |+56975-05-07 23:01:37|25:34:21:45|
|2010001|119 |Netopia |login |+56975-05-07 23:01:37|25:34:21:45|
|2010001|119 |Netopia |logout |+56975-05-07 23:01:37|25:34:21:44|
|2010001|119 |Netopia |logout |+56975-05-07 23:01:37|25:34:21:46|
+-------+----------+------------+---------+---------------------+-----------+
I need to apply multiple filters to same dataset and pass to Kafka.
Dataset<Row> mainData=df.select( "data.*").filter("data.eventdesc='logout'");
Send the filtered data to kafka, again there will be another filter condition.
mainData=df.select( "data.*").filter("data.eventdesc='login'");
send the data to kafka.
In this case only the last expression is working. Any suggestions on how I can filter data for both of the conditions and send to kafka?
I have the following data below:
+-------+----------+------------+---------+---------------------+-----------+
|id |resource id|resource name|event-desc|event-date |ipaddress1 |
+-------+----------+------------+---------+---------------------+-----------+
|2010001|119 |Netopia |logout |+56975-05-07 23:01:37|25:34:21:44|
|2010001|119 |Netopia |login |+56975-05-07 23:01:37|25:34:21:44|
|2010001|119 |Netopia |logout |+56975-05-07 23:01:37|25:34:21:45|
|2010001|119 |Netopia |login |+56975-05-07 23:01:37|25:34:21:45|
|2010001|119 |Netopia |logout |+56975-05-07 23:01:37|25:34:21:44|
|2010001|119 |Netopia |logout |+56975-05-07 23:01:37|25:34:21:46|
+-------+----------+------------+---------+---------------------+-----------+
I need to apply multiple filters to same dataset and pass to Kafka.
Dataset<Row> mainData=df.select( "data.*").filter("data.eventdesc='logout'");
Send the filtered data to kafka, again there will be another filter condition.
mainData=df.select( "data.*").filter("data.eventdesc='login'");
send the data to kafka.
In this case only the last expression is working. Any suggestions on how I can filter data for both of the conditions and send to kafka?
Share Improve this question edited Mar 19 at 4:47 Santosh Shimpi asked Mar 18 at 4:56 Santosh ShimpiSantosh Shimpi 134 bronze badges2 Answers
Reset to default 0Based on this answer.
Dataset<Row> mainData=df.select( "data.*").filter("data.eventdesc='logout' or data.eventdesc='login'")
the variable (mainData
) is reassigned and thus reference to a new object, I'd like to recommend below approach, if you have complex filter logic.
Dataset<Row> data = df.select("data.*")
.filter(functions.col("data.eventdesc").equalTo("logout")
.or(functions.col("data.eventdesc").equalTo("login")));
and, for passing a dataset to Kafka topics, you would need to import the related jar with proper version selected first, as below ( not for streaming data ). Hope it is helpful for you.
<dependency>
<groupId>.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<version>3.5.x</version>
</dependency>
data.selectExpr("value").write()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "login-logout-events")
.save();
本文标签: Spark Java Structured Streaming filtersStack Overflow
版权声明:本文标题:Spark Java Structured Streaming filters - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1744525106a2610698.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论