Nous passerons en revue les bases, l’installation et le fonctionnement d’Apache Kafka, ainsi qu’une implémentation étape par étape à l’aide d’une application Web .NET Core 6.
Conditions préalables
- Visual Studio 2022
- SDK .NET Core 6
- serveur SQL
- JDKJava 11
- Apache Kafka
Ordre du jour
- Présentation de la diffusion d’événements
- Présentation d’Apache Kafka.
- Principaux concepts et fondement de Kafka.
- Différentes API Kafka.
- Cas d’utilisation d’Apache Kafka.
- Installation de Kafka sur Windows 10.
- Mise en œuvre étape par étape
Présentation de la diffusion d’événements
Les événements sont les choses qui se produisent dans notre application lorsque nous naviguons quelque chose. Par exemple, nous nous inscrivons sur n’importe quel site Web et commandons quelque chose, donc ce sont les événements.
La plate-forme de streaming d’événements enregistre différents types de données telles que les données de transaction, historiques et en temps réel. Cette plate-forme est également utilisée pour traiter les événements et permettre aux différents consommateurs de traiter les résultats immédiatement et en temps opportun.
Une plate-forme événementielle nous permet de surveiller nos activités et les données en temps réel de différents types d’appareils comme l’IoT et bien d’autres. Après analyse, il fournit une bonne expérience client en fonction de différents types d’événements et de besoins.
Présentation d’Apache Kafka
Ci-dessous, quelques puces décrivant Apache Kafka :
- Kafka est un magasin d’événements distribué et une plate-forme de traitement de flux.
- Kafka est open source et est écrit en Java et Scala.
- L’objectif principal de la fondation Kafka by Apache est de gérer les flux de données en temps réel et de fournir des plates-formes à haut débit et à faible latence.
- Kafka est une plate-forme de diffusion d’événements dotée de nombreuses fonctionnalités pour publier (écrire) et s’abonner (lire) à des flux d’événements provenant d’un système différent.
- De plus, pour stocker et traiter les événements durablement aussi longtemps que nous le souhaitons, par défaut, Kafka stocke les événements à partir de sept jours de la période, mais nous pouvons augmenter cela selon les besoins et les exigences.

- Kafka a un système distribué, qui a des serveurs et des clients qui peuvent communiquer via le protocole TCP.
- Il peut être déployé sur différentes machines virtuelles et conteneurs dans des environnements sur site et cloud selon les besoins.
- Dans le monde Kafka, un producteur envoie des messages au courtier Kafka. Les messages seront stockés dans les sujets et le consommateur s’abonne à ce sujet pour consommer les messages envoyés par le producteur.
- ZooKeeper est utilisé pour gérer les métadonnées des éléments liés à Kafka, il suit les courtiers qui font partie du cluster Kafka et les partitions de différents sujets. Enfin, il gère le statut des nœuds Kafka et maintient une liste de sujets et de messages Kafka.
Principaux concepts et fondement de Kafka
1. Événement
Un événement ou un enregistrement est le message que nous lisons et écrivons sur le serveur Kafka ; nous le faisons sous la forme d’événements dans notre monde des affaires, et il contient une clé, une valeur, un horodatage et d’autres en-têtes de métadonnées. La clé, la valeur et l’horodatage, dans ce cas, sont les suivants :
- Clé: « Jaydeep »
- Valeur: « BMW réservé »
- Horodatage de l’événement: « Déc. 11, 2022, à 12:00”
2. Producteur
Le producteur est une application cliente qui envoie des messages au nœud ou au courtier Kafka.
3. Consommateur
Le consommateur est une application qui reçoit des données de Kafka.
4. Pôle Kafka
Le cluster Kafka est l’ensemble d’ordinateurs qui partagent la charge de travail entre eux à des fins diverses.
5. Courtier
Le courtier est un serveur Kafka qui agit comme un agent entre le producteur et le consommateur, qui communiquent via le courtier.
6. Sujet
Les événements sont stockés dans le « sujet », c’est similaire à notre dossier dans lequel nous stockons plusieurs fichiers.

