Poor man's job runner with Clojure Agents
On (mis)using Clojure's concurrency features to make an in-memory job runner, because I needed an excuse to use more than atoms for once. Definitely not Rich Hickey's "Ants" demo.

Contents

Author's note: To run real background jobs like a proper Clojure professional, maybe use one of the proper professional libraries out there, like goose.

Backstory

Opportunity to use not just atoms made its way to me a decade after I first watched Rich Hickey's "Ants" demo 1. The demo shows off a Java Swing GUI world swarming with ants, driven using all three of Clojure's built-in concurrency features.

Clojure wants us to be explicit about type of concurrency 2, depending on how state is to be shared between threads. We must choose a feature accordingly.

↓ State Sharing / Threads → Independent Coordinated
Synchronous atom ref
Asynchronous agent not in our universe

However, like maybe 99% of all Clojure programmers, I've only ever used atoms in practice 3. Like maybe some of them, I've always wanted to use the others some day. Because that demo is rad!

While working through the (very nice!) hypermedia.systems book, I reached the section where we must code up a Dynamic Archive UI, complete with a live progress indicator.

The scenario requires an Archive job that runs in the background, with basic job control stuff (run, pause, cancel, reset, status, progress, etc.).

Obviously we must kick jobs off of the main execution thread, to avoid blocking it. Further, if we have many jobs and/or many batches per job, we need to queue them all up for asynchronous evaluation. And job state must be observable at all times without blocking job execution.

Of course, we are here because I didn't want to just get on with life by using a Thread/sleep or some trivial mock; definitely not a library. Because Clojure has features that can help us.

Upon squinting a little, the, ah, job description looked an awful lot like the place in the table up above where Asynchronous state update meets Independent threads.

Aha! Send in our agent!

The hack

(agent {:status (atom :waiting) ; the one weird trick
        :total-batches 0
        :progress 0
        :job-file "resources/job-log.json"}
       ;; the one weird trick
       :validator (fn [self] 
                    (not= :paused @(:status self))))

I modeled a single job runner as a Clojure agent:

  • meant to manage one job at a time,
  • where a job has one or more long-running batches,
  • where the runner's current status may be one of :waiting, :running, or :paused,
  • and it is always initialised in the :waiting state.

Additionally, the job state reflects current progress and points to a file that accumulates the work of each batch.

The one weird trick is how to implement out-of-band job control, if we use an agent this way?

Because…

  • Our job batches must be executed in order.
  • Actions dispatched to an agent occur in the order they were sent.
  • Only one action is executed at a time for an agent.
  • So I must queue all batches up-front (using send-off).

However

  • The trouble is the agent system is designed to be reactive and has no built-in pause/resume facility.
  • To ensure the agent system's sequential execution guarantee, the queue of actions cannot be modified post-hoc.

So!

The problem in pictures, with reference to code in the next section.

Let's say the following actions are already queued using the agent system's send and send-off functions.

|---|send increment3 -----------\
|--|send-off batch3 ------------\
|----------|send increment1 ----\
|---------|send-off batch1 -----\
                                 #[job-runner agent]
|-send increment4 --------------/
|send-off batch4 ---------------/
|-------|send increment2 -------/
|------|send-off batch2 --------/

Note: I have visually placed actions on separate thread-like tracks to emphasise that they are independent actions that will be performed asynchronously on a fixed-size thread pool allocated by the agent system. However each action is sequentially ordered, and will be executed in the order it was sent to the agent.

Now, if I send a :pause action to update the :status of the agent, the agent system will queue this latest action behind all the earlier actions. This means if batch1 is in progress and I want to pause the job run, I cannot do it with a send.

             |---|send increment3 -----------\
             |--|send-off batch3 ------------\
             |----------|send increment1 ----\
             |---------|send-off batch1 -----\
                                              #[job-runner agent]
             |-send increment4 --------------/
             |send-off batch4 ---------------/
             |-------|send increment2 -------/
             |------|send-off batch2 --------/
|send :pause --------------------------------/

What if I put job :status in an atom, and observe the atom via a validator?

  • Any validator attached to an agent is evaluated for every action sent to the agent. The agent halts if the validator returns false. 4
  • The agent's state is always observable out-of-band.
    • Which means I can see the :state atom from the outside.
    • Which further means I can update it from the outside too.
  • Putting the two together…
    • If a batch is in progress,
    • and I set the :state atom to :paused,
    • then the next batch won't execute,
    • because the validator will fail the agent before it runs the next action,
    • thus effectively pausing the queue. Phew!

So I gave the agent a validator function that returns false if the :state atom is set to :paused, thus halting the agent.

Upon halting, agent-error allows us to see the reason for job interruption (an Exception). And restart-agent lets us resume the job after suitably dealing with the interruption.

