DéveloppeurWeb.Com
    DéveloppeurWeb.Com
    • Agile Zone
    • AI Zone
    • Cloud Zone
    • Database Zone
    • DevOps Zone
    • Integration Zone
    • Web Dev Zone
    DéveloppeurWeb.Com
    Home»Big Data Zone»Installation et démonstration de Confluent Kafka
    Big Data Zone

    Installation et démonstration de Confluent Kafka

    octobre 26, 2021
    Installation et démonstration de Confluent Kafka
    Share
    Facebook Twitter Pinterest Reddit WhatsApp Email

    À propos de KAFKA et Confluent

    Apache Kafka est une plate-forme de diffusion d’événements distribuée par la communauté open source utilisée par des milliers d’entreprises pour la diffusion en continu hautes performances, les pipelines de données et les applications critiques. Kafka a été développé par Apache Software Foundation écrit en Scala et Java.

    Confluent Open Source est une distribution optimisée pour les développeurs d’Apache Kafka. Plateforme Confluente est une plate-forme de streaming de données à grande échelle qui vous permet d’accéder, de stocker et de gérer facilement les données sous forme de flux continus et en temps réel. Confluent est une distribution plus complète d’Apache Kafka. Il rationalise les procédures d’opérations d’administration avec beaucoup de facilité.

    Installation et démonstration de Confluent Kafka

    Le but est d’insérer à SQLite-DB et affichez ces lignes sur un sujet créé automatiquement sur Kafka via le connecteur JDBC-source.

    Environnement: RedHat Linux 7.x

    1. Télécharger Tarball

      1

      curl -O http://packages.confluent.io/archive/6.0/confluent-6.0.1.tar.gz

    2. Extraire tar.gz

      1

      tar -xvf tar/confluent-6.0.1.tar.gz

    3. Définir des variables confluentes

      1

      2

      export CONFLUENT_HOME=/mydata/myuser/confluent-6.0.1

      export PATH=$PATH:$CONFLUENT_HOME/bin

    4. Installer confluent-hub et kafka connect jdbc

      1

      2

      $CONFLUENT_HOME/bin/confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:latest

      confluent-hub install confluentinc/kafka-connect-jdbc:10.0.1

      Vous pouvez voir le hub confluent sur votre site Web local.

      http://localhost:9021/clusters

    5. Définir source JDBC fichier à /mesdonnées/monutilisateur/confluent-6.0.1/etc/kafka-connect-jdbc/ :

      1

      2

      3

      4

      5

      6

      7

      8

      9

      name=test-sqlite-jdbc-autoincrement

      connector.class=io.confluent.connect.jdbc.JdbcSourceConnector

      value.converter=org.apache.kafka.connect.json.JsonConverter

      value.converter.schemas.enable=false 

      tasks.max=1

      connection.url=jdbc:sqlite:test.db

      mode=incrementing

      incrementing.column.name=id

      topic.prefix=turkcell.sqlite-jdbc-

    6. Exécutez les services confluents :

      1

      confluent local services connect start

      La sortie comme ci-dessous :Sortie du gardien du zoo de Kafka

    7. Charge jdbc-source au connecteur :

      1

      confluent local services connect connector load jdbc-source -c /mydata/myuser/confluent-6.0.1/etc/kafka-connect-jdbc/source-quickstart-sqlite.properties

    8. Vous pouvez créer SQLite-DB ; créer un tableau et insérer des lignes :

      1

      2

      3

      4

      cd confluent-6.0.1/

      sqlite3 test.db

      sqlite> CREATE TABLE ttech(id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, name VARCHAR(255)); 

      sqlite> INSERT INTO ttech(name) VALUES('turkcell'); 

      Vous pouvez voir une ligne comme celle-ci :

      rangée Kafka

    9. Regardez le journal de connexion pour voir si Kafka-source-JDBC échoue ou fonctionne correctement :

      1

      confluent local services connect log

    10. Enfin, consultez le sujet Kafka pour voir votre enregistrement nouvellement ajouté. J’utilise Kafka Tool pour vérifier :

    record de Kafka

    Quelques commandes et captures d’écran utiles

    1. Voir la liste des connecteurs : confluent local services connect connector --list
    2. Voir le contenu du connecteur : confluent local services connect connector config jdbc-source
    3. Connecteur de déchargement : confluent local services connect connector unload jdbc-source
    4. Voir le journal de connexion : confluent local services connect log
    5. Remplacez le formateur d’Avro par JSON dans ce fichier : /mydata/myuser/confluent-6.0.1/etc/kafka/connect-distributed.properties
    6. Si vous utilisez schema-registry :
      • UNEschéma clé-valeur dd :
        • curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema": "{"type":"record","name":"your_table","fields":[{"name":"ID","type":"long"}]}"}' http://localhost:8091/subjects/your_table-key/versions

      • Vérifiez qu’il est installé :
        • curl -X GET http://localhost:8091/subjects

      • Ajoutez le connecteur à synchroniser à la table Oracle :
        • curl -XPOST --header "Content-Type: application/json"  localhost:8083/connectors  -d 
          '{  
              "name": "sink_my_table",  
              "config": {    
                  "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",    
          		"tasks.max": 3,   
          		"connection.url": "jdbc:oracle:thin:@my.db:1961:MYSERVICE",
                  "connection.user": "ora_user",    
                  "connection.password": "XXXXXX",    
          		"table.name.format": "my_table",
                  "topics": "my_table",
                  "auto.create": "false",
          		"delete.enabled": "true",
          		"pk.mode": "record_key",
          		"pk.fields": "ID",
          		"insert.mode": "upsert",
          		"transforms": "TimestampConverter1,TimestampConverter2,TimestampConverter3",
          		"transforms.TimestampConverter1.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
          		"transforms.TimestampConverter1.field": "RECORDDATE",
          		"transforms.TimestampConverter1.target.type": "Timestamp",
          		"transforms.TimestampConverter2.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
          		"transforms.TimestampConverter2.field": "STARTDATE",
          		"transforms.TimestampConverter2.target.type": "Timestamp",
          		"transforms.TimestampConverter3.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
          		"transforms.TimestampConverter3.field": "ENDDATE",
          		"transforms.TimestampConverter3.target.type": "Timestamp"
          		
              }
          }' 

      • Regardez les informations de configuration du connecteur :
        • curl -X GET http://localhost:8083/connectors/sink_my_table

      • Regardez l’état du connecteur :
        • curl -X GET http://localhost:8083/connectors/sink_my_table/status

      • Supprimer le connecteur :
        • curl -X DELETE http://localhost:8083/connectors/sink_my_table

    Capture d'écran du centre de contrôle Confluent

    capture d'écran de la page d'aperçu de confluent

    capture d'écran des sujets

    capture d'écran des clusters de connexion

    capture d'écran de la source JDBC

    J’espère que ça vous aide !

    Share. Facebook Twitter Pinterest LinkedIn WhatsApp Reddit Email
    Add A Comment

    Leave A Reply Cancel Reply

    Catégories

    • Politique de cookies
    • Politique de confidentialité
    • CONTACT
    • Politique du DMCA
    • CONDITIONS D’UTILISATION
    • Avertissement
    © 2023 DéveloppeurWeb.Com.

    Type above and press Enter to search. Press Esc to cancel.