À propos de KAFKA et Confluent
Apache Kafka est une plate-forme de diffusion d’événements distribuée par la communauté open source utilisée par des milliers d’entreprises pour la diffusion en continu hautes performances, les pipelines de données et les applications critiques. Kafka a été développé par Apache Software Foundation écrit en Scala et Java.
Confluent Open Source est une distribution optimisée pour les développeurs d’Apache Kafka. Plateforme Confluente est une plate-forme de streaming de données à grande échelle qui vous permet d’accéder, de stocker et de gérer facilement les données sous forme de flux continus et en temps réel. Confluent est une distribution plus complète d’Apache Kafka. Il rationalise les procédures d’opérations d’administration avec beaucoup de facilité.
Installation et démonstration de Confluent Kafka
Le but est d’insérer à SQLite-DB et affichez ces lignes sur un sujet créé automatiquement sur Kafka via le connecteur JDBC-source.
Environnement: RedHat Linux 7.x
- Télécharger Tarball
1
curl -O http://packages.confluent.io/archive/6.0/confluent-6.0.1.tar.gz
- Extraire tar.gz
1
tar -xvf tar/confluent-6.0.1.tar.gz
- Définir des variables confluentes
1
2
export CONFLUENT_HOME=/mydata/myuser/confluent-6.0.1
export PATH=$PATH:$CONFLUENT_HOME/bin
- Installer confluent-hub et kafka connect jdbc
1
2
$CONFLUENT_HOME/bin/confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:latest
confluent-hub install confluentinc/kafka-connect-jdbc:10.0.1
Vous pouvez voir le hub confluent sur votre site Web local.
http://localhost:9021/clusters
- Définir source JDBC fichier à /mesdonnées/monutilisateur/confluent-6.0.1/etc/kafka-connect-jdbc/ :
1
2
3
4
5
6
7
8
9
name=test-sqlite-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
tasks.max=1
connection.url=jdbc:sqlite:test.db
mode=incrementing
incrementing.column.name=id
topic.prefix=turkcell.sqlite-jdbc-
- Exécutez les services confluents :
1
confluent local services connect start
La sortie comme ci-dessous :
- Charge jdbc-source au connecteur :
1
confluent local services connect connector load jdbc-source -c /mydata/myuser/confluent-6.0.1/etc/kafka-connect-jdbc/source-quickstart-sqlite.properties
- Vous pouvez créer SQLite-DB ; créer un tableau et insérer des lignes :
1
2
3
4
cd confluent-6.0.1/
sqlite3 test.db
sqlite> CREATE TABLE ttech(id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, name VARCHAR(255));
sqlite> INSERT INTO ttech(name) VALUES('turkcell');
Vous pouvez voir une ligne comme celle-ci :
- Regardez le journal de connexion pour voir si Kafka-source-JDBC échoue ou fonctionne correctement :
1
confluent local services connect log
- Enfin, consultez le sujet Kafka pour voir votre enregistrement nouvellement ajouté. J’utilise Kafka Tool pour vérifier :
Quelques commandes et captures d’écran utiles
- Voir la liste des connecteurs :
confluent local services connect connector --list
- Voir le contenu du connecteur :
confluent local services connect connector config jdbc-source
- Connecteur de déchargement :
confluent local services connect connector unload jdbc-source
- Voir le journal de connexion :
confluent local services connect log
- Remplacez le formateur d’Avro par JSON dans ce fichier :
/mydata/myuser/confluent-6.0.1/etc/kafka/connect-distributed.properties
- Si vous utilisez schema-registry :
- UNEschéma clé-valeur dd :
-
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema": "{"type":"record","name":"your_table","fields":[{"name":"ID","type":"long"}]}"}' http://localhost:8091/subjects/your_table-key/versions
-
- Vérifiez qu’il est installé :
-
curl -X GET http://localhost:8091/subjects
-
- Ajoutez le connecteur à synchroniser à la table Oracle :
-
curl -XPOST --header "Content-Type: application/json" localhost:8083/connectors -d '{ "name": "sink_my_table", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": 3, "connection.url": "jdbc:oracle:thin:@my.db:1961:MYSERVICE", "connection.user": "ora_user", "connection.password": "XXXXXX", "table.name.format": "my_table", "topics": "my_table", "auto.create": "false", "delete.enabled": "true", "pk.mode": "record_key", "pk.fields": "ID", "insert.mode": "upsert", "transforms": "TimestampConverter1,TimestampConverter2,TimestampConverter3", "transforms.TimestampConverter1.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.TimestampConverter1.field": "RECORDDATE", "transforms.TimestampConverter1.target.type": "Timestamp", "transforms.TimestampConverter2.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.TimestampConverter2.field": "STARTDATE", "transforms.TimestampConverter2.target.type": "Timestamp", "transforms.TimestampConverter3.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.TimestampConverter3.field": "ENDDATE", "transforms.TimestampConverter3.target.type": "Timestamp" } }'
-
- Regardez les informations de configuration du connecteur :
-
curl -X GET http://localhost:8083/connectors/sink_my_table
-
- Regardez l’état du connecteur :
-
curl -X GET http://localhost:8083/connectors/sink_my_table/status
-
- Supprimer le connecteur :
-
curl -X DELETE http://localhost:8083/connectors/sink_my_table
-
- UNEschéma clé-valeur dd :
J’espère que ça vous aide !