DéveloppeurWeb.Com
    DéveloppeurWeb.Com
    • Agile Zone
    • AI Zone
    • Cloud Zone
    • Database Zone
    • DevOps Zone
    • Integration Zone
    • Web Dev Zone
    DéveloppeurWeb.Com
    Home»Uncategorized»Diffusion de données vers RDBMS via Kafka JDBC Sink
    Uncategorized

    Diffusion de données vers RDBMS via Kafka JDBC Sink

    février 22, 2023
    Diffusion de données vers RDBMS via Kafka JDBC Sink
    Share
    Facebook Twitter Pinterest Reddit WhatsApp Email

    Dans le paysage actuel des communications M2M (Machine à machine), il existe un énorme besoin de diffuser les données numériques à partir d’appareils IoT hétérogènes vers les différents SGBDR pour une analyse plus approfondie via le tableau de bord, déclenchant différents événements pour effectuer de nombreuses actions. Pour prendre en charge les scénarios ci-dessus, Apache Kafka agit comme un système nerveux central où les données peuvent être ingérées à partir de divers appareils IoT et conservées dans différents types de référentiel, RDBMS, stockage en nuage, etc. En outre, divers types de pipelines de données peuvent être exécutés avant ou après que les données arrivent sur le sujet de Kafka. En utilisant le connecteur de récepteur Kafka JDBC, nous pouvons diffuser des données en continu à partir du sujet de Kafka dans les RDBMS respectifs.

    La plus grande difficulté du connecteur de récepteur JDBC

    La plus grande difficulté avec le connecteur de récepteur JDBC est qu’il nécessite la connaissance du schéma de données qui a déjà atterri sur le sujet Kafka. Schema Registry doit donc être intégré en tant que composant distinct avec le cluster Kafka existant afin de transférer les données dans le RDBMS. Par conséquent, pour couler les données du sujet Kafka vers le SGBDR, les producteurs doivent publier des messages/données contenant le schéma. Le schéma définit la structure du format de données. Si le schéma n’est pas fourni, le connecteur de récepteur JDBC ne pourra pas mapper les messages avec les colonnes de la table de base de données après avoir consommé les messages de la rubrique.

    En tirant parti de Schema Registry, nous pouvons éviter d’envoyer à chaque fois des schémas avec des messages/charges utiles des producteurs, car Schema Registry stocke (ou enregistre) les schémas dans _schemas sujet et liez-le en conséquence avec le nom de sujet configuré/mentionné tel que défini dans le fichier de propriétés du connecteur de récepteur JDBC.

    Le coût de la licence peut constituer un obstacle pour les petites ou moyennes entreprises qui souhaitent utiliser Oracle ou Registre des schémas de Confluent avec Apache Kafka open source pour collecter des données sur les appareils IoT pour leurs perspectives commerciales.

    Dans cet article, nous allons utiliser un extrait de code Java pour voir comment les données peuvent être diffusées en continu dans la base de données MySQL à partir du sujet Apache Kafka en utilisant le connecteur JDBC Sink sans Schema Registry.

    Apache Kafka et connecteurs JDBC

    Apache Kafka n’a pas regroupé les connecteurs JDBC pour les SGBDR spécifiques au fournisseur similaires aux connecteurs de source et de récepteur de fichier. Il est de notre responsabilité d’implémenter ou de développer le code pour un SGBDR spécifique en implémentant l’API Connect d’Apache Kafka. Mais Confluent a développé, testé et pris en charge le connecteur JDBC Sink et finalement mis en open source sous licence communautaire Confluent. Nous avons donc intégré le connecteur JDBC Sink à Apache Kafka.

    Aucune exception ne sera levée à partir du sujet même si nous envoyons le schéma incorrect ou aucun schéma du tout, car le sujet Kafka accepte tous les messages ou enregistrements en tant que tableaux d’octets dans des paires clé-valeur. Avant de transmettre l’intégralité du message au sujet, le producteur doit convertir le message en un tableau d’octets à l’aide de sérialiseurs.

    Vous trouverez ci-dessous l’exemple de schéma lié à la charge utile ou aux données réelles qui doivent être publiées par les producteurs de messages Apache Kafka.

    Exemple de schéma

    Voici également l’extrait de code Java pour le générateur de message :

    public class ProducerWithSchema {
    
    private String status = "Failed";
    private String paylaodWithSchema = "{ \"schema\": { \"type\": \"struct\", \"fields\": [{ \"type\": \"int32\", \"optional\": false, \"field\": \"deviceId\" }, { \"type\": \"string\", \"optional\": false, \"field\": \"deviceData\" }, { \"type\": \"int64\", \"optional\": false, \"name\": \"org.apache.kafka.connect.data.Timestamp\", \"version\": 1, \"field\": \"generatedTime\" } ] }, \"payload\": { \"deviceId\": 3000, \"deviceData\": \"PPPPPwfgjijk\", \"generatedTime\": 1401984166000} }";
    private String key = "first";
    
    public Producer createProducer() {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, IKafkaConstants.KAFKA_BROKERS);
    props.put(ProducerConfig.CLIENT_ID_CONFIG, IKafkaConstants.CLIENT_ID);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
    //props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());
    System.setProperty("org.apache.logging.log4j.level", "INFO");
    return new KafkaProducer(props);
    }
    
    public String sendMsgToTopic(){
    Producer producer = null;
    ObjectMapper objectMapper = new ObjectMapper();
    try {
    JsonNode jsonNode = objectMapper.readTree(paylaodWithSchema);
    ProducerRecord<String, JsonNode> record = new ProducerRecord<String, JsonNode>(IKafkaConstants.TOPIC_NAME,jsonNode);
    
    producer = this.createProducer();
    producer.send(record);
    producer.flush();
    producer.close();
    
    }catch (Exception e) {
    System.out.println("Error in sending record");
    System.out.println(e.getMessage());
    }
    
    return status;
    
    }
    
    public static void main(String[] args) {
    // TODO Auto-generated method stub
    new ProducerWithSchema().sendMsgToTopic();
    }
    
    }

    Bien sûr, avec l’approche ci-dessus, quelques goulots d’étranglement existent, tels que :

    • Étroitement couplé entre les messages et le schéma.
    • Chaque schéma de temps doit être matraqué avec des données réelles.
    • Problèmes avec l’évolution du schéma.
    • Maintenabilité du code, etc.

    Pour atténuer ou résoudre les problèmes ci-dessus, le registre de schémas a été introduit en tant que composant distinct où tous les schémas seraient déployés/maintenus. Des vérifications de compatibilité sont nécessaires pendant l’évolution du schéma pour s’assurer que le contrat producteur-consommateur est respecté et que Schema Registry peut être utilisé pour y parvenir.

    Vous pouvez regarder la vidéo ci-dessous pour voir comment les données sont transmises en continu du sujet à la table spécifique de MySQL à l’aide du connecteur de récepteur JDBC sur le cluster Apache Kafka à nœud unique.

    Conclusion

    À présent, vous devriez avoir une meilleure compréhension de la plus grande difficulté avec le connecteur JDBC et le regroupement d’Apache Kafka avec les connecteurs JDBC. J’espère que vous avez apprécié cette lecture. Veuillez aimer et partager si vous pensez que cette composition est précieuse.

    Share. Facebook Twitter Pinterest LinkedIn WhatsApp Reddit Email
    Add A Comment

    Leave A Reply Cancel Reply

    Catégories

    • Politique de cookies
    • Politique de confidentialité
    • CONTACT
    • Politique du DMCA
    • CONDITIONS D’UTILISATION
    • Avertissement
    © 2023 DéveloppeurWeb.Com.

    Type above and press Enter to search. Press Esc to cancel.