(require '[clojure.core.async :as async]) (defn async-reduce [input-to-accumulator accum-to-accumulator from-accumulator init input output] (async/go-loop [] (if-some [in (async/<! input)] (do (async/>! input-to-accumulator in) (recur)) (async/close! input-to-accumulator))) (async/go-loop [prev init] (if (async/>! accum-to-accumulator prev) (if-some [in (async/<! from-accumulator)] (recur in) ;; weird (recur prev)) (async/>! output prev)))) (let [a (async/chan) b (async/chan) c (async/chan) x (async/chan) y (async/chan)] (async/go-loop [] (when-some [accum (async/<! b)] (if-some [in (async/<! a)] (do (async/>! c (+ accum in)) (recur)) (do (async/>! c accum) (async/close! b))))) (async-reduce a b c 0 x y) (async/onto-chan!! x (range 10)) (async/<!! y))
Generated At 2023-09-12T14:10:35-0700 original