As a back-end developer, you are often asked to build some structured orchestration of tasks such as microservices orchestrations, distributed transactions, data pipelines or some business logic. In all those examples you must ensure that the different types of tasks are processed according to a given scenario. This scenario can be simple such as sequential tasks, or a lot more complex with scenarios depending on time, with a complex decision diagram, or depending on external signals for example.
As soon as you are facing moderately complex workflows, you may be tempted to use a DSL, coupled with a dedicated engine to operate those workflows, such as Airflow, Luigi or Conductor.
In this article, I will show how a programming language can be used as a DSL by a “workflow as code” engine, and why it’s probably your best long-term option for building reliable automation at any scale.
Even if you can benefit from a “workflow as code” engine in simple infrastructure, I am considering the more complex situation where you have distributed stateless workers in different locations. You are able to distribute tasks to those workers through some mechanisms (for example a queuing system) — each task being usually described by their name and some data.
The problem we try to solve is: how to orchestrate the processing of those tasks on different servers — in a way both reliable, scalable, and easy to manage and monitor?
Let’s consider the sequential workflow below as an example :
Using Infinitic, this workflow could be coded like this:
Let’s look at it line by line:
Workflow
interface that provides useful functions here (see below)ImageUtil
and we do not need to know one for coding the workflow. This interface actually declares how tasks can be called in workers. Here we declare that workers will receive a task defined by a name ImageUtil
. This task can be call with 3 methods: download
with a string as input, returns an array of bytes — resize
with a array of bytes and an integer as input, returns an array of bytes — and upload
with an array of bytes as input, returns a stringWorkflow
provides the useful function proxy
used belowcontext
variable is injected when processing by workers. It contains the workflow history. I’ll explain later how it is used.proxy
function builds a pseudo instance of the provided interface. This pseudo instance is used below to capture which methods are actually applied as the workflow is processed.ImageUtil
with method download
and input imageUrl
. Then process the task ImageUtil
with method resize
using the output of previous task as first parameter and the provided size as second parameter. Then process the task ImageUtil
with method upload
using output provided by the last task as input. Returns the last output as the output of the workflow.”As you can see, using code provides a very concise and efficient way to describe this workflow. As additional benefits, the description of your workflow:
To make it work, Infinitic uses the following architecture:
As you can see, around a transport layer, we have:
Serialization: as data used within a workflow instance flow between workers, it is serialized and deserialized along the way. Infinitic does it automatically and transport/store it into an avro format that guarantees durability and evolutivity. This format also lets us have workers processing tasks in different programming languages than java/kotlin. We provide examples in node.js.
Scalability: all stateless workers can easily be scaled horizontally. For the engines, horizontal scalability can also be ensured as long as all messages related to a specific workflow instance is handled in time by the same engine worker. For example, using Pulsar this constraint is solved using a Key-Shared subscription (the key being the id of a workflow instance). In case of complete failure of an engine worker, it can be stopped and the corresponding traffic will be automatically redistributed to remaining workers.
At Infinitic we are working on an off-the-shelf integration with Apache Pulsar.
The exciting benefits of using such a “workflow as code” pattern is that once your infrastructure is set up you can use it for whatever workflows you need to implement as shown below
Workflow can trigger child-workflows as easily as tasks. For example, this (inefficient) workflow calculates n! In a distributed way using a classical recursion algorithm. At the last level (n=1), we have a workflow without task directly returning 1
If you do not care about a task output, you may want to process it asynchronously, i.e. without stopping the workflow’s flow. Infinitic provides a syntax for that:
// T being method’s return typeval d: Deferred<T> = async(myTask) { method(args) }
This is triggering myTask.method(args)
but without interrupting workflow’s flow. The same syntax can be used to trigger a child-workflow asynchronously. Please have a look at the “Using Deferred” section below to learn what you can do with the provided Deferred instance.
As an example, if you want to send you an email when above CropImage
workflow starts and completes, you can do it like so:
In more complex cases, you may want to run more than one task asynchronously, but a complete series of tasks. The async
function can also be used for that:
// T being the return type of … instructionsval d: Deferred<T> = async { … }
For example, the following code will resize an image to all sizes from sizeMin
to sizeMax
, all branches being processed in parallel:
When using the async
function you get a Deferred<T>
instance, T
being the return type you’ll get once the task (or child workflow) is completed.
This Deferred is a reference that can be used with its 3 methods:
// trigger a task asynchronously, get immediately a Deferred
async(myTask) { method(args) }// returns the deferred status (ongoing, completed, canceled)val status: DeferredStatus = d.status()// waits for completion (or cancellation) and returns itselfd.await()// waits for completion (or cancellation) and returns resultval result: T = d.result()
val d: Deferred<T> =
But more importantly, Deferred can be logically combined:
(d1 or d2 or d3).result()
will wait for at least one deferred to complete and returns its result(d1 and d2 and d3).result()
will wait for all deferred to complete and returns a list of results(d1 or (d2 and d3)).result()
will wait for completion of d1 or (d2 and d3) and will return the result of d1 or a list of d2 and d3 results depending on what completes first.Any logical combination is possible. for example, it can be used to easily code a fan-out of asynchronous branches (line 32 below):
This article is an introduction to the “workflow as code” pattern built-in in Infinitic. More features will quickly be added soon before the first release:
As you may start to realize, this pattern provides a very generic way to code and maintain any workflows within your infrastructure, and at the same time have them defined on versioned files as well as automatically monitored.
Actually, this approach is so versatile that even if you need to use and define your own DSL (let’s say in a Json or a BPM format) you can use this engine to actually process it. The only thing you have to do is to code a workflow that takes this definition file as input and convert it to actual tasks to run.
If interested to have a deeper understanding on how it works, please look at under the hood of an event-driven "workflow as code" engine.