Chaque sujet a un ou plusieurs producteurs et consommateurs, qui écrivent et lisent les données du sujet. Les événements dans « sujet » peuvent être lus aussi souvent que nécessaire car ils conservent les événements et ce n’est pas comme un autre système de messagerie qui supprime les messages après consommation.
7. Cloisons
Les sujets sont des partitions, ce qui signifie que le sujet est réparti sur plusieurs partitions que nous avons créées à l’intérieur du sujet. Lorsque le producteur envoie un événement au sujet, il le stocke dans les partitions particulières, puis le consommateur peut lire l’événement à partir de la partition de sujet correspondante en séquence.
8. Décalage
Kafka attribue un identifiant unique au message stocké dans la partition de rubrique lorsque le message arrive du producteur.
9. Groupes de consommateurs
Dans le monde de Kafka, le groupe de consommateurs agit comme une unité logique unique.
10. Réplique
Dans Kafka, pour rendre les données tolérantes aux pannes et hautement disponibles, nous pouvons répliquer les sujets dans différentes régions et courtiers. Ainsi, en cas de problème avec les données d’un sujet, nous pouvons facilement l’obtenir d’un autre pour le reproduire.
Différentes API Kafka
Kafka a cinq API principales qui servent à des fins différentes :
- Administrateur API: Cette API gère différents sujets, courtiers et objets Kafka.
- API de producteur: Cette API est utilisée pour écrire/publier des événements sur différents sujets Kafka.
- API consommateur: Cette API permet de recevoir les différents messages correspondant aux sujets auxquels le consommateur s’est abonné.
- API de flux Kafka: Cette API est utilisée pour effectuer différents types d’opérations comme le fenêtrage, les jointures, l’agrégation et bien d’autres. Fondamentalement, son utilisation est de transformer des objets.
- API Kafka Connect: Cette API fonctionne comme un connecteur vers Kafka, ce qui aide différents systèmes à se connecter facilement à Kafka. Il dispose de différents types de connecteurs prêts à l’emploi liés à Kafka.
Cas d’utilisation d’Apache Kafka
- Messagerie
- Suivi de l’activité des utilisateurs
- Agrégation de journaux
- Traitement de flux
- Analyse des données en temps réel
Installation de Kafka sur Windows 10
Étape 1
Téléchargez et installez le SDK Java de la version 8 ou plus.
Note: J’ai Java 11, c’est pourquoi j’ai mis le même chemin dans toutes les commandes que j’ai utilisées ici.
Étape 2
Ouvrez et installez EXE.
Étape 3
Définissez la variable d’environnement pour Java à l’aide de l’invite de commande en tant qu’administrateur.
Commande:
setx -m JAVA_HOME “C:\Program Files\Java\jdk-11.0.16.1” setx -m PATH “%JAVA_HOME%\bin;%PATH%”
Étape 4
Après cela, téléchargez et installez Apache Kafka.
Étape 5
Extrayez le fichier Kafka téléchargé et renommez-le « Kafka ».
Étape 6
Ouvrir D:\Kafka\config\
et créez un dossier « zookeeper-data » et « kafka-logs » à l’intérieur.
Étape 7
Ensuite, ouvrez D:\Kafka\config\zookeeper.properties
fichier et ajoutez le chemin du dossier à l’intérieur de celui-ci :
D:\Kafka\config\zookeeper.properties
dataDir=D:/Kafka/zookeeper-data
Étape 8
Après cela, ouvrez D:\Kafka\config\server.properties
fichier et modifiez le chemin du journal là-bas :
D:\Kafka\config\server.properties
log.dirs=D:/Kafka/kafka-logs
Étape 9
Enregistre et ferme les deux fichiers.
Étape 10
Exécutez ZooKeeper :
D:\Kafka> .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
Étape 11
Démarrer Kafka :
D:\Kafka> .\bin\windows\kafka-server-start.bat .\config\server.properties

Étape 12
Créer un sujet Kafka :
D:\Kafka\bin\windows>kafka-topics.bat — create — bootstrap-server localhost:9092 — replication-factor 1 — partitions 1 — topic testdata
Étape 13
Créez un producteur et envoyez des messages après avoir démarré un producteur et un consommateur :
D:\Kafka\bin\windows>kafka-console-producer.bat — broker-list localhost:9092 — topic testdata

Étape 14
Ensuite, créez un consommateur. Après, vous verrez le message envoyé par le producteur :
D:\Kafka\bin\windows>kafka-console-consumer.bat — bootstrap-server localhost:9092 — topic testdata
Mise en œuvre étape par étape
Commençons par la mise en œuvre pratique.
Étape 1
Créez une nouvelle API Web .NET Core Producer :

Étape 2
Configurez votre application :
Étape 3
Fournissez des détails supplémentaires :

Étape 4
Installez les deux packages NuGet suivants :
Étape 5
Ajoutez des détails de configuration dans le appsettings.json
déposer:
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
},
"AllowedHosts": "*",
"producerconfiguration": {
"bootstrapservers": "localhost:9092"
},
"TopicName": "testdata"
}
Étape 6
Enregistrez quelques services dans la classe « Programme » :
using Confluent.Kafka;
var builder = WebApplication.CreateBuilder(args);
// Add services to the container.
var producerConfiguration = new ProducerConfig();
builder.Configuration.Bind("producerconfiguration", producerConfiguration);
builder.Services.AddSingleton<ProducerConfig>(producerConfiguration);
builder.Services.AddControllers();
// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
var app = builder.Build();
// Configure the HTTP request pipeline.
if (app.Environment.IsDevelopment())
{
app.UseSwagger();
app.UseSwaggerUI();
}
app.UseHttpsRedirection();
app.UseAuthorization();
app.MapControllers();
app.Run();
Étape 7
Ensuite, créez le CarDetails
classe modèle :
using Microsoft.AspNetCore.Authentication;
namespace ProducerApplication.Models
{
public class CarDetails
{
public int CarId { get; set; }
public string CarName { get; set; }
public string BookingStatus { get; set; }
}
}
Étape 8
Maintenant, créez le CarsController
classe:
using Confluent.Kafka;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Configuration;
using Newtonsoft.Json;
using ProducerApplication.Models;
namespace ProducerApplication.Controllers
{
[Route("api/[controller]")]
[ApiController]
public class CarsController : ControllerBase
{
private ProducerConfig _configuration;
private readonly IConfiguration _config;
public CarsController(ProducerConfig configuration, IConfiguration config)
{
_configuration = configuration;
_config = config;
}
[HttpPost("sendBookingDetails")]
public async Task<ActionResult> Get([FromBody] CarDetails employee)
{
string serializedData = JsonConvert.SerializeObject(employee);
var topic = _config.GetSection("TopicName").Value;
using (var producer = new ProducerBuilder<Null, string>(_configuration).Build())
{
...