So far we’ve written a basic word count pipeline and ran it using DirectRunner. We used some built-in transforms to process the data. In this post we’ll explore more about PTransforms .

Our previous pipeline looked like this:

graph LR Input>Input] --> A[PCollection] A --> B(PTransform) B --> C[PCollection] C --> D(..) D --> E[PCollection] E --> F[Output]

For each PCollection we applied a transform and got another collection. A PCollection cannot be mutated. Each transform must return a new collection. Let’s dive a bit more into PTransforms.

Transforms operate on the data and return something. The logic needs to be supplied by the developers. For each element of PCollection, the transform logic is applied. There are built-in transforms in Beam SDK. For example ParDo,GroupByKey, CoGroupByKey, Combine, Flatten, and Partition. ParDo and Combine are called general purpose transforms where as transforms that perform execute one or more composite transforms are called composite transforms.

In our previous example, we created an instance of SimpleFunction and passed it to the MapElements. There wasn’t a lot of code in there but it will get ugly pretty fast and will be a lot difficult to test individual transforms.

First Composite Transform

Let’s extract our the logic to count the words. Our new composite transform shall take a PCollection of string (i.e. a collection of lines) and produce a PCollection of key value pair (word:word_count).

package org.example;

import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;

import java.util.Arrays;
import java.util.List;

public class CountWords extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
    public static class ExtractWords extends SimpleFunction<String, List<String>> {
        @Override
        public List<String> apply(String input) {
            return Arrays.asList(input.split(" "));
        }
    }

    @Override
    public PCollection<KV<String, Long>> expand(PCollection<String> input) {
        return input
                .apply(MapElements.via(new ExtractWords()))
                .apply(Flatten.iterables())
                .apply(Count.perElement());
    }
}

We’ve created our own transform called CountWords. This is a composite transform that applies several other core transforms. To define our own transforms, we need to inherit from PTransform class specifying the types of input collection and output collection. In this case we want to take a collection of strings and produce a collection of key-value pairs where key is a string and value is a long. Hence we extend PTransform as PTransform<PCollection<String>, PCollection<KV<String, Long>>>.

Remember a transform will take in one or more collections as input. And we need to apply our transformation logic to this input. This happens in expand function which we need to override. We can access the input collection in this function. Like before, we use the apply function of PCollection object to apply the transformation. In this case, we first apply MapElements transform. This transform expects an instance of SimpleFunction. To separate our logic, we have defined a separate class called ExtractWords that extends SimpleFunction. ExtractWords needs to override apply function where the input parameter contains an element of the PCollection. MapElements transform is quite convenient where we want to produce exactly one output for an input. But it cannot be used where we might want to produce arbitrary number of outputs from a single input. For e.g. there might be cases where we need to discard an element based on some criteria so that it won’t be in the output collection. We’ll look at how to do this later in this series. For now, change the code in App.java as shown below and run it.

public static void main(String... args) {
    PipelineOptions options = PipelineOptionsFactory.create();
    Pipeline pipeline = Pipeline.create(options);

    pipeline
            .apply(TextIO.read().from("./file1.txt"))
            .apply(new CountWords()) // we replaced old transforms with this one
            .apply(MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
                @Override
                public String apply(KV<String, Long> input) {
                    return String.format("%s: %d", input.getKey(), input.getValue());
                }
            }))
            .apply(TextIO.write().to("output"))
    ;

    // run the pipeline and wait until the whole pipeline is finished
    pipeline.run().waitUntilFinish();
}

Looks nice already. But we can simplify this even further. If we’re using Java >= 1.8, then we can use lambda functions to further reduce the boilerplate.

In the main function, we are formatting the KV pair as a string and is just a line a code but the boilerplate is too much. You can completely replace that with

.apply(MapElements.into(TypeDescriptors.strings()).via(kv -> String.format("%s: %d", kv.getKey(), kv.getValue())))

To use lambda we need to first tell MapElements what will our mapper return using into function. It accepts a TypeDescriptor object and Beam provides descriptors for standard data types like string, list, map etc. in TypeDescriptors. To create a descriptor for user defined classes use TypeDescriptor.of function as follows

.apply(MapElements.into(TypeDescriptor.of(String.class)).via(kv -> String.format("%s: %d", kv.getKey(), kv.getValue())))

Similarly in CountWords class, remove the ExtractWords class and use the following

public class CountWords extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
    @Override
    public PCollection<KV<String, Long>> expand(PCollection<String> input) {
        return input
                .apply(MapElements.into(TypeDescriptors.lists(TypeDescriptors.strings())).via(s -> Arrays.asList(s.split(" "))))
                .apply(Flatten.iterables())
                .apply(Count.perElement())
                ;
    }
}

Looks a lot better and shorter than previous version. Of course, you shouldn’t misuse lambdas to write overly complicated logic. In that case using a dedicated class would be much better and maintainable. Note that since Java does not allow us to pass List<String>.class as an argument, we need to use TypeDescriptors.list and pass TypeDescriptors.strings() to it to tell MapElements that our mapper will return a List<String> for each input string.

Conclusion

We briefly went through the basics of transforms and how to write our own transforms. So far we’ve created our own composite transform, simple functions to map each element in a collection to a different value, use TypeDescriptors to tell MapElements about the return values of our mapper when using lambda function.

In the next post, we’ll go through ParDo and DoFn which, unlike MapElements, allow us to produce zero or more outputs from a single input.

Categories:

Updated:

Leave a comment