Processamento assíncrono com ExecutorServices – Parte 2

Seguindo o que foi apresentado na primeira parte do artigo, vamos falar mais um pouco sobre processamento assíncrono com ExecutorServices. Nessa segunda parte vamos abordar o seguinte caso de uso.
Nosso software receberá e processará um lote de transações. Ao final do processamento desse lote, o sistema deve fazer um “resumo” do processamento, indicando quantas transações foram processadas com sucesso e quantas foram processadas com erro. Esse resumo poderia ser enviado à um outro sistema, por exemplo, que poderia analisar a taxa de sucesso desse lote de transações que foi processado ou algo similar.

Vamos começar com a definição da classe que representa a transação. É uma classe bastante simples, que possui um id e um método que faz o processamento e retorna um booleano indicando se a transação foi ou não executada com sucesso.

Transaction.java
package com.wordpress.lucianomolinari.taskexecutor.relatedtask;

import org.apache.log4j.Logger;

import com.wordpress.lucianomolinari.taskexecutor.unrelatedtask.SimpleTask;

public final class Transaction {
	private final Long id;
	private static final Logger logger = Logger.getLogger(SimpleTask.class);

	public Transaction(Long id) {
		this.id = id;
	}

	/**
	 * Returns a boolean indicating if the transaction was processed
	 * successfully
	 * 
	 * @return
	 */
	public boolean process() {
		// waits 5 milliseconds(simulates a slow task, such as accessing a remote system)
		try {
			Thread.sleep(5);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		// error every 10 transactions
		boolean success = id % 10 != 0;
		logger.debug("Transaction processed: " + id + "; Result: " + success);
		return success;
	}

	public Long getId() {
		return id;
	}

	@Override
	public String toString() {
		return "Transaction [id=" + id + "]";
	}

}

Como o output final do sistema deve ser um resumo de execução do lote, vamos criar uma classe para armazenar esses dados.

ExecutionSummary.java
package com.wordpress.lucianomolinari.taskexecutor.relatedtask;

public class ExecutionSummary {
	private int numberOfOk;
	private int numberOfError;

	public void addOk() {
		numberOfOk++;
	}

	public void addError() {
		numberOfError++;
	}

	public void addExecutionSummary(ExecutionSummary executionSummary) {
		numberOfOk += executionSummary.getNumberOfOk();
		numberOfError += executionSummary.getNumberOfError();
	}

	public int getNumberOfOk() {
		return numberOfOk;
	}

	public int getNumberOfError() {
		return numberOfError;
	}

	@Override
	public String toString() {
		return "ExecutionSummary [numberOfOk=" + numberOfOk + ", numberOfError=" + numberOfError + "]";
	}

}

Agora que já temos as classes usadas no input e output do sistema, podemos começar a atacar a parte do processamento. Como a idéia é mostrar mais uma vez a diferença entre o processamento sequencial e paralelo, vamos definir uma interface comum de execução e uma implementação para cada um dos tipos de processamento. A interface é bastante simples, recebendo uma lista de transações a serem processadas e retornando o resumo da execução.

TransactionExecutor.java
package com.wordpress.lucianomolinari.taskexecutor.relatedtask;

import java.util.List;

public interface TransactionExecutor {

	/**
	 * Executes a {@link List} of {@link Transaction} and returns its execution
	 * summary, containing the number of transactions which were executed with
	 * success and with error
	 * 
	 * @param transactions
	 * @return
	 */
	ExecutionSummary execute(List<Transaction> transactions);

}

A implementação sequencial simplesmente itera sobre a lista e, conforme vai processando as transações, vai contabilizando os resultados.

TransactionExecutorSequential.java
package com.wordpress.lucianomolinari.taskexecutor.relatedtask;

import java.util.List;

public class TransactionExecutorSequential implements TransactionExecutor {

