ParDo is a general purpose transform for parallel processing. It is quite flexible and allows you to perform common data processing tasks. Unlike MapElements transform where it produces exactly one output for each input element of a collection, ParDo gives us a lot of flexibility so that we can return zero or more output for each input element in a collection.

ParDo

Let’s modify our previous word count example to use ParDo transform to produce a collection where only the words with count > 5 are present. Words with smaller frequency will be discarded. We can’t do this with MapElements.

public static void main(String... args) {
    PipelineOptions options = PipelineOptionsFactory.create();
    Pipeline pipeline = Pipeline.create(options);

    pipeline
        .apply(TextIO.read().from("./file1.txt"))
        .apply(new CountWords())
        .apply(ParDo.of(new DoFn<KV<String, Long>, KV<String, Long>>() {
            @ProcessElement
            public void processElement(@Element KV<String, Long> input, OutputReceiver<KV<String, Long>> outputReceiver) {
                if(input.getValue() > 5) {
                    outputReceiver.output(input);
                }
            }
        }))
        .apply(MapElements.into(TypeDescriptor.of(String.class)).via(kv -> String.format("%s: %d", kv.getKey(), kv.getValue())))
        .apply(TextIO.write().to("output"))
    ;
    // run the pipeline and wait until the whole pipeline is finished
    pipeline.run().waitUntilFinish();
}

We added a ParDo transform to discard words with counts <= 5. To apply a ParDo, we need to provide the user code in the form of DoFn. A DoFn should specify the type of input element and type of output element. In this case, both input and output have the same type.

Our user code will go inside a function annotated with @ProcessElement. A function annotated with @ProcessElement will be executed for each element in the input collection. To define what the input element is, we annotate the parameter with @Element. To emit an output, we also need to specify an OutputReceiver. We can discard the it if we don’t plan to return any values e.g. when we just plan to write the elements to a database. This is not the only signature for a function annotated with @ProcessElement. Later on we’ll see how to produce multiple PCollections as output and how to accept side inputs (additional input other than the element currently being processed). In our user code, we check if the value of input KV i.e. word count, is greater than 5 and emit if it is true using outputReceiver.output(...).

If you run the code, only the words the count > 5 will be written. I got the following output

-: 10
to: 8
is: 9
`PCollection`: 7
of: 6
we: 6
the: 10
a: 7
data: 6

This is just an example of using ParDo and DoFn to filter the elements. Beam already provides a Filter transform that is very convenient and you should prefer it. For completeness, here is how you would do the same thing using Filter.

pipeline
    .apply(TextIO.read().from("./file1.txt"))
    .apply(new CountWords())
    .apply(Filter.by(kv -> kv.getValue() > 5))

Producing Multiple Outputs

Side Inputs

Conclusion

Categories:

Updated:

Leave a comment