Creating Distributed Map-Reduce Job via AWS Step Function

Recently, I've been interested in generative art. One of my projects was poésie concrète, which automatically regenerates itself each Sunday, highlighting recent Russian war crimes. To achieve this, it does the following steps:

  1. Crawl search engine for recent results
  2. Obtain the content of each article
  3. Analyze each article with ML-model to extract most relevant parts
  4. Reduce these parts into a single document that serves as a web page

The high-level diagram looks as follows:

High level diagram

The entire process is launched by EventBridge Scheduler which launches the chain of Lambdas each following its own responsibility: - Crawling the search engine - Extract article content from a web page - Analyze the sentiment of each sentence inside the article - Assemble the poem from the sentences with the strongest sentiment and put it in S3 bucket that is served to the client via CloudFront.

There is much advice on how to synchronously call one lambda from another over the internet.AWS Documentation, however, is more prohibitive on that matter, and for a good reason:

While this synchronous flow may work within a single application on a server, it introduces several avoidable problems in a distributed serverless architecture:

  • Cost: with Lambda, you pay for the duration of an invocation. In this example, while the Create invoice functions runs, two other functions are also running in a wait state, shown in red on the diagram.
  • Error handling: in nested invocations, error handling can become much more complex. Either errors are thrown to parent functions to handle at the top-level function, or functions require custom handling. For example, an error in the Create invoice might require the Process payment function to reverse the charge, or it may instead retry the Create invoice process.
  • Tight coupling: processing a payment typically takes longer than creating an invoice. In this model, the availability of the entire workflow is limited by the slowest function.
  • Scaling: the concurrency of all three functions must be equal. In a busy system, this uses more concurrency than would otherwise be needed.

One of the alternatives is to use AWS Step Functions to orchestrate the execution of the lambdas. It turns out that my problem is a nice example of a distributed map. Consider: - We extract all necessary links - We map each link in parallel by extracting its content and analyzing its sentiment. - We reduce it into a single S3 bucket.

Let's do the mapping part: crawl the search engine and consequently extract the content of each article. The AWS Step Function definition will be as follows.

{
  "Comment": "A Step Functions workflow that processes an array of strings concurrently",
  "StartAt": "Extract links from google",
  "States": {
    "Extract links from google": {
      "Type": "Task",
      "Resource": "<google crawler arn>",
      "ResultPath": "$",
      "Next": "ProcessArray"
    },
    "ProcessArray": {
      "Type": "Map",
      "ItemsPath": "$",
      "MaxConcurrency": 10,
      "Iterator": {
        "StartAt": "Extract article content",
        "States": {
          "Extract article content": {
            "Type": "Task",
            "Resource": "<article extractor arn>",
            "InputPath": "$",
            "End": true
          }
        }
      }
    }
  }
}

Here, we enter into the map phase executing step with "Type": "Map". Inside the iterator we iterate each item of the array with Article content extractor lambda. We do this concurrently and we are limited by MaxConcurrency option.

Now, let's expand our iterator by analyzing article sentiment via calling Sentiment analyzer lambda.

{
  "Comment": "A Step Functions workflow that processes an array of strings concurrently",
  "StartAt": "Extract links from google",
  "States": {
    "Extract links from google": {
      "Type": "Task",
      "Resource": "<google crawler arn>",
      "ResultPath": "$",
      "Next": "ProcessArray"
    },
    "ProcessArray": {
      "Type": "Map",
      "ItemsPath": "$",
      "MaxConcurrency": 10,
      "Iterator": {
        "StartAt": "Extract article content",
        "States": {
          "Extract article content": {
            "Type": "Task",
            "Resource": "<article extractor arn>",
            "InputPath": "$",
            "Next": "Analyze sentiment",
          },
          "Analyze sentiment": {
            "Type": "Task",
            "Resource": "<sentiment analyzer arn>",
            "InputPath": "$",
            "End": true
          }
        }
      }
	}
  }
}

As you can see, Sentiment analyzer now became a part of the Iterator and is passed into the "Next" property of Article extractor.

Now, we need to enter into the reducer phase.

