Mule ESB & Elasticsearch, main dans la main

Poursuivons notre apprentissage de Mule ESB à travers un autre exemple simple faisant également intervenir le moteur de recherche Elasticsearch

Cas d'utilisation

Soit le SI d'une entreprise possédant X applications communiquant entre elles via Mule ESB. A un instant T, le génial DSI décide qu'un certain nombre de données générées par les différents composants du SI doivent désormais être indexées dans un cluster Elasticsearch, données qui seront ensuite utilisées par le nouvel intranet.

Voilà ce que souhaite obtenir notre directeur du SI à terme :

Chaque application doit donc publier sur le bus, des données qui doivent être indexées par le cluster Elasticsearch.

Choix de l'équipe technique :

Les responsabilités que nous pouvons confier à notre ESB sont multiples. Dans notre cas, il a basiquement la responsabilité de collecter les données publiées par les différentes applications et aussi de les envoyer vers notre cluster qui n'a qu'une seule chose à faire : les indexer.

Pour ce faire, notre équipe a choisi d'enrichir notre ESB et de lui accorder la capacité de contacter directement Elasticsearch et de lui demander d'indexer du contenu. Elle va donc créer un composant Java qui va faire office de client vers Elasticsearch tout en lui demandant d'indexer les données obtenues.

Heureusement , tout ce que souhaite faire notre équipe est déjà disponible sous la forme du module elastic-mule disponible sur Github.

Présentation d'Elastic-Mule

Comme vu sur Github, pour utiliser elastic-mule, il suffit d'ajouter le jar associé dans les librairies du projet Mule que nous sommes en train de construire.

Pour indexer un document avec elastic-mule, il faut déclarer la classe ElasticSearchConnector dans son flow. Il s'agit d'un composant Java implémentant l'interface Callable de l'api Mule qui va permettre à notre ESB de savoir qu'il doit l'appeler via sa méthode onCall pour effectuer un traitement.

Cette méthode pour le moment ne supporte que les String et des objets de type JsonData, fourni nativement dans l'api mule (Représentation Java d'un document JSon).

Si on souhaite indexer un document dans notre flow, on peut adopter la syntaxe suivante :

<component doc:name="Elastic Search Connector">
<singleton-object >
<property key="clusterPort" value="${mule.cluster.port}"/>
<property key="clusterHost" value="${mule.cluster.host}"/>
<property key="indexName" value="${mule.indexName.value}"/>
<property key="indexType" value="${mule.indexType.value}"/>
</singleton-object>
</component>

Les différentes propriétés mule.cluster.port, mule.cluster.host, mule.indexName.value, mule.indexType.value sont disponibles dans le fichier de propriétés de l'application. Etant donné qu'on utilise un TransportClient pour contacter le cluster Elasticsearch, il faut préciser le port TCP utilisé (9300-9399 par défaut). A ce stade, pour les opérations d'indexation, elastic-mule ne permet pas encore de créer à la volée un nouvel index; cela viendra plus tard (les pull requests sont toujours les bienvenus 😉 ).
Il faut donc en attendant, au démarrage de l'application, préciser où seront stockés les nouveaux documents.

Cas 1 : Application publiant des fichiers Json

Soit l'application SAP du SI qui publie un fichier txt contenant du json. La modélisation du processus sera la suivante :

Flow avec un fichier contenant du JSon

En xml, cette modélisation correspond au bloc suivant :

<flow name="ElasticSearchCloudConnectorFlow3" doc:name="ElasticSearchCloudConnectorFlow3">
<file:inbound-endpoint path="D:folders_reposelasticSearchConnectorInData_JSON_String" responseTimeout="10000" doc:name="File"/>
<file:file-to-string-transformer doc:name="File to String"/>
<component doc:name="Elastic Search Connector">
<singleton-object  >
<property key="clusterPort" value="${mule.cluster.port}"/>
<property key="clusterHost" value="${mule.cluster.host}"/>
<property key="indexName" value="${mule.indexName.value}"/>
<property key="indexType" value="${mule.indexType.value}"/>
</singleton-object>
</component>
</flow>

Ce flow traduit que le point d'entrée est un le répertoire spécifié.

  • Chaque fois qu'un fichier arrive dans ce dossier, à intervalle régulier , Mule le lit.
  • On applique un transformer "file-to-string" sur le fichier : ce transformer manipule le contenu du fichier et en fait un string.
  • Le String obtenu est envoyé vers le composant Elastic Search Connector qui va l'envoyer ensuite vers le cluster elastic search.

Cas 2 : Application publiant du JSon via JMS

Dans ce cas, le flow peut être modélisé de la façon suivante :

Flow par une JMS

L'ajout de cette liaison se traduit en xml par

<jms:activemq-connector name="Active_MQ" brokerURL="tcp://localhost:61616" validateConnections="true" doc:name="Active MQ"/>

