Cet article a pour objectif d’introduire Spring Integration. Dans un premier temps, j’y décrirai les différents concepts inhérents à Spring Integration, puis nous verrons un exemple basique d’intégration dans une application.
Spring integration est, comme son nom l’indique, un projet du framework Spring. Il respecte donc tous les principes de Spring dont voici les plus importants : la séparation des préoccupations (separation of concerns), l’injection de dépendances (dependency injection) ou le couplage lâche (loose coupling). En revanche, Spring Integration permet de pousser ces principes encore plus loin en permettant de faire communiquer facilement des beans Spring de manière asynchrone et indépendante via un système de messaging, sans qu’ils aient besoin de se connaître mutuellement.
De plus, Spring Integration fournit également des outils pour communiquer avec des systèmes externes (JMS, RabbitMQ, etc).
Spring Integration est une implémentation des Enterprise Integrations Patterns. Il est construit autour du modèle pipes-and-filters. Les pipes sont n’importe quel composant capable de transporter les messages alors que les filters sont ceux capables de produire ou consommer des messages.
Les composants principaux
Message
Un Message n’est autre qu’un wrapper pour n’importe quel POJO (le payload) associé à des metadata (les headers). Les headers sont par exemple l’id, le timestamp, l’id de corrélation, et une adresse de retour. Mais ils peuvent également être utilisés pour passer des informations entre les différents composants.
Message Channel
Un Message Channel permet de transporter les messages, il représente donc le « pipe » du modèle « pipes-and-filters ». Les producteurs de messages envoient leurs messages dans le channel alors que les consommateurs vont les lire. Un channel peut être « point à point » ou de type « publish / subscribe ». Avec un Channel « point à point », un seul consommateur recevra un message envoyé sur le Channel alors que tous les consommateurs recevront le même message si le Channel est de type « publish / subscribe ».
Message Endpoint
Les Messages Endpoint permettent de connecter notre application au système de messaging. Ils représentent les « filters » du modèle « pipes-and-filters ». Ils permettent à notre application d’envoyer des messages sans qu’elle ait connaissance du système de messaging sur lequel elle s’appuie. Voici les principaux types de Message Endpoint supportés nativement :
- Transformer : permet de transformer un message
- Filter : permet de filtrer des messages entrants et renvoyer sur un channel de sortie uniquement certains messages
- Router : permet de rediriger des messages vers un channel ou un autre suivant certains critères
- Splitter : permet de découper un message en plusieurs autres
- Aggregator : permet de regrouper plusieurs message en un seul
- Service Activator : permet d'exécuter une méthode d’un service à la réception d’un message, et éventuellement d’envoyer une réponse sur un channel de réponse
- Channel Adapter : permet de connecter un channel à un autre système. Il peut s’agir d’une connexion entrante (Inbound Channel Adapter) ou sortante (Outbound Channel Adapter)
Exemple par le code
Passons maintenant à un exemple concret pour mieux comprendre son utilisation. Imaginons donc une application Spring Boot qui recevrait des métriques de la part d’objets connectés. Je ne détaillerai pas ce qui touche à Spring Boot dans les exemples suivants.
L’application de test est basique, elle expose un endpoint POST sur /metric pour recevoir des métriques, qui se composent pour commencer simplement d’une date et d’une valeur. Les métriques ne sont pas persistées, elles sont juste envoyées sur un Channel Spring Integration afin de pouvoir leur appliquer quelques traitements.
On remarque que la classe Application est annotée @ImportResource afin de charger le fichier de configuration Spring qui définira l’ensemble de la configuration Spring Integration. J’ai préféré le choix de configuration XML pour cet exemple afin d’avoir un endroit centralisé pour montrer les possibilités offertes par Spring Integration. Il aurait été également possible de concevoir la même application de test via les annotations Spring Integration.
Messaging Gateway et Service Activator
Lors de la réception d’une requête POST avec une représentation JSON d’un objet Metric, un message est envoyé, l’objet Metric étant son payload. Pour cela, l’interface MetricSenderService est déclarée comme une Messaging Gateway.
Ainsi, tous les appels à la méthode send permettent de transformer l’objet Metric en un message envoyé sur le Channel metrics.in.
Maintenant que nous avons de quoi créer des messages sur un Channel, il nous reste à créer un consommateur. Ce consommateur n’aura pour seul but que de logguer la réception d’un message. Pour cela, il suffit de déclarer un bean ServiceActivator :
A chaque message reçu sur le Channel metrics.in, la méthode receive du bean SimpleMetricReceiverService sera exécutée, avec le message en paramètre.
Nous avons maintenant un système simple de production et de consommation de messages. Nous pouvons le tester:
Le code de cet exemple est disponible ici.
Logging Handler
Pour le moment, nous avons vu comment lire un message que nous logguons depuis le Service Activator.
Nous pouvons utiliser directement un Logging Handler afin de logguer l’intégralité dans messages produits sur un Channel. Pour cela nous allons enregistrer un Interceptor de type Wire-Tap sur notre Channel qui enverra à son tour les messages reçu au composant Channel Logging Adapter qui effectuera l’écriture dans le log.
Pour tester, le code se trouve ici.
Message Filter
Admettons maintenant que nous voulions filtrer certains messages pour éviter de les traiter. Par exemple, dans notre cas, les messages dans le futur qui correspondent forcément à des erreurs. Pour cela, nous allons déclarer un Filter. Le Filter permet de supprimer à la volée les messages qui ne respectent pas un critère et éventuellement de le déposer dans un autre Channel. Par défaut, les messages sont supprimés.
Un Filter étant une implémentation de MessageSelector, nous pouvons déclarer nos beans comme suit:
Les messages arrivant sur le Channel metrics.in seront testés via la méthode accept(Message<?> message) du bean MessageFilterImpl. Si la méthode renvoit false, le message est supprimé. Dans notre exemple, les messages non filtrés sont déposés dans le Channel metrics.filtered. Il aurait été également possible de déposer les messages invalides dans un autre Channel en positionnant l’attribut discard-channel. Il est également possible de lever une exception lorsque un message est invalide, avec l’attribut throw-exception-on-rejection.
Afin que le ServiceActivator ne consomme que les messages filtrés, il est nécessaire de changer son Channel d’écoute :
Pour ce filtre, l’implémentation de l’interface MessageSelector est très simple :
Attention, ne pas confondre ici Filter qui permet, littéralement, de filtrer des messages avec le terme Filter dans Pipes-and-Filters qui désignent dans ce cas n’importe quel composant capable de produire ou consommer des messages.
Pour tester, le code se trouve ici.
Commençons avec une date passée :
Notre message est effectivement remonté jusqu’au ServiceActivator.
Alors qu’avec une date dans le futur, notre ServiceActivator n’a pas été exécuté, rien n'apparaît dans le log.
Pour des cas simples comme celui-ci, il est également possible de se passer de l’implémentation de MessageSelector et d’utiliser directement une expression SPeL pour effectuer le filtrage :
A tester avec le code ici.
Router
Pour les cas un peu plus complexes où les messages ne doivent pas être tous traités de la même manière, nous avons à disposition le Router dont le but est de dispatcher des messages sur différents Channels en fonction de critères avant d’être consommés.
Pour notre cas, imaginons que nous recevons désormais deux types de métriques, temperature et humidity, que nous voulons traiter différemment. Voici comment l’on pourrait procéder :
Nous avons désormais deux ServiceActivator (un pour les températures et un pour l’humidité) qui consomment les messages depuis deux Channels distincts (metrics.humidity et metrics.temperature). C’est le Router qui s’occupe de dispatcher les messages arrivant sur le channel metrics.filtered vers les Channels temperature et humidity en fonction du nouvel attribut type de l’objet Metric.
Voici par exemple à quoi ressemble le Service Activator correspondant au type humidity.
Comme d’habitude, voici le code.
Et il suffit d’exécuter le test comme ceci :
Puis essayons avec un autre type :
AMQP Gateway
Pour ce dernier exemple, je vous propose de réaliser une passerelle vers une file de message RabbitMQ. Spring Integration offre le support natif d'AMQP, il suffit d'ajouter la dépendance spring-integration-amqp à notre projet pour en profiter :
Il suffit ensuite d'adapter un peu notre fichier de configuration de Spring Integration afin de définir un Outbound Channel Adapter pour écrire les messages dans une file et un Inbound Channel Adapter pour lire depuis une file.
Pour cet exemple, il y a également quelques adaptations :
- définition d'un nouveau Channel metrics.from-rabbit qui recueille les messages en provenance de RabbitMQ
- changement de l'input-channel du Router pour utiliser le nouveau Channel metrics.from-rabbit
- définition d'une RabbitConnectionFactory pour configurer les paramètres de connexion à RabbitMQ
- définition d'une Queue RabbitMQ nommée metrics.queue. Celle-ci sera créée automatiquement.
- définition d'un Exchange RabbitMQ pour effectuer le routage de nos messages vers la file correspondante au routing-key définie par l'Outbound Channel Adapter
- définition d'un template AMQP
- Et enfin, la classe Metric (la classe de nos messages) doit être modifiée pour implémenter Serializable
Lorsque notre application est démarrée, on peut remarquer que la file RabbitMQ a été créée avec la commande suivante :
J'ai positionné un Thread.sleep() dans la classe HumidityMetricReceiverService qui traite les messages de type humidity afin d'avoir le temps de visualiser les messages rentrer et sortir de la file RabbitMQ. Il suffit donc de faire des appels en boucle et de ré-exécuter la commande précédente pour voir que les messages arrivent bien dans la file.
Vous aurez peut-être remarqué que j'ai placé l'option concurrent-consumers="10" sur l'Inbound Channel Adapter. Elle permet tout simplement de lire 10 messages à la fois.
Conclusion
Comme nous avons pu le voir, il est très simple d'intégrer un système de messaging à une application Spring, et ceci avec presque uniquement que de la configuration ! Il est très pratique d'utiliser ce genre de mécanisme pour créer des traitements asynchrones, d'autant que l'intégration de systèmes externes comme RabbitMQ permettent de se passer de gérer la persistence des messages.
Chaque exemple de cet article montre les traitements de base de Spring Integration, la référence permet de découvrir toutes les possibilités de chaque composant, ainsi que tous les composants qui n'ont pas été (encore) couvert !