(require '[clojure.core.async :as async]) (defn async-reduce [input-to-accumulator accum-to-accumulator from-accumulator init input output] (async/go-loop [] (if-some [in (async/! input-to-accumulator in) (recur)) (async/close! input-to-accumulator))) (async/go-loop [prev init] (if (async/>! accum-to-accumulator prev) (if-some [in (async/! output prev)))) (let [a (async/chan) b (async/chan) c (async/chan) x (async/chan) y (async/chan)] (async/go-loop [] (when-some [accum (async/! c (+ accum in)) (recur)) (do (async/>! c accum) (async/close! b))))) (async-reduce a b c 0 x y) (async/onto-chan!! x (range 10)) (async/