Nous connaissons tous la configuration in-app à la dérive et IaC. Nous commençons avec une configuration spécifique soutenue par des fichiers IaC. Peu de temps après, nous sommes confrontés à une «dérive» ou à un changement entre ce qui est réellement configuré dans notre infrastructure et nos fichiers. Le même comportement se produit dans les données. Le schéma commence par une forme spécifique. Au fur et à mesure que l’ingestion de données se développe et s’adapte à différentes sources, nous obtenons une dérive de schéma, une base de données désordonnée et non structurée et une couche analytique qui échoue en permanence en raison d’un mauvais schéma. Dans cet article, nous allons apprendre à gérer le scénario et à travailler avec des schémas dynamiques.
Les schémas sont une lutte majeure
Un schéma définit la structure du format de données. Keys/Values/Formats/Types
une combinaison de tous, aboutit à une structure définie ou simplement à un schéma.
Développeurs et ingénieurs de données, avez-vous déjà eu besoin de recréer une collection NoSQL ou de recréer une mise en page d’objet sur un bucket à cause de différents documents avec des clés ou des structures différentes ? Vous avez probablement.
Une structure d’enregistrement non alignée dans votre ingestion de données fera planter votre visualisation, vos tâches d’analyse et votre backend, et c’est une poursuite sans fin pour y remédier.
Dérive de schéma
La dérive de schéma est le cas où vos sources changent souvent les métadonnées. Les champs, les clés, les colonnes et les types peuvent être ajoutés, supprimés ou modifiés à la volée.
Votre flux de données devient vulnérable aux modifications de source de données en amont sans gérer la dérive de schéma. Les modèles ETL typiques échouent lorsque les colonnes et les champs entrants changent car ils ont tendance à être liés à ces sources. Stream nécessite un ensemble d’outils différent.
L’article suivant explique les solutions et stratégies existantes pour atténuer le défi et éviter la dérive de schéma, y compris la gestion des versions des données à l’aide de lakeFS.
Comparaison entre Confluent Schema Registry et Memphis Schemaverse
Confluent—Registre de schémas
Confluent Schema Registry fournit une couche de service pour vos métadonnées. Il fournit une interface RESTful pour stocker et récupérer vos schémas Avro, JSON Schema et Protobuf. Il stocke un historique versionné de tous les schémas en fonction d’une stratégie de nom de sujet spécifiée, fournit plusieurs paramètres de compatibilité et permet l’évolution des schémas en fonction des paramètres de compatibilité configurés et de la prise en charge étendue de ces types de schémas. Il fournit des sérialiseurs qui se connectent aux clients Apache Kafka. Ils gèrent le stockage et la récupération du schéma pour les messages Kafka qui sont envoyés dans l’un des formats pris en charge.
Le bon
- Prend en charge Avro, le schéma JSON et Protobuf.
- Sécurité renforcée
- Application du schéma
- Évolution du schéma
Le mauvais
- Difficile à configurer.
- Pas de sauvegarde externe.
- Sérialisation manuelle
- Peut être contourné.
- Nécessite un entretien et une surveillance.
- Prend principalement en charge Java.
- Aucune validation

