admin管理员组

文章数量:1384788

Apache Beam Java sdk version: 2.63.0 Spark version: 3.5.4

I'm running ParDo on a CoGrouped PCollection and expecting to retrieve multiple outputs using withOutputTags.

When looking at Spark DAG Visualization, I see that that the ParDo step is invoked multiple times (1 per each output).

I'm wondering if ParDo on CoGbkResult with multiple outputs can run with a single pass.

Couldn't find Any ref for it online nor on beam's github.

Code:

PCollection<KV<Long, CustomObject1>> collection1 = ... 
PCollection<KV<Long, CustomObject2>> collection2 = ... 


PCollection<KV<Long, CoGbkResult>> grouped = KeyedPCollectionTuple
     .of(TAG1, collection1)
     .and(TAG2, collection2)
     .apply("CoGroupById", CoGroupByKey.create());

PCollectionTuple outputDatasets = grouped.apply(ParDo.of(new DoFn<KV<Long, CoGbkResult>, KV<Long, OutputObject>>() {
            @ProcessElement
            public void processElement(@Element KV<Long, CoGbkResult> element, ProcessContext c) {

                  Result result = custom_method(); 
                  if (result.getScore() < 0) 
                   context.output(KV.of(obj.getId(), obj));
                  else 
                     context.output(TAG_B, KV.of(obj.getId(), obj)); 
                  
                  // TAG_ADDITIONAL expects different object 
                  context.output(TAG_ADDITIONAL, result.getAdditional() )); 

            }
        }).withOutputTags(TAG_A, TupleTagList.of(TAG_B, TAG_ADDITIONAL)));

// Processing TAG_A, TAG_B, TAG_ADDITIONAL and write them to the disc. 


  • Tried to change the way the output is written to the disc from writeDynamic to simple TextIO.write() but it didn't help.

  • Under Accumulators, Beam.Metrics imply that my custom code ran multiple times.

  • Event timeline implies the same things (separate jobs that took ~same amount of time).

Am I missing something?

本文标签: Apache Beam with Spark Runner ParDo invoked multiple times when using withOutputTagsStack Overflow