Preloader image

In this example will be used Concurrency Utilities for Java EE, or JSR 236.

This standard allows application developers to use concurrency utilities managed by the application server. In this way, the developer no longer has the responsibility to manually manage thread polls or threads. Also, in a non-managed Thread object, the container cannot guarantee that other Java EE platform services work correctly. For these reasons, it is recommended the usage of managed threads whenever the need arise. More information can be found here.

Main Components of the Concurrency Utilities

The standard specifies main components of the concurrency utilities. In short, these components are managed objects that offer concurrency facilities. These objects, since are managed by the application, can be injected either using CDI, either JNDI. More information can be found here.

ManagedExecutorService

A ManagedExecutorService is an object that allows application developers to submit tasks asynchronously. Tasks are executed on threads that are managed by the container.

Example

Here is a class that uses a ManagedExecutorService (full code can be found here):

@RequestScoped
public class ManagedService {

    @Resource
    private ManagedExecutorService executor;

    public CompletableFuture<Integer> asyncTask(final int value) {
        return CompletableFuture
                .supplyAsync(longTask(value, 100, null), executor)
                .thenApply(i -> i + 1);
    }

    public CompletableFuture<Integer> asyncTaskWithException(final int value) {
        return CompletableFuture
                .supplyAsync(longTask(value, 100, "Planned exception"), executor)
                .thenApply(i -> i + 1);
    }

    private Supplier<Integer> longTask(final int value,
                                       final int taskDurationMs,
                                       final String errorMessage) {
        return () -> {
            if (nonNull(errorMessage)) {
                throw new RuntimeException(errorMessage);
            }

            try {
                TimeUnit.MILLISECONDS.sleep(taskDurationMs);
            } catch (InterruptedException e) {
                throw new RuntimeException("Problem while waiting");
            }
            return value + 1;
        };
    }

}

The ManagedExecutorService object, being an managed object, is injected using the @Resource annotation.

This example simulates a long running computation, defined in the longTask method.

The capabilities of ManagedExecutorService are exemplified in the asyncTask and asyncTaskWithException methods. Both methods invoke the longTask method defined above; each execution of longTask is performed in a thread managed by the application. The method asyncTask simulates a successful execution, while the asyncTaskWithException simulates a execution that will throw an exception.

The methods are used in the following test class (full example can be found here):

@RunWith(Arquillian.class)
public class ManagedServiceTest {

    @Inject
    private ManagedService managedService;

    @Deployment()
    public static final WebArchive app() {
        return ShrinkWrap.create(WebArchive.class, "example.war")
                .addClasses(ManagedService.class)
                .addAsWebInfResource(EmptyAsset.INSTANCE, "beans.xml");
    }

    @Test
    public void managedInvocationTest() {
        final CompletableFuture<Integer> future = managedService.asyncTask(1);
        try {
            assertEquals(3, future.get(200, TimeUnit.MILLISECONDS).intValue());
        } catch (Exception e) {
            fail("Unexpected exception" + e);
        }
    }

    @Test(expected = TimeoutException.class)
    public void managedInvocationTestWithTimeout() throws InterruptedException, ExecutionException, TimeoutException {
        final CompletableFuture<Integer> future = managedService.asyncTask(1);
        future.get(10, TimeUnit.MILLISECONDS);
    }

    @Test
    public void managedInvocationTestWithException() {
        final CompletableFuture<Integer> future = managedService.asyncTaskWithException(1);

        try {
            future.get(200, TimeUnit.MILLISECONDS);
        } catch (ExecutionException e) {
            assertEquals("Planned exception", e.getCause().getMessage());
        } catch (Exception e) {
            fail("Unexpected exception" + e);
        }
    }
}

ManagedScheduledExecutorService

A ManagedScheduledExecutorService is an object that allows developers to execute tasks asynchronously at specific times. The tasks are executed on threads started by the container.

Example

Full example can be found here:

@RequestScoped
public class ManagedScheduledService {

    @Resource
    private ManagedScheduledExecutorService executor;

    public Future<Integer> singleFixedDelayTask(final int value,
                                                final String errorMessage) {
        return executor.schedule(
                longCallableTask(value, 10, errorMessage), 100, TimeUnit.MILLISECONDS);
    }

    public ScheduledFuture<?> periodicFixedDelayTask(final int value,
                                                     final String errorMessage,
                                                     final CountDownLatch countDownLatch) {
        return executor.scheduleAtFixedRate(
                longRunnableTask(value, 10, errorMessage, countDownLatch), 0, 100, TimeUnit.MILLISECONDS);
    }

    private Runnable longRunnableTask(final int value,
                                      final int taskDurationMs,
                                      final String errorMessage,
                                      final CountDownLatch countDownLatch) {
        return () -> {
            failOrWait(taskDurationMs, errorMessage);
            Integer result = value + 1;
            countDownLatch.countDown();
        };
    }

    private Callable<Integer> longCallableTask(final int value,
                                               final int taskDurationMs,
                                               final String errorMessage) {
        return () -> {
            failOrWait(taskDurationMs, errorMessage);
            return value + 1;
        };
    }

    private void failOrWait(final int taskDurationMs,
                            final String errorMessage) {
        if (nonNull(errorMessage)) {
            throw new RuntimeException(errorMessage);
        }
        try {
            TimeUnit.MILLISECONDS.sleep(taskDurationMs);
        } catch (InterruptedException e) {
            throw new RuntimeException("Problem while waiting");
        }
    }

}

This example also defines a method, longCallableTask, simulating the execution of a long running computation.

