This article is Part 3 in a 3-Part Apache Beam Tutorial Series.
- Part 1 - Apache Beam Tutorial Series - Introduction
- Part 2 - Apache Beam Tutorial - PTransforms
- Part 3 - > Apache Beam Transforms: ParDo
ParDo
is a general purpose transform for parallel processing. It is quite flexible and allows you to perform common data processing tasks. Unlike MapElements
transform where it produces exactly one output for each input element of a collection, ParDo
gives us a lot of flexibility so that we can return zero or more output for each input element in a collection.
ParDo
Let’s modify our previous word count example to use ParDo
transform to produce a collection where only the words with count > 5 are present. Words with smaller frequency will be discarded. We can’t do this with MapElements
.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static void main(String... args) {
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply(TextIO.read().from("./file1.txt"))
.apply(new CountWords())
.apply(ParDo.of(new DoFn<KV<String, Long>, KV<String, Long>>() {
@ProcessElement
public void processElement(@Element KV<String, Long> input, OutputReceiver<KV<String, Long>> outputReceiver) {
if(input.getValue() > 5) {
outputReceiver.output(input);
}
}
}))
.apply(MapElements.into(TypeDescriptor.of(String.class)).via(kv -> String.format("%s: %d", kv.getKey(), kv.getValue())))
.apply(TextIO.write().to("output"))
;
// run the pipeline and wait until the whole pipeline is finished
pipeline.run().waitUntilFinish();
}
We added a ParDo
transform to discard words with counts <= 5. To apply a ParDo
, we need to provide the user code in the form of DoFn
. A DoFn
should specify the type of input element and type of output element. In this case, both input and output have the same type.
Our user code will go inside a function annotated with @ProcessElement
. A function annotated with @ProcessElement
will be executed for each element in the input collection. To define what the input element is, we annotate the parameter with @Element
. To emit an output, we also need to specify an OutputReceiver
. We can discard the it if we don’t plan to return any values e.g. when we just plan to write the elements to a database. This is not the only signature for a function annotated with @ProcessElement
. Later on we’ll see how to produce multiple PCollection
s as output and how to accept side inputs (additional input other than the element currently being processed).
In our user code, we check if the value of input KV
i.e. word count, is greater than 5 and emit if it is true using outputReceiver.output(...)
.
If you run the code, only the words the count > 5 will be written. I got the following output
1
2
3
4
5
6
7
8
9
-: 10
to: 8
is: 9
`PCollection`: 7
of: 6
we: 6
the: 10
a: 7
data: 6
This is just an example of using ParDo
and DoFn
to filter the elements. Beam already provides a Filter
transform that is very convenient and you should prefer it. For completeness, here is how you would do the same thing using Filter
.
1
2
3
4
pipeline
.apply(TextIO.read().from("./file1.txt"))
.apply(new CountWords())
.apply(Filter.by(kv -> kv.getValue() > 5))
Producing Multiple Outputs
Side Inputs
Conclusion
This article is Part 3 in a 3-Part Apache Beam Tutorial Series.
- Part 1 - Apache Beam Tutorial Series - Introduction
- Part 2 - Apache Beam Tutorial - PTransforms
- Part 3 - > Apache Beam Transforms: ParDo
Comments