[1z0-829] 13-Threads
These are highlights from a Java 8 developer with years of experience trying to highlight only the important details that we easily miss throughout the time. You might think that sometimes is silly to highlight some things but even those things might help you to remember the entire context. —
- Order of thread execution is not often guaranteed
- Remember that calling run() instead of start() does not start a new thread.
- There are different types of threads:
- system thread is created by the JVM and runs in the background of the application, EX: garbage collection
- user defined thread is one created by the application developer to accomplish a specific task
- daemon thread can be both system or user defined threads and will not prevent the JVM from exiting when the program finishes
- Thread states are
- NEW (when you create a new thread)
- RUNNABLE (as soon as you call start() but it does not mean that it is already being executed)
- BLOCKED (Waiting to enter synchronized block)
- WAITING (Waiting indefinitely to be notified)
- TIMED_WAITING (Waiting a specified time)
- Using while() loop to check for data without some kind of delay is considered a bad coding practice as it ties up CPU resources for no reason.
- Calling interrupt() on a thread in the TIMED_WAITING or WAITING state causes the main() thread to become RUNNABLE again. (The example below is a way to prevent the thread to wait more than needed for the result)
class Interrupt example {
public static void main(String[] a) {
final var mainThread = Thread.currentThread();
new Thread(() -> {
for (int i = 0; i < 1_000_000; i++) counter++;
mainThread.interrupt();
}).start();
while (counter < 1_000_000) {
System.out.println("Not reached yet");
try {
Thread.sleep(1_000); // 1 SECOND
} catch (InterruptedException e) {
System.out.println("Interrupted!");
}
}
System.out.println("Reached: " + counter);
}
}
- The concurrency API includes the ExecutorService Interface. which defines services that create and manage threads. The Concurrency API was created to make complicated work easier
- Executors.newSingleThreadExecutor() will create a single thread to execute. so although it is async and in the example below you will have 2 threads (the main and the thread of the ExecutorService) but you can guarantee that sequence of the ExecutionService will remain
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ConcurrencyTest {
public static void main(String[] args) {
final ExecutorService executorService = Executors.newSingleThreadExecutor();
try{
System.out.println("start");
executorService.execute(() -> System.out.println("First print"));
executorService.execute(() -> System.out.println("Second print"));
executorService.execute(() -> System.out.println("Third print"));
System.out.println("end");
}finally {
executorService.shutdown();
}
}
}
- Once you finished using a thread executor, it is important that you call the shutdown() method. but you have to remember that the shutdown does not stop any task that have already been submitted to the thread executor.
- execute() vs submit()
- The execute() returns void
- the submit() returns future.
- The submit() method can receive a Runnable or a Callable
the biggest difference is that with Runnable your future will always return null while with callable it will return a value. - Example with Runnable
public static void main(String[] args) throws ExecutionException, InterruptedException {
final ExecutorService service = Executors.newSingleThreadExecutor();
final Future<?> futureWithoutResult = service.submit(() -> {
System.out.println("starting thread");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("ending thread;");
});
System.out.println("Waiting result");
final Object futureResult = futureWithoutResult.get();
System.out.println("Result->" + futureResult);
System.out.println("Result done");
}
/**
* The results will be like
*
Waiting result
starting thread
ending thread;
Result->null
Result done
*
*
*/
- Now, if you return something in your submit() bellow it will use th Callable
and the future will return the value
public static void main(String[] args) throws ExecutionException, InterruptedException {
final ExecutorService service = Executors.newSingleThreadExecutor();
final Future<Integer> futureWithResult = service.submit(() -> {
System.out.println("starting thread");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("ending thread;");
return 10;
});
System.out.println("Waiting result");
final Object futureResult = futureWithResult.get();
System.out.println("Result->" + futureResult);
System.out.println("Result done");
}
/*
The result will be
Waiting result
starting thread
ending thread;
Result->10
Result done
* */
- In case you don`t need the results but you want to wait for the services to end you can use the example below:
service.awaitTermination(1, TimeUnit.MINUTES);
- Concurrency API also offers you the possibility of scheduling your calls with ScheduleExecutorService where you can use the following methods:
- schedule(Callable
callable, long delay, TimeUnit unit) - schedule(Runnable command, long delay, TimeUnit unit)
- scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
- The scheduleAtFixedRate will be submitted to the executor every period regardless of whether the previous task finished
- scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
- schedule(Callable
- You can check the example below as an example
ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
Runnable task1 = () -> System.out.println("Hello ");
Callable<String> task2 = () -> "Task2";
ScheduledFuture<?> r1 = service.schedule(task1, 10, TimeUnit.SECONDS);
ScheduledFuture<?> r2 = service.schedule(task2, 3, TimeUnit.MINUTES);
- Instead of using the Single thread you can also use the ThreadPool with ExecutorService in the example below we create a thread pool of 20 but we use 30 threads which will be queued and processed after the release of each thread.
public static void main(String[] args) throws ExecutionException, InterruptedException {
final ExecutorService service = Executors.newFixedThreadPool(20);
for(int i = 0; i< 30; i++){
final int index = i;
service.submit(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("Printing " + index);
});
}
service.shutdown();
service.awaitTermination(1, TimeUnit.MINUTES);
System.out.println("end");
}
- Remember that volatile attribute ensures that only one thread is modifying a variable at one time and the read among multiple threads is consistent BUT the accumulator below is already a problem since there are two distinct operations
private volatile int counter = 0;
private void incrementAndReport() {
System.out.println((counter++) + ""); // This is not thread safe
}
- To solve the issue above you have the atomic classes like
- AtomicBoolean
- AtomicInteger
- AtomicLong
- and you could use the incrementAndGet() method to solve the issue above;
- In the synchronized block you have to use a lock… and any object can be a lock.\
var mylock = new Object()
synchronized(manager){
// synccode
}
- In the example below we use the object itself as a lock…
public class SyncTest {
public static void main(String[] args) {
final Counter counter = new Counter();
final ExecutorService executorService = Executors.newFixedThreadPool(10);
for(int i =0; i< 10 ;i++){
executorService.submit(() ->{
for(int x =0; x< 10 ;x++) {
counter.increment();
}
});
}
}
}
class Counter {
private int counter = 0;
public int increment(){
synchronized (this){
counter++;
System.out.println(counter);
return counter;
}
}
}
- We could synchronize the entire method as well the methods below would be equivalent
public int increment(){
synchronized (this){
counter++;
System.out.println(counter);
return counter;
}
}
// equivalent to
public synchronized int increment(){
counter++;
System.out.println(counter);
return counter;
}
- Lock interface is easy to use, you call lock() before the part you want to protect and unlock() to release the lock… below is an example converting the code to use The lock interface
class Counter {
Lock lock = new ReentrantLock();
private int counter = 0;
public int increment(){
try {
lock.lock();
counter++;
System.out.println(counter);
return counter;
} finally {
lock.unlock();
}
}
}
- In the Lock interface we have the following methods
- void lock(): Requests lock and blocks until lock is acquired
- void unlock(): Releases lock.
- boolean tryLock(): Requests lock and returns immediately. Returns boolean indicating whether lock was successfully acquired.
- boolean tryLock(long timeout, TimeUnit unit): Requests lock and blocks for specified time or until lock is acquired. Returns boolean indicating whether lock was successfully acquired.
- The CyclicBarrier is a tool in case you want to wait for every thread to reach one point so then you can release it. below is an example of a schoolbus that needs to wait for all the students before leaving
public class CyclicBarrierExample {
public static void main(String[] args) throws InterruptedException {
final ExecutorService twoPeopleCanJoinAtTheSameTime = Executors.newFixedThreadPool(40);
final SchoolBus schoolBus = new SchoolBus();
for(int i =0 ; i< 40; i++){
final var index =i;
twoPeopleCanJoinAtTheSameTime.submit(() -> {
try {
schoolBus.letsGo("Student " + index);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (BrokenBarrierException e) {
throw new RuntimeException(e);
}
});
}
Thread.sleep(30000);
}
}
class SchoolBus {
private CyclicBarrier cyclicBarrier = new CyclicBarrier(40);
public void letsGo(String student) throws InterruptedException, BrokenBarrierException {
System.out.println(student + "Joined the bus, waiting until bus is with everybody");
cyclicBarrier.await();
System.out.println("Every body is here lets go");
}
}
- To work with collections in a concurrent way you need to use special types of implementations. The example below is not async but illustrate a ConcurrentModificationException in case you want to change a HashMap while using its keyset
var animals = new HashMap<String, Integer>();
animals.put("dog", 1);
animals.put("cat", 2);
for(String key: animals.keySet())
animals.remove(key);
//This will throw a ConcurrentModificationException
- To solve the issue above you can use a ConcurrentHashMap
- Below are the concurrent types.
- ConcurrentHashMap: Map
- ConcurrentLinkedQueue: Queue
- ConcurrentSkipListMap: Map, SortedMap, NavigableMap, ConcurrentMap, ConcurrentNavigableMap
- ConcurrentSkipListSet: Set, SortedSet, NavigableSet
- CopyOnWriteArrayList: List
- CopyOnWriteArraySet: Set
- LinkedBlockingQueue: Queue, BlockingQueue
- Thread problems that might occur
- Starvation: When a single thread is perpetually dined access to a shared resource or lock.
- Livelock: When to or more threads are conceptually blocked forever, although they are each still active anf trying to complete their task.
- Deadlock: When two or more threads are blocked forever, each waiting on the other.
- Parallel streams will give you an unordered result
final List<Integer> integersList = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
integersList.parallelStream().forEach(System.out::println);
System.out.println("-----");
integersList.stream().forEach(System.out::println);
// The first output will be unordered while the second output will be ordered.
- You could solve this issue using the forEachOrdered, but it is not as performant as the parallel stream and has to be used with caution to not give slow results
final List<Integer> integersList = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
integersList.parallelStream().forEachOrdered(System.out::println);
// This will return an ordered list
- Since ordered is not guaranteed with parallel streams, methods like findAny() may result in unexpected behavior.
- If you also call methods like findFirst(), limit() and skip() Order is preserved but performance may suffer on parallel stream
- unordered streams can have great performance improvement if used in parallelStreams ```java
List.of(1,2,3,4,5,6).stream().unordered(); // No performance improvement… List.of(1,2,3,4,5,6).stream().unordered().parallel(); // Possibly Great performance improvement
* In the reducer part the third parameter is the accumulator which is used when parallel streams are executed.
```java
System.out.println(List.of('w', 'o', 'r', 'k')
.parallelStream()
.reduce("",
(s1,c) -> s1 + c,
(s2,s3) -> s2 + s3));