nomad

Replacing Queues with Nomad Dispatch

Nomad v0.5.3 introduced parameterized jobs which act as job scaffolds that can be dispatched on demand with configurable arguments. These jobs behave similar to functions, encapsulating the logic and allowing the caller to name the job and provide appropriate arguments. Nomad Dispatch treats those "function calls" like a future or promise, making it easy to build scalable processing pipelines for operations like video transcoding, image resizing, sending emails, or doing a billing rollup.

This blog post explores using the new Nomad Dispatch feature to build a video transcoding service using the popular open source tool ffmpeg. Traditional approaches to this problem often involve many different components for work queuing, scheduling, capacity planning, and failure handling. Due to its design, Nomad automatically handles these concerns, allowing for focus on a minimal job definition and the business logic of the video transcoding service.

»Background

Most applications have a synchronous request/response component, but many applications also have asynchronous background processing to perform. Some common examples include sending emails, billing aggregations, processing user content, and notifying mobile devices. The reason these processes are often off-loaded to an asynchronous system is because their delay or processing time exceeds that of a normal request. For example, you would not want to wait for an email to send before rendering an HTML page or burden web servers with a job that could be run on dedicated resources.

A very common pattern for handling async or background processing is a worker queue. In this model, producers submit to a centralized work queue. This queue serves as a buffer until consumers dequeue and perform the processing. The diagram below visualizes this common architecture.

Work Queue

This pattern allows for decoupling producers and consumers, enabling scaling consumers as the workload increases or decreases. Producers can be any application, but they are typically latency sensitive like web or API servers, where they cannot perform long running processing during the lifetime of a request. Instead, producers queue workloads to be processed in the background, out of band with the request.

The queue itself is often represented by a tool like RabbitMQ or Apache Kafka. The queue is responsible for buffering work until adequate resources are available for processing. It is important that the queue provide durability to survive restarts, failures, or partial outages, and has some guarantee of at-least-once delivery semantics. This means if a message is delivered to a consumer that fails during processing, the work will be given to another consumer until successfully completed.

The consumer is responsible for dequeuing work from the queue, performing the relevant processing, acknowledging completion, handling cancelations, and doing any cleanup or failure handling. The actual work performed by the consumer depends on the use case, but some common examples include sending email, doing a billing rollup, or encrypting data.

The producer-consumer model has been around for a long time, but there are a few challenges associated with this pattern. The biggest problem is the queue itself. In order to be useful, the work queue must be fault tolerant and highly available. Most queues are complex stateful services which makes them operationally complex. Secondly, consumers that process work are often required to include hundreds of lines of boilerplate to integrate with the queue system and handle things like concurrency, cancelation, cleanup, or failure handling. Lastly, humans have to make decisions about the number of consumers - if there are too few consumers to handle the work, the queues will be long, but if there are too many consumers to handle the work, there are wasted or underutilized resources in the cluster.

Nomad Dispatch and parameterized jobs greatly simplify the producer-consumer model. Nomad acts as a work queue, dynamically scheduling consumers to avoid wasting resources and abstracting away the boilerplate often required by consumers.

If we step back and examine the problem space, it becomes clear that a scheduler is the right solution. Instead of relying on a centralized queue, the scheduler can serve as the queue, and it has the ability to weigh even more complex strategies like prioritization and has built-in support for failure handing.

The new parameterized jobs and Nomad Dispatch functionality enables organizations to replace complex queueing systems with a highly-available, fault-tolerant, and battle-tested solution. Having demonstrated the ability to run a million concurrent containers, Nomad is a strong candidate for this transcoding service.

»Parameterized Jobs

A Nomad parameterized job looks very similar to a regular Nomad job, but includes an additional parameterized stanza. The inclusion of this stanza changes the job from an executable job to a job scaffold for dispatching.

