Error handling Apache Beam,
and Asgarde Library
By Mazlum TOSUN
About me
Mazlum TOSUN
Head of data and co founder at
Tech lead GCP and data
Passionate about Google Cloud, data, craft and functional programming
Fan
https://github.com/tosun-si
https://twitter.com/MazlumTosun3 @MazlumTosun3
https://www.linkedin.com/in/mazlum-tosun-900b1812/
Asgarde
Library allows simplifying error handling with Apache Beam
Proposed for Apache Beam Java and Python
A Wrapper is also proposed in Kotlin
Beam Java : native error handling ParDo and DoFn
Beam recommends treating errors with a dead letter queue
It means catching errors in the flow and, using side outputs, sinking errors to
a file, database or any other output…
Beam suggests handling side outputs with TupleTags in a DoFn class,
example :
Beam Java : native error handling ParDo and DoFn
Beam Java : native error handling ParDo and DoFn
With this approach we can, in all steps, get the output and failures result PCollections.
Beam Java : native error handling MapElements
FlatMapElement
Beam also allows handling errors with built-in components like MapElements
and FlatMapElements
Comparison between approaches : usual Beam
pipeline
In a usual Beam pipeline flow, steps are chained fluently:
Comparison between approaches : Usual Beam
pipeline with error handling
Here's the same flow with error handling in each step:
Comparison between approaches : Usual Beam
pipeline with error handling
Problems with this approach:
We loose the native fluent style on apply chains, because we have to handle
output and error for each step.
For MapElements and FlatMapElements we have to always add
exceptionsInto and exceptionsVia (can be centralized in a wrapper class).
For each custom DoFn, we have to duplicate the code of TupleTag logic and
the try catch block (can be centralized in a wrapper class).
The code is verbose.
There is no centralized code to concat all the errors, we have to concat all
failures (can be centralized in a wrapper class).
Comparison between approaches : Usual Beam
pipeline with error handling using Asgarde Java
Here's the same flow with error handling, but using Asgarde library instead:
Comparison between approaches : Usual Beam
pipeline with error handling using Asgarde
Purpose of the library:
Wrap all error handling logic in a composer class.
Wrap exceptionsInto and exceptionsVia usage in the native Beam classes MapElements and
FlatMapElements.
Keep the fluent style natively proposed by Beam in apply methods while checking for failures
and offer a less verbose way of handling errors.
Expose custom DoFn classes with centralized try/catch blocks and Tuple tags.
Expose an easier access to the @Setup, @StartBundle, @FinishBundle, @Teardown steps of
DoFn classes, while error handling.
Allow to concat all the failures occurred in the flow.
Expose a way to handle errors in filtering logic (currently not available with Beam's Filter.by).
Comparison between approaches : Usual Beam
pipeline with error handling using Asgarde Kotlin
Extensions are proposed to use Asgarde with Kotlin:
Pipeline example native error handling Python
Pipeline example Asgarde Python
Error handling code demo real
application
Links to example projects
https://github.com/tosun-si/teams-league-java-dlq-native-beam-summit
https://github.com/tosun-si/teams-league-java-dlq-asgarde-beam-summit
https://github.com/tosun-si/teams-league-kotlin-dlq-asgarde-beam-summit
https://github.com/tosun-si/teams-league-python-dlq-native-beam-summit
https://github.com/tosun-si/teams-league-python-dlq-asgarde-beam-summit
Links of Asgarde projects
https://github.com/tosun-si/asgarde
https://github.com/tosun-si/pasgarde
https://twitter.com/AsgardeBeam
https://www.linkedin.com/company/asgardebeam
Help us, contributing to the projects and supporting us with Github
stars
Thank you :)