admin管理员组文章数量:1123852
I need to create output to 4 different files in my Apache Beam pipeline. I'll use a very simplified case to protect the underlying work, but the overall structure here needs to be preserved. What I would like to happen is to take an unbounded collection from a Kafka topic and publish it to 4 different files. The unbounded collection will be windowed by having a minimum gap duration between bursts of data by 30 minutes. For each windowed set of data, I'd like one file to contain all entries greater than 100, we'll call that file greater.dat, one file to contain all entries less than or equal to 100, we'll call that file less.dat and then a summary file that for each greater.summ and less.sum that contain the number of entries in each. For the following stream of data, this is what i'd expect
... greater than 30 minutes since last data
10
20
120
110
30
40
... 30 minutes elapse
greater.dat
120
110
less.dat
10
20
30
40
greater.sum
2
less.sum
4
This is what I have so far
public class TupleTagContainer {
public static final TupleTag<Integer> greaterThan
= new TupleTag<Integer>() {};
public static final TupleTag<Integer> lessThan
= new TupleTag<Integer>() {};
}
public class LTGTSplitter extends DoFn<Integer, Integer> {
@ProcessElement
public void processElement (
@Element Integer elem,
MultiOutputReceiver out
) {
Integer copy = new Integer(elem);
if (copy <= 100) {
out.get(TupleTagContainer.lessThan).output(copy);
}
else {
out.get(TupleTagContainer.greaterThan).output(copy);
}
}
}
var splitRes = pipeline
.apply("Read From Kafka", ...)
.apply("SplitLTGTTransform", ParDo.of(new LTGTSplitter())
.withOutputTags(TupleTagContainer.greaterThan, TupleTagList.of(TupleTagContainer.lessThan))
);
var lessThanCollection = splitRes.get(TupleTagContainer.lessThan);
var greaterThanCollection = splitRes.get(TupleTagContainer.greaterThan);
lessThanCollection.apply("ApplyWindowing", Window.into(Sessions.withGapDuration(Duration.standardMinutes(30)))
.triggering(AfterWatermark.pastEndOfWindow())
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes())
.apply("Write To LT File", TextIO.write()
.withWindowedWrites()
.withNumShards(1));
greaterThanCollection.apply("ApplyWindowing", Window.into(Sessions.withGapDuration(Duration.standardMinutes(30)))
.triggering(AfterWatermark.pastEndOfWindow())
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes())
.apply("Write To GT File", TextIO.write()
.withWindowedWrites()
.withNumShards(1));
Glossing over the details of the file name itself and the fact that each subsequent firing will overwrite the file name, I'm left after the above code with a PDone which I can't then apply a subsequent TextIO write to write the less.dat and greater.dat files. How do I do this?
本文标签: javaOutput PCollections to multiple files in Apache BeamStack Overflow
版权声明:本文标题:java - Output PCollections to multiple files in Apache Beam - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1736599661a1945193.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论