Dans cet atelier pratique de l’Université ScyllaDB, vous apprendrez à utiliser le connecteur source ScyllaDB CDC pour transférer les événements de modification au niveau des lignes dans les tables d’un cluster ScyllaDB vers un serveur Kafka.
Qu’est-ce que ScyllaDB CDC ?
Pour récapituler, Change Data Capture (CDC) est une fonctionnalité qui vous permet non seulement d’interroger l’état actuel de la table d’une base de données, mais également d’interroger l’historique de toutes les modifications apportées à la table. CDC est prêt pour la production (GA) à partir de ScyllaDB Enterprise 2021.1.1 et ScyllaDB Open Source 4.3.
Dans ScyllaDB, CDC est facultatif et activé par table. L’historique des modifications apportées à une table compatible CDC est stocké dans une table associée distincte.
Vous pouvez activer CDC lors de la création ou de la modification d’une table à l’aide de l’option CDC, par exemple :
CREATE TABLE ks.t (pk int, ck int, v int, PRIMARY KEY (pk, ck, v)) WITH cdc = {'enabled':true};
Connecteur source ScyllaDB CDC
ScyllaDB CDC Source Connector est un connecteur source capturant les modifications au niveau des lignes dans les tables d’un cluster ScyllaDB. C’est un connecteur Debezium, compatible avec Kafka Connect (avec Kafka 2.6.0+). Le connecteur lit le journal CDC pour les tables spécifiées et produit des messages Kafka pour chaque niveau de ligne INSERT
, UPDATE
ou DELETE
opération. Le connecteur est tolérant aux pannes et réessaye de lire les données de Scylla en cas d’échec. Il enregistre périodiquement la position actuelle dans le journal ScyllaDB CDC à l’aide du suivi de décalage Kafka Connect. Chaque message Kafka généré contient des informations sur la source, telles que l’horodatage et le nom de la table.
Note: au moment de la rédaction, il n’y a pas de support pour les types de collection (LIST
, SET
, MAP
) et UDT : les colonnes avec ces types sont omises des messages générés. Tenez-vous au courant de cette demande d’amélioration et des autres développements du projet GitHub.
Confluent et Kafka Connect
Confluent est une plate-forme de diffusion de données à grande échelle qui vous permet d’accéder, de stocker et de gérer facilement des données sous forme de flux continus en temps réel. Il étend les avantages d’Apache Kafka avec des fonctionnalités de niveau entreprise. Confluent facilite la création d’applications modernes pilotées par les événements et bénéficie d’un pipeline de données universel, prenant en charge l’évolutivité, les performances et la fiabilité.
Kafka Connect est un outil permettant de diffuser des données de manière évolutive et fiable entre Apache Kafka et d’autres systèmes de données. Il simplifie la définition de connecteurs qui déplacent de grands ensembles de données vers et depuis Kafka. Il peut ingérer des bases de données entières ou collecter des métriques à partir de serveurs d’applications dans des rubriques Kafka, rendant les données disponibles pour le traitement de flux avec une faible latence.
Kafka Connect comprend deux types de connecteurs :
- Connecteur sources : Les connecteurs source ingèrent des bases de données entières et diffusent les mises à jour de table vers les rubriques Kafka. Les connecteurs source peuvent également collecter des métriques à partir de serveurs d’applications et stocker les données dans des rubriques Kafka, rendant les données disponibles pour le traitement de flux avec une faible latence.
- Connecteur évier : Les connecteurs de récepteur transmettent les données des rubriques Kafka à des index secondaires, tels qu’Elasticsearch, ou à des systèmes de traitement par lots, tels que Hadoop, pour une analyse hors ligne.
Configuration du service avec Docker
Dans cet atelier, vous utiliserez Docker.
Veuillez vous assurer que votre environnement répond aux prérequis suivants :
- Docker pour Linux, Mac ou Windows.
- Note: l’exécution de ScyllaDB dans Docker est uniquement recommandée pour évaluer et essayer ScyllaDB.
- ScyllaDB open source. Pour de meilleures performances, une installation régulière est recommandée.
- 8 Go de RAM ou plus pour les services Kafka et ScyllaDB.
- docker-composer
- Gite
Table d’installation et d’initialisation de ScyllaDB
Tout d’abord, vous allez lancer un cluster ScyllaDB à trois nœuds et créer une table avec CDC activé.
Si vous ne l’avez pas encore fait, téléchargez l’exemple depuis git :
git clone https://github.com/scylladb/scylla-code-samples.git cd scylla-code-samples/CDC_Kafka_Lab
Il s’agit du fichier docker-compose que vous utiliserez. Il démarre un cluster ScyllaDB à trois nœuds :
version: "3"
services:
scylla-node1:
container_name: scylla-node1
image: scylladb/scylla:5.0.0
ports:
- 9042:9042
restart: always
command: --seeds=scylla-node1,scylla-node2 --smp 1 --memory 750M --overprovisioned 1 --api-address 0.0.0.0
scylla-node2:
container_name: scylla-node2
image: scylladb/scylla:5.0.0
restart: always
command: --seeds=scylla-node1,scylla-node2 --smp 1 --memory 750M --overprovisioned 1 --api-address 0.0.0.0
scylla-node3:
container_name: scylla-node3
image: scylladb/scylla:5.0.0
restart: always
command: --seeds=scylla-node1,scylla-node2 --smp 1 --memory 750M --overprovisioned 1 --api-address 0.0.0.0
Lancez le cluster ScyllaDB :
docker-compose -f docker-compose-scylladb.yml up -d
Attendez environ une minute et vérifiez que le cluster ScyllaDB est opérationnel et en état normal :
docker exec scylla-node1 nodetool status
Ensuite, vous utiliserez cqlsh pour interagir avec ScyllaDB. Créez un espace de clés et une table avec CDC activé, puis insérez une ligne dans la table :
docker exec -ti scylla-node1 cqlsh
CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor' : 1};
CREATE TABLE ks.my_table (pk int, ck int, v int, PRIMARY KEY (pk, ck, v)) WITH cdc = {'enabled':true};
INSERT INTO ks.my_table(pk, ck, v) VALUES (1, 1, 20);
exit
[guy@fedora cdc_test]$ docker-compose -f docker-compose-scylladb.yml up -d
Creating scylla-node1 ... done
Creating scylla-node2 ... done
Creating scylla-node3 ... done
[guy@fedora cdc_test]$ docker exec scylla-node1 nodetool status
Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
-- Address Load Tokens Owns Host ID Rack
UN 172.19.0.3 ? 256 ? 4d4eaad4-62a4-485b-9a05-61432516a737 rack1
UN 172.19.0.2 496 KB 256 ? bec834b5-b0de-4d55-b13d-a8aa6800f0b9 rack1
UN 172.19.0.4 ? 256 ? 2788324e-548a-49e2-8337-976897c61238 rack1
Note: Non-system keyspaces don't have the same replication settings, effective ownership information is meaningless
[guy@fedora cdc_test]$ docker exec -ti scylla-node1 cqlsh
Connected to at 172.19.0.2:9042.
[cqlsh 5.0.1 | Cassandra 3.0.8 | CQL spec 3.3.1 | Native protocol v4]
Use HELP for help.
cqlsh> CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor' : 1};
cqlsh> CREATE TABLE ks.my_table (pk int, ck int, v int, PRIMARY KEY (pk, ck, v)) WITH cdc = {'enabled':true};
cqlsh> INSERT INTO ks.my_table(pk, ck, v) VALUES (1, 1, 20);
cqlsh> exit
[guy@fedora cdc_test]$
Installation de Confluent et configuration du connecteur
Pour lancer un serveur Kafka, vous utiliserez la plate-forme Confluent, qui fournit une interface graphique Web conviviale pour suivre les sujets et les messages. La plate-forme confluente fournit un docker-compose.yml
fichier pour configurer les services.
Note: ce n’est pas ainsi que vous utiliseriez Apache Kafka en production. L’exemple n’est utile qu’à des fins de formation et de développement. Obtenez le fichier :
wget -O docker-compose-confluent.yml https://raw.githubusercontent.com/confluentinc/cp-all-in-one/7.3.0-post/cp-all-in-one/docker-compose.yml
Ensuite, téléchargez le connecteur ScyllaDB CDC :
wget -O scylla-cdc-plugin.jar https://github.com/scylladb/scylla-cdc-source-connector/releases/download/scylla-cdc-source-connector-1.0.1/scylla-cdc-source-connector-1.0.1-jar-with-dependencies.jar
Ajoutez le connecteur ScyllaDB CDC au répertoire du plug-in du service de connexion Confluent à l’aide d’un volume Docker en modifiant docker-compose-confluent.ym
l ajouter les deux lignes comme ci-dessous en remplaçant le répertoire par le répertoire de votre scylla-cdc-plugin.jar
déposer.
image: cnfldemos/cp-server-connect-datagen:0.5.3-7.1.0
hostname: connect
container_name: connect
+ volumes:
+ - <directory>/scylla-cdc-plugin.jar:/usr/share/java/kafka/plugins/scylla-cdc-plugin.jar
depends_on:
- broker
- schema-registry
Lancez les services Confluent :
docker-compose -f docker-compose-confluent.yml up -d
Attendez environ une minute, puis accédez http://localhost:9021
pour l’interface graphique Web de Confluent.
Ajoutez le ScyllaConnector à l’aide du tableau de bord Confluent :
Ajoutez le connecteur Scylla en cliquant sur le plugin :
Remplissez les « Hôtes » avec l’adresse IP de l’un des nœuds Scylla (vous pouvez le voir dans la sortie de la commande nodetool status) et le port 9042, qui est écouté par le service ScyllaDB.
Le « Namespace » est l’espace de clés que vous avez créé auparavant dans ScyllaDB.
Notez que cela peut prendre environ une minute pour le ks.my_table
apparaître:
Tester les messages Kafka
Tu peux voir ça MyScyllaCluster.ks.my_table
est le sujet créé par le connecteur ScyllaDB CDC.
Maintenant, vérifiez les messages Kafka dans le panneau Sujets :
Sélectionnez le sujet, qui est le même que l’espace de clés et le nom de la table que vous avez créés dans ScyllaDB :
Du « Aperçu« , vous pouvez voir les informations sur le sujet. En bas, il indique que ce sujet se trouve sur la partition 0.
Une partition est la plus petite unité de stockage contenant un sous-ensemble d’enregistrements appartenant à une rubrique. Chaque partition est un fichier journal unique dans lequel les enregistrements y sont écrits en mode ajout uniquement. Les enregistrements dans les partitions se voient chacun attribuer un identifiant séquentiel appelé décalage, qui est unique pour chaque enregistrement dans la partition. Le décalage est un nombre incrémentiel et immuable maintenu par Kafka.
Comme vous le savez déjà, les messages ScyllaDB CDC sont envoyés au ks.my_table
sujet, et l’ID de partition du sujet est 0. Ensuite, allez à la « messages » et entrez l’ID de partition 0 dans l’onglet « compenser » champ:
Vous pouvez voir à partir de la sortie des messages de sujet Kafka que la table ScyllaDB INSERT
événement et les données ont été transférées aux messages Kafka par le connecteur source Scylla CDC. Cliquez sur le message pour afficher les informations complètes du message :
Le message contient le nom de la table ScyllaDB et le nom de l’espace de clés avec l’heure, ainsi que…