Java Concurrency

1. Describe the newly added features in Java 8?

Here are the newly added features of Java 8:

Feature Name Description
Runnable interface describes a task you want to run, perhaps concurrently with others.
  • If you want to execute a Runnable in a separate thread, you could spawn a thread just for this Runnable
  • But usually an ExecutorService is used to choose the thread and run the task in it.
		Runnable task = () -> { // some task to do };
		ExecutorService executor = Executors.newCachedThreadPool(); // optimized for short tasks
		// can use
		exec = Executors.newFixedThreadPool(nthreads);
		// also 
		int nthreads = Runtime.getRuntime().availableProcessors(); // number of processors
		
		executor.execute(task);
Callable<V> interface (call() method) used for tasks that return value.
  • When you submit the task, you get a Future—an object of interface Future
  • interface has the following methods
  • The get method blocks (the thread containing the call does not progress) until the result is available or until the timeout has been reached or throws an exception.
		V get() throws InterruptedException, ExecutionException
		V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
		boolean cancel(boolean mayInterruptIfRunning)
		boolean isCancelled()
		boolean isDone()
		
A task may need to wait for the result of multiple subtasks. Instead of submitting each subtask separately, you can use the invokeAll method, passing a Collection of Callable instances.
invokeAny - stops after first result is get from the task
- The cancel method attempts to cancel the task. If the task isn't already running, it won't be scheduled. Otherwise, if mayInterruptIfRunning is true, the thread running the task is interrupted.
- The CompletableFuture class implements the Future interface, and it provides a second mechanism for obtaining the result. You register a callback that will be invoked (in some thread) with the result once it is available.
CompletableFuture<String> f = CompletableFuture.supplyAsync(() 
	-> { String result; Compute the result; return result; },executor);
f.thenAccept((String s) -> Process the result s); //or
f.whenComplete((s, t) -> {if (t == null) { Process the result s; }
else { Process the Throwable t; }});
		if(f.exists() && f.isDirectory()) {
		    File[] allfiles = f.listFiles();		    
		    List<Callable<Long>> tasks = new ArrayList<>();
		    SearchFiles sf = new SearchFiles("Java");		    
		    for(File f1 : allfiles) {
		    	tasks.add(() -> {  return sf.searchFile(f1.getPath()); });
		    }		    		    
		    try {    List<Future<Long>> results = executor.invokeAll(tasks);				
				// This call blocks until all tasks have completed
				for (Future result : results) total += result.get();
				//total = executor.invokeAny(tasks);
			} catch (Exception e) {// Catch exception if any
				System.err.println("Error: " + e.getMessage());
			}		    
		    System.out.println("Total found: " + total);

Visibility

Updates on the variables may not be visible to other threads: processors reordering instructions, omitting things not involved not aware of other threads
  • The value of a final variable is visible after initialization.
  • The initial value of a static variable is visible after static initialization.
  • Changes to a volatile variable are visible.
  • volatile solves the visibility but not concurrent update
  • confinement - do not share data. Give personal counter - combine after finished
  • immutability - use immutable objects
  • locking - lock objects or methods
Immutable classes
  • final variables
  • Can declare class final (protected?) so extensions cannot be added
  • in constructor - take objects, make copy
  • in getter - make copy, give object
  • the same in setter, or no setters

Parallel Algorithms

Parallel Streams
The stream is broken up into segments. The filtering and counting is done on each segment, and the results are combined. You don't need to worry about the details.
	long result = coll.parallelStream().filter(s -> s.startsWith("A")).count();
	long sum = IntStream.of(values).parallel().sum();
Should be enough data, data should be in memory, operations should do a substantial amount of work
Parallel Array
Arrays.parallelSetAll(values, i -> i % 10);
Arrays.parallelSort(words, Comparator.comparing(String::length));
parallelPrefix ??

Threadsafe Data Structures

java.util.concurrent package Collections from package yelds Fail-safe iterators. They do not throw ConcurrentModificationException (as in java.util)
ConcurrentHashMap
  • Do not use map.put(key, value) operator - not safe
  • map.compute(word, (k, v) -> v == null ? 1 : v + 1); - Atomic
  • computeIfPresent / computeIfAbsent
  • map.putIfAbsent(word, 0L);
  • map.merge(word, 1L, (existingValue, newValue) -> existingValue + newValue) OR map.merge(word, 1L, Long::sum)
  • map.replace(word, oldValue, newValue)
LinkedBlockingQueue / ArrayBlockingQueue
ConcurrentSkipListMap / ConcurrentSkipListSet
CopyOnWriteArrayList / CopyOnWriteArraySet

Atomic Counters and Accumulators

java.util.concurrent.atomic package
AtomicLong, AtomicInteger, AtomicLongFieldUpdater, AtomicIntegerArray, AtomicReference, AtomicIntegerFieldUpdater, AtomicReferenceArray, AtomicLongArray, AtomicReferenceFieldUpdater, Boolean also
LongAdder and LongAccumulator
public static AtomicLong nextNumber = new AtomicLong();
long id = nextNumber.incrementAndGet();
public static AtomicLong largest = new AtomicLong();
// In some thread...
largest.set(Math.max(largest.get(), observed)); // Error—race condition!
largest.updateAndGet(x -> Math.max(x, observed)); // right
accumulateAndGet, getAndUpdate, getAndAccumulate

Locks

synchronized
  • Locks the object or method
  • If 2 methods synchronized using method locks Object! - both methods will be locked
  • Synchronized on static methods - lock the class just as the instance.
  • Can use conditions with wait() - notifyAll() (notify()) to wait for updates from other threads
Can use conditions with wait() - notifyAll() (notify()) to wait for updates from other threads
public synchronized Object take() throws InterruptedException {
while (head == null) wait();
...
}
public synchronized void add(Object newValue) {
...
notifyAll();
}

Threads

Better use Executors but can manually start Thread.
Can check for interruption (if (Thread.currentThread().isInterrupted()) return; ) or interrupted while wait / sleep
In latter case throws InterruptedException
Runnable task = () -> { 
	Thread.sleep(millis);
 };
Thread thread = new Thread(task);
thread.start();
thread.join(millis); // stops the thread

Thread-Local Variables

avoid sharing by giving each thread its own instance, using the ThreadLocal helper class
// construct one instance per thread
public static final ThreadLocal<NumberFormat> currencyFormat
= ThreadLocal.withInitial(() -> NumberFormat.getCurrencyInstance());

// access formatter
String amountDue = currencyFormat.get().format(total);