Spring parallel processing. Exécution parallèle avec Spring

Ce billet tente de démystifier l'utilisation en java5+ des exécutions en parallèle des tâches (parallel processing) en s'appuyant sur les classes de Spring qui masquent les difficultés du parallélisme en java.

Les machines de nos jours viennent avec plusieurs processeurs: des multi-core (quatre, huit ou plusieurs centaines cœurs).

Or la puissance de ces ressources n'est pas constamment utilisée.

L'exécution parallèle autorise des calculs parallèles qui consomment ces ressources pour améliorer les performances.

Mais c’est comme tout, le parallélisme ça se mérite. Même si c'est un petit chouia difficile!

En réalité, la difficulté n'est pas dans la mise en place technique mais réside dans la gestion de la cohésion des données.

Techniquement, c'est le développeur qui doit réaliser les calculs parallèles.

C'est sûr, demain apparaîtront côté JVM les méthodes de traitement du parallélisme comme c'était le cas de la gestion mémoire avec le garbage collector ou récemment la gestion des descripteurs de fichier avec try/catch en java7.

Java 8+ nous proposera (peut-être) de faire le calcul paralléle à notre place (nous développeurs).

Ce qui reste au développeur c'est de décrire les traitements atomiques (comme l'atomicité dans les transactions), et à partir de là c'est la JVM qui mène le traitement global parallélisé efficacement en fonction des ressources disponibles.

Bel enjeu de demain.

Signalons que nous n'utilisons pas directement la classe Executors de java qui offre des méthodes statiques pour instancier un Executor ou un ExecutorService.

A la place nous utiliserons les classes de Spring.

La démo ci-après est faite en java7 et spring 3.

Passons à la mise en pratique.

Voici les étapes pour concevoir notre démonstration simple.

ETAPE 1. ECRIRE UN BEAN MODEL

La classe Tache décrit le model de notre simple démo:

package fr.netapsys.executorservice.beans;

import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.Date;

public class Tache implements Serializable {

	private final static transient SimpleDateFormat sdf=
                 new SimpleDateFormat("dd/MM/yy HH:mm:ss");
	private static final long serialVersionUID = 1L;

	private String libelle, strDate;
	
	public Tache(){
		super();
	}
	
	public Tache(final String libelle,final String strDate){
		setLibelle(libelle);
		setStrDate(strDate);
	}
	

	public Tache(final String libelle,final Date date){
		setLibelle(libelle);
		setStrDate(sdf.format(date));
	}
	//.... omis les getters/setters et toString
}

ETAPE 2. AJOUTER LES DEPENDANCES

Seules les dépendances de Spring sont ajoutés, le pom.xml est donné en annexe.

ETAPE 3. CONFIGURER SPRING

Nous retenons la configuration java-centric dans cette démo.

La classe chargée de configurer Spring est JavaConfigurator:

package fr.netapsys.executorservice.javacfg;
import java.util.concurrent.ExecutorService;
import org.springframework.context.annotation.*;
import org.springframework.core.task.TaskExecutor;
import org.springframework.core.task.support.ExecutorServiceAdapter;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
public class JavaConfigurator {
    protected static int nbThreadPool=5;    
    /***
     * Use l'implementation spring 
       ThreadPoolTaskScheduler de l'interface TaskExecutor
    */
    @Bean
    public TaskExecutor taskExecutor() {
       ThreadPoolTaskExecutor tpte = new ThreadPoolTaskExecutor(); 
       tpte.setCorePoolSize(nbThreadPool);
       tpte.setMaxPoolSize(nbThreadPool*2);
       tpte.setQueueCapacity(nbThreadPool*10);
       return tpte;
    }
    /* Use l'adapter de spring ExecutorServiceAdapter pour 
         retourner une instance ExecutorService
         au lieu de Executors.newFixedThreadPool de java
    */
    @Bean
    public ExecutorService executorService() {
      return new ExecutorServiceAdapter(taskExecutor()); 
    }
}

Explications:

C'est ici que la configuration du contexte de spring est regroupée.

Une instance de TaskExecutor est définie: précisément c'est l'implémentation ThreadPoolTaskExecutor de spring retenue.