{
  "Comment": "A Step Functions workflow that processes an array of strings concurrently",
  "StartAt": "Extract links from google",
  "States": {
    "Extract links from google": {
      "Type": "Task",
      "Resource": "<google crawler arn>",
      "ResultPath": "$",
      "Next": "ProcessArray"
    },
    "ProcessArray": {
      "Type": "Map",
      "ItemsPath": "$",
      "MaxConcurrency": 10,
      "Iterator": {
        "StartAt": "Extract article content",
        "States": {
          "Extract article content": {
            "Type": "Task",
            "Resource": "<article extractor arn>",
            "InputPath": "$",
            "Next": "Analyze sentiment"
            ],
            "Catch": [
              {
                "ErrorEquals": [
                  "States.ALL"
                ],
                "Next": "Analyze sentiment"
              }
            ]
          },
          "Analyze sentiment": {
            "Type": "Task",
            "Resource": "<sentiment analyzer arn>",
            "InputPath": "$",
            "End": true
          }
        }
      },
      "Next": "Reducer"
    },
    "Reducer": {
      "Type": "Task",
      "Resource": "<reducer arn>",
      "InputPath": "$",
      "ResultPath": "$",
      "End": true
    }
  }
}

If you look at the Analyze sentiment definition, you might notice that the property "End" is set to true, which means that this is the end of the iterator. However, the iterator itself points to the Reducer with "Next": "Reducer".

Since the Reducer accepts input from all of the items, its code looks as below.

export const handler = async (data) => {
  data.flat()
      .filter(_ => _ != null)
      .sort((a, b) =>
          calculateSentiment(b.SentimentValue, b.Magnitude) - calculateSentiment(a.SentimentValue, a.Magnitude))
      .slice(0, sentenceCount)
      .map(item => item.Text)
    //more code
}

Here, we accept an array of array to flatten it. Then, we iterate items of the flattened array and arrange them based on the scores provided by the previous lambda.

Are we done yet? Not so, since things in the real world are prone to failure. So we have to consider this as well in our definition. The most obvious way is adding retries via.

"Retry": [
  {
    "ErrorEquals": [
      "States.ALL"
    ],
    "IntervalSeconds": 1,
    "MaxAttempts": 2,
    "BackoffRate": 2
  }
]

We also use another tactic - letting a single map instance fail instead of failing the entire map phase.

"Catch": [
  {
    "ErrorEquals": [
      "States.ALL"
    ],
    "Next": "Analyze sentiment"
  }
]

With this, our complete Step Function definition will look as below.

{
  "Comment": "A Step Functions workflow that processes an array of strings concurrently",
  "StartAt": "Extract links from google",
  "States": {
    "Extract links from google": {
      "Type": "Task",
      "Resource": "<google crawler arn>",
      "ResultPath": "$",
      "Next": "ProcessArray",
      "Retry": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "IntervalSeconds": 1,
          "MaxAttempts": 2,
          "BackoffRate": 2
        }
      ]
    },
    "ProcessArray": {
      "Type": "Map",
      "ItemsPath": "$",
      "MaxConcurrency": 10,
      "Iterator": {
        "StartAt": "Extract article content",
        "States": {
          "Extract article content": {
            "Type": "Task",
            "Resource": "<article extractor arn>",
            "InputPath": "$",
            "Next": "Analyze sentiment",
            "Retry": [
              {
                "ErrorEquals": [
                  "States.ALL"
                ],
                "IntervalSeconds": 1,
                "MaxAttempts": 2,
                "BackoffRate": 2
              }
            ],
            "Catch": [
              {
                "ErrorEquals": [
                  "States.ALL"
                ],
                "Next": "Analyze sentiment"
              }
            ]
          },
          "Analyze sentiment": {
            "Type": "Task",
            "Resource": "<sentiment analyzer arn>",
            "InputPath": "$",
            "End": true,
            "Retry": [
              {
                "ErrorEquals": [
                  "States.ALL"
                ],
                "IntervalSeconds": 1,
                "MaxAttempts": 2,
                "BackoffRate": 2
              }
            ]
          }
        }
      },
      "Next": "Reducer",
      "Retry": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "IntervalSeconds": 1,
          "MaxAttempts": 2,
          "BackoffRate": 2
        }
      ],
      "Catch": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "Reducer"
        }
      ]
    },
    "Reducer": {
      "Type": "Task",
      "Resource": "<reducer arn>",
      "InputPath": "$",
      "ResultPath": "$",
      "End": true,
      "Retry": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "IntervalSeconds": 1,
          "MaxAttempts": 2,
          "BackoffRate": 2
        }
      ]
    }
  }
}