L’une des distinctions les plus importantes à garder à l’esprit lorsque vous travaillez avec Project Reactor ou toute autre implémentation de flux réactifs est la différence entre temps de montage vs temps d’abonnement dans l’exécution du code.
Quel que soit votre niveau d’expérience avec la programmation réactive, il y a de fortes chances que vous ayez déjà rencontré le célèbre :
Rien ne se passe jusqu’à ce que vous vous abonnez
Autrement dit, les éditeurs réactifs (Flux
et Mono
) sont paresseux, donc aucun élément ne sera publié ou traité jusqu’à ce que quelqu’un s’abonne. Comprendre cette distinction est essentiel car, lors de l’écriture d’applications réelles, nous souhaitons que toute (ou la plupart) de notre logique métier soit exécutée au moment de l’abonnement. Dans cet article, nous montrerons quels types de problèmes peuvent survenir en cas de non-respect de cette règle et comment les atténuer.
Exemple de service de location de voiture
Pour illustrer cela, nous utiliserons une implémentation très simple d’un service de location de voiture fictif. Le service accepte l’entrée consistant en le nom, l’âge et l’adresse e-mail d’un client ainsi qu’un modèle de voiture. Il vérifie d’abord si le client a plus de 18 ans (et donc légalement autorisé à louer une voiture), après quoi il enregistre la demande de location dans une base de données et génère enfin un reçu PDF et l’envoie par e-mail au client.
Ce flux est mis en œuvre par le rentCar
méthode:
private static Mono<UUID> rentCar(CarRentalRequest request) {
if (request.getCustomerAge() > 18) {
UUID rentalId = UUID.randomUUID(); // Generate an ID for the new rental
return saveCarRental(rentalId, request) // Save the rental entity to the database
.then(buildAndSendPdfReceipt(rentalId, request)) // Generate and send PDF report
.then(Mono.just(rentalId)); // Return the ID of the new rental
} else {
return Mono.error(new RuntimeException("Must be 18 to rent a car"));
}
}
private static Mono<Void> buildAndSendPdfReceipt(UUID rentalId, CarRentalRequest carRentalRequest) {
byte[] pdfReceipt = buildPdfReceipt(rentalId, carRentalRequest);
return sendPdfReceipt(pdfReceipt, carRentalRequest.getCustomerEmail());
}
Nous pouvons ensuite appeler cette méthode pour créer l’éditeur. De plus, nous voulons nous assurer que nous déléguons le travail à un planificateur distinct afin que le thread principal puisse procéder au traitement d’autres demandes. Nous pouvons y parvenir en utilisant le subscribeOn
opérateur (qui modifie le contexte d’exécution dans l’ensemble du pipeline, à la fois au-dessus et au-dessous, de sorte que le premier éditeur produira les éléments sur le Scheduler
défini par cet opérateur). Enfin, nous fournissons un abonné qui définit la logique à exécuter pour un succès ainsi qu’une réponse d’erreur (les deux arguments lambda dans le subscribe()
méthode, respectivement).
CarRentalRequest request = new CarRentalRequest("Alice", 30, "Hyundai i30", "alice@mail.com");
rentCar(request)
.subscribeOn(Schedulers.boundedElastic())
.subscribe(s -> log.info("Car rented successfully, rental ID: {}", s),
e -> log.error("Could not rent car: {}", e.getMessage(), e));
Avec cette implémentation à l’esprit, examinons de plus près le premier problème.
Piège 1 : Contexte d’exécution incorrect
En regardant de près le buildAndSendPdfReceipt
méthode, on peut facilement deviner que buildPdfReceipt
est une méthode synchrone et non réactive : elle ne renvoie aucun type réactif, elle renvoie simplement un byte[]
représentant le document PDF. Cette méthode pourrait ressembler à
private static byte[] buildPdfReceipt(UUID rentalId, CarRentalRequest request) {
log.info("Build PDF receipt");
// Create and return the PDF receipt document
...
}
Cependant, si nous exécutons cet exemple, nous obtenons la sortie suivante :
21:25:38.961 [main] INFO com.reactordemo.carrental.CarRentalService - Build PDF receipt
21:25:38.986 [boundedElastic-1] INFO com.reactordemo.carrental.CarRentalService - Car rented successfully, rental ID: d5b689dd-fa91-486c-b835-44bc2583d53a
Si nous prêtons attention à la section des journaux montrant le thread actuel pour chaque instruction (entre crochets), nous remarquons que la logique de l’abonné est correctement exécutée sur un thread dans le planificateur élastique borné – boundedElastic-1
. Cependant, le travail de création du PDF semble être exécuté sur le main
fil! Alors pourquoi est-ce le cas ?
La réponse réside dans la distinction précitée entre montage et abonnement. Jetons un autre regard sur le buildAndSendPdfReceipt
méthode:
private static Mono<Void> buildAndSendPdfReceipt(UUID rentalId, CarRentalRequest carRentalRequest) {
byte[] pdfReceipt = buildPdfReceipt(rentalId, carRentalRequest);
return sendPdfReceipt(pdfReceipt, carRentalRequest.getCustomerEmail());
}
Lorsque cette méthode est exécutée, nous sommes simplement censés assembler le pipeline réactif, c’est-à-dire définir de manière déclarative les étapes à exécuter pour créer le rapport PDF. À ce stade, nous ne sommes pas censés effectuer le travail réel de génération de ce rapport, ce qui n’est censé se produire que lorsque quelqu’un s’abonne à cet éditeur. Ce n’est malheureusement pas le cas ici – l’appel à buildPdfReceipt
est fait dans le corps de la méthode, avec le reste du code assembleur. L’une des conséquences très malheureuses de ceci est le contexte d’exécution incorrect que nous avons vu ci-dessus. L’ensemble du pipeline est assemblé sur le main
thread tandis que les éléments publiés sont traités sur le boundedElastic
planificateur. Mais depuis l’appel à buildPdfReceipt
est faite au moment de l’assemblage, cette opération vraisemblablement longue se déroulera désormais sur le main
fil aussi. Cela peut être dangereux car dans de nombreux scénarios de la vie réelle, nous avons tendance à avoir plus de threads/ressources dédiés au traitement des pipelines réactifs qu’à leur assemblage, et garder le thread d’assemblage occupé peut avoir un impact négatif sur le débit global et les performances de notre application.
Une façon de résoudre ce problème est d’utiliser le fromCallable
méthode comme ci-dessous:
private static Mono<Void> buildAndSendPdfReceipt(UUID rentalId, CarRentalRequest carRentalRequest) {
return Mono.fromCallable(() -> buildPdfReceipt(rentalId, carRentalRequest))
.flatMap(pdfReceipt -> sendPdfReceipt(pdfReceipt, carRentalRequest.getCustomerEmail()));
}
Comme nous le savons, l’éditeur ne commencera à produire les éléments que lorsque quelqu’un s’abonnera (c’est-à-dire au moment de l’abonnement), donc l’appel à buildPdfReceipt
est maintenant fait dans le cadre du pipeline global, sur le planificateur souhaité. En effet, relancer l’application produit le résultat suivant :
21:54:49.955 [boundedElastic-1] INFO com.reactordemo.carrental.CarRentalService - Build PDF receipt
21:54:49.956 [boundedElastic-1] INFO com.reactordemo.carrental.CarRentalService - Car rented successfully, rental ID: a3bb873e-4943-407a-967f-9fa1c1d0d235
Dans de nombreuses applications complexes de la vie réelle, ce type de problème peut être difficile à détecter. Un bon moyen de les éviter est de s’assurer que les méthodes réactives (c’est-à-dire les méthodes assemblant des pipelines, qui ont généralement un type de retour réactif) n’appellent pas directement des méthodes non réactives. Au contraire, ils doivent simplement assembler les pipelines réactifs, de préférence dans une seule instruction fluide, et tous les appels aux méthodes non réactives doivent être effectués à partir des opérateurs réactifs (fromCallable
, fromRunnable
, map
, filter
, etc.).
Piège 2 : Gestion incorrecte des exceptions
Lors de la conception et de la mise en œuvre de tout type d’application, nous voulons toujours nous assurer que nous pouvons gérer les erreurs avec élégance, soit en essayant de récupérer, soit en présentant à l’utilisateur un message d’erreur approprié. Dans notre service de location de voiture simple, nous créons un abonné avec un gestionnaire d’erreurs lambda qui enregistre l’erreur en amont. On s’attend à ce que toute erreur pouvant se produire n’importe où dans le pipeline se traduise par une instruction de journal décrivant le problème.
Pour tester cela, considérons l’entrée suivante :
CarRentalRequest request = new CarRentalRequest("Bob", null, "Hyundai i30", "bob@mail.com")
Notez que dans ce cas, l’âge du client est incorrectement défini sur null
. Même ainsi, nous nous attendrions à ce que toute erreur que cela puisse causer soit correctement interceptée et enregistrée. Malheureusement, l’exécution de ce code produit maintenant la sortie suivante :
Exception in thread "main" java.lang.NullPointerException: Cannot invoke "java.lang.Integer.intValue()" because the return value of "com.reactordemo.carrental.CarRentalService$CarRentalRequest.getCustomerAge()" is null
at com.reactordemo.carrental.CarRentalService.rentCar(CarRentalService.java:27)
at com.reactordemo.carrental.CarRentalService.entryPoint(CarRentalService.java:19)
at com.reactordemo.carrental.ReactorDemoApplication.main(ReactorDemoApplication.java:10)
Cela montre que notre entrée invalide a produit un NPE qui n’a été détecté nulle part. Mais pourquoi? Pourquoi notre gestionnaire d’erreurs n’a-t-il pas été invoqué pour cette exception ? Pour comprendre cela, jetons un autre regard sur notre principal pipeline réactif :
private static Mono<UUID> rentCar(CarRentalRequest request) {
if (request.getCustomerAge() > 18) {
UUID rentalId = UUID.randomUUID();
return saveCarRental(rentalId, request)
.then(buildAndSendPdfReceipt(rentalId, request))
.then(Mono.just(rentalId));
} else {
return Mono.error(new RuntimeException("Must be 18 to rent a car"));
}
}
Il est clair que l’exception se produit dans le if
condition de l’instruction, où nous vérifions si l’âge est supérieur à 18. Mais notez que cette vérification ne se produit pas correctement dans le cadre de l’exécution du pipeline. Au lieu de cela, le contrôle est effectué dans le cadre de l’assemblage du pipeline. Par conséquent, toute erreur qui se produit ici ne sera pas considérée comme un échec de traitement d’un élément dans le pipeline, mais plutôt comme un échec d’assemblage du pipeline, pour commencer. Encore une fois, ce problème aurait pu être évité en définissant simplement toute la logique spécifique au traitement des éléments (y compris la vérification) à l’intérieur du pipeline réactif.
private static Mono<UUID> rentCar(CarRentalRequest request) {
return Mono.just(request)
.<CarRentalRequest>handle((req, sink) -> {
if (req.getCustomerAge() > 18) {
sink.next(req);
} else {
sink.error(new RuntimeException("Must be 18 to rent a car"));
}
})
.flatMap(req -> {
UUID rentalId = UUID.randomUUID();
return saveCarRental(rentalId, req)
.then(buildAndSendPdfReceipt(rentalId, req))
.then(Mono.just(rentalId));
});
}
Dans la mise en œuvre initiale, il y avait deux fonctionnalités liées au traitement de la demande qui était exécutée au moment de l’assemblage : la vérification de l’âge et la génération d’ID. Nous les avons maintenant déplacés tous les deux à l’intérieur du pipeline, dans le handle
et flatMap
opérateurs, respectivement. Après avoir appliqué ce correctif, l’exécution produit la sortie suivante :
12:48:46.627 [boundedElastic-1] ERROR com.reactordemo.carrental.CarRentalService - Could not rent car: Cannot invoke "java.lang.Integer.intValue()" because the return value of "com.reactordemo.carrental.CarRentalService$CarRentalRequest.getCustomerAge()" is null
java.lang.NullPointerException: Cannot invoke "java.lang.Integer.intValue()" because the return value of "com.reactordemo.carrental.CarRentalService$CarRentalRequest.getCustomerAge()" is null
at com.reactordemo.carrental.CarRentalService.lambda$rentCarFixed$2(CarRentalService.java:40)
Bien sûr, il n’est pas idéal qu’un NPE soit jeté…