L'instance ThreadPoolTaskExecutor est configurée ainsi:

- corePoolSize initialisé à 5. Le nombre de thread lancés simultanément,

- maxPoolSize le nombre maximal de thread lancés en parallèle. Au dela il sera mis en pool,

- queueCapacity la taille max du pool. Au délà une exception de rejet est levée (à moins de configuration particulière).

Nous avons aussi défini une instance de ExecutorService qui autorise de lancer un objet Runnable ou Callable.

Nous y reviendrons une autre fois sur les Callable.

NOTES.

- Les commentaires dans le code complètent ces explications,

- C'est spring qui se charge d'appeler la méthode shutdown sur ExecutorService donc il n'est pas autorisé de l'appeler manuellement.

ETAPE 4. ECRIRE UNE CLASSE RUNNABLE (THREAD)

La classe MyThread implémente l'interface Runnable:

package fr.netapsys.executorservice;
import java.util.Date;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.*;
import fr.netapsys.executorservice.beans.Tache;

public class MyThread implements Runnable {
    private AtomicInteger num;
    private Logger logger=LoggerFactory.getLogger("MyThread");
    public MyThread(){super();    }
    public MyThread(AtomicInteger ai){
       this(); 
       this.num=ai;
    }
    @Override
    public void run() {
       logger.info(Thread.currentThread().getName()+" demarrage:"+num);
       process(); /*simulate long process*/
       logger.info(Thread.currentThread().getName()+" termine.");
    }
    private void process() {
        Tache tache=new Tache("libel"+num, new Date());	
    	logger.info("processing tache:"+tache);
    }
}

Dans la classe MyThread qui redéfinit la méthode run qui simule un long processus.

Dans notre cas, une tache est créée puis logguée. Je n'utilise pas de Thread.sleep afin de ne pas masquer les difficulltés liées à l'exécution parallèle.

Il serait utile de surcharger la méthode toString() du bean Tache pour un affichage clair.

ETAPE 5. Tester avec JUnit

Avec ce test nous mettons en pratique l'exécution parallèle en appelant la méthode execute de
l'instance de TaskExecutor. Ce test hérite de la classe abstrainte TestParent donné ci-après.

package fr.netapsys.executorservice.tests;

import static org.junit.Assert.assertNotNull;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import org.slf4j.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.task.TaskExecutor;
import fr.netapsys.executorservice.MyThread;

public class TestSpringTaskExecutor extends TestParent {
	private Logger logger=LoggerFactory.getLogger(this.getClass());
	@Autowired TaskExecutor threadPoolTaskExecutor;
	@Test
	public void testSpringTaskExecutor() 
                             throws InterruptedException  {
		assertNotNull(threadPoolTaskExecutor);
		for (int k = 0; k < 5; k++) {
			Runnable myThread = 
                            new MyThread(new AtomicInteger(k));
			threadPoolTaskExecutor.execute(myThread);
		}
		Thread.sleep(500);
		logger.info("Finished all threads");
	}	
}

La seule chose à noter est la boucle for qui crée cinq threads.

Ces threads sont lancés simultanément via la méthode execute de l'instance ThreadPoolTaskExecutor spring.

En effet, à chaque itération, un objet Runnable est passée à la méthode execute.

Ainsi, plusieurs tâches sont lancées simultanément dont chacune réalise le traitement dans la méthode run.

Dans notre cas, chaque thread initialise une instance du bean Tache puis le loggue.

La classe abstraite TestParent permet de charger le contexte spring à partir de JavaConfigurator:

package fr.netapsys.executorservice.tests;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import fr.netapsys.executorservice.javacfg.JavaConfigurator;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes={JavaConfigurator.class})
public abstract class TestParent{
	@Autowired ApplicationContext sprinCtx;
}

Il reste à lancer mvn test et observer les traces logs.

En fonction de votre environnement, les traces logs ne sont pas forcément ordonnés.

Dans un proche billet nous ferons tout cela avec le framework Camel ce qui rend la chose plus simple.

ANNEXE 1.

Voici le contenu du pom.xml:

<properties>
	<jdk-version>1.7</jdk-version>
	<!-- Log -->
	<log4j-version>1.2.17</log4j-version>
	<slf4j-version>1.6.6</slf4j-version>
	<!-- Spring -->
	<spring-version>3.1.2.RELEASE</spring-version>
	<!-- versions dependances de test -->
	<junit-version>4.10</junit-version>
	<javassist-version>3.12.1.GA</javassist-version>
	<cglib-version>2.2.2</cglib-version>
</properties>
	
<dependencies>
	<!-- 2 dep transitive de JavConfig see @Configuration -->
	<dependency>
		<groupId>javassist</groupId>
		<artifactId>javassist</artifactId>
		<version>${javassist-version}</version>
		<scope>compile</scope>
	</dependency>
	<dependency>
		<groupId>cglib</groupId>
		<artifactId>cglib</artifactId>
		<version>${cglib-version}</version>
		<scope>compile</scope>
	</dependency>
	<!-- spring-core -->
	<dependency>
		<groupId>org.springframework</groupId>
		<artifactId>spring-core</artifactId>
		<version>${spring-version}</version>
		<scope>compile</scope>
	</dependency>

	<!-- spring-expression -->
	<dependency>
		<groupId>org.springframework</groupId>
		<artifactId>spring-expression</artifactId>
		<version>${spring-version}</version>
		<scope>compile</scope>
	</dependency>
	<!-- spring-context -->
	<dependency>
		<groupId>org.springframework</groupId>
		<artifactId>spring-context</artifactId>
		<version>${spring-version}</version>
		<scope>compile</scope>
	</dependency>
<!-- spring-asm -->
	<dependency>
		<groupId>org.springframework</groupId>
		<artifactId>spring-asm</artifactId>
		<version>${spring-version}</version>
		<scope>compile</scope>
	</dependency>
<!-- spring-beans -->
	<dependency>
		<groupId>org.springframework</groupId>
		<artifactId>spring-beans</artifactId>
		<version>${spring-version}</version>
		<scope>compile</scope>
	</dependency>
	<!-- spring-context-support -->
	<dependency>
		<groupId>org.springframework</groupId>
		<artifactId>spring-context-support</artifactId>
		<version>${spring-version}</version>
		<scope>compile</scope>
	</dependency>
	<!-- logging -->
	<dependency>
		<groupId>org.slf4j</groupId>
		<artifactId>slf4j-api</artifactId>
		<version>1.6.6</version>
	</dependency>
	<dependency>
	 <groupId>org.slf4j</groupId>
	 <artifactId>slf4j-log4j12</artifactId>
	 <version>1.6.6</version>
	</dependency>
	<dependency>
	 <groupId>log4j</groupId>
	 <artifactId>log4j</artifactId>
	 <version>1.2.17</version>
	</dependency>
	<!-- logging -->
	<dependency>
	 <groupId>commons-logging</groupId>
	 <artifactId>commons-logging</artifactId>
	 <version>1.1</version>
	 <scope>runtime</scope>
	</dependency>
	<!-- spring-test -->
	<dependency>
	 <groupId>org.springframework</groupId>
	 <artifactId>spring-test</artifactId>
	 <version>${spring-version}</version>
	 <scope>test</scope>
	</dependency>
	<!-- DEPENDANCES DE TESTS junit -->
	<dependency>
	 <groupId>junit</groupId>
	 <artifactId>junit</artifactId>
	 <version>${junit-version}</version>
	 <scope>test</scope>
	</dependency>
</dependencies>
<build>
	<defaultGoal>install</defaultGoal>
	<plugins>
	 <plugin>
	  <groupId>org.apache.maven.plugins</groupId>
		<artifactId>maven-compiler-plugin</artifactId>
		<version>2.5.1</version>
		<configuration>
		 <source>1.7</source>
		 <target>1.7</target>
		</configuration>
		</plugin>
	</plugins>
</build>

Laisser un commentaire

Votre adresse de messagerie ne sera pas publiée. Les champs obligatoires sont indiqués avec *

Captcha *

Ce site utilise Akismet pour réduire les indésirables. En savoir plus sur comment les données de vos commentaires sont utilisées.