<flow name="ElasticSearchCloudConnectorFlow2" doc:name="ElasticSearchCloudConnectorFlow2">
<jms:inbound-endpoint queue="JsonMessageQueue" connector-ref="Active_MQ" doc:name="JMS">
<jms:transaction action="NONE"/>
</jms:inbound-endpoint>
<json:json-to-object-transformer doc:name="JSON to Object"/>
<component doc:name="Elastic Search Connector">
<singleton-object  >
<property key="clusterPort" value="${mule.cluster.port}"/>
<property key="clusterHost" value="${mule.cluster.host}"/>
<property key="indexName" value="${mule.indexName.value}"/>
<property key="indexType" value="${mule.indexType.value}"/>
</singleton-object>
</component>

Nous avons déclaré un composant pour nous permettre d'écouter une queue sur un Apache Active MQ. Pour ce test, nous avons utilisé la version 5.7.0 d'Apache Active non supportée nativement dans la version community de Mule 3.3.0.

  • Ici, au lancement de l'application, Mule se met en écoute sur la queue JsonMessageQueue
  • A chaque nouveau message publié dessus, Mule le récupère
  • Applique un transformer pour passer de l'objet reçu qui doit être un message JSON à un objet java représentant le json en question
  • Le transmet à Elasticsearch Connector qui va l'envoyer vers le cluster pour indexation.

Cas 3 : Application stockant du JSon dans sa BDD

Cet exemple va nous permettre d'illustrer la capacité de Mule à lire directement une base de données disponible dans le SI (Est-ce une bonne idée de l'autoriser à le faire ? Je pense que non mais ce n'est pas l'objet de l'article).

On va supposer l'existence d'une base de données contenant une table JSONDOCS à 2 entrées : ID et JSONDOC; cette table est alimentée en écriture uniquement par l'application à laquelle elle est liée.

L'opération peut être modélisée par le graphique suivant :

Database Flow

Database Flow

En terme de configuration xml, cela peut se traduire par :

<flow name="ElasticSearchCloudConnectorFlow5" doc:name="ElasticSearchCloudConnectorFlow5" >
<poll frequency="50000">
<jdbc:outbound-endpoint exchange-pattern="request-response" queryTimeout="-1" connector-ref="JDBCConnector" doc:name="Database" queryKey="alldocs">
<jdbc:query key="alldocs" value="select * from jsondocs"/>
</jdbc:outbound-endpoint>
</poll>
<collection-splitter xmlns:jdbc="http://www.mulesoft.org/schema/mule/jdbc" doc:name="For each entry from DB"/>
<expression-transformer expression="#[map-payload:JSONDOC]" doc:name="Expression"/>
<component xmlns:jdbc="http://www.mulesoft.org/schema/mule/jdbc" doc:name="Elastic Search Connector">
<singleton-object class="org.mule.elasticsearch.ElasticSearchConnector" >
<property key="clusterPort" value="${mule.cluster.port}"/>
<property key="clusterHost" value="${mule.cluster.host}"/>
<property key="indexName" value="${mule.indexName.value}"/>
<property key="indexType" value="${mule.indexType.value}"/>
</singleton-object/>
</component/>
</flow>

  • Ici nous avons configuré un polling de Mule sur la table en question. L' ESB en lit le contenu à intervalle régulier et envoie vers le cluster elastic search tout ce qu'il y trouve. Il effectue donc une requête "select * from JSONDOCS".
  • En sortie de cette opération, Mule renvoie une liste correspondant à chaque ligne de la table JSONDOC. Chaque objet contient lui-même une map avec en clé les noms des colonnes et en valeur les valeurs réelles des différents champs. Comme Elasticsearch Connector ne supporte pas les collections dans son traitement, nous allons affiner le traitement mule en rajoutant un composant splitter juste en sortie de la DB pour indiquer qu'il doit ensuite effectuer un traitement pour chacune des lignes de la table, donc envoyer la map représentant une ligne de la table
  • Etant donné que l'Elastic Search ne traite pas non plus les map, nous rajoutons un traitement pour extraire de la map la valeur de la clé "jsondoc". Pour ce faire, nous rajouter un expression evaluator pour trouver dans la payload en cours la valeur cherchée. L'instruction #[map-payload:JSONDOC] va indiquer à Mule qu'en ce moment nous manipulons une map qui contient une clé nommée jsondoc et dont nous voulons la valeur.
  • C'est l'évaluation de cette dernière expression qui est envoyée à Elasticsearch Connector et transmise vers le cluster Elasticsearch.

Conclusions

Au terme de cet article nous aurons vu comment intégrer notre elastic-mule à travers quelques cas d'utilisation assez simples. Des évolutions sont à prévoir pour lui permettre de supporter encore plus de chose, notamment pouvoir effectuer des recherches ou pouvoir effectuer des indexations en masse.

Vous trouverez ci-dessous les éléments vous permettant de reproduire les exemples de cet article :

Laisser un commentaire

Votre adresse de messagerie ne sera pas publiée. Les champs obligatoires sont indiqués avec *

Captcha *

Ce site utilise Akismet pour réduire les indésirables. En savoir plus sur comment les données de vos commentaires sont utilisées.