DéveloppeurWeb.Com
    DéveloppeurWeb.Com
    • Agile Zone
    • AI Zone
    • Cloud Zone
    • Database Zone
    • DevOps Zone
    • Integration Zone
    • Web Dev Zone
    DéveloppeurWeb.Com
    Home»Uncategorized»Techniques à connaître en tant que développeur Kafka Streams
    Uncategorized

    Techniques à connaître en tant que développeur Kafka Streams

    février 23, 2023
    Techniques à connaître en tant que développeur Kafka Streams
    Share
    Facebook Twitter Pinterest Reddit WhatsApp Email

    Envisagez-vous d’utiliser Kafka Streams pour créer une application distribuée ? Cet article présente quelques astuces et techniques avancées pour les développeurs de Kafka Streams. Au cours des deux dernières années, nous avons découvert ces techniques pour gérer les fonctionnalités avancées de Kafka Streams.

    Nous avons construit Kestra, une plate-forme d’orchestration et de planification de données open source, et nous avons décidé d’utiliser Kafka comme magasin de données central pour construire une architecture évolutive. Nous nous appuyons fortement sur Kafka Streams pour la plupart de nos services (l’exécuteur et le planificateur) et avons fait quelques hypothèses sur la façon dont il gère la charge de travail.

    Cependant, Kafka a certaines restrictions car ce n’est pas une base de données, nous devons donc gérer les contraintes et adapter le code pour le faire fonctionner avec Kafka. Nous couvrirons des sujets, tels que l’utilisation du même sujet Kafka pour la source et la destination, et la création d’un jointeur personnalisé pour Kafka Streams, afin d’assurer un débit élevé et une faible latence tout en s’adaptant aux contraintes de Kafka et en le faisant fonctionner avec Kestra.

    Pourquoi Apache Kafka ?

    Apache Kafka est un magasin d’événements distribué open source et une plate-forme de traitement de flux qui gère de gros volumes de données à grande vitesse. L’écosystème Kafka apporte également un framework de streaming robuste appelé Kafka Streams conçu pour simplifier la création de pipelines de données de streaming et effectuer des opérations de haut niveau comme la jonction et l’agrégation. L’un de ses principaux avantages est la possibilité d’intégrer l’application de streaming directement dans votre application Java, éliminant ainsi le besoin de gérer une plate-forme distincte.

    Lors de la construction de Kestra, nous voulions nous appuyer uniquement sur la file d’attente comme base de données pour notre application (file d’attente persistante) sans dépendances supplémentaires. Nous avons analysé de nombreux candidats (RabbitMQ, Apache Pulsar, Redis, etc.) et avons constaté qu’Apache Kafka était le seul qui couvrait tout pour notre cas d’utilisation.

    Même sujet Kafka pour la source et la destination

    Dans Kestra, nous avons un sujet Kafka pour l’exécution du flux actuel. Ce sujet est à la fois la source et la destination. Nous mettons à jour l’exécution actuelle pour ajouter des informations et la renvoyons à Kafka pour un traitement ultérieur.

    Au départ, nous ne savions pas si cette conception était possible avec Kafka. Nous demandé Matthias J. Sax, l’un des principaux mainteneurs de Kafka Streams, qui a répondu sur Stack Overflow.

    Oui, c’est possible si vous êtes certain que, pour la même clé (l’ID d’exécution, dans ce cas), vous n’avez qu’un seul processus qui peut l’écrire. Si vous voyez cet avertissement dans la console Detected out-of-order KTable update for execution at offset 10, partition 7vous avez probablement plusieurs processus pour la même clé, ce qui peut entraîner un comportement inattendu (comme l’écrasement des valeurs précédentes).

    Vous avez du mal à comprendre ce que cela signifie ? Imaginez une topologie avec le sujet comme source, une logique de branchement et deux processus différents écrivant vers la même destination :

    KStream<String, Execution> executions = builder
        .stream("executions", Consumed.with(Serdes.String(), JsonSerde.of(Execution.class)));
    
    executions
        .mapValues((readOnlyKey, value) -> value)
        .to("executions", Produced.with(Serdes.String(), JsonSerde.of(Execution.class)));
    
    executions
        .leftJoin(
            builder.table("results", Consumed.with(Serdes.String(), JsonSerde.of(WorkerTaskResult.class))),
            (readOnlyKey, value1, value2) -> value1
        )
        .to("executions", Produced.with(Serdes.String(), JsonSerde.of(Execution.class)));

    Dans ce cas, un processus concurrent peut écrire ce sujet sur la même clé, écrasant la valeur précédente, perdant ainsi ses données. Dans ce contexte, vous devez définir un seul rédacteur pour une clé à un instant donné. Cela nous amène à notre section suivante, un menuisier personnalisé.

    Joiner personnalisé pour les flux Kafka

    Nous avons écrit un microservice pour traiter les exécutions et diviser le microservice en plusieurs sujets :

    • Un sujet avec les exécutions (avec plusieurs tâches).
    • Un sujet avec des résultats de tâches.

    Pour permettre à la tâche suivante d’un flux de démarrer, nous devons créer un état avec tous les résultats des tâches fusionnés dans l’exécution en cours. Notre première idée a été d’utiliser join() de Kafka Streams. Avec le recul, ce n’était pas une décision très intelligente.

    Toutes les jointures fournies par Kafka Streams ont été conçues avec l’agrégation à l’esprit, comme sum, avg, etc. Il traite toutes les données entrantes des deux sujets 1 à 1. Nous verrons tous les changements sur les flux des deux côtés, comme illustré ci-dessous :

    # timeline
    --A-------- > Execution
    -----B--C-- > Task Result
    
    # join result timeline
    - (A,null)
    - (A, B) => emit (A+B)
    - (A, C) => emit (A+C) <=== you have overwritten the result of A+B
    - (A+B, null)
    - (A+C, null) <== we will never have (A+B+C)

    Cependant, nous construisons une machine à états et souhaitons conserver le dernier état d’exécution, ce qui signifie que nous ne voulons pas voir les états intermédiaires. Dans ce cas, nous n’avons pas d’autre choix que de créer un jointeur personnalisé puisque Kafka Streams n’en a pas de intégré.

    Notre menuisier sur mesure doit :

    • Créez manuellement un magasin qui enregistrera le dernier état d’une exécution.
    • Créez une fonction de fusion personnalisée qui fusionnera le flux d’exécution avec le flux de résultats des tâches.
    • Obtenez la dernière valeur de l’état, ajoutez le résultat de la tâche et émettez le nouvel état qui sera finalement enregistré sur le magasin et le sujet final.

    Avec tout cela, nous nous assurons que l’état d’exécution sera toujours la dernière version, quel que soit le nombre de résultats de tâches arrivant en parallèle.

    Charge de travail distribuée entre plusieurs backends

    Dans Kestra, un planificateur recherchera tous les flux soit avec une exécution planifiée, soit avec un mécanisme d’interrogation longue (détection des fichiers sur S3 ou SFTP). Pour éviter un point de défaillance unique sur ce service, nous devions répartir les flux entre toutes les instances de planificateurs.

    Nous nous appuyons sur les groupes de consommateurs de Kafka qui géreront pour nous la complexité d’un système distribué. Voici comment nous procédons :

    • Créez un flux Kafka qui lira dans un KTable et transmettre tous les résultats à un Consumer.
    • Écoutez les changements d’état (surtout REBALANCED flux) et vider tous les flux pour le Consumer.
    • Sur le READY état, lisez tous les KTable encore.

    Avec ceux-ci, tous les flux seront envoyés à tous les auditeurs. Cela signifie que si vous avez mille flux, chaque consommateur aura environ 500 flux (selon la répartition des clés). Kafka gérera toutes les parties lourdes des systèmes distribués, telles que :

    • Heartbeat pour détecter l’échec d’un consommateur.
    • Notifications de rééquilibrage.
    • Assurez une sémantique unique pour un sujet, en vous assurant qu’un seul consommateur traitera les données.

    De cette façon, vous aurez un système entièrement distribué grâce à Kafka sans avoir à passer par une analyse Jespen.

    Partitions pour détecter les consommateurs Kafka morts

    Dans Kestra, les travailleurs sont des consommateurs Kafka qui traitent les tâches qui leur sont soumises et géreront tout le calcul (connecter et interroger une base de données, récupérer des données auprès de services externes, etc.) et sont des processus de longue durée. Nous devons détecter quand un travailleur traitait une tâche et est décédé. Les raisons de la « mort » du processus peuvent aller d’une panne à un simple redémarrage pendant le traitement.

    Grâce au mécanisme du consommateur Kafka, nous pouvons connaître les partitions spécifiques affectées par un consommateur décédé. Nous utilisons ces fonctionnalités pour détecter les travailleurs décédés :

    • Nous créons un UUID au démarrage pour le travailleur.
    • Lorsqu’un consommateur se connecte à Kafka, nous écoutons les partitions affectées à l’aide d’un ConsumerRebalanceListener. Nous publions à Kafka une WorkerInstance avec le UUID et les partitions attribuées.
    • Pour chaque exécution de tâche, nous publions un message TaskRunning avec l’UUID du travailleur.

    Maintenant, gérons les données stockées dans Kafka. La logique principale est un Kafka Stream, qui va :

    • Créer un global KTable avec tout le WorkerInstance.
    • À chaque changement, il écoutera le changement WorkerInstance.
    • S’il y a un nouveau WorkerInstance, on regarde les partitions qui lui sont affectées. S’il y a un chevauchement entre les partitions de cette instance et la précédente, nous savons que la précédente WorkerInstance est mort. Dans Kafka, vous ne pouvez pas avoir deux consommateurs sur la même partition.
    • Nous n’avons qu’à examiner les tâches affectées dans ce WorkerInstance et les renvoyer pour traitement.

    Et voilà! Nous avons la détection des consommateurs morts en utilisant uniquement l’API Kafka.

    Méfiez-vous du magasin d’État all()

    Nous utilisons un GlobalKTable pour détecter les déclencheurs de flux. Pour tous les flux du cluster, nous testons toutes les conditions du flux pour trouver les flux correspondants. Pour cela, nous utilisons une API pour récupérer tous les flux d’un GlobalKTable en utilisant store.all() qui renvoie tous les flux de RocksDB (base de données interne de Kafka Stream).

    Notre première hypothèse était que all() renvoie un objet (Flow dans notre cas), comme l’objet de retour de l’API, mais nous avons découvert que le all() méthode va :

    • Récupérez toutes les données de RocksDB.
    • Désérialisez les données de RocksDB stockées sous forme d’octets et mappez-les sur un POJO Java concret.

    Ainsi, chaque fois que nous appelons le all() , toutes les valeurs sont désérialisées, ce qui peut entraîner une utilisation élevée du processeur et une latence sur votre flux. Nous parlons de toutes les révisions de flux sur notre cluster. La dernière révision avait des flux de 2.5K, mais nous ne voyons pas les gens créer beaucoup de révisions. Imaginez 100K byte[] pour désérialiser en POJO pour chaque appel.

    Comme nous n’avons besoin que de la dernière révision dans notre cas d’utilisation, nous créons une carte en mémoire avec tous les flux en utilisant ce qui suit :

    builder.addGlobalStore(
        Stores.keyValueStoreBuilder(
            Stores.persistentKeyValueStore(FLOW_STATE_STORE_NAME),
            Serdes.String(),
            JsonSerde.of(Flow.class)
        ),
        kafkaAdminService.getTopicName(Flow.class),
        Consumed.with(Serdes.String(), JsonSerde.of(Flow.class)).withName("GlobalStore.Flow"),
        () -> new GlobalInMemoryStateProcessor<>(
            FLOW_STATE_STORE_NAME,,
            flows -> kafkaFlowExecutor.setFlows(flows),
            store -> kafkaFlowExecutor.setStore(store)
        )
    );

    GlobalInMemoryStateProcessor est un wrapper simple qui enregistre le magasin d’état et envoie une liste complète à chaque changement (pas si fréquent). Grâce à cela, nous avons décidé de rassembler tous les flux en mémoire. Cela fonctionne bien pour nos cas d’utilisation car nous savons qu’une instance de Kestra n’aura pas des millions de flux.

    Rappelez-vous que…

    Share. Facebook Twitter Pinterest LinkedIn WhatsApp Reddit Email
    Add A Comment

    Leave A Reply Cancel Reply

    Catégories

    • Politique de cookies
    • Politique de confidentialité
    • CONTACT
    • Politique du DMCA
    • CONDITIONS D’UTILISATION
    • Avertissement
    © 2023 DéveloppeurWeb.Com.

    Type above and press Enter to search. Press Esc to cancel.