Páginas

sábado, 27 de novembro de 2010

Paralelismo em Clojure: Threadpool produtor - consumidor

Ontem fiz uma pequena implementação rascunho de uma fila consumida por threads de processamento em Clojure.
Eu havia feito algo similar em Python para encontrar nomes de usuário disponíveis no Twitter.
Tive dificuldade em encontrar uma forma canônica de fazer isso, sem querer me meter muito no mundo Java.

Por fim deixo o código aqui como lembrança do meu feito.

(def *mul* 1)
    
(defn slow-task [v] (Thread/sleep (* @#'*mul* v)) v)

(def history (ref []))

(defn init-items [stream]
    (let [items-stream (ref stream)]
        (fn get-item []
            (dosync
                (let [f (first @items-stream)]
                        (alter history conj [(count @items-stream) f])
                        (alter items-stream rest)
                        f)))))

(defn worker []
    (doall
        (for [x (repeatedly get-item) :while x]
            (try
                (slow-task x)
                (catch Exception _ :put-item-back-to-queue)))))
    
(defn process-all [n-workers]
    (doall
        (for [x (range n-workers)]
            (future (worker)))))
    
(defn main [n-tasks n-workers time-mult]
    ; set multiplier for sleep
    (def *mul* time-mult)
  
    ; clear history
    (dosync (alter history empty))
  
    ; set number of tasks to process
    (def get-item (init-items (take n-tasks (cycle [1 7 2 3]))))
    (sort-by (fn [[k v]] k) (doall (into [] (map (fn [T] [(reduce + T) T]) (time (doall (map deref (process-all n-workers)))))))))

(main 100 20 100)


;------ alternativa ainda não explorada --------
(def producer (seque 1 (cycle (range 10))))

(doseq [o (take 10 producer)]
  (println "consumer:" o)
  (Thread/sleep 1000))

O código acima foi fruto de uma experimentação no REPL. Acabei usando a ideia depois num de nossos projetos na Intelie.
Agradeço ao pessoal que postou nesta thread do grupo de usuários de Clojure e este post no guj.

Nenhum comentário: