Processamento assíncrono com ExecutorServices

Introdução

Com máquinas cada vez mais potentes e com mais núcleos de processadores, é fundamental que seus programas consigam tirar proveito desses recursos. Do que adianta ter uma máquina com 16 processadores, se seu programa executa toda sua lógica de forma sequencial em uma única thread?Sistemas de mensageria, como JMS e AMQP, ajudam bastante no processamento assíncrono, desacoplado e distribuído. Porém, em alguns casos, eles podem ser uma “bala para matar um mosquito”, pois além de trazerem o overhead de um message broker na arquitetura, podemos estar apenas querendo usar os processadores da máquina, quebrando uma única tarefa que teria que ser executada de forma sequencial e em uma única thread em um conjunto de tarefas a serem executadas de forma assíncrona e em várias threads.
Antes do Java 5, para realizar esse tipo de tarefa, era necessário criar e gerenciar as threads manualmente, o que não era uma tarefa muito agradável. O Java 5 trouxe um framework para processamento concorrente que ajuda bastante: Executor Services.

Exemplos

Para ilustrar o uso desse recurso, vamos abordar 2 exemplos.
O primeiro exemplo vai tratar da execução de uma lista de tarefas independentes, enquanto que o segundo irá tratar da execução de uma lista de tarefas dependentes, ou seja, o resultado final depende da execução de todas elas.
Nesse post iremos abordar o primeiro exemplo e, no próximo post, o segundo.

Projeto

Como usual, iremos utilizar o maven para o gerenciamento de dependências e de build. O pom.xml ficou definido como a seguir.

pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.wordpress.lucianomolinari</groupId>
	<artifactId>taskexecutor</artifactId>
	<version>1.0.0</version>

	<properties>
		<junit.version>4.8.2</junit.version>
		<log4j.version>1.2.17</log4j.version>
	</properties>

	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>2.3.2</version>
				<configuration>
					<source>1.6</source>
					<target>1.6</target>
				</configuration>
			</plugin>
		</plugins>
	</build>

	<dependencies>
		<dependency>
			<groupId>log4j</groupId>
			<artifactId>log4j</artifactId>
			<version>${log4j.version}</version>
		</dependency>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>${junit.version}</version>
			<scope>test</scope>
		</dependency>
	</dependencies>

</project>

Vamos agora criar a classe que representa a tarefa a ser executada. Seu método de processamento é bastante simples. Ele apenas recebe uma String e, para simular uma tarefa demorada, “trava” a execução por alguns milissegundos e então faz o log da mensagem recebida.

SimpleTask.java
package com.wordpress.lucianomolinari.taskexecutor.unrelatedtask;

import org.apache.log4j.Logger;

/**
 * 
 * @author Luciano Molinari
 */
public final class SimpleTask {
	private static final Logger logger = Logger.getLogger(SimpleTask.class);
	private final String message;

	public SimpleTask(String message) {
		this.message = message;
	}

	public void execute() {
		// waits 5 milliseconds
		try {
			Thread.sleep(5);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		logger.debug("Message processed: " + message);
	}

}

Como teste e para analisar as diferenças para tempo de processamento, o mesmo conjunto de tarefas será executado sequencialmente e em paralelo. Para isso, será definida uma interface para os executors sequencial e paralelo.

SimpleTaskExecutor.java
package com.wordpress.lucianomolinari.taskexecutor.unrelatedtask;

/**
 * Interface for executing {@link SimpleTask}
 * 
 * @author Luciano Molinari
 */
public interface SimpleTaskExecutor {

	void execute(final SimpleTask simpleTask);

}

A implementação sequencial é bastante simples e apenas delega a execução das tarefas para a classe SimpleTask definida anteriormente.

SimpleTaskExecutorSequential.java
package com.wordpress.lucianomolinari.taskexecutor.unrelatedtask;

/**
 * Execute {@link SimpleTask} in a sequential manner
 * 
 * @author Luciano Molinari
 */
public class SimpleTaskExecutorSequential implements SimpleTaskExecutor {

	@Override
	public void execute(final SimpleTask simpleTask) {
		simpleTask.execute();
	}

}

A implementação que executa as tarefas em paralelo utiliza o Executor do Java, conforme pode ser visto abaixo.

SimpleTaskExecutorParallel.java
package com.wordpress.lucianomolinari.taskexecutor.unrelatedtask;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Execute {@link SimpleTask} in a parallel manner
 * 
 * @author Luciano Molinari
 */
public class SimpleTaskExecutorParallel implements SimpleTaskExecutor {

	/**
	 * Creates a pool with 10 threads to execute jobs
	 */
	private final ExecutorService executor = Executors.newFixedThreadPool(10);

	@Override
	public void execute(final SimpleTask simpleTask) {
		executor.execute(new Runnable() {
			@Override
			public void run() {
				simpleTask.execute();
			}
		});
	}

}

Repare que é bem simples utilizar a classe Executors e a interface ExecutorService. O método newFixedThreadPool() cria um pool de Threads com a quantidade informada por parâmetro. Existem outros tipos de pool, que podem ser consultados na documentação da classe Executors.
Com o executor em mãos, é possível executar tanto objetos Runnable quanto Callable. A principal diferença é que o Runnable executa tarefas void, enquanto que o Callable pode retornar um valor. O uso do Callable será mostrado no próximo post sobre esse assunto.
Para validar a diferença de tempo de execução para as 2 implementações, vamos criar uma classe de testes com JUnit que executa 50 tarefas usando cada uma delas. A verificação do tempo gasto em cada abordagem será feito via mensagens de log mesmo, por simplicidade. Para isso, vamos configurar o log4j para que o mesmo mostre as mensagens no console.

log4j.properties
# Root logger option
log4j.rootLogger=DEBUG, stdout
 
# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} - %m%n
TestSimpleTaskExecutor.java
package com.wordpress.lucianomolinari.taskexecutor.unrelatedtask;

import java.util.ArrayList;
import java.util.List;

import org.apache.log4j.Logger;
import org.junit.Test;

/**
 * 
 * @author Luciano Molinari
 */
public class TestSimpleTaskExecutor {
	private final Logger logger = Logger.getLogger(TestSimpleTaskExecutor.class);

	@Test
	public void testSequential() {
		logger.debug("--------------");
		logger.debug("Executing tasks in a sequential manner...");
		SimpleTaskExecutor executor = new SimpleTaskExecutorSequential();
		for (SimpleTask task : getListToBeExecuted()) {
			executor.execute(task);
		}
	}

	@Test
	public void testParallel() {
		logger.debug("--------------");
		logger.debug("Executing tasks in a parallel manner...");
		SimpleTaskExecutor executor = new SimpleTaskExecutorParallel();
		for (SimpleTask task : getListToBeExecuted()) {
			executor.execute(task);
		}
		// it is necessary to wait for a moment so the threads can finish their
		// jobs
		try {
			Thread.sleep(100);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	private List<SimpleTask> getListToBeExecuted() {
		List<SimpleTask> tasks = new ArrayList<SimpleTask>();
		for (int index = 1; index <= 50; index++) {
			tasks.add(new SimpleTask("Message " + index));
		}
		return tasks;
	}

}

Ao executar essa classe, obtemos o seguinte output.

23:21:55,062 - --------------
23:21:55,063 - Executing tasks in a sequential manner...
23:21:55,069 - Message processed: Message 1
23:21:55,074 - Message processed: Message 2
23:21:55,079 - Message processed: Message 3
23:21:55,085 - Message processed: Message 4
23:21:55,090 - Message processed: Message 5
23:21:55,095 - Message processed: Message 6
....
23:21:55,307 - Message processed: Message 45
23:21:55,312 - Message processed: Message 46
23:21:55,318 - Message processed: Message 47
23:21:55,323 - Message processed: Message 48
23:21:55,328 - Message processed: Message 49
23:21:55,333 - Message processed: Message 50

23:21:55,334 - --------------
23:21:55,334 - Executing tasks in a parallel manner...
23:21:55,344 - Message processed: Message 1
23:21:55,344 - Message processed: Message 2
23:21:55,344 - Message processed: Message 3
23:21:55,344 - Message processed: Message 4
23:21:55,344 - Message processed: Message 5
...
23:21:55,365 - Message processed: Message 42
23:21:55,365 - Message processed: Message 48
23:21:55,365 - Message processed: Message 45
23:21:55,365 - Message processed: Message 49
23:21:55,365 - Message processed: Message 50

Repare que no teste sequencial, todas as mensagens são mostradas na ordem em que foram enviadas, o que não ocorre com o teste parelelo, já que existem 10 threads executando as tarefas.
O que realmente chama a atenção é a questão do tempo de processamento em cada cenário. Veja que, no primeiro teste, entre o print do início do método de teste e o da última mensagem, existe um intervalo de 270ms (23:21:55,063 e 23:21:55,333), enquanto que no segundo esse tempo é de apenas 31ms (23:21:55,334 e 23:21:55,365).
Vale ressaltar que o resultado desse teste depende da quantidade de processadores presentes na máquina, podendo fazer com que esse tempo seja um pouco pior ou até melhor.

O código fonte apresentado pode ser encontrado aqui.

Na próximo posto daremos continuidade sobre esse assunto. Espero que tenham gostado, até a próxima!


Quer aprender muito mais? Não deixe de ver meu curso Construa uma aplicação do zero com JEE 7, Java 8 e Wildfly.

Esta entrada foi publicada em Parallelism com as etiquetas , , , . ligação permanente.

3 respostas a Processamento assíncrono com ExecutorServices

  1. Pingback: Processamento assíncrono com ExecutorServices – Parte 2 | Software, Java e Arquitetura

  2. Lintz diz:

    Olá Luciano, tudo bem?

    Achei muito legal seu artigo sobre processamento assíncrono com esta interface ExecutorServices e até cheguei a implementar um exemplo baseado na sua explicação usando a implementação newSingleThreadExecutor(). Porém, encontrei um probleminha e não consigo entender o que está acontecendo. Poderia me ajudar? O problema é o seguinte: coloquei o processamento “pesado” no submit implementando Callable e outra thread(do swing) chama o método get(). Notei que se a execução chegar primeiro em get() antes de concluir a execução do submit() ocorre o travamento das duas. Caso a execução do submit() ocorra primeiro da chamada do get() não há problema.

    Obrigado pela atenção

    Lintz.

Deixe um comentário