This page contains the information about the concurrent
library that ships with the alpha version of OCaml-Java 2.0.
Warning! by default, OCaml-Java favors compatibility with the original OCaml implementation, meaning that it is based on a global runtime lock. In order to leverage the power of the concurrent
library, it is necessary to disable the runtime lock by linking the program with the -runtime-parameter runtime-lock=off command-line option.
The concurrent
library is a pack of several modules into the Concurrent
module. These modules fall into nine categories of raising abstraction levels:
Thread
, ThreadGroup
, and ThreadLocal
modules);Lock
, ReadWriteLock
, and Condition
modules);Semaphore
, CountDownLatch
, CyclicBarrier
, Exchanger
, and Phaser
modules);Future
, ScheduledFuture
, ThreadPoolExecutor
, ScheduledThreadPoolExecutor
, and ExecutorCompletionService
modules);ForkJoinPool
, and ForkJoin
modules);MapReduce
module);ParallelArray
module);STM
module).Besides these modules, TimeUnit
defines the various time units and a conversion function.
The doc
directory of the binary distribution contains the ocamldoc-generated documentation for all modules. Most modules from the first five categories above are the counterparts of Java classes with the same name in the package java.util.concurrent
(and its sub-packages). It is thus possible to get additional information from the JDK documentation.
Threads created through the Thread
module from the concurrent
library are akin to those from the Thread
module from the original OCaml distribution (in either systhread
, or thread
library). The three main differences are:
Thread-local storage is available through the ThreadLocal
module, and locks/conditions in the POSIX tradition are available through the Lock
/ReadWriteLock
/Condition
modules. Advanced synchronization is provided through the following modules:
Semaphore
;Exchanger
for simple rendez-vous allowing to swap data between two threads;CountDownLatch
for one-use barriers;CyclicBarrier
for reusable barriers;Phaser
for customizable barriers.Here is an example of threads with a locally-stored accumulator variable:
let acc = ThreadLocal.make 0 (* initial value *)
let compute x =
...
let old = ThreadLocal.get acc in (* read access *)
...
ThreadLocal.set acc (f old) (* write access *)
...
let () =
let threads =
List.map
(fun s -> Thread.make compute s)
[ 1; 2; 3; 5; 7; 11; 13 ] in
List.iter Thread.start threads;
List.iter Thread.join threads
The atomic containers have module names that slightly differ from the equivalent Java class names, the complete mapping being given by the following table. All the atomic containers provide compare-and-set operations, thus allowing to write lock-free algorithms.
OCaml module name | Java class name | |
---|---|---|
AtomicBool |
AtomicBoolean |
|
AtomicInt32 |
AtomicInteger |
|
AtomicInt32Array |
AtomicIntegerArray |
|
AtomicInt64 |
AtomicLong |
|
AtomicInt64Array |
AtomicLongArray |
|
AtomicMarkableReference |
AtomicMarkableReference |
(1) |
AtomicReference |
AtomicReference |
(1) |
AtomicReferenceArray |
AtomicReferenceArray |
(1) |
AtomicStampedReference |
AtomicStampedReference |
(1) |
(1): physical comparison is used by the container. As OCaml-Java uses boxed values for OCaml int
values, the container should not be used to store int
values. Any other type can be safely stored (caching of int
values ensure that sum types are correctly handled).
Here is an example of atomic use, comparing the use of a bare Pervasives.ref
value with an atomic one:
let () = Random.self_init ()
let a = AtomicInt64.make 0L (* accesses to the value are atomic *)
let b = ref 0L (* accesses to the value are not atomic *)
let print s n =
let l = ref 0L in
for _i = 1 to n do
let t = Int64.of_int (Random.int 250) in
Printf.printf "%s (waiting %Ld)\n%!" s t;
b := Int64.add !b t;
l := Int64.add !l t;
ignore (AtomicInt64.add_and_get a t);
Thread.sleep t
done;
Printf.printf "l = %Ld\n" !l
let () =
let n = 10 in
let threads =
List.map
(fun s -> Thread.make (print s) n)
[ "hello"; "salut" ] in
List.iter Thread.start threads;
List.iter Thread.join threads;
Printf.printf "a = %Ld, b = %Ld\n" (AtomicInt64.get a) !b
Futures are similar to lazy values in the sense that, once created, it is possible to wait for their evaluation (though Future.get
). However, they differ from lazy values as their evaluation is done in the background by another thread. Basic future manipulation is done through the Future
module, but they are created by submitting a computation to a thread pool (by submit
, or some invoke
variant), as provided by the ThreadPoolExecutor
module. Such pools need to be shutdown at program termination: as they contain threads, they prevent the JVM from ending the program (unless exit
is explicitly called). Here is a basic example of future use:
let pool =
ThreadPoolExecutor.make
~core_pool_size:4l
~max_pool_size:4l
1L TimeUnit.Seconds (* keep-alive time for threads outside core *)
RejectedExecutionHandler.Discard_policy (* how to handle rejected execution *)
let compute x =
...
let () =
let f = ThreadPoolExecutor.submit pool compute 137 in
...
Printf.printf "result = %d\n" (Future.get f);
ThreadPoolExecutor.shutdown pool
It is also possible to launch several computations in parallel and wait for the first one to return:
let () =
let l = [ 1; 2; 3; 5; 7; 11; 13 ] in
let res = ThreadPoolExecutor.invoke_any pool (List.map compute l) in
let others = ThreadPoolExecutor.shutdown_now pool in (* get all futures still running *)
List.iter (fun f -> ignore (Future.cancel f true)) others;
Printf.printf "result = %d\n" res
However, for Future.cancel
to succeed, it is necessary for compute
to cooperate: the compute
function should periodically test whether is has been interrupted. A simple way to perform that test is to check the value returned by Thread.interrupted
before each computation step:
let compute x =
...
while (not !done) && (not (Thread.interrupted ())) do
...
perform computation step
...
done;
...
Besides simple futures, it is also possible to use scheduled futures (through the ScheduledFuture
, and ScheduledThreadPoolExecutor
modules). It is possible to schedule a one-shot future through schedule
by just specifying a delay before future evaluation. It is also possible to schedule a future that will be repeatedly evaluated according to a given periodicity:
schedule_at_fixed_rate
allows to specify the period between two evaluation starts;schedule_with_fixed_delay
allows to specify the period between one evaluation end and the next evaluation start.The following code will print "hello!" once per second:
let pool =
ScheduledThreadPoolExecutor.make
~core_pool_size:4l
RejectedExecutionHandler.Discard_policy
let () =
let f =
ScheduledThreadPoolExecutor.schedule_at_fixed_rate
pool
print_endline "hello!"
1L 1L TimeUnit.Seconds (* one second before first call, one second between two calls *) in
ScheduledFuture.get f;
ScheduledThreadPoolExecutor.shutdown pool
Finally, the ExecutorCompletionService
module allows to either poll or wait until any of the submitted futures has completed evaluation.
Fork/join computations as supported by the concurrent
library are quite different from their Java counterpart in order to provide a less general but safer abstraction. Basically, one can turn a sequential function into a parallel one by applying a very simple divide-and-conquer strategy that is defined by:
As an exemple, a (very inefficient) way of computing the fibonacci function is:
let rec fib n =
if n <= 1 then
1
else
(fib (n - 2)) + (fib (n - 1))
that can be turned into a parallel version through:
let fork n =
if n < threshold
then None (* below a given value, we do not fork *)
else Some (n - 1, n - 2) (* otherwise, we fork and create two subproblems *)
let join x y = x + y
let parallel_fib pool = Concurrent.ForkJoin.split pool fork join fib
where pool
is a value with type ForkJoinPool.t
.
The behaviour of parallel_fib
when passed a value x
is the following:
fork x
;fork x
matches None
, then the result for parallel_fib x
is fib x
;fork x
matches Some (x1, x2)
, then the result for parallel_fib x
is join y1 y2
where y
i is the result of parallel_fib x
i.Besides the ForkJoin.split
function that is based on an option
type, the ForkJoin
module provides similar functions based on list
, and array
types. They allow to divide a problem into more than two subproblems at once, possibly saving recursion steps when possible.
Map/reduce computations are a way to express computations through a bunch of functions:
map : input -> (key * value) list
combine : key -> value -> value -> value
reduce : key -> value -> output -> output
The computation is started by providing an input Stream.t
that is used to launch map computations for the stream values in different threads. Results computed by threads are then stored into a map from key
to value
, using combine
to merge the values for equivalent keys. Once all map computations have returned, reduce
acts as a bare fold over the aforementioned map, calculating the final result.
Here is how to define a map/reduce computation:
module C = struct
type input = int
type key = int
type value = int
type output = (int * int) list
let compare_keys = Pervasives.compare
let map x = [x, compute x]
let combine _ x y = x + y
let reduce _ v acc = v + acc
end
that is then passed to the MapReduce.Make
functor:
let pool =
ThreadPoolExecutor.make
~core_pool_size:4l
~max_pool_size:4l
1L TimeUnit.Days
RejectedExecutionHandler.Discard_policy
module MR = MapReduce.Make (C)
let () =
let s = Stream.of_list [ 1; 2; 3; 5; 7; 9; 11; 13; ... ] in
let res = MR.compute pool s 0 in
Printf.printf "result = %d\n" res;
ThreadPoolExecutor.shutdown pool
The ParallelArray
module provides the same functions as the Array
module from the standard library, except that the following ones take advantage of the multiple cores to execute operations in parallel:
init
;iter
and iteri
;fold_left
and fold_right
;sort
, stable_sort
, and fast_sort
.Of course, when using one of these functions, there is no guarantee on the order in which operations will be executed.
The ParallelArray
module also provides some functions with no counterpart in the Array
module:
mem
and memq
;exists
and for_all
;find
, find_index
, and fast_find_all
.All functions accept two optional parameters:
?pool:ThreadPoolExecutor
that allows to specify which thread pool to use;?chunk_size:int
that allows to specify the size of the data chunks passed to the various threads.In order to get rid of these optional parameters, it is possible to use the ParallelArray.Make
functor to set the parameters once for all function calls.
Warning! most functions have the same signature as in the Array
module, but it is not possible for the fold functions. Indeed, as folds are executed on parts of the array, an additional function is needed in order to combine the results from those partial folds. This leads to the following type for the fold_left
function:
?pool:ThreadPoolExecutor.t ->
?chunk_size:int ->
('a -> 'b -> 'a) -> (* original fold function *)
('a -> 'a -> 'a) -> (* function used to combine results from partial folds *)
'a ->
'b array ->
'a
Of course, the parallel folds will yield the same results as their classical counterparts iff passed functions are associative and commutative.
Here is an example comparing the use of Array
and ParallelArray
:
let size = 10000
let a = Array.make size init_func
let sequential () =
let b = Array.map compute_func a in
let res = Array.fold_left (fun acc elem -> acc + (aux_func elem)) 0 b in
Printf.printf "result = %d\n" res
let parallel () =
let b = ParallelArray.map compute_func a in
let res = ParallelArray.fold_left (fun acc elem -> acc + (aux_func elem)) (+) 0 b in
Printf.printf "result = %d\n" res;
ParallelArray.shutdown_now () (* stop threads from the default ParallelArray pool *)
Warning! the current implementation has only been lightly tested.
The STM
module provides support for a partial software transactional memory. This means that the whole memory is not protected by transactions: only values of type STM.ref
are protected. Such values are akin to Pervasives.ref
values, and are created through the STM.ref
function. However, those values can only be accessed from within a transaction.
Two functions allow to process transactions:
STM.run
that executes any transaction;STM.run_read_only
that executes a transaction that cannot modify values.A transaction function is passed to one of these functions in order to specify the transaction behavior. The transaction function can access STM.ref
values through accessor functions. As an example, the canonical banking account example can be written:
type account = {
name : string; (* bare value *)
balance : int STM.ref; (* value protected by transactions *)
}
let make_account n b =
{ name = n; balance = STM.ref b }
let print_account acc =
STM.run_read_only (fun get ->
Printf.printf "%s = %dn" acc.name (get acc.balance))
let transfer x y a =
STM.run (fun get set ->
let xv, yv = get x.balance, get y.balance in
set x.balance (xv - a);
set y.balance (yv + a));