The state of atoms and agents is always readable without blocking writers, so one can get away with pretty straightforward lock-free code.

Code sketch / Sketchy code (same thing)

Here's the basic idea. It works on my computer. I also am pleased to report that I finished the book example and have moved on in life.

(ns study-htmx.pmjc
  "Poor Man's Job Control")

(defn make-initial-job-state
  []
  (agent {:status (atom :waiting)
          :total-batches 0
          :progress 0
          :job-file "resources/job-log.json"}
         :validator (fn [job-runner]
                      (not= :paused @(:status job-runner)))))

(defonce job-runner
  (make-initial-job-state))

(defn create-job!
  "Queue all the batches for the given job and
  keep the job progress current."
  [job-runner batches batch-executor]
  ;; Start the job when it is parked in the initial
  ;; :waiting state. Also rotate the job file.
  (when (= @(:status @job-runner) :waiting)
    (swap! (:status @job-runner)
           (constantly :running))
    (send job-runner assoc
          :total-batches (count batches))
    (spit (:job-file @job-runner)
          ""))

  ;; As soon as a job is set to run, queue all batches
  ;; and progress updates
  (when (= @(:status @job-runner) :running)
    (doseq [batch batches]
      (send job-runner update :progress inc)
      (send-off job-runner batch-executor batch)))

  ;; Queue a final action to mark the job as :done
  (send-off job-runner
            (fn [runner]
              (swap! (:status runner) (constantly :done))
              runner)))

(defn pause-job!
  "Out-of-band job control by reaching into the :status atom."
  [job-runner]
  (swap! (:status @job-runner)
         (constantly :paused))
  job-runner)

(defn resume-job!
  "Out-of-band job control by reaching into the :status atom."
  [job-runner]
  (when (= :paused @(:status @job-runner))
    (swap! (:status @job-runner) (constantly :running))
    (restart-agent job-runner @job-runner))
  job-runner)

(defn reset-job!
  "Cheaping out by resetting the var because we mean to be
  destructive and consign the agent to garbage collector.
  Wrapping the agent in an atom would be better."
  [job-runner-var]
  (alter-var-root job-runner-var
                  (constantly (make-initial-job-state))))

(defn cancel-job!
  [job-runner job-runner-var]
  (pause-job! job-runner)
  (reset-job! job-runner-var))

(defn do-batch!
  "Presumably a long-running batch. We must always accept
  and return the job runner as this is an action sent off
  to the job runner agent."
  [job-runner batch]
  (Thread/sleep 5000) ; the batch is running
  (spit (:job-file job-runner)
        (format "Completed batch %s\n" batch)
        :append true)
  job-runner)

(comment
  (create-job! job-runner
               ["ONE" "TWO" "THREE" "FOUR" "FIVE"]
               do-batch!)

  (pause-job! job-runner)
  (resume-job! job-runner)
  (reset-job! (var job-runner))
  (cancel-job! job-runner (var job-runner))

  (agent-error job-runner))

Pros / Cons

This is proooobably gross misuse of the agent system. But if it is not, I would like to know!

Pros:

  • No need for an external library.
  • Straightforward lock-free code.
  • Built-in thread safety of Clojure's concurrency system.
  • Built-in error recovery.
  • Built-in observability of state and errors.
  • Extensible, if you have the iron constitution to live with the consequences.

Cons:

  • Obviously, in-memory job control is bound to a single process. If it dies, we lose our jobs.
    • Mitigation: We can attach a watcher to the agent and write to a log file to track progress and recover from a process restart.
    • Alternative: Use SQLite to manage job state. When in WAL mode, SQLite is a lot like an agent, allowing for mutually non blocking sequential writes and concurrent reads.
  • Easy to write subtle bugs, especially timing and order problems like incrementing progress counter in the wrong order, leading to off-by-one errors if we pause / resume the job.
    • Mitigation: Write side-effecting functions with care. Design for idempotence. Test thoroughly. Be well aware of each feature's concurrency model and the intended behaviour of operations supported by the the feature.
  • Abusing your programming language's standard library.
    • Mitigation: ¯\(ツ)_/¯

Obligatory HTMX meme

I feel compelled to contribute back to the HTMX community seeing as we began our side quest because of the hypermedia.systems book.

This HTMX is Boring Technology fact is brought to you using Imgflip.

Footnotes


  1. Clojure: A Dynamic Programming Language for the JVM. Concurrency Support. talk video, code archive, transcript and slides.↩︎

  2. Cf. Concurrent Programming reference page at Clojure.org↩︎

  3. The opportunity tends not to arise in real life because we end up using futures, or core.async for our asynchronous and/or independent operational needs, and RDBMSes for our coordinated and/or synchronous transactional needs. Plus, present-day Java has a whole host of options for concurrent programming.↩︎

  4. Cf. Agents and Asynchronous Actions↩︎