Lorsqu’il s’agit de gérer de grandes quantités de données dans un système distribué, Apache Cassandra et Apache Pulsar sont deux noms qui reviennent souvent.
Apache Cassandra est une base de données NoSQL hautement évolutive qui excelle dans la gestion des écritures et des requêtes à grande vitesse sur plusieurs nœuds. C’est une solution idéale pour les cas d’utilisation tels que la gestion des profils d’utilisateurs, les catalogues de produits et l’analyse en temps réel.
Une plate-forme de messagerie distribuée et de streaming, appelée Apache Pulsar, a été créée pour gérer les données en mouvement. Il peut gérer les charges de travail de messagerie standard et les cas d’utilisation de streaming plus complexes, y compris le traitement des données en temps réel et les architectures pilotées par les événements.
Cet article couvre les principales étapes de la création d’une application Web basée sur Spring Boot et React qui interagit avec Pulsar et Cassandra, affichant les données boursières en direct au fur et à mesure de leur réception. Ce tutoriel n’est pas complet, il ne couvre que les étapes les plus importantes. Vous pouvez trouver le code source complet de l’application sur GitHub.
Vous apprendrez à :
- Configurez des instances Cassandra et Pulsar à l’aide de DataStax Astra DB et Astra Streaming.
- Publiez et consommez des messages Pulsar dans une application Spring Boot.
- Stockez les messages Pulsar dans Cassandra à l’aide d’un récepteur.
- Affichage des données en direct et stockées dans React à l’aide du framework Hilla de Vaadin.
Technologies et bibliothèques utilisées
- Apache Cassandra (avec Astra DB)
- Apache Pulsar (avec Astra Streaming)
- Botte de printemps
- Printemps pour Apache Pulsar
- Données de printemps pour Apache Cassandra
- Réagir
- La chicouté
- API AlphaVantage
Exigences
- Java 17 ou plus récent
- Noeud 18 ou plus récent
- Compétences Java intermédiaires et familiarité avec Spring Boot
Stocker des données sensibles dans Spring Boot
Une grande partie de la configuration de Cassandra et Pulsar est basée sur la configuration. Bien qu’il puisse être tentant de mettre la configuration en application.properties
ce n’est pas une bonne idée car le fichier est sous contrôle de code source et vous pouvez involontairement révéler des secrets.
Au lieu de cela, créez un local config/local/application.properties
fichier de configuration et ajoutez-le à .gitignore
pour s’assurer qu’il ne quitte pas votre ordinateur. Les paramètres du fichier de configuration local seront automatiquement appliqués par Spring Boot :
mkdir -p config/local
touch config/local/application.properties
echo "
# Contains secrets that shouldn't go into the repository
config/local/" >> .gitignore
Vous pouvez fournir à Spring Boot les options en tant que variables d’environnement lors de son utilisation en production.
Configuration de Cassandra et Pulsar à l’aide de DataStax Astra
Les deux technologies Apache utilisées dans cet article sont des projets open source et peuvent être installées localement. Cependant, l’utilisation de services cloud pour configurer les instances est une option plus simple.
Dans cet article, nous avons configuré l’infrastructure de données requise pour notre exemple d’application Web à l’aide des services de niveau gratuit de DataStax.
Commencez par vous connecter à votre compte existant ou en vous inscrivant à un nouveau sur le site officiel d’Astra DataStax, où vous devrez créer la base de données et le service de streaming séparément.
Configuration de Cassandre
Commencez par cliquer sur « Créer une base de données” sur le site officiel d’Astra DataStax. Le transfert de données d’un flux vers Astra DB nécessite que les deux services soient déployés dans une région qui prend en charge à la fois Astra Streaming et Astra DB :
- Entrez le nom de votre nouvelle instance de base de données.
- Sélectionnez le nom de l’espace de clés. (Un espace de clés stocke votre groupe de tables, un peu comme le schéma dans les bases de données relationnelles).
- Sélectionnez un fournisseur de cloud et une région.
Note: Pour que l’application de démonstration fonctionne, vous devez déployer le service de base de données sur une région qui prend également en charge le streaming. -
Sélectionner « Créer une base de données.”
Cassandra : Connexion au service
Une fois l’initialisation du service de base de données créée, vous devez générer un jeton et télécharger le fichier « Ensemble de connexion sécurisée” qui crypte le transfert de données entre l’application et la base de données cloud (mTLS). Accédez au tableau de bord de la base de données « Connecter” onglet où vous trouverez le bouton pour générer un jeton à usage unique (n’oubliez pas de le télécharger) et le bouton de téléchargement du bundle :
spring.cassandra.schema-action=CREATE_IF_NOT_EXISTS
spring.cassandra.keyspace-name=<KEYSPACE_NAME>
spring.cassandra.username=<ASTRADB_TOKEN_CLIENT_ID>
spring.cassandra.password=<ASTRADB_TOKEN_SECRET>
# Increase timeouts when connecting to Astra from a dev workstation
spring.cassandra.contact-points=<ASTRADB_DATACENTER_ID>
spring.cassandra.port=9042
spring.cassandra.local-datacenter=<ASTRADB_REGION>
datastax.astra.secure-connect-bundle=<secure-connect-astra-stock-db.zip>
Paramètres de pulsar pour application.properties
.
Configuration du Pulsar
Commencez par cliquer sur « Créer un flux » sur la page principale d’Astra DataStax :
- Entrez le nom de votre nouvelle instance de streaming.
- Sélectionnez un fournisseur et une région.
Note: N’oubliez pas d’utiliser le même fournisseur et la même région que vous avez utilisés pour créer le service de base de données. - Sélectionner « Créer un flux.”
Pulsar : Activer la création automatique de sujets
Outre la mise en place et le fonctionnement du service de diffusion en continu, vous devrez également définir le sujet utilisé par l’application pour consommer et produire des messages. Vous pouvez créer un sujet explicitement à l’aide de l’interface utilisateur, mais un moyen plus pratique consiste à activer « Autoriser la création automatique de sujet” paramètre pour l’instance créée :
-
Cliquez sur l’instance de flux nouvellement créée et accédez au « Espace de noms et sujets» feuille d’onglet, et cliquez sur «Modifier l’espace de noms.”
- Naviguez jusqu’au « Paramètres » situé sous l’espace de noms par défaut (pas l’onglet » Paramètres » de niveau supérieur) et faites défiler vers le bas.
- Changer la « Autoriser la création de sujets » pour « Autoriser la création automatique de sujet.”
La modification de ce paramètre par défaut permettra à l’application de créer automatiquement de nouveaux sujets sans aucun effort administratif supplémentaire dans Astra. Avec cela, vous avez réussi à mettre en place l’infrastructure d’hébergement de vos données actives et passives.
Pulsar : Se connecter au Service
Une fois l’instance de streaming configurée, vous devez créer un jeton pour accéder au service depuis votre application. La plupart des propriétés nécessaires sont situées sur le « Connecter» onglet « Tableau de bord de streaming ». L’entrée « topic-name » se trouve dans le champ « Espaces de noms et sujets» onglet :
## Client
spring.pulsar.client.service-url=<Broker Service URL>
spring.pulsar.client.auth-plugin-class-name=org.apache.pulsar.client.impl.auth.AuthenticationToken
spring.pulsar.client.authentication.token=<Astra_Streaming_Token>
## Producer
spring.pulsar.producer.topic-name=persistent://<TENANT_NAME>/default/<TOPIC_NAME>
spring.pulsar.producer.producer-name=<name of your choice>
## Consumer
spring.pulsar.consumer.topics=persistent://<TENANT_NAME>/default/<TOPIC_NAME>
spring.pulsar.consumer.subscription-name=<name of your choice>
spring.pulsar.consumer.consumer-name=<name of your choice>
spring.pulsar.consumer.subscription-type=key_shared
Paramètres de pulsar pour application.properties
.
Publication de messages Pulsar à partir de Spring Boot
La bibliothèque Spring for Apache Pulsar s’occupe de configurer les producteurs et les consommateurs Pulsar en fonction de la configuration donnée.
Dans l’application, le composant StockPriceProducer gère la publication des messages. Pour récupérer les données boursières, il utilise un appel d’API externe avant de les publier sur un flux Pulsar à l’aide d’un PulsarTemplate
.
Câblage automatique PulsarTemplate
dans la classe et enregistrez-le dans un champ :
@Component
public class StockPriceProducer {
private final PulsarTemplate<StockPrice> pulsarTemplate;
public StockPriceProducer(PulsarTemplate<StockPrice> pulsarTemplate) {
this.pulsarTemplate = pulsarTemplate;
}
//...
}
Utilisez-le ensuite pour publier des messages :
private void publishStockPrices(Stream<StockPrice> stockPrices) {
// Publish items to Pulsar with 100ms intervals
Flux.fromStream(stockPrices)
// Delay elements for the demo, don't do this in real life
.delayElements(Duration.ofMillis(100))
.subscribe(stockPrice -> {
try {
pulsarTemplate.sendAsync(stockPrice);
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
});
}
Vous devez configurer le schéma pour la coutume StockPrice
taper. Dans Application.java
définissez le bean suivant :
@Bean
public SchemaResolver.SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
return (schemaResolver) -> schemaResolver.addCustomSchemaMapping(StockPrice.class, Schema.JSON(StockPrice.class));
}
Consommer des messages Pulsar dans Spring Boot
La bibliothèque Spring for Apache Pulsar est livrée avec un @PulsarListener
annotation pour un moyen pratique d’écouter les messages Pulsar. Ici, les messages sont émis vers un puits de réacteur de projet afin que l’interface utilisateur puisse les consommer en tant que Flux
:
@Service
public class StockPriceConsumer {
private final Sinks.Many<StockPrice> stockPriceSink = Sinks.many().multicast().directBestEffort();
private final Flux<StockPrice> stockPrices = stockPriceSink.asFlux();
@PulsarListener
private void stockPriceReceived(StockPrice stockPrice) {
stockPriceSink.tryEmitNext(stockPrice);
}
public Flux<StockPrice> getStockPrices() {
return stockPrices;
}
}
Création d’un point de terminaison de serveur pour accéder aux données à partir de React
Le projet utilise Hilla, un framework Web complet pour Spring Boot. Il gère les connexions websocket pour les types de données réactifs et permet une communication de serveur de type sécurisé.
Le client peut utiliser les méthodes TypeScript correspondantes créées par StockPriceEndpoint pour récupérer les données :
@Endpoint
@AnonymousAllowed
public class StockPriceEndpoint {
private final StockPriceProducer producer;
private final StockPriceConsumer consumer;
private final StockPriceService service;
StockPriceEndpoint(StockPriceProducer producer, StockPriceConsumer consumer, StockPriceService service) {
this.producer = producer;
this.consumer = consumer;
this.service = service;
}
public List<StockSymbol>...