October 28, 2024

Introducing Batch Processing (v 0.16.0)

Introducing Batch Processing (v 0.16.0)

The release of Infinitic 0.16.0 introduces Batch Processing, a feature designed to optimize high-volume tasks by processing them in bulk.

Why Batch Processing Matters

Batch processing in Infinitic is ideal for scenarios such as:

  • Bulk Database Operations: Minimize connection overhead with grouped inserts or updates. Instead of executing 1,000 individual inserts, perform them in batches of 100, reducing connection overhead by 99%.
  • API costs: Efficiently manage external service requests within usage constraints. If you pay an API per request, Infinitic optimizes request batching to reduce costs while processing as many tasks as possible.
  • Resource-Intensive Computations: Optimize CPU and memory use by consolidating tasks. Process multiple computations together, reducing system overhead and improving overall throughput.
  • Subscribe for free to receive new posts and support our work.
  • Subscribe

The Challenge

When we set out to build batch processing, we encountered some engineering challenges. The requirements seemed straightforward: Combine messages into batches based on size and time limits.

But the devil was in the details! 🤔 The real challenge emerged when designing a system that could:

  • Process both individual and batched tasks simultaneously
  • Dynamically build unlimited batches while controlling their execution rate to apply back-pressure on message consumption
  • Handle timing constraints without blocking threads
  • Maintain individual task context within batches for reliable acknowledgment
  • Gracefully process all ongoing messages during shutdown

So, After doing a deep dive into Kotlin coroutines, the power of channels and structured concurrency became evident as we were able to accomplish these goals elegantly.

How It Works

The new batch execution model lets developers define batch parameters using annotations like @Batch(maxMessages, maxSeconds), configuring tasks based on size and timing.

  • maxMessages: Defines the maximum number of tasks that can be grouped into a single batch. Once this threshold is reached, the batch is immediately processed. Example: With maxMessages = 50, if your system receives 175 tasks:
    • First 50 tasks → Batch 1 (processed immediately)
    • Next 50 tasks → Batch 2 (processed immediately)
    • Next 50 tasks → Batch 3 (processed immediately)
    • Remaining 25 tasks → Wait for more tasks or maxSeconds timeout
  • maxSeconds: Sets a maximum waiting period after receiving the first task in a batch. Ensures tasks don't wait indefinitely if the maxMessages threshold isn't met. Example: With maxSeconds = 1.0:
    • First task arrives at T+0
    • Only 24 more tasks arrive within 1 second
    • At T+1 second, all 25 tasks are processed together, even though maxMessages wasn't reached

Additionally, an optional batch key can be added to the task metadata. When present, the task will be added in a batch with other tasks having the same batch key.

Note: batches are counted as a single execution for the concurrency parameter, meaning up to concurrency batches may be processed in parallel.

Use Case Highlight

Consider a high-traffic service needing to process bulk email requests. With batch processing, you can set maxMessages = 50 and maxSeconds = 1.0 to efficiently send emails while adhering to API rate limits. Yet each email’s response remains distinct, allowing fine control over individual results in the workflow.

Let's examine a practical implementation:

@Batch(maxMessages = 50, maxSeconds = 1.0)
Map<String, EmailResult> sendEmail(Map<String, EmailRequest> requests) {
 // Batch process up to 50 emails or after 1 second
return emailService.sendBatch(requests);
}

This simple configuration delivers impressive benefits:

  • Reduces API calls by up to 98%
  • Preserves workflow flexibility

Getting Started

To dive deeper into implementing batched tasks, explore our documentation. Try out this feature and share your experience with the community!