Après s’être posé la question de comment notifier le front de la fin d’un traitement back, j’ai décidé de réaliser un proof of concept (POC) pour mieux comprendre le fonctionnement et les difficulté à mettre en oeuvre une communication Server-Sent Events (SSE).
Scénario
Pour que le POC soit “crédible”, il nous faut deux prérequis :
- Simuler un traitement asynchrone dans le backend
- Le front doit réagir à la fin du traitement
Pour avoir un scénario réaliste, je pars d’un front basique qui souhaite diffuser un message. Il se trouve que, par hasard, la publication prend du temps, et que pour absorber le flux, Nous avons choisi de mettre en place une file de publication pour réaliser cette tâche en asynchrone. À chaque publication d’un message, nous souhaitons notifier les utilisateurs.
sequenceDiagram participant Frontend participant API participant Service as PublicationService Frontend->>API: S'abonne aux notifications Frontend->>API: Publie un message API-)Service: Lance le traitement API-->>Frontend: Accusé de réception Note over Frontend: Clear du formulaire Note over Service: Traitement long Service-)API: Traitement terminé API-)Frontend: Notification SSE de traitement réussi Note over Frontend: Réception et affichage du message
Étapes de réalisation
Backend
Initialisation
En tant que développeur backend, je suis naturellement parti vers le composant back. L’initialisation à partir de Spring Initalizr, et c’est parti pour la génération, Spring boot 3.4, Java 21 (dernière LTS), Maven (déjà configuré). Nous ajoutons quelques modules pour se simplifier la vie :
- Spring Web, pour exposer les services
- Spring Modulith, pour tester l’architecture Hexagonal et le ApplicationEvent
- Spring Data JDBC, pour gérer les notifications transactionnelles générées par Spring Modulith
- H2 Database, pour avoir une base embarqué rapide à initialiser
- Spring Boot DevTools, permet d’avoir du LiveReload
- Lombok, injection de logger, finalement utilisation des records limite son usage
- Spring Configuration Processor, pour améliorer la compatibilité de la configuration YAML
disclaimer
Je fais l’impasse sur la sécurité pour simplifier la configuration, car l’objectif n’est pas de déployer l’application. Certaines dépendances sont discutables, la gestion “transactionnelle” des notifications interne avec une reprise en cas de redémarre n’a pas vraiment de sens avec une base de données in-memory.
Définition de l’API
J’ai besoin de deux endpoints : un premier pour envoyer un message, rien de compliqué avec Spring MVC, une classe Controller, l’annotation @RestController
et une méthode recevant du POST avec l’annotation @PostMapping
@RestController
public class MessageController {
@PostMapping("/message")
@ResponseStatus(HttpStatus.CREATED)
public CreatedMessageEvent sendMessage(@RequestBody MessageDto message) {
return new CreatedMessageEvent(UUID.randomUUID(), message.message(), message.channel(), LocalDateTime.now());
}
}
Le deuxième endpoint pour la souscription aux événements de message.
SSE se base sur le protocole HTTP, Spring nous fournit une classe SseEmitter
qui spécialise ResponseBodyEmitter pour faire du SSE.
@GetMapping("/message/event")
public SseEmitter subscribeMessageEvent() throws IOException {
SseEmitter sseEmitter = new SseEmitter();
// code pour émettre une notification
return sseEmitter;
}
Lancer le traitement du message
Lors de la réception d’un message, le contrôleur va créer un événement pour lancer le traitement. Puis répond au client que son message qu’il a reçu son message et qu’il va être traité.
Pour émettre un événement dans Spring, il suffit de publier dans ApplicationEventPublisher
:
private final ApplicationEventPublisher events;
@PostMapping
@ResponseStatus(HttpStatus.CREATED)
@Transactional
public CreatedMessageEvent sendMessage(@RequestBody MessageDto message) {
CreatedMessageEvent event = new CreatedMessageEvent(UUID.randomUUID(), message.message(), message.channel(), LocalDateTime.now());
events.publishEvent(event);
return event;
}
Nous remarquerons la présence de @Transctional
pour gérer la publication du message dans le cadre d’une notification “transactional” proposée par Spring Modulith.
Pour la réception de l’événement de “création de message”, j’ai choisi d’utiliser le listener fourni par Spring Modulith @ApplicationModuleListener
.
Cette annotation est un raccourci de plusieurs annotations :
@Async
, qui permet d’ajouter de l’asynchrone dans notre traitement@Transactional(propagation = Propagation.REQUIRES_NEW)
, ouvre une nouvelle transaction pour ne pas perturber la transaction d’origine@TransactionalEventListener
, qui surcharge@EventListener
qui crée le endpoint de l’événement Et voilà notre méthode pour recevoir un événement de typeCreatedMessageEvent
:
@ApplicationModuleListener
void publishMessage(CreatedMessageEvent messageEvent) {
// Traitement de l'event
}
Nous avons besoin d’une base de données pour stocker les événements générés par l’application. J’ai choisi une base H2 pour simplifier la configuration et le lancement de l’application.
Et maintenant, nous pouvons spécifier dans le fichier application.yml
notre configuration :
spring:
datasource:
url: jdbc:h2:mem:messasync
driverClassName: org.h2.Driver
modulith.events.jdbc.schema-initialization.enabled: true
Notifier le front
Le front s’abonne au notification via le endpoint défini, et lorsque nous avons traité notre message, nous publions un événement à tous les fronts qui le souhaite. Nous pouvons voir l’utilisation d’un pattern Observer sera adapté. Pour le créer, nous ajoutons une liste d’observateurs dans notre service de publication. Nous utilisons une liste concurrente pour ne pas avoir de conflit :
private final List<SseEmitter> observers = new CopyOnWriteArrayList<>();
Une méthode pour souscrire au événement, cette méthode sera appelée après la création du SseEmitter
:
public void subscribe(SseEmitter sseEmitter) {
observers.add(sseEmitter);
sseEmitter.onCompletion(() -> {
observers.remove(sseEmitter);
log.debug("Removed completed sseEmitter {}", sseEmitter);
});
sseEmitter.onTimeout(() -> {
observers.remove(sseEmitter);
log.debug("Removed timeout sseEmitter {}", sseEmitter);
});
sseEmitter.onError((e) -> {
observers.remove(sseEmitter);
log.debug("remove error sseEmitter {}", sseEmitter, e);
});
}
Lors de la souscription, nous ajoutons les méthodes de nettoyage lorsque la requête http est fermée.
Et dans la méthode de publication de message, nous allons notifier tous nos observers du nouveau message :
@ApplicationModuleListener
void publishMessage(CreatedMessageEvent messageEvent) {
MessageEvent message = messageProcessingService.processMessage(convertEvent(messageEvent));
observers.forEach(sseEmitter -> {
try {
sseEmitter.send(SseEmitter.event()
.id(String.valueOf(message.id()))
.name("createdMessage")
.data(message, MediaType.APPLICATION_JSON));
} catch (Exception e) {
sseEmitter.completeWithError(e);
}
});
}
Frontend
Côté Frontend, je vais rester succinct : pour lire les événements envoyés par le backend, nous pouvons utiliser l’interface EventSource malgré ses limites sur le nombre de connexions simultanées en HTTP/1.1, cette interface est déjà très puissante.
Pour fonctionner avec une application Angular, nous encapsulons cette EventSource
dans un Observable
.
export class MessageService {
listenMessage(): Observable<MessageResponse> {
return new Observable<MessageResponse>(observer => {
const eventSource = new EventSource('http://localhost:8080/message/event');
eventSource.addEventListener("createdMessage", (event) => {
let createdMessage = JSON.parse(event.data) as MessageResponse;
observer.next(createdMessage);
});
eventSource.onerror = (event) => {
console.log("Received error", event);
}
return () => {
eventSource.close();
}
}
)
}
}
Lorsque le composant doit écouter des événements, il appelle le service listenMessage
, qui retourne un Observable
.
Si nous ne souhaitons plus écouter les événements ou si le composant est détruit, nous marquons l’Observable
comme complété, ce qui déclenche la fermeture de l’EventSource
et clôt ainsi la connexion entre le frontend et le backend.
Dans notre cas, nous ne faisons rien en cas d’erreur pour ne pas arrêter l’Observable
.
Il serait intéressant d’émettre un message d’erreur pour informer l’utilisateur du problème.
En cas d’interruption, comme une coupure réseau, un redémarrage de l’application ou un timeout, EventSource
se charge de rouvrir la connexion.
Problèmes rencontrés
Erreur 503
Pas défaut, SseEmitter
configure son timeout à 30 secondes, et quand je n’envoie pas de message, je reçois aucune données.
Et l’interprétation HTTP quand aucune données n’a été envoyée est une réponse 503:
{
"timestamp": "2025-04-18T13:22:02.325+00:00",
"status": 503,
"error": "Service Unavailable",
"message": "No message available",
"path": "/message/event"
}
Et cette réponse 503 interrompt la re-connexion de l’EventSource
et finalement, quand nous décidons d’envoyer un message, nous ne recevrons pas la réponse.
La première astuce est d’envoyer un premier événement pour signifier que le backend est en vie.
Pour ça, j’emmet un message de type heartbeat
pour mieux identifier les messages de “ping” des messages qui contient de la vraie donnée.
J’ai donc ajouté cette ligne dans la méthode subscribeMessageEvent
de mon controller
sseEmitter.send(SseEmitter.event().name("heartbeat"));
Nous aurions pu imaginer aussi envoyer les précédents messages pour servir de “rattrapage”. Mais l’architecture actuelle mémorise aucun message.
Vérification des observers
Pour éviter de ré-ouvrir la connexion tous les 30 secondes, SseEmitter
accepte de personnaliser le timeout lors de la phase d’initialisation.
Dans la méthode subscribeMessageEvent
de mon controller, j’ai ajouté un timeout de 10 minutes.
Seulement en 10 minutes, nous pouvons cumuler beaucoup de connexion qui ne sont plus en vie.
Et tant que nous n’envions pas de message, il n’y a pas de ménage réalisé.
Donc il faudrait faire du ménage de temps en temps pour enlever les connexions fantômes, et limiter le nombre d’erreur lors de la publication d’un message. Pour faire ça, j’ai ajouté une tâche planifier toutes les 30 secondes qui envoie un “ping” à chaque observer.
@Scheduled(fixedDelay = 30, timeUnit = TimeUnit.SECONDS)
public void heartbeat() {
observers.forEach(sseEmitter -> {
try {
sseEmitter.send(SseEmitter.event().name("heartbeat"));
} catch (IOException e) {
sseEmitter.completeWithError(e);
} catch (IllegalStateException e) {
log.debug("emitter already closed");
observers.remove(sseEmitter);
}
});
}
Cette option est discutable, ça ajoute du bruit sur le réseau et du traitement à intervalle régulier qui n’est pas nécessaire. Cette ajout permettrait de supprimer le 1er ping à l’ouverture de la connexion.
Évolution
Ajouter un filtre dans la souscription pour recevoir que les messages que d’un canal spécifique parmi un liste définie.
Sources :