Create 10k+ agents in clojure

697 views Asked by At

As I tested, a separate thread is used for each new agent, when I create them. Could several agents be run in one thread?

My idea is to create 10K+ light-weight agents (like actors in erlang), so is it a challenge for Clojure?

Thanks

3

There are 3 answers

0
Arthur Ulfeldt On

the at function of the at-at library that was developed to support the (in my opinion fantastic) Overtone music synthesizer provides a nice clean interfase for running functions at a specific point in time.

(use 'overtone.at-at)
(def my-pool (mk-pool))
(after 1000 #(println "hello from the past!") my-pool)
1
kotarak On

This is incorrect. Agents use a thread pool which is the number of core + 2 in size. So on a quad core machine even 10k+ agents will only use 6 worker threads.

With send, that is. With send-off new threads will be started.

1
sw1nn On

Consider using a j.u.c.DelayQueue

Here's a sketch of how it would work,

the (delayed-function is a bit cumbersome here, but it basically constructs an instance of j.u.c.Delayed for submission to the queue.)

(import [java.util.concurrent Delayed DelayQueue TimeUnit])

(defn delayed-function [f]
  (let [execute-time    (+ 5000 (System/currentTimeMillis))
        remaining-delay (fn [t] (.convert t 
                                          (- execute-time 
                                              (System/currentTimeMillis))
                                          TimeUnit/MILLISECONDS))]
  (reify
      Delayed    (getDelay [_ t] (remaining-delay t))
      Runnable   (run [_] (f))
      Comparable (compareTo [_ _] 0))))

;;; Use java's DelayQueue from clojure.
;;; See http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/DelayQueue.html

(def q (DelayQueue.))

(defn delayed 
  "put the function f on the queue, it will only execute after the delay 
   expires"
  [f]
  (.offer q (delayed-function f)))

(defn start-processing 
  "starts a thread that endlessly reads from the delay queue and 
   executes the function found in the queue"
  []
  (.start
   (Thread.
    #(while true
        (.run (.take q))))))

user> (start-processing)
user> (delayed #(println "Hello"))

   ; 5 seconds passes

Hello