DéveloppeurWeb.Com
    DéveloppeurWeb.Com
    • Agile Zone
    • AI Zone
    • Cloud Zone
    • Database Zone
    • DevOps Zone
    • Integration Zone
    • Web Dev Zone
    DéveloppeurWeb.Com
    Home»Database Zone»Connecteur source Kafka JDBC pour les données volumineuses
    Database Zone

    Connecteur source Kafka JDBC pour les données volumineuses

    novembre 29, 2022
    Connecteur source Kafka JDBC pour les données volumineuses
    Share
    Facebook Twitter Pinterest Reddit WhatsApp Email

    Récemment, j’ai eu pour tâche de migrer certaines données d’une ancienne base de données Oracle monolithique vers un microservice avec une base de données PostgreSQL. Le problème était que les données nécessaires à la migration avaient une table parente avec environ 2 millions d’enregistrements avec 150 colonnes et en plus de cela, tout était mis en évidence avec une colonne de charge utile agrégeant les données de diverses tables en XML. Comme vous pouvez l’imaginer, le SELECT de cette vue était assez lent, et par joli, je veux dire incroyablement lent, ce qui n’allait pas très bien fonctionner pour le connecteur. Ainsi, dans cet article, nous examinerons un cas d’utilisation simplifié similaire et comment pouvons-nous le gérer.

    Cas d’utilisation

    Nous avons une application de catalogue de cours avec une base de données PostgreSQL qui traite des instructeurs et de leurs cours. Nous devons maintenant migrer certains instructeurs hérités d’une autre base de données PostgreSQL qui va bientôt être mise hors service. Donc nous avons instructors-legacy-db et le course-catalog-db. Dans notre cas, les deux bases de données ne seront pas submergées d’enregistrements, avec à peu près 200 enregistrements pour le instructors-legacy-dbmais pour les besoins de l’exemple, imaginez simplement que instructors-legacy-db est cette table avec 2 millions d’enregistrements encombrants.

    Exact, voici le docker-compose.yml

    version: '3'
    services:
      course-catalog-operational-db:
        image: postgres:13.3
        container_name: course-catalog-operational-db
        command:
          - "postgres"
          - "-c"
          - "wal_level=logical"
        environment:
          POSTGRES_PASSWORD: 123456
          POSTGRES_DB: course-catalog-db
        ports:
          - "5433:5432"
      instructors-legacy-db:
        image: postgres:13.3
        container_name: instructors-legacy-db
        command:
          - "postgres"
          - "-c"
          - "wal_level=logical"
        environment:
          POSTGRES_PASSWORD: 123456
          POSTGRES_DB: instructors-db
        ports:
          - "5434:5432"
        volumes:
          - ./init.sql:/docker-entrypoint-initdb.d/init.sql
    

    Voici la course-catalog-db/instructors table:

    create table instructors
    (
        id          integer   not null
            primary key,
        created_at  timestamp not null,
        updated_at  timestamp not null,
        description varchar(3000),
        name        varchar(255),
        summary     varchar(3000)
    );
    

    Et voici le instructors-legacy-db/instructors table:

    create table instructors
    (
        id         integer   not null
            primary key,
        created_at timestamp not null,
        updated_at timestamp not null,
        first_name       varchar(255),
        last_name       varchar(255),
        title varchar(255)
    );
    

    Aussi, si vous avez remarqué pour le instructors-legacy-db conteneur j’utilise un init.sql script pour créer la table et faire quelques insertions au démarrage, nous aurons donc des données avec lesquelles jouer. Il n’y a rien de spécial dans ce script, juste 200 inserts générés aléatoirement, en voici quelques-uns (le reste peut être consulté sur le repo) :

    INSERT INTO public.instructors (id, created_at, updated_at, first_name, last_name, title) VALUES (0, '2022-08-28 21:00:19.315552', '2022-08-28 21:00:19.315552', 'Liam', 'Martinez', 'Mr.');
    INSERT INTO public.instructors (id, created_at, updated_at, first_name, last_name, title) VALUES (1, '2022-08-28 21:00:19.315552', '2022-08-28 21:00:19.315552', 'Thomas', 'Williams', 'Mr.');
    INSERT INTO public.instructors (id, created_at, updated_at, first_name, last_name, title) VALUES (2, '2022-08-28 21:00:19.315552', '2022-08-28 21:00:19.315552', 'Mateo', 'Martinez', 'Mr.');
    INSERT INTO public.instructors (id, created_at, updated_at, first_name, last_name, title) VALUES (3, '2022-08-28 21:00:19.315552', '2022-08-28 21:00:19.315552', 'Ciro', 'Smith', 'Mr.');
    

    Et voici la vue que nous allons aborder :

    create view instructor_aggregate_vw(id, created_at, updated_at, name) as
    SELECT instructors.id,
           instructors.created_at,
           instructors.updated_at,
           (((instructors.title::text || ' '::text) || instructors.first_name::text) || ' '::text) ||
           instructors.last_name::text AS name
    FROM instructors;
    

    D’accord, non pas que nous ayons tout en place, comment interrogeons-nous et publions-nous nos données ?

    Kafka Connect en action

    C’est vrai, nous allons utiliser io.confluent.connect.jdbc.JdbcSourceConnector pour ça.

    Le connecteur Kafka Connect JDBC Source vous permet d’importer des données de n’importe quelle base de données relationnelle avec un pilote JDBC dans une rubrique Apache Kafka®. Ce connecteur peut prendre en charge une grande variété de bases de données.

    Mais d’abord, nous devons configurer notre environnement Kafka. Pour cela, je vais utiliser le landoop/fast-data-dev‘s docker image, car il est livré avec presque tout correctement configuré à partir du zookeeper, du registre de schéma, kafka-connectet le courtier et se terminant par une belle interface utilisateur fournie par Landoop pour gérer tout ce qui concerne Kafka. Voici ce que nous allons ajouter à notre docker-compose.yml

      course-catalog-kafka-cluster:
        container_name: course-catalog-kafka-cluster
        image: landoop/fast-data-dev
        environment:
          ADV_HOST: 127.0.0.1         
          RUNTESTS: 0                 
        ports:
          - 2181:2181                 
          - 3030:3030                 
          - 8081-8083:8081-8083       
          - 9581-9585:9581-9585       
          - 9092:9092                 
        volumes:
          # Specify an absolute path mapping
          - C:\Users\ionpa\Projects\course-catalog\infra:/my-data
    

    D’accord, docker-compose up, et tout notre environnement est en place. Vous pouvez aller de l’avant et vérifier l’interface utilisateur sur http://localhost:3030/ et l’explorer un peu.

    Parlons maintenant du connecteur. Les connecteurs peuvent être ajoutés à la fois via l’interface utilisateur sur localhost ici en suivant une configuration basée sur les propriétés ou basée sur JSON ou en exécutant une requête HTTP http://localhost:8083/connectors/ avec la méthode PUT et un corps JSON contenant la configuration de votre connecteur. Examinons la configuration que nous allons utiliser pour notre connecteur :

    name=legacy.instructors.connector
    topic.prefix=legacy-instructors
    connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
    table.types=VIEW
    connection.url=jdbc:postgresql://instructors-legacy-db:5432/instructors-db
    connection.user=postgres
    connection.password=123456
    connection.attempts=100
    connection.backoff.ms=300000
    poll.interval.ms=100
    transforms=AddNamespace,createKey,AddKeyNamespace
    transforms.AddNamespace.type=org.apache.kafka.connect.transforms.SetSchemaMetadata$Value
    transforms.AddNamespace.schema.name=inc.evil.coursecatalog.InstructorAggregate
    transforms.AddKeyNamespace.type=org.apache.kafka.connect.transforms.SetSchemaMetadata$Key
    transforms.AddKeyNamespace.schema.name=inc.evil.coursecatalog.Key
    transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
    transforms.createKey.fields=id
    mode=timestamp+incrementing
    timestamp.column.name=updated_at
    timestamp.delay.interval.ms=3000
    incrementing.column.name=id
    numeric.mapping=best_fit
    query=select * from(select * from instructor_aggregate_vw where "updated_at" < ? AND (("updated_at" = ? AND "id" > ?) OR "updated_at" > ?) order by "updated_at","id" ASC) limit 50 --
    

    Tout d’abord, chapeau à Confluent, pour avoir fait un si bon travail de documentation. Vous pouvez vérifier la signification de chaque propriété ici. Mais, de toute façon, je vais donner une brève description de ce que nous avons ici :

    • name – évidemment, est le nom du connecteur.
    • topic.prefix – parce que nous utilisons une requête personnalisée, c’est le nom du sujet que nous allons publier nos enregistrements.
    • connector.class – est l’implémentation du connecteur que nous allons utiliser, dans notre cas c’est le io.confluent.connect.jdbc.JdbcSourceConnector
    • table.types – puisque nous allons interroger une vue personnalisée, le type va être VOIR.
    • connection.* – sont des propriétés liées à la connexion, évidemment la connexion URL, utilisateur, le mot de passe à notre base de données et également une configuration liée au nombre de tentatives et de recul.
    • poll.interval.ms – il s’agit essentiellement de la fréquence à laquelle le connecteur doit interroger la table pour de nouveaux enregistrements.
    • transforms.* – dans mon cas, les propriétés de configuration liées à la conversion/sérialisation à la fois de la charge utile et de la clé vers AVRO.
    • mode– c’est fondamentalement l’une des propriétés les plus importantes, et elle peut avoir les valeurs suivantes :
      • en gros – interroge la table entière à chaque fois
      • incrémentation – détectera les nouvelles lignes basées sur une colonne id
      • horodatage – détectera les lignes nouvelles/mises à jour en fonction d’une colonne d’horodatage mise à jour automatiquement
      • horodatage + incrémentation – considéré comme le mode le plus robuste et le plus précis, car il combine les 2 modes mentionnés ci-dessus, et le fait d’avoir à la fois la colonne timestamp et la colonne id en place nous permet d’identifier de manière unique les lignes nouvelles et mises à jour
    • timestamp.* – définit la colonne requise et le délai en millisecondes pour le mode horodatage, dans notre cas update_at
    • incrementing.column.name – définit la colonne requise pour le mode d’incrémentation, dans notre cas identifiant
    • numeric.mapping – décide comment allons-nous traiter, les valeurs NUMERIC, et dans notre cas, c’est meilleur ajustement qui indique que ces colonnes numériques doivent être converties en INT ou FLOAT en fonction de la précision et de l’échelle de la colonne
    • query– c’est la propriété la plus essentielle concernant cet article, alors approfondissons un peu ici.
      • En gros, cette propriété définit la requête qui va être utilisée pour adresser une table/vue, et pour notre cas, cela devrait suffire, non ?
      • select * from instructor_aggregate_vw

        Eh bien pas vraiment, puisque nous voulons que cette requête soit interrogée par lots, notre requête doit être modifiée. Disons que nous voulons interroger par lots de 50 enregistrements, cela devrait être facilement implémenté avec un simple LIMIT comme ça

      • select * from instructor_aggregate_vw limit 50

        Oui et non, oui c’est la bonne implémentation, et cela ne fonctionnera pas pour le connecteur Kafka. Et ça ne marchera pas parce que le…

    Share. Facebook Twitter Pinterest LinkedIn WhatsApp Reddit Email
    Add A Comment

    Leave A Reply Cancel Reply

    Catégories

    • Politique de cookies
    • Politique de confidentialité
    • CONTACT
    • Politique du DMCA
    • CONDITIONS D’UTILISATION
    • Avertissement
    © 2023 DéveloppeurWeb.Com.

    Type above and press Enter to search. Press Esc to cancel.