L’une des fonctionnalités les plus utiles du traitement de flux en temps réel consiste à combiner les forces et les avantages de diverses technologies pour offrir une expérience de développeur unique et un moyen efficace de traiter les données en temps réel à grande échelle. Hazelcast est une plate-forme de calcul et de stockage distribuée en temps réel pour les requêtes à faible latence constante, l’agrégation et le calcul avec état par rapport aux flux d’événements en temps réel et aux sources de données traditionnelles. Apache Pulsar est une plate-forme de messagerie et de diffusion en continu géorépliquée et mutualisée en temps réel pour les charges de travail en temps réel, gérant des millions d’événements par heure.
Cependant, le traitement de flux en temps réel n’est pas une tâche facile, en particulier lors de la combinaison de plusieurs flux en direct avec de grands volumes de données stockées dans des stockages de données externes pour fournir un contexte et des résultats instantanés. En ce qui concerne l’utilisation, Hazelcast peut être utilisé pour les choses suivantes :
- Traitement de données avec état sur des données de streaming en temps réel.
- Données au repos.
- Une combinaison de données au repos et de traitement de données avec état sur des données en continu en temps réel.
- Interroger le flux.
- Sources de données par lots directement à l’aide de SQL.
- Coordination distribuée pour les microservices.
- Réplication des données d’une région à l’autre.
- Réplication des données entre les centres de données dans la même région.
Bien qu’Apache Pulsar puisse être utilisé pour la messagerie et le streaming, les cas d’utilisation remplacent plusieurs produits et fournissent un sur-ensemble de leurs fonctionnalités. Apache Pulsar est une plate-forme de messagerie unifiée cloud-native mutualisée destinée à remplacer Apache Kafka, RabbitMQ, MQTT et les anciennes plates-formes de messagerie. Apache Pulsar fournit un bus de messages infini pour Hazelcast pour agir comme une source et un puits instantanés pour toutes les sources de données.
Conditions préalables
Nous construisons une application où nous ingérons des données d’Apache Pulsar dans Hazelcast, puis les traitons en temps réel. Pour exécuter cette application, assurez-vous que votre système dispose des composants suivants :
- Hazelcast installé sur votre système : nous utilisons CLI.
- Pulsar installé sur votre système : nous utilisons Docker.
Si vous avez macOS et Homebrew, vous pouvez installer Hazelcast à l’aide de la commande suivante :
brew tap hazelcast/hz
brew install hazelcast@5.2.1
Vérifiez si Hazelcast est installé :
hz -V
Démarrez ensuite un cluster local :
hz start
Vous devriez voir ce qui suit dans la console :
INFO: [192.168.1.164]:5701 [dev] [5.2.1]
Members {size:1, ver:1} [
Member [192.168.1.164]:5701 - 4221d540-e34e-4ff2-8ad3-41e060b895ce this
]
Vous pouvez démarrer Pulsar dans Docker à l’aide de la commande suivante :
docker run -it -p 6650:6650 -p 8080:8080 \
--mount source=pulsardata,target=/pulsar/data \
--mount source=pulsarconf,target=/pulsar/conf \
apachepulsar/pulsar:2.11.0 bin/pulsar standalone
Pour installer Management Center, utilisez l’une des méthodes suivantes, selon votre système d’exploitation :
brew tap hazelcast/hz
brew install hazelcast-management-center@5.2.1
Vérifiez que Management Center est installé :
hz-mc -V
Collecte de données
Pour notre application, nous souhaitons ingérer des lectures de qualité de l’air de partout aux États-Unis via le fournisseur de données AirNow.
Source: API AirNow
Avec une simple application Java, nous effectuons des appels REST à l’API AirNow qui fournit une lecture de la qualité de l’air pour les principaux codes postaux aux États-Unis. L’application Java envoie les données AirNow encodées en JSON au sujet Pulsar « qualité de l’air ». À partir de ce point, une application Hazelcast peut le lire.
Source: GitHub
Nous avons également une fonction Java Pulsar qui reçoit chaque événement du sujet « qualité de l’air » et l’analyse en différents sujets en fonction du type de lecture de la qualité de l’air. Cela inclut les PM2,5, les PM10 et l’ozone.
Source: GitHub
Exemple de données sur la qualité de l’air
{"dateObserved":"2023-01-19 ","hourObserved":12,"localTimeZone":"EST","reportingArea":"Philadelphia","stateCode":"PA","latitude":39.95,"longitude":-75.151,"parameterName":"PM10","aqi":19,"category":{"number":1,"name":"Good","additionalProperties":{}},"additionalProperties":{}}
Exemple de données sur l’ozone
{"dateObserved":"2023-01-19 ","hourObserved":12,"localTimeZone":"EST","reportingArea":"Philadelphia","stateCode":"PA","parameterName":"O3","latitude":39.95,"longitude":-75.151,"aqi":8}
Exemple de données PM10
{"dateObserved":"2023-01-19 ","hourObserved":12,"localTimeZone":"EST","reportingArea":"Philadelphia","stateCode":"PA","parameterName":"PM10","latitude":39.95,"longitude":-75.151,"aqi":19}
Exemple de données PM2,5
{"dateObserved":"2023-01-19 ","hourObserved":12,"localTimeZone":"EST","reportingArea":"Philadelphia","stateCode":"PA","parameterName":"PM2.5","latitude":39.95,"longitude":-75.151,"aqi":54}
Traitement de l’information
Afin de traiter les données collectées, nous avons utilisé le module connecteur Hazelcast Pulsar pour ingérer les données des sujets Pulsar.
Note: vous pouvez utiliser le même connecteur pour écrire dans les sujets Pulsar.
L’utilisation de Hazelcast nous permet de calculer diverses fonctions d’agrégation (somme, moyenne, etc.) en temps réel sur une fenêtre spécifiée d’éléments de flux. Le connecteur Pulsar utilise la bibliothèque client Pulsar, qui a deux manières différentes de lire les messages d’un sujet Pulsar. Il s’agit de l’API consommateur et de l’API lecteur ; les deux utilisent le modèle de générateur (pour plus d’informations, Cliquez ici).
Dans votre fichier pom, importez les dépendances suivantes :
<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast</artifactId>
<version>5.1.4</version>
</dependency>
<dependency>
<groupId>com.hazelcast.jet.contrib</groupId>
<artifactId>pulsar</artifactId>
<version>0.1</version>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>2.10.1</version>
</dependency>
Nous créons un PulsarSources.pulsarReaderBuilder
instance pour se connecter au cluster Pulsar précédemment démarré situé à pulsar://localhost:6650
.
StreamSource<Event>source = PulsarSources.pulsarReaderBuilder(
topicName,
() -> PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build(),
() -> Schema.JSON(Event.class),
Message::getValue).build();
Nous créons ensuite un pipeline pour lire à partir de la source avec une fenêtre glissante et un décompte agrégé avant d’écrire dans l’enregistreur :
Pipeline p = Pipeline.create();
p.readFrom(source)
.withNativeTimestamps(0)
.groupingKey(Event::getUser)
.window(sliding(SECONDS.toMillis(60), SECONDS.toMillis(30)))
.aggregate(counting())
.writeTo(Sinks.logger(wr -> String.format(
"At %s Pulsar got %,d messages in the previous minute from %s.",
TIME_FORMATTER.format(LocalDateTime.ofInstant(
Instant.ofEpochMilli(wr.end()), ZoneId.systemDefault())),
wr.result(), wr.key())));
JobConfig cfg = new JobConfig()
.setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE)
.setSnapshotIntervalMillis(SECONDS.toMillis(1))
.setName("pulsar-airquality-counter");
HazelcastInstance hz = Hazelcast.bootstrappedInstance();
hz.getJet().newJob(p, cfg);
Vous pouvez exécuter le code précédent à partir de votre IDE (dans ce cas, il créera son propre membre Hazelcast et exécutera le travail dessus), ou vous pouvez l’exécuter sur le membre Hazelcast précédemment démarré (dans ce cas, vous devez créer un JAR exécutable, y compris toutes les dépendances requises pour l’exécuter) :
mvn package
bin/hz-cli submit target/pulsar-example-1.0-SNAPSHOT.jar
Pour annuler la tâche et arrêter le cluster Hazelcast :
bin/hz-cli cancel pulsar-message-counter
hz-stop
Conclusion
Dans cet article, nous avons démontré comment vous pouvez combiner les forces et les avantages de diverses technologies pour offrir une expérience de développeur unique et un moyen efficace de traiter les données en temps réel à grande échelle. Nous avons diffusé des données sur la qualité de l’air d’Apache Pulsar vers Hazelcast, où nous avons traité les données en temps réel. La tendance à la hausse des technologies cloud, le besoin d’applications intelligentes en temps réel et l’urgence de traiter les données à grande échelle nous ont amenés à un nouveau chapitre du traitement de flux en temps réel, où les latences sont mesurées, non pas en minutes mais en millisecondes et submillisecondes.
Hazelcast vous permet de créer rapidement des applications en temps réel économes en ressources. Vous pouvez le déployer à n’importe quelle échelle, des appareils de petite taille à un grand cluster d’instances cloud. Un cluster de nœuds Hazelcast partage le stockage des données et la charge de calcul, qui peuvent évoluer dynamiquement vers le haut et vers le bas. Lorsque vous ajoutez de nouveaux nœuds au cluster, les données sont automatiquement rééquilibrées dans le cluster et les tâches de calcul en cours d’exécution (appelées tâches) capturent leur état et leur échelle avec des garanties de traitement. Pulsar vous permet d’utiliser votre choix de protocoles de messagerie pour distribuer rapidement des événements entre plusieurs types de consommateurs et de producteurs et agir comme un hub de messagerie universel. Pulsar sépare le calcul du stockage, permettant une mise à l’échelle dynamique et une gestion efficace des données rapides. StreamNative est la société composée des créateurs originaux d’Apache Pulsar et d’Apache BookKeeper. StreamNative fournit une expérience d’entreprise complète pour Apache Pulsar dans le cloud et sur site.