job "transcode" {
  type        = "batch"
  datacenters = ["dc1"]

  meta {
    input   = ""
    profile = "small"
  }

  # The new "parameterized" stanza that marks a job as dispatchable. In this
  # example, there are two pieces of metadata (see above). The "input"
  # parameter is required, but the "profile" parameter is optional and defaults
  # to "small" if left unspecified when dispatching the job.
  parameterized {
    meta_required = ["input"]
    meta_optional = ["profile"]
  }

  task "tc" {
    driver = "exec"

    config {
      command = "transcode.sh"
      args    = ["${NOMAD_META_INPUT}", "${NOMAD_META_PROFILE}"]
    }

    env {
      "S3_BUCKET" = "BUCKET_NAME"
    }

    resources {
      cpu    = 1000
      memory = 256
    }

    template {
      destination = "local/s3cfg.ini"

      # This example uses hard-coded credentials, but a real production job
      # file should pull secrets using the Vault integration.
      data = <<EOH
[default]
access_key = "<access_key>"
secret_key = "<secret_key>"
EOH
    }
  }
}

This is the new parameterized stanza that marks a job as dispatchable. In this example, there are two pieces of metadata (see above). The "input" parameter is required, but the "profile" parameter is optional and defaults to "small" if left unspecified when dispatching the job.

Nomad allows for per-job metadata, which can be interpolated to modify various parts of the job such as the artifacts downloaded, application configuration, or CLI flags. In this case, the metadata flags are being used as arguments to a command via NOMAD_META_INPUT.

The task utilizes the "exec" driver to call a transcode.sh script. For the purposes of this post, this is a small shell script that implements the core logic of our transcoding service, but it could be any executable or application. The program takes two arguments - the input file to transcode and an optional profile. The input file is required, but the profile is optional. The profile must be either "small" (480p) or "large" (720p).

The transcode.sh script does the following:

  • Downloads the input file from the provided URL
  • Computes the MD5 of the input file
  • Invokes ffmpeg to convert the input with settings determined by the profile
  • Uploads the output file to an S3 bucket at s3://<BUCKET>/videos/out-<MD5>-<Profile>.mp4

The entire script is about 60 lines of code and implements the complete business logic needed for our video transcoding service. There is no need to integrate with a work queue to dequeue tasks, implement concurrency, cleanup, or handle retries and failure recovery because Nomad abstracts away those concerns, allowing the script to focus on only the core logic.

Below is a diagram for a sample infrastructure architecture setup on AWS:

Dispatch Architecture

Using a tool like Terraform, it is possible to configure the following:

  • AWS VPC with a public and private subnet
  • Nomad server running in the public subnet, allowing incoming connections
  • Nomad clients running in the private subnet, connected to the Nomad server
  • Datadog dashboard to monitor the Nomad queue depth

Jobs are dispatched with input to the Nomad server. The Nomad server schedules work on the scalable Nomad clients behind a private subnet. The job uploads the final artifact to an S3 bucket.

»Submitting Work

Before we can dispatch our job, we must submit the job file with the parameterized stanza to the Nomad server. Just like a normal job, a parameterized job is submitted using the nomad run command:

$ nomad run transcode.nomad
Job registration successful

Unlike a normal job dispatch, the output of nomad run does not indicate any allocations took place. That is because this is just the job scaffolding and does not represent any actual work (yet). To put it another way, the running services in the cluster remain unchanged after this submission.

To check the status of the parameterized job, use the familiar nomad status command:

$ nomad status transcode
ID            = transcode
Name          = transcode
Type          = batch
Priority      = 50
Datacenters   = dc1
Status        = running
Periodic      = false
Parameterized = true

Parameterized Job
Payload           = optional
Required Metadata = input
Optional Metadata = profile

Parameterized Job Summary
Pending  Running  Dead
0        0        0

No dispatched instances of parameterized job found

The output includes information about the parameterization including the required and optional metadata. It also displays any active instances of the job (currently there are none). Nomad also allows an opaque payload to be provided such as a JSON file or binary object. You can read more about this in the Nomad dispatch examples.

To launch an instance of the job, use the new nomad dispatch command:

$ export FILE="http://s3.amazonaws.com/akamai.netstorage/HD_downloads/Orion_SM.mp4"
$ nomad job dispatch -meta "input=$FILE" transcode
Dispatched Job ID = transcode/dispatch-1486175276-b3be0285
Evaluation ID     = 44011499

==> Monitoring evaluation "44011499"
    Evaluation triggered by job "transcode/dispatch-1486175276-b3be0285"
    Allocation "c42def90" created: node "cedb0204", group "tc"
    Evaluation status changed: "pending" -> "complete"
