(require '[clojure.core.async :as async])

(defn async-reduce [input-to-accumulator
                    accum-to-accumulator
                    from-accumulator
                    init
                    input
                    output]
  (async/go-loop []
    (if-some [in (async/<! input)]
      (do
        (async/>! input-to-accumulator in)
        (recur))
      (async/close! input-to-accumulator)))
  (async/go-loop [prev init]
    (if (async/>! accum-to-accumulator prev)
      (if-some [in (async/<! from-accumulator)]
        (recur in)
        ;; weird
        (recur prev))
      (async/>! output prev))))

(let [a (async/chan)
      b (async/chan)
      c (async/chan)
      x (async/chan)
      y (async/chan)]
  (async/go-loop []
    (when-some [accum (async/<! b)]
      (if-some [in (async/<! a)]
        (do
          (async/>! c (+ accum in))
          (recur))
        (do
          (async/>! c accum)
          (async/close! b)))))
  (async-reduce a b c 0 x y)
  (async/onto-chan!! x (range 10))
  (async/<!! y))

Generated At 2023-09-12T14:10:35-0700 original