	@Override
	public ExecutionSummary execute(List<Transaction> transactions) {
		ExecutionSummary summary = new ExecutionSummary();
		for (Transaction transaction : transactions) {
			if (transaction.process()) {
				summary.addOk();
			} else {
				summary.addError();
			}
		}
		return summary;
	}

}

A classe de processamento paralelo é um pouco mais complexa. A idéia é dividir o lote de transações em sub-lotes, passar o processamento de cada um desses sub-lotes para uma Thread separada, obter o resultado parcial do processamento de cada sub-lote e, ao final, criar um resultado consolidado em cima desses dados. A figura abaixo ilustra essa idéia.

O interessante é que, como os processos estão sendo executados em paralelo, precisamos usar algum artifício para saber que as threads terminaram suas execuções e que já podemos obter os resultados parciais para fazer a consolidação.
O framework ExecutorServices oferece todos os recursos que necessitamos, conforme apresentado na classe abaixo. Assim como na primeira parte do artigo, estamos criando um pool de 10 Threads para executar as tarefas. Sendo assim, no início do método execute() calculamos a quantidade de transações que cada sub-lote deverá conter, considerando 10 sub-lotes (1 para cada Thread do pool). Feito isso, passamos cada sub-lote desses para ser executado. Diferentemente da primeira parte do artigo, onde usamos a interface Runnable para execução, nesse caso estamos utilizando a Callable. A grande diferença é que essa última permite que retornemos um resultado ao final da execução, como um método qualquer (lembre-se de que precisamos obter o resultado parcial da execução de cada processo).
Mas então surge outra pergunta: Se os processos estão sendo executados em outra Thread, como podemos obter esses resultados. A “mágica” está no objeto Future retornado pela Callable. Como o próprio nome já diz, esse objeto permite que possamos obter a resposta no “futuro”. Dessa forma, o código armazena cada um dos 10 objetos Future em uma lista para obtenção da resposta posteriormente.
O último loop do código faz justamente isso. O método get() do objeto Future fica “travado” até que a Thread a qual ele está associada termine sua execução. Assim, esse simples loop que realizamos garante que ele só será finalizado quando todas as Threads já estiverem terminado suas execuções. Por final, o programa consolida todos os resultados em um único resultado final.

TransactionExecutorParallel.java
package com.wordpress.lucianomolinari.taskexecutor.relatedtask;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class TransactionExecutorParallel implements TransactionExecutor {

	private final ExecutorService executor = Executors.newFixedThreadPool(10);

	@Override
	public ExecutionSummary execute(List<Transaction> transactions) {
		int numberOfElementsPerList = transactions.size() / 10;
		List<Future<ExecutionSummary>> summariesToBeGet = new ArrayList<Future<ExecutionSummary>>();
		int iniPos = 0;
		for (int i = 0; i < 10; i++) {
			final List<Transaction> subList = transactions.subList(iniPos, iniPos + numberOfElementsPerList);
			Future<ExecutionSummary> future = executor.submit(new Callable<ExecutionSummary>() {
				@Override
				public ExecutionSummary call() throws Exception {
					ExecutionSummary summary = new ExecutionSummary();
					for (Transaction transaction : subList) {
						if (transaction.process()) {
							summary.addOk();
						} else {
							summary.addError();
						}
					}
					return summary;
				}
			});
			summariesToBeGet.add(future);
			iniPos += numberOfElementsPerList;
		}
		ExecutionSummary summaryFinal = new ExecutionSummary();
		for (Future<ExecutionSummary> futureSummary : summariesToBeGet) {
			try {
				summaryFinal.addExecutionSummary(futureSummary.get());
			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (ExecutionException e) {
				e.printStackTrace();
			}
		}

		return summaryFinal;
	}

}

Para verificar a diferença de processamento entre as implementações paralela e sequencial, criamos uma classe de testes.

TestTransactionExecutor.java
package com.wordpress.lucianomolinari.taskexecutor.relatedtask;

import static org.junit.Assert.assertEquals;

import java.util.ArrayList;
import java.util.List;

import org.apache.log4j.Logger;
import org.junit.Test;

public class TestTransactionExecutor {
	private final Logger logger = Logger.getLogger(TestTransactionExecutor.class);

	@Test
	public void testSequential() {
		doTest(new TransactionExecutorSequential());
	}

	@Test
	public void testParallel() {
		doTest(new TransactionExecutorParallel());
	}

	private void doTest(TransactionExecutor transactionExecutor) {
		logger.debug("--------------");
		List<Transaction> transactions = getListOfTransactions();
		logger.debug("Executing transactions using " + transactionExecutor.getClass().getSimpleName());
		ExecutionSummary summary = transactionExecutor.execute(transactions);
		logger.debug("Transactions executed successfully");
		assertEquals(90, summary.getNumberOfOk());
		assertEquals(10, summary.getNumberOfError());
	}

	private List<Transaction> getListOfTransactions() {
		List<Transaction> transactions = new ArrayList<Transaction>();
		for (int i = 1; i <= 100; i++) {
			transactions.add(new Transaction(Long.valueOf(i)));
		}
		return transactions;
	}

}

Ao executar essa classe, obtemos o seguinte output.

23:59:05,668 - --------------
23:59:05,670 - Executing transactions using TransactionExecutorSequential
23:59:05,675 - Transaction processed: 1; Result: true
23:59:05,680 - Transaction processed: 2; Result: true
23:59:05,686 - Transaction processed: 3; Result: true
23:59:05,691 - Transaction processed: 4; Result: true
....
23:59:06,188 - Transaction processed: 97; Result: true
23:59:06,194 - Transaction processed: 98; Result: true
23:59:06,199 - Transaction processed: 99; Result: true
23:59:06,204 - Transaction processed: 100; Result: false
23:59:06,204 - Transactions executed successfully
23:59:06,217 - --------------
23:59:06,217 - Executing transactions using TransactionExecutorParallel
23:59:06,224 - Transaction processed: 1; Result: true
23:59:06,224 - Transaction processed: 11; Result: true
23:59:06,224 - Transaction processed: 21; Result: true
23:59:06,224 - Transaction processed: 31; Result: true
...
23:59:06,272 - Transaction processed: 80; Result: false
23:59:06,272 - Transaction processed: 20; Result: false
23:59:06,272 - Transaction processed: 90; Result: false
23:59:06,273 - Transaction processed: 100; Result: false
23:59:06,273 - Transactions executed successfully

Assim como na primeira parte do artigo, repare que o processamento paralelo (56ms) foi muito mais rápido que o processamento sequencial(534ms).

Conclusão

Essa técnica utilizada nessa parte do artigo lembra o conceito do Map Reduce, onde o processamento de uma tarefa é dividido entre várias máquinas e, ao final, é feita a consolidação dos resultados. Porém, o Map Reduce é muito mais poderoso para processamento de grandes volumes de dados, pois permite que esse seja feito entre várias máquinas, enquanto que o exemplo desse artigo permite a divisão entre processos da mesma máquina. Entretanto, como pudemos ver, a implementação apresentada é bastante simples e pode ser usada com bastante eficiência em cenários onde o volume de dados não seja tão big assim!
Outro detalhe é que essa técnica é conhecida como fork-join e foi adicionada como uma feature nativa do Java 7. No artigo preferi usar o ExecutorServices, mas o conceito é exatamente o mesmo.
Espero que tenha ficado claro como o ExecutorServices pode ajudar no melhor uso dos recursos disponíveis das máquinas existentes nos dias atuais, com vários núcleos de processadores. Como pudemos ver, os ganhos são bastante expressivos.

O código completo pode ser obtido aqui.


Quer aprender muito mais? Não deixe de ver meu curso Construa uma aplicação do zero com JEE 7, Java 8 e Wildfly.

Anúncios
Esta entrada foi publicada em Parallelism com as etiquetas , , , . ligação permanente.

Deixe uma Resposta

Preencha os seus detalhes abaixo ou clique num ícone para iniciar sessão:

Logótipo da WordPress.com

Está a comentar usando a sua conta WordPress.com Terminar Sessão / Alterar )

Imagem do Twitter

Está a comentar usando a sua conta Twitter Terminar Sessão / Alterar )

Facebook photo

Está a comentar usando a sua conta Facebook Terminar Sessão / Alterar )

Google+ photo

Está a comentar usando a sua conta Google+ Terminar Sessão / Alterar )

Connecting to %s