Memphis—Schemaverse
Schemaverse fournit un magasin de schémas robuste et une couche de gestion de schémas au-dessus du courtier Memphis sans calcul autonome ni ressources dédiées. Avec une approche unique et programmatique, les utilisateurs techniques et non techniques peuvent créer et définir différents schémas, attacher le schéma à plusieurs stations et choisir si le schéma doit être appliqué ou non.
L’approche low-code de Memphis supprime la partie de sérialisation car elle est intégrée dans la bibliothèque du producteur. Schemaverse prend en charge la gestion des versions, les méthodologies GitOps et l’évolution des schémas.
Le bon
- Excellente approche programmatique.
- Intégrer au courtier
- Application de la confiance zéro
- Gestion des versions
- Surveillance prête à l’emploi
- GitOps—travail avec des fichiers.
- Validation et sérialisation à faible/sans code.
- Aucune configuration.
- Support natif en Python, Go, Node.js.
Le mauvais
- Ne prend pas encore en charge tous les formats. Protobuf et JSON uniquement. GraphQL et Avro sont les prochains.
Éviter la dérive de schéma à l’aide de Confluent Schema Registry
Éviter la dérive de schéma signifie que vous souhaitez appliquer un schéma particulier sur un sujet ou une station et valider chaque donnée produite.
Ce didacticiel suppose que vous utilisez le cloud Confluent avec le registre configuré. Si ce n’est pas le cas, visitez le site Web officiel de Confluent pour savoir comment l’installer.
Pour vous assurer que vous ne dérivez pas et ne maintenez pas une seule norme ou structure de schéma dans notre sujet, vous devrez :
1. Copiez le ccloud-stack
fichier de configuration à $HOME/.confluent/java.config
.
2. Créez un sujet.
3. Définissez un schéma. Par example:
{
"namespace": "io.confluent.examples.clients.basicavro",
"type": "record",
"name": "Payment",
"fields": [
{"name": "id", "type": "string"},
{"name": "amount", "type": "double"}
]
}
4. Activez la validation de schéma sur la rubrique nouvellement créée.
5. Configurez Avro/Protobuf à la fois dans l’application et avec le registre.
Exemple de code producteur dans Maven :
...
import io.confluent.kafka.serializers.KafkaAvroSerializer;
...
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
...
KafkaProducer<String, Payment> producer = new KafkaProducer<String, Payment>(props));
final Payment payment = new Payment(orderId, 1000.00d);
final ProducerRecord<String, Payment> record = new ProducerRecord<String, Payment>(TOPIC, payment.getId().toString(), payment);
producer.send(record);
Parce que le pom.xml
inclut le plug-in avro-maven, la classe de paiement est automatiquement générée lors de la compilation.
Si un producteur essaie de produire un message qui n’est pas structuré selon le schéma défini, le message ne sera pas ingéré.
Éviter la dérive de schéma à l’aide de Memphis Schemaverse
1. Créez un nouveau schéma (actuellement uniquement disponible via l’interface graphique de Memphis).
2. Attacher un schéma : dirigez-vous vers votre station et, dans le coin supérieur gauche, cliquez sur « + Attacher un schéma ».
3. Exemple de code dans node.js.
Memphis fait abstraction du besoin de fonctions de sérialisation externes et les intègre dans le SDK.
Producteur (exemple Protobuf):
const memphis = require("memphis-dev");
var protobuf = require("protobufjs");
(async function () {
try {
await memphis.connect({
host: "localhost",
username: "root",
connectionToken: "*****"
});
const producer = await memphis.producer({
stationName: "marketing-partners.prod",
producerName: "prod.1"
});
var payload = {
fname: "AwesomeString",
lname: "AwesomeString",
id: 54,
};
try {
await producer.produce({
message: payload
});
} catch (ex) {
console.log(ex.message)
}
} catch (ex) {
console.log(ex);
memphis.close();
}
})();
Consommateur (nécessite .proto
fichier pour décoder les messages) :
const memphis = require("memphis-dev"); var protobuf = require("protobufjs"); (async function () { try { await memphis.connect({ host: "localhost", username: "root", connectionToken: "*****" }); const consumer = await memphis.consumer({ stationName: "marketing", consumerName: "cons1", consumerGroup: "cg_cons1", maxMsgDeliveries: 3, maxAckTimeMs: 2000, genUniqueSuffix: true }); const root = await protobuf.load("schema.proto"); var TestMessage = root.lookupType("Test"); consumer.on("message", message => { const x = message.getData() var msg = TestMessage.decode(x); console.log(msg) message.ack(); }); consumer.on("error", error => { console.log(error); }); } catch (ex) { console.log(ex); memphis.close(); } })();
Application du schéma
Avec la gestion des versions des données pour une solution robuste sur les magasins d’objets :
Maintenant que nous connaissons les nombreuses façons d’appliquer le schéma à l’aide de Confluent ou de Memphis, laissez-nous comprendre comment un outil de gestion des versions comme LakeFS peut conclure l’affaire pour vous.
Qu’est-ce que LakeFS ?
lakeFS est un moteur de gestion des versions de données qui vous permet de gérer des données, comme du code. Grâce aux opérations de branchement, de validation, de fusion et d’annulation de type Git, la gestion des données et, par conséquent, du schéma sur l’ensemble du cycle de vie des données est simplifiée.
Comment réaliser l’application des schémas avec lakeFS
En tirant parti de la fonctionnalité de branchement et des webhooks de lakeFS, vous pouvez implémenter un mécanisme d’application de schéma robuste sur votre lac de données.
Les hooks lakeFS permettent d’automatiser et de garantir l’exécution d’un ensemble donné de vérifications et de validations avant les événements importants du cycle de vie. Ils sont similaires, conceptuellement, aux crochets Git, mais contrairement à Git, les crochets lakeFS déclenchent un serveur distant, qui exécute des tests et il est donc garanti que cela se produise.
Vous pouvez configurer les hooks lakeFS pour vérifier le schéma de table spécifique lors de la fusion des données des branches de données de développement ou de test vers la production. Autrement dit, un hook de pré-fusion peut être configuré sur la branche de production pour la validation du schéma. Si l’exécution du crochet échoue, LakeFS bloque l’opération de fusion.
Cette garantie extrêmement puissante peut aider à mettre en œuvre l’application des schémas et à automatiser les règles et les pratiques auxquelles toutes les sources de données et tous les producteurs doivent adhérer.
Implémentation de l’application de schéma à l’aide de lakeFS
- Commencez par créer un référentiel de données lakeFS au-dessus de votre magasin d’objets (par exemple, un compartiment AWS S3).
- Toutes les données de production (source unique de vérité) peuvent résider sur la branche « principale » ou la branche « production » de ce référentiel.
- Vous pouvez ensuite créer une branche « dev » ou « staging » pour conserver les données entrantes des sources de données.
- Configurez un serveur de webhooks lakeFS. Par exemple, une simple application Python Flask pouvant servir des requêtes HTTP peut être un serveur de webhooks. Reportez-vous à l’exemple d’application Flask que l’équipe LakeFS a mis en place pour commencer.
- Une fois que le serveur de webhooks est en cours d’exécution, activez les webhooks sur un…