- A Runnable describes a task that can be executed asynchronously but does
not return a result.
public interface Runnable {
void run();
}
run method is executed in a thread. If you want to execute a Runnable in a separate thread, you could spawn a
thread. in practice, it doesn't usually make sense. In the Java concurrency library, an executor service schedules and executes
tasks, choosing the threads on which to run them.
- An ExecutorService schedules tasks instances for execution.
Runnable task = () -> { ... };
ExecutorService executor = ...;
executor.execute(task);
has factory methods for executor services with different
scheduling policies.
exec = Executors.newCachedThreadPool();
optimized for programs with many tasks that are short
lived or spend most of their time waiting. Each task is executed on an idle thread
if possible, but a new thread is allocated if all threads are busy.
exec = Executors.newFixedThreadPool(nthreads);
yield a pool with a fixed number of threads. When you submit a task, it is
queued up until a thread becomes available.
Hello Example
- A Callable describes a task that can be executed asynchronously and
yields a result.
public interface Callable<V> {
V call() throws Exception;
}
ExecutorService executor = Executors.newFixedThreadPool();
Callable<V> task = ...;
Future<V> result = executor.submit(task);
a future - an object that represents a
computation whose result will be available at some future time. The Future
interface has the following methods:
V get() throws InterruptedException, ExecutionException
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
boolean cancel(boolean mayInterruptIfRunning) // attempt to cancel. Cancel if not started or Exception
boolean isCancelled()
boolean isDone()
- You can submit one or more Callable instances to an
ExecutorService and combine the results when they are available.
Instead of submitting
each subtask separately, you can use the invokeAll method, passing a
Collection of Callable instances.
String word = ...;
Set<Path> paths = ...;
List<Callable<Long>> tasks = new ArrayList<>();
for (Path p : paths) tasks.add(
() -> { return number of occurrences of word in p });
List<Future<Long>> results = executor.invokeAll(tasks);
// This call blocks until all tasks have completed
long total = 0;
for (Future<Long> result : results) total += result.get();
There is also a variant of invokeAll with a timeout, which cancels all tasks
that have not completed when the timeout is reached.
If it bothers you that the calling task blocks until all subtasks are done,
you can use an ExecutorCompletionService. It returns the
futures in the order of completion.
ExecutorCompletionService service
= new ExecutorCompletionService(executor);
for (Callable<T> task : tasks) service.submit(task);
for (int i = 0; i < tasks.size(); i++) {
Process service.take().get()
Do something else
}
The invokeAny method is like invokeAll, but it returns as soon as any one
of the submitted tasks has completed normally, without throwing an exception. It
then returns the value of its Future. The other tasks are cancelled.
String word = ...;
Set<Path> files = ...;
List<Callable<Path>> tasks = new ArrayList<>();
for (Path p : files) tasks.add(
() -> { if (word occurs in p) return p; else throw ... });
Path found = executor.invokeAny(tasks);
- Asynchronous Computations - when you do not want to wait for result
The CompletableFuture class
implements the Future interface, You register a callback that will be invoked (in some thread)
with the result once it is available.
To run a task asynchronously and obtain a CompletableFuture, you call the static method
CompletableFuture.supplyAsync:
CompletableFuture<String> f = CompletableFuture.supplyAsync(
() -> { String result; Compute the result; return result; },
executor);
If you omit the executor, the task is run on a default executor (namely the
executor returned by ForkJoinPool.commonPool()).
A CompletableFuture can complete in two ways: either with a result, or
with an uncaught exception. In order to handle both cases, use the
whenComplete method.
f.whenComplete((s, t) -> {
if (t == null) { Process the result s; }
else { Process the Throwable t; }
});
two tasks can work simultaneously on computing an answer:
CompletableFuture<Integer> f = new CompletableFuture<>();
executor.execute(() -> {
int n = workHard(arg);
f.complete(n);
});
executor.execute(() -> {
int n = workSmart(arg);
f.complete(n);
});
// To instead complete a future with an exception, call
Throwable t = ...;
f.completeExceptionally(t);
The isDone method tells you whether a Future object has been completed
(normally or with an exception). In the preceding example, the workHard and
workSmart methods can use that information to stop working when the result
has been determined by the other method.
- The CompletableFuture providing a
mechanism for composing asynchronous tasks into a processing pipeline.
public void CompletableFuture<String> readPage(URI url);
public static List<URI> getLinks(String page);
CompletableFuture<String> contents = readPage(url);
CompletableFuture<List<URI>> links =
contents.thenApply(Parser::getLinks);
// also possible
CompletableFuture<U> future.thenApplyAsync(f);
The
CompletionStage interface describes how to compose
asynchronous computations, whereas the Future interface focuses on
the result of a computation. A CompletableFuture is both a
CompletionStage and a Future.
- Thread Safety When multiple threads operate on shared data without synchronization, the
result is unpredictable.
By Default variables could be not visible to other tasks.
- The value of a final variable is visible after initialization.
- The initial value of a static variable is visible after static initialization.
- Changes to a volatile variable are visible.
- Changes that happen before releasing a lock are visible to anyone acquiring
the same lock
confinement: Just say no when it comes to sharing
data among tasks. For example, when your tasks need to count something, give
each of them a private counter instead of updating a shared counter. When the
tasks are done, they can hand off their results to another task that combines
them.
immutability: It is safe to share immutable objects. For
example, a task can generate an
immutable collection of results. Another task combines the results into another
immutable data structure.
To create immutable objects: declare variables, methods or even classes final, do not reference mutable objects by this, don't store references - make copies.
locking: By granting only one task at a time access to a data
structure, one can keep it from being damaged.
Locking is error-prone (deadlock), and it can be expensive since it reduces opportunities for
concurrent execution.
- Prefer using ready to use parallel algorithms and threadsafe data structures over
programming with locks.
Parallel Streams - For example, if coll is a large collection of strings, and you want to find how
many of them start with the letter A, call
long result = coll.parallelStream().filter(s ->
s.startsWith("A")).count();
Parallel Array
// Fills values with 0 1 2 3 4 5 6 7 8 9 0 1 2 ...
Arrays.parallelSetAll(values, i -> i % 10);
// Comparing
Arrays.parallelSort(words, Comparator.comparing(String::length));
// Comparing with range - Sort the upper half
Arrays.parallelSort(values, values.length / 2, values.length);
parallelPrefix - also
For other parallel operations on arrays, turn the arrays into parallel streams.
// compute the sum of a long array of integers
long sum = IntStream.of(values).parallel().sum();
8. A ConcurrentHashMap is a threadsafe hash table that allows atomic update of entries.
The collections in the
java.util.concurrent package have been cleverly implemented so that
multiple threads can access them without blocking each other, provided they
access different parts. This package implement special iterator.
9. You can use AtomicLong for a lock-free shared counter, or use
LongAdder if contention is high.
10. A lock ensures that only one thread at a time executes a critical section.
11. An interruptible task should terminate when the interrupted flag is set or an
InterruptedException occurs.
12. A long-running task should not block the user-interface thread of a program,
but progress and final updates need to occur in the user-interface thread.
13. The Process class lets you execute a command in a separate process and
interact with the input, output, and error streams.