==> Evaluation "44011499" finished with status "complete"

This dispatches the job named "transcode" with a URL to a video of the simulated launch of the Orion rocket provided by NASA. Like any Nomad job, it is possible to monitor this job using nomad status followed by the job's ID, allowing dispatched jobs to be treated like a future or promise:

$ nomad status transcode/dispatch-1486175276-b3be0285
ID            = transcode/dispatch-1486175276-b3be0285
Name          = transcode/dispatch-1486175276-b3be0285
Type          = batch
Priority      = 50
Datacenters   = dc1
Status        = dead
Periodic      = false
Parameterized = false

Summary
Task Group  Queued  Starting  Running  Failed  Complete  Lost
tc          0       0         0        0       1         0

Allocations
ID        Eval ID   Node ID   Task Group  Desired  Status    Created At
c42def90  44011499  cedb0204  tc          run      complete  02/04/17 02:27:56 UTC

»Dispatching Videos

Using a compiled list of links to videos published by NASA, we can use a script that iterates over the list and dispatches each input in the file twice, once each for "small" and "large" profiles.

while IFS='' read -r line || [[ -n "$line" ]]; do
  echo "Input file: $line"
  nomad job dispatch -detach -meta "profile=small" -meta "input=$line" transcode
  nomad job dispatch -detach -meta "profile=large" -meta "input=$line" transcode
done < "$1"

This script is invoked with the path to the list of video files:

$ ./bin/dispatch.sh samples/many.txt
Input file: http://s3.amazonaws.com/akamai.netstorage/HD_downloads/Orion_SM.mp4
Dispatched Job ID = transcode/dispatch-1486005726-0d20aa76
Evaluation ID     = ac7a43be
Dispatched Job ID = transcode/dispatch-1486005726-6671c576
Evaluation ID     = ba6cfb02
# ...

Each dispatch generates a unique job ID which can be used to track the status of that instance. Integration with Datadog provides us a monitoring dashboard of the work queue. Telemetry about the depth of our queue can inform our decision to scale up or down the number of clients to provide appropriate throughput:

Queue depth

The graph displays two hills. The first was a small test run where the queue builds and decreases as the cluster finishes transcoding the videos. The second run was 25 input files with 50 output files for both profiles. There is a steep decline from 46 to 34 files when two additional Nomad clients were added to the cluster. When new clients join the cluster Nomad uses the new capacity by assigning queued jobs.

The entire process took about 30 minutes, and it is possible to verify the job completed using the nomad status command. Alternatively, we can inspect S3 to verify the output files are present.

S3 Files

»Caveats

In line with the Tao of HashiCorp, it is important to point out that Nomad Dispatch is not the perfect solution to every problem; there are caveats to consider when migrating from a traditional producer-consumer-queue model to Nomad Dispatch.

Nomad Dispatch is not as suitable for high volumes of requests which take very short time to process. Nomad can schedule thousands of tasks per second, but the overhead of scheduling is very high. For most long-running processes, this time is amortized over the life of the job, but for processes that complete very quickly, the scheduling component may increase total processing time, especially in large quantities.

»Scalable Processing with Nomad Dispatch

This blog post uses Nomad Dispatch to build a highly scalable video transcoding service in less than 100 total lines of code. The parameterized job encapsulates the requirements and allows consumers to provide configured arguments as metadata.

Nomad handles the boilerplate of queueing, scheduling, cancelation, failure handling, and cleanup, allowing for focus on the essential bits of the service. Datadog provides a convenient dashboard to monitor and visualize the cluster. When resources are exhausted, new Nomad clients can be added to the cluster, increasing capacity, letting Nomad handle the scaling challenges automatically.

Using parameterized jobs and dispatch simplifies application architectures by moving concerns out of applications and into Nomad.

Internally at HashiCorp, we have already begun migrating to this new pattern for our commercial SaaS product and have reduced complexity while improving our resource utilization. We will share more about that journey in a future post.

All the code for this blog post, including the sample Terraform configurations for setting up a Nomad cluster, is publicly available in the hashicorp/nomad-dispatch-ffmpeg repository on GitHub.


Sign up for the latest HashiCorp news

By submitting this form, you acknowledge and agree that HashiCorp will process your personal information in accordance with the Privacy Policy.