The method singleFixedDelayTask schedules a long running task (by calling longCallableTask), but the execution will start after 100 ms. The method periodicFixedDelayTask schedules tasks to be run periodically, after each 100 ms, with an initial delay of 0.

The methods are used in the following test class (full code can be found here):

@RunWith(Arquillian.class)
public class ManagedScheduledServiceTest {

    @Inject
    private ManagedScheduledService scheduledService;

    @Deployment()
    public static final WebArchive app() {
        return ShrinkWrap.create(WebArchive.class, "example.war")
                .addClasses(ManagedScheduledService.class)
                .addAsWebInfResource(EmptyAsset.INSTANCE, "beans.xml");
    }

    @Test
    public void singleFixedDelayTask() throws InterruptedException, ExecutionException, TimeoutException {
        final Future<Integer> futureA = scheduledService.singleFixedDelayTask(1, null);
        final Future<Integer> futureB = scheduledService.singleFixedDelayTask(50, null);

        assertEquals(2, futureA.get(200, TimeUnit.MILLISECONDS).intValue());
        assertEquals(51, futureB.get(200, TimeUnit.MILLISECONDS).intValue());

    }

    @Test
    public void periodicFixedDelayTask() throws InterruptedException {
        final CountDownLatch countDownLatch = new CountDownLatch(4); // execute 4 times
        final ScheduledFuture<?> scheduledFuture = scheduledService.periodicFixedDelayTask(1, null, countDownLatch);
        countDownLatch.await(500, TimeUnit.MILLISECONDS);
        if (!scheduledFuture.isCancelled()) {
            scheduledFuture.cancel(true);
        }
    }

    @Test
    public void singleFixedDelayTaskWithException() {
        final Future<Integer> future = scheduledService.singleFixedDelayTask(1, "Planned exception");
        try {
            future.get(200, TimeUnit.MILLISECONDS);
        } catch (ExecutionException e) {
            assertEquals("Planned exception", e.getCause().getMessage());
        } catch (Exception e) {
            fail("Unexpected exception" + e);
        }
    }

    @Test
    public void periodicFixedDelayTaskWithException() {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final ScheduledFuture<?> scheduledFuture = scheduledService.periodicFixedDelayTask(1, "Planned exception", countDownLatch);

        try {
            countDownLatch.await(200, TimeUnit.MILLISECONDS);
            scheduledFuture.get(200, TimeUnit.MILLISECONDS);
        } catch (ExecutionException e) {
            assertEquals("Planned exception", e.getCause().getMessage());
        } catch (Exception e) {
            fail("Unexpected exception" + e);
        }

        if (!scheduledFuture.isCancelled()) {
            scheduledFuture.cancel(true);
        }
    }

}

ManagedThreadFactory

A ManagedThreadFactory is an object that allows developers to create container managed threads.

Example

Full example can be found here:

@RequestScoped
public class ThreadFactoryService {

    @Resource
    private ManagedThreadFactory factory;

    public void asyncTask(final LongTask longTask) throws InterruptedException {
        final Thread thread = factory.newThread(longTask);
        thread.setName("pretty asyncTask");
        thread.start();
    }

    public void asyncHangingTask(final Runnable longTask) {
        final Thread thread = factory.newThread(longTask);
        thread.setName("pretty asyncHangingTask");
        thread.start();

        if (thread.isAlive()) {
            thread.interrupt();
        }
    }

    public static class LongTask implements Runnable {
        private final int value;
        private final long taskDurationMs;
        private final CountDownLatch countDownLatch;
        private int result;
        private AtomicBoolean isTerminated = new AtomicBoolean(false);

        public LongTask(final int value,
                        final long taskDurationMs,
                        final CountDownLatch countDownLatch) {
            this.value = value;
            this.taskDurationMs = taskDurationMs;
            this.countDownLatch = countDownLatch;
        }

        public int getResult() {
            return result;
        }

        public boolean getIsTerminated() {
            return isTerminated.get();
        }

        @Override
        public void run() {
            try {
                TimeUnit.MILLISECONDS.sleep(taskDurationMs);
            } catch (InterruptedException e) {
                isTerminated.set(true);
                countDownLatch.countDown();
                throw new RuntimeException("Problem while waiting");
            }

            result = value + 1;
            countDownLatch.countDown();
        }
    }
}

This example defines a class implementing Runnable, executing a long running task in the run method.

The method asyncTask just creates a managed thread (using the injected ManagedThreadFactory) then starts it. The method asyncHangingTask also creates a managed thread, starts it, but then stops it.

The following class tests these methods (full code can be found here):

@RunWith(Arquillian.class)
public class ThreadFactoryServiceTest {

    @Inject
    private ThreadFactoryService factoryService;

    @Deployment()
    public static final WebArchive app() {
        return ShrinkWrap.create(WebArchive.class, "example.war")
                .addClasses(ThreadFactoryService.class)
                .addAsWebInfResource(EmptyAsset.INSTANCE, "beans.xml");
    }

    @Test
    public void asyncTask() throws InterruptedException {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final LongTask longTask = new LongTask(1, 50, countDownLatch);
        factoryService.asyncTask(longTask);

        countDownLatch.await(200, TimeUnit.MILLISECONDS);

        assertEquals(2, longTask.getResult());
    }

    @Test
    public void asyncHangingTask() throws InterruptedException {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final LongTask longTask = new LongTask(1, 1000000, countDownLatch);

        factoryService.asyncHangingTask(longTask);

        countDownLatch.await(200, TimeUnit.MILLISECONDS);

        assertTrue(longTask.getIsTerminated());
    }
}

Full project example can be found here. It’s a Maven project, and all the tests can be executed by running mvn clean install command.