admin管理员组

文章数量:1123852

I need to create output to 4 different files in my Apache Beam pipeline. I'll use a very simplified case to protect the underlying work, but the overall structure here needs to be preserved. What I would like to happen is to take an unbounded collection from a Kafka topic and publish it to 4 different files. The unbounded collection will be windowed by having a minimum gap duration between bursts of data by 30 minutes. For each windowed set of data, I'd like one file to contain all entries greater than 100, we'll call that file greater.dat, one file to contain all entries less than or equal to 100, we'll call that file less.dat and then a summary file that for each greater.summ and less.sum that contain the number of entries in each. For the following stream of data, this is what i'd expect

... greater than 30 minutes since last data
10
20
120
110
30
40
... 30 minutes elapse

greater.dat 
120
110

less.dat
10
20
30
40

greater.sum
2

less.sum
4

This is what I have so far


public class TupleTagContainer {
    public static final TupleTag<Integer> greaterThan
        = new TupleTag<Integer>() {};

    public static final TupleTag<Integer> lessThan
        = new TupleTag<Integer>() {};
}

public class LTGTSplitter extends DoFn<Integer, Integer> {
    @ProcessElement
    public void processElement (
        @Element Integer elem,
        MultiOutputReceiver out
    ) {
        Integer copy = new Integer(elem);
        if (copy <= 100) {
            out.get(TupleTagContainer.lessThan).output(copy);
        }
        else {
            out.get(TupleTagContainer.greaterThan).output(copy);
        }
    }
}

var splitRes = pipeline
    .apply("Read From Kafka", ...)
    .apply("SplitLTGTTransform", ParDo.of(new LTGTSplitter())
        .withOutputTags(TupleTagContainer.greaterThan, TupleTagList.of(TupleTagContainer.lessThan))
    );

var lessThanCollection = splitRes.get(TupleTagContainer.lessThan);
var greaterThanCollection = splitRes.get(TupleTagContainer.greaterThan);

lessThanCollection.apply("ApplyWindowing", Window.into(Sessions.withGapDuration(Duration.standardMinutes(30)))
    .triggering(AfterWatermark.pastEndOfWindow())
    .withAllowedLateness(Duration.ZERO)
    .discardingFiredPanes())
    .apply("Write To LT File", TextIO.write()
        .withWindowedWrites()
        .withNumShards(1));

greaterThanCollection.apply("ApplyWindowing", Window.into(Sessions.withGapDuration(Duration.standardMinutes(30)))
    .triggering(AfterWatermark.pastEndOfWindow())
    .withAllowedLateness(Duration.ZERO)
    .discardingFiredPanes())
    .apply("Write To GT File", TextIO.write()
        .withWindowedWrites()
        .withNumShards(1));

Glossing over the details of the file name itself and the fact that each subsequent firing will overwrite the file name, I'm left after the above code with a PDone which I can't then apply a subsequent TextIO write to write the less.dat and greater.dat files. How do I do this?

本文标签: javaOutput PCollections to multiple files in Apache BeamStack Overflow