Récemment, j’ai eu pour tâche de migrer certaines données d’une ancienne base de données Oracle monolithique vers un microservice avec une base de données PostgreSQL. Le problème était que les données nécessaires à la migration avaient une table parente avec environ 2 millions d’enregistrements avec 150 colonnes et en plus de cela, tout était mis en évidence avec une colonne de charge utile agrégeant les données de diverses tables en XML. Comme vous pouvez l’imaginer, le SELECT de cette vue était assez lent, et par joli, je veux dire incroyablement lent, ce qui n’allait pas très bien fonctionner pour le connecteur. Ainsi, dans cet article, nous examinerons un cas d’utilisation simplifié similaire et comment pouvons-nous le gérer.
Cas d’utilisation
Nous avons une application de catalogue de cours avec une base de données PostgreSQL qui traite des instructeurs et de leurs cours. Nous devons maintenant migrer certains instructeurs hérités d’une autre base de données PostgreSQL qui va bientôt être mise hors service. Donc nous avons instructors-legacy-db
et le course-catalog-db
. Dans notre cas, les deux bases de données ne seront pas submergées d’enregistrements, avec à peu près 200 enregistrements pour le instructors-legacy-db
mais pour les besoins de l’exemple, imaginez simplement que instructors-legacy-db
est cette table avec 2 millions d’enregistrements encombrants.
Exact, voici le docker-compose.yml
version: '3'
services:
course-catalog-operational-db:
image: postgres:13.3
container_name: course-catalog-operational-db
command:
- "postgres"
- "-c"
- "wal_level=logical"
environment:
POSTGRES_PASSWORD: 123456
POSTGRES_DB: course-catalog-db
ports:
- "5433:5432"
instructors-legacy-db:
image: postgres:13.3
container_name: instructors-legacy-db
command:
- "postgres"
- "-c"
- "wal_level=logical"
environment:
POSTGRES_PASSWORD: 123456
POSTGRES_DB: instructors-db
ports:
- "5434:5432"
volumes:
- ./init.sql:/docker-entrypoint-initdb.d/init.sql
Voici la course-catalog-db/instructors
table:
create table instructors
(
id integer not null
primary key,
created_at timestamp not null,
updated_at timestamp not null,
description varchar(3000),
name varchar(255),
summary varchar(3000)
);
Et voici le instructors-legacy-db/instructors
table:
create table instructors
(
id integer not null
primary key,
created_at timestamp not null,
updated_at timestamp not null,
first_name varchar(255),
last_name varchar(255),
title varchar(255)
);
Aussi, si vous avez remarqué pour le instructors-legacy-db
conteneur j’utilise un init.sql
script pour créer la table et faire quelques insertions au démarrage, nous aurons donc des données avec lesquelles jouer. Il n’y a rien de spécial dans ce script, juste 200 inserts générés aléatoirement, en voici quelques-uns (le reste peut être consulté sur le repo) :
INSERT INTO public.instructors (id, created_at, updated_at, first_name, last_name, title) VALUES (0, '2022-08-28 21:00:19.315552', '2022-08-28 21:00:19.315552', 'Liam', 'Martinez', 'Mr.');
INSERT INTO public.instructors (id, created_at, updated_at, first_name, last_name, title) VALUES (1, '2022-08-28 21:00:19.315552', '2022-08-28 21:00:19.315552', 'Thomas', 'Williams', 'Mr.');
INSERT INTO public.instructors (id, created_at, updated_at, first_name, last_name, title) VALUES (2, '2022-08-28 21:00:19.315552', '2022-08-28 21:00:19.315552', 'Mateo', 'Martinez', 'Mr.');
INSERT INTO public.instructors (id, created_at, updated_at, first_name, last_name, title) VALUES (3, '2022-08-28 21:00:19.315552', '2022-08-28 21:00:19.315552', 'Ciro', 'Smith', 'Mr.');
Et voici la vue que nous allons aborder :
create view instructor_aggregate_vw(id, created_at, updated_at, name) as
SELECT instructors.id,
instructors.created_at,
instructors.updated_at,
(((instructors.title::text || ' '::text) || instructors.first_name::text) || ' '::text) ||
instructors.last_name::text AS name
FROM instructors;
D’accord, non pas que nous ayons tout en place, comment interrogeons-nous et publions-nous nos données ?
Kafka Connect en action
C’est vrai, nous allons utiliser io.confluent.connect.jdbc.JdbcSourceConnector
pour ça.
Le connecteur Kafka Connect JDBC Source vous permet d’importer des données de n’importe quelle base de données relationnelle avec un pilote JDBC dans une rubrique Apache Kafka®. Ce connecteur peut prendre en charge une grande variété de bases de données.
Mais d’abord, nous devons configurer notre environnement Kafka. Pour cela, je vais utiliser le landoop/fast-data-dev
‘s docker image, car il est livré avec presque tout correctement configuré à partir du zookeeper, du registre de schéma, kafka-connect
et le courtier et se terminant par une belle interface utilisateur fournie par Landoop pour gérer tout ce qui concerne Kafka. Voici ce que nous allons ajouter à notre docker-compose.yml
course-catalog-kafka-cluster:
container_name: course-catalog-kafka-cluster
image: landoop/fast-data-dev
environment:
ADV_HOST: 127.0.0.1
RUNTESTS: 0
ports:
- 2181:2181
- 3030:3030
- 8081-8083:8081-8083
- 9581-9585:9581-9585
- 9092:9092
volumes:
# Specify an absolute path mapping
- C:\Users\ionpa\Projects\course-catalog\infra:/my-data
D’accord, docker-compose up
, et tout notre environnement est en place. Vous pouvez aller de l’avant et vérifier l’interface utilisateur sur http://localhost:3030/ et l’explorer un peu.
Parlons maintenant du connecteur. Les connecteurs peuvent être ajoutés à la fois via l’interface utilisateur sur localhost ici en suivant une configuration basée sur les propriétés ou basée sur JSON ou en exécutant une requête HTTP http://localhost:8083/connectors/ avec la méthode PUT et un corps JSON contenant la configuration de votre connecteur. Examinons la configuration que nous allons utiliser pour notre connecteur :
name=legacy.instructors.connector
topic.prefix=legacy-instructors
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
table.types=VIEW
connection.url=jdbc:postgresql://instructors-legacy-db:5432/instructors-db
connection.user=postgres
connection.password=123456
connection.attempts=100
connection.backoff.ms=300000
poll.interval.ms=100
transforms=AddNamespace,createKey,AddKeyNamespace
transforms.AddNamespace.type=org.apache.kafka.connect.transforms.SetSchemaMetadata$Value
transforms.AddNamespace.schema.name=inc.evil.coursecatalog.InstructorAggregate
transforms.AddKeyNamespace.type=org.apache.kafka.connect.transforms.SetSchemaMetadata$Key
transforms.AddKeyNamespace.schema.name=inc.evil.coursecatalog.Key
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=id
mode=timestamp+incrementing
timestamp.column.name=updated_at
timestamp.delay.interval.ms=3000
incrementing.column.name=id
numeric.mapping=best_fit
query=select * from(select * from instructor_aggregate_vw where "updated_at" < ? AND (("updated_at" = ? AND "id" > ?) OR "updated_at" > ?) order by "updated_at","id" ASC) limit 50 --
Tout d’abord, chapeau à Confluent, pour avoir fait un si bon travail de documentation. Vous pouvez vérifier la signification de chaque propriété ici. Mais, de toute façon, je vais donner une brève description de ce que nous avons ici :
name
– évidemment, est le nom du connecteur.topic.prefix
– parce que nous utilisons une requête personnalisée, c’est le nom du sujet que nous allons publier nos enregistrements.connector.class
– est l’implémentation du connecteur que nous allons utiliser, dans notre cas c’est leio.confluent.connect.jdbc.JdbcSourceConnector
table.types
– puisque nous allons interroger une vue personnalisée, le type va être VOIR.connection.*
– sont des propriétés liées à la connexion, évidemment la connexion URL, utilisateur, le mot de passe à notre base de données et également une configuration liée au nombre de tentatives et de recul.poll.interval.ms
– il s’agit essentiellement de la fréquence à laquelle le connecteur doit interroger la table pour de nouveaux enregistrements.transforms.*
– dans mon cas, les propriétés de configuration liées à la conversion/sérialisation à la fois de la charge utile et de la clé vers AVRO.mode
– c’est fondamentalement l’une des propriétés les plus importantes, et elle peut avoir les valeurs suivantes :- en gros – interroge la table entière à chaque fois
- incrémentation – détectera les nouvelles lignes basées sur une colonne id
- horodatage – détectera les lignes nouvelles/mises à jour en fonction d’une colonne d’horodatage mise à jour automatiquement
- horodatage + incrémentation – considéré comme le mode le plus robuste et le plus précis, car il combine les 2 modes mentionnés ci-dessus, et le fait d’avoir à la fois la colonne timestamp et la colonne id en place nous permet d’identifier de manière unique les lignes nouvelles et mises à jour
timestamp.*
– définit la colonne requise et le délai en millisecondes pour le mode horodatage, dans notre cas update_atincrementing.column.name
– définit la colonne requise pour le mode d’incrémentation, dans notre cas identifiantnumeric.mapping
– décide comment allons-nous traiter, les valeurs NUMERIC, et dans notre cas, c’est meilleur ajustement qui indique que ces colonnes numériques doivent être converties en INT ou FLOAT en fonction de la précision et de l’échelle de la colonnequery
– c’est la propriété la plus essentielle concernant cet article, alors approfondissons un peu ici.- En gros, cette propriété définit la requête qui va être utilisée pour adresser une table/vue, et pour notre cas, cela devrait suffire, non ?
-
select * from instructor_aggregate_vw
Eh bien pas vraiment, puisque nous voulons que cette requête soit interrogée par lots, notre requête doit être modifiée. Disons que nous voulons interroger par lots de 50 enregistrements, cela devrait être facilement implémenté avec un simple
LIMIT
comme ça -
select * from instructor_aggregate_vw limit 50
Oui et non, oui c’est la bonne implémentation, et cela ne fonctionnera pas pour le connecteur Kafka. Et ça ne marchera pas parce que le…