Mensajeria
El sistema de mensajeria trabaja con bandejas de entrada y salida — outbox e inbox, una pareja para eventos de dominio y otra para comandos — para resolver dos problemas complementarios:
-
El outbox (lado emisor) evita el dual-write problem: cambiar el estado en la base de datos y publicar un mensaje son dos escrituras que no comparten transaccion. Publicar antes del commit puede dejar mensajes fantasma (si la TX revierte); publicar despues puede perderlos (si el proceso muere). El outbox guarda el mensaje en la misma transaccion que el cambio, y un worker asincrono lo entrega.
-
El inbox (lado consumidor) garantiza procesamiento exactly-once aunque el message broker entregue at-least-once: el mensaje se persiste al recibirlo con dedup por id (un id ya conocido se descarta), y un worker separado lo entrega al handler. Sin el inbox, un mensaje reentregado por el broker o por reintento del emisor se procesaria dos veces.
En ambos lados la regla es la misma: si una transaccion falla el mensaje no avanza, y si un worker se interrumpe el siguiente ciclo retoma desde donde quedo.
Flujo end-to-end
El mensaje atraviesa dos macro-fases asincronas separadas por el message broker: en la emision, el caso de uso lo persiste en el outbox dentro de su propia transaccion y un worker en background lo publica al broker; en el procesamiento, un listener lo recibe del broker y lo guarda en el inbox, y otro worker lo entrega al handler. Eventos y comandos siguen la misma forma con diferencias menores en la persistencia — eventos pasan por el agregado y el PersistenceAdapter; comandos van directos al CommandRegistry.
Emision
Eventos
El caso de uso muta el agregado — que acumula los DomainEvent en una coleccion transient — y llama persistencePort.save(agregado). El PersistenceAdapter esta anotado @Transactional: dentro de esa transaccion hace el INSERT del agregado, hidrata los eventos inceptive con el id generado, y los drena al EventRegistry, que los inserta en outbox_events. Si la transaccion revierte, ningun evento sale.
El diagrama muestra el camino de creacion. Para un update el orden se invierte — el drenado al EventRegistry corre antes del save, sin paso de hidratacion porque el agregado ya tiene id — pero el comportamiento transaccional es identico.
Una vez persistidos, los eventos del outbox son tomados asincronamente por el OutboxEventsDispatcher: en cada ciclo de su polling-interval saca un lote de mensajes PENDING, los entrega al EventsPublisher que los envia al message broker, y marca el resultado en la bandeja (COMPLETED si fue exitoso; reintento hasta max-retries, despues FAILED).
Comandos
No hay agregado donde acumular, asi que el caso de uso llama CommandRegistry.register(cmd) desde un metodo @Transactional propio. El INSERT en outbox_commands se commitea junto al resto del trabajo del caso de uso.
Una vez persistidos, los comandos del outbox son tomados asincronamente por el OutboxCommandDispatcher, que en cada ciclo de su polling-interval saca un lote PENDING, los entrega al CommandPublisher que los envia al message broker, y marca el resultado (COMPLETED o reintento hasta FAILED).
Procesamiento
Eventos
El EventsListener recibe el mensaje del message broker y lo guarda en inbox_events via EventRegistry. La dedup por id absorbe redeliveries: un mensaje cuyo id ya existe en la bandeja se descarta sin llegar al handler.
En un ciclo posterior, el InboxEventsProcessor saca lotes de inbox_events con status=PENDING y los entrega al @EventHandler correspondiente, marcando el resultado — COMPLETED si fue exitoso, reintento hasta max-retries, despues FAILED.
Comandos
El CommandListener recibe el mensaje del message broker y lo guarda en inbox_commands via CommandRegistry (con la misma dedup por id que eventos).
En un ciclo posterior, el InboxCommandProcessor saca lotes de inbox_commands con status=PENDING y los entrega al @CommandMapping correspondiente, marcando el resultado — COMPLETED o reintento hasta FAILED.
La implementacion concreta del transporte — bus in-proceso (default) o broker externo — esta en las secciones Default in-proceso y Reemplazar por un broker externo mas abajo.
Componentes
El mensaje atraviesa cinco etapas a lo largo de la cadena, de arriba hacia abajo:
-
Emision — el emisor (agregado para events; caso de uso para commands) produce el mensaje.
-
Despacho — un worker lee del outbox y empuja al broker.
-
Transporte — el message broker entrega al consumidor.
-
Recepcion — un listener guarda el mensaje en el inbox.
-
Procesamiento — otro worker invoca al handler.
Eventos y comandos siguen cadenas paralelas que convergen en el Message broker. La unica diferencia practica esta en el emisor — los eventos se drenan del agregado dentro de la TX del save; los comandos los registra el caso de uso directamente. Cada componente tiene su seccion mas abajo con su contrato.
DomainEvent<T>
Un domain event es un hecho del dominio ya ocurrido — un cambio de estado significativo de un agregado que otros componentes pueden necesitar observar. Su nombre se forma en pasado (TaskCreatedEvent, OrderShippedEvent) porque describe algo que ya paso, y por eso es inmutable: un hecho del historial no se reescribe.
Es la forma idiomatica en DDD para que un agregado comunique sus cambios sin acoplarse a sus consumidores: el agregado emisor no sabe — ni necesita saber — quien reacciona; cualquier handler interesado se suscribe al tipo concreto del evento.
En el arquetipo, el evento que tu defines es el payload T — un record propio del dominio bajo events/{dominio}/ que captura los datos del hecho. DomainEvent<T> envuelve ese payload con la metadata operacional para rutearlo, rastrearlo y administrar su ciclo de vida.
public record DomainEvent<T>(
String id,
String aggregateId,
String aggregateType,
String eventType,
T payload,
MessageDirection direction,
MessageStatus status,
Instant occurredAt,
String correlationId,
int retryCount) { ... }
La factory DomainEvent.create(…) recibe un Supplier<String> para el aggregateId. Esto soporta el caso inceptive: cuando un evento se construye antes de que el agregado se persista (creacion), el id aun no existe; el PersistenceAdapter lo hidrata despues del INSERT y antes de drenarlo al outbox. La fila persistida siempre tiene aggregateId poblado.
Los metodos de transicion del record (process, complete, fail, asInbound, fromMessage) los llama la infraestructura, no el codigo de aplicacion.
Command<T>
Un command es una intencion de accion dirigida a un servicio (y un tenant) concretos: una solicitud para ejecutar algo, no un hecho ya ocurrido. Su nombre se forma en imperativo (ArchiveTaskCommand, SubmitOrderCommand) porque describe lo que se pide hacer.
A diferencia de un domain event — que comunica un hecho pasado sin destinatario — un command lleva en sus datos a quien va dirigido: targetTenantId, targetService y el sourceTenantId del emisor (este ultimo para tracing).
En el arquetipo, los commands no nacen en un agregado — el caso de uso los emite directamente como pieza de coordinacion cross-servicio o cross-tenant. El payload T es el record que tu defines con los datos de la accion; Command<T> lo envuelve con la metadata de ruteo y ciclo de vida.
public record Command<T>(
String id,
String commandType,
T payload,
int version,
MessageDirection direction,
MessageStatus status,
Instant sentAt,
UUID sourceTenantId,
UUID targetTenantId,
String targetService,
int retryCount) { ... }
Dos factories: Command.to(targetTenantId, targetService, payload) para inter-tenant y Command.toSelf(targetService, payload) para intra-tenant. Ambas resuelven sourceTenantId del TenantContextHolder actual (lanzan IllegalStateException si no hay tenant en contexto).
Como en DomainEvent, los metodos de transicion del record los llama la infraestructura, no el codigo de aplicacion.
MessageStatus
MessageStatus modela el ciclo de vida de cada mensaje que pasa por el sistema de mensajeria: desde que se registra en la bandeja hasta que se completa, falla, o queda pendiente para reintento.
Persistir el estado junto con el mensaje es lo que hace al sistema resiliente. Si un worker se interrumpe, el siguiente ciclo lee el estado en disco y sabe exactamente cuales mensajes quedan por procesar. Sin esto, una caida obligaria a re-procesar todo (con duplicados) o asumir que todo fallo (perdiendo trabajo).
El ciclo es identico para DomainEvent y Command. Cuatro estados, dos terminales:
PENDING es el estado inicial al persistir un mensaje en una bandeja. El dispatcher (outbox) o el processor (inbox) toman lotes de mensajes PENDING, invocan el publisher o el handler correspondiente, y aplican complete(success, maxRetries) para decidir el estado siguiente:
-
Si
successestrue, el mensaje queda enCOMPLETED(terminal). -
Si
successesfalseyretryCount < maxRetries, el mensaje vuelve aPENDINGconretryCount + 1. Sera reintentado en el proximo ciclo del worker. -
Si
successesfalsey se agotaron los reintentos, queda enFAILED(terminal). Ningun worker lo vuelve a tocar — requiere inspeccion manual.
fail() es el atajo que un processor puede usar para saltar directo a FAILED cuando la excepcion no es retriable (sin gastar reintentos). Toda transicion ilegal — por ejemplo, llamar process() sobre un mensaje ya COMPLETED — lanza IllegalStateException.
|
Mensajes "stuck". |
Bandejas
Una bandeja es un buffer persistente entre el emisor de un mensaje y su consumidor. No es solo almacenamiento: es el punto de control que desacopla el momento en que se decide enviar un mensaje del momento en que la entrega se confirma, y permite resumir el trabajo despues de una caida sin perder ni duplicar mensajes.
Hay cuatro bandejas — una por cada combinacion <events|commands> x <outbox|inbox> — porque cada par tiene su propio ciclo operativo independiente: intervalos de polling, contadores de reintento, metricas, y on/off no compartidos entre tipos ni direcciones.
Todas comparten estructura: id (PK), status, direction, payload serializado a JSON, retry_count, timestamps de creacion y ultima modificacion.
-
outbox_events— eventosOUTBOUNDque este servicio emitio. ElPersistenceAdapterde cada agregado los drena dentro de la TX delsave;OutboxEventsDispatcherlos publica al message broker. -
inbox_events— eventosINBOUNDque este servicio recibio. ElEventsListenerlos deposita aqui;InboxEventsProcessorlos rutea alDomainEventHandler<T>correspondiente. -
outbox_commands— comandosOUTBOUNDemitidos. El caso de uso los inserta viaCommandRegistry.register;OutboxCommandDispatcherlos publica. -
inbox_commands— comandosINBOUNDrecibidos. ElCommandListenerlos deposita;InboxCommandProcessorlos rutea alCommandHandler<T>correspondiente.
La PK por id del mensaje da dedup natural: cuando EventRegistry.register o CommandRegistry.register recibe un mensaje cuyo id ya existe en la bandeja destino, hace skip explicito (no lanza error, no actualiza). Esto importa especialmente en el inbox — si un mismo mensaje llega dos veces (broker, replay, reintento del lado emisor), el segundo register no inserta una segunda fila ni el handler se invoca de nuevo. El handler no necesita ser idempotente por su cuenta.
Las bandejas son la fuente local de lo que el servicio emitio y recibio — la base de datos, no el broker. Si el broker se cae o se vuelve lento, los mensajes ya registrados en una bandeja no se pierden: el outbox sigue acumulando hasta que el broker vuelva; los mensajes ya en el inbox los sigue procesando el Inbox*Processor. Esto tambien hace al outbox replayable: a diferencia del broker (que tipicamente tiene ventana de retencion limitada), la fila de outbox_events/outbox_commands queda indefinidamente como evidencia de lo que se emitio.
El endpoint /actuator/message-boxes expone contadores agregados por bandeja (pending, failed, stuck, oldestPendingAt, lastProcessedAt); ver Monitoreo > endpoint message-boxes.
Workers de bandeja
Los workers son el motor asincrono de las bandejas: cada uno corre en su propio ciclo, lee los mensajes pendientes de la bandeja que le corresponde, los entrega al publisher o handler apropiado, y persiste el resultado. Las bandejas son estado; los workers son el movimiento.
Hay cuatro — uno por bandeja — cada uno con configuracion independiente (encendido/apagado, intervalo de polling, tamano de lote, reintentos). Esto permite operar los flujos por separado: pausar el procesamiento de inbox sin pausar la emision de outbox, por ejemplo.
Todos siguen el mismo patron:
-
Cada
polling-interval, el scheduler dispara una tarea por tenant activo. -
La tarea intenta adquirir un lock ShedLock especifico de la bandeja y del tenant. Si ya hay otro nodo procesando esa bandeja para ese tenant, vuelve sin hacer nada.
-
Con el lock tomado, llama al servicio dispatcher/processor.
-
El servicio pagina sobre los mensajes
PENDING(la consulta filtra porstatus = PENDINGy por ladirectioncorrespondiente), procesa cada uno, y persiste el resultado del lote viaupdateAll. -
Libera el lock.
ShedLock coordina multiples instancias: dos pods no pueden drenar la misma bandeja del mismo tenant a la vez. Si un worker muere a mitad del ciclo, el lock expira despues de lock-at-most-for y otra instancia lo retoma. La calibracion contra el graceful shutdown vive en Decisiones de ciclo de vida > timeout vs ShedLock.
Aislamiento por tenant: el scheduler dispara una tarea por tenant activo. Un tenant con backlog grande no bloquea el procesamiento de los demas.
Reintentos: la logica vive en el worker, no en el handler. Si el handler (o el publisher) lanza una excepcion, el worker marca el mensaje como PENDING con retryCount + 1 hasta max-retries, despues FAILED. El handler no necesita capturar excepciones ni reintentar por su cuenta.
Los servicios detras de cada scheduler:
-
OutboxEventsDispatcher— saca lotes deoutbox_events, invocaEventsPublisher.publish(event), y marca cada evento segun resultado. -
InboxEventsProcessor— saca lotes deinbox_events, invocaDomainEventHandlerDispatcher.dispatch(event)(rutea al@EventHandlercorrespondiente), marca el resultado. -
OutboxCommandDispatchereInboxCommandProcessor— los equivalentes para comandos.
Los cuatro estan anotados @ConditionalOnProperty(value = "<bandeja>.enabled", havingValue = "true") — si enabled=false el bean no se registra y el scheduler simplemente no tiene a quien llamar. Apagar un procesador es una decision de configuracion sin costo en runtime.
Los parametros operativos (intervalo, batch, retries) viven en Propiedades de configuracion.
Default in-proceso
El arquetipo arranca listo para emitir y consumir mensajes sin broker externo. Por default, el rol del message broker lo cumple ApplicationEventPublisher de Spring — el mismo bus que usan los @EventListener del framework — y la mensajeria entera corre intra-JVM, sin red ni serializacion. Util para desarrollo, pruebas, y deployments donde un broker dedicado seria sobreingenieria.
Cuatro beans Spring se registran en SpringMessagingAutoConfiguration para conectar los outbox con los inbox:
-
SpringApplicationEventPublisherySpringCommandPublisherimplementan los portsEventsPublisheryCommandPublisher, delegando al bus de Spring. -
SpringEventsListenerySpringCommandListenerconsumen del bus y depositan los mensajes en sus respectivos inbox via los registries.
Los cuatro estan bajo @ConditionalOnMissingBean(EventsPublisher.class) y @ConditionalOnMissingBean(CommandPublisher.class). Esa auto-configuracion es lo que permite reemplazar el default por un broker real sin tocar ningun otro componente del sistema — ver seccion siguiente.
Reemplazar por un broker externo
Para llevar el servicio a un entorno con multiples servicios y broker real (Kafka, Pub/Sub, RabbitMQ), se reemplaza el default in-proceso registrando implementaciones alternativas de los ports EventsPublisher y/o CommandPublisher. El resto del sistema — bandejas, workers, registries, handlers — sigue funcionando igual; solo cambia la pieza de transporte.
El contrato tiene dos lados:
Lado emisor: un bean que implemente EventsPublisher (y CommandPublisher si emitis comandos). Recibe el mensaje y lo envia al broker (serializar, llamar al SDK del broker, manejar acks/errores).
Lado consumidor: un componente que reciba mensajes del broker, los reconstruya como INBOUND (tipicamente con DomainEvent.fromMessage(…) o Command.fromMessage(…), que ya devuelven el mensaje con direction=INBOUND y status=PENDING), y los registre en el inbox via EventRegistry.register(…) / CommandRegistry.register(…). La dedup por id del inbox absorbe redeliveries.
Desactivacion en cascada: al registrar tu EventsPublisher, SpringMessagingAutoConfiguration apaga tanto SpringApplicationEventPublisher como SpringEventsListener via @ConditionalOnMissingBean(EventsPublisher.class). Lo mismo para CommandPublisher con su listener. Reemplazas el publisher → asumes el rol completo (emitir y recibir); el listener default no se entromete.
Para ruteo cross-servicio de comandos, app.commands.topics (ver Propiedades de configuracion) mapea targetService → topico del broker; tu publisher externo lee ese mapa para resolver a donde publicar cada comando.
Handlers
Los handlers son el lado reactivo del sistema: el codigo que el dev escribe para que algo concreto ocurra cuando un mensaje sale del inbox. Mientras los emisores producen eventos o comandos sin saber quien escucha, los handlers son los que efectivamente reaccionan a un tipo de mensaje.
Para declarar un handler, implementas la interfaz correspondiente y la anotas con el FQN del payload como value:
@EventHandler("com.example.events.task.TaskCreatedEvent")
class TaskEventHandler implements DomainEventHandler<TaskCreatedEvent> {
@Override
public void handle(DomainEvent<TaskCreatedEvent> event) {
// ...
}
}
Las anotaciones @EventHandler y @CommandMapping son meta-anotaciones de @Component, asi que el descubrimiento es por component scan — sin configuracion adicional. CommandHandler<T> y @CommandMapping son el par equivalente para comandos.
El value de la anotacion debe coincidir literalmente con eventType / commandType del mensaje — que a su vez es el FQN de la clase del payload (ver DomainEvent y Command arriba). No hay matching por tipo Java: si renombras o mueves el record del payload, actualiza tambien el value.
|
Un solo handler por tipo. Si dos clases declaran el mismo |
Entre el inbox processor y el handler viven dos piezas de infraestructura:
-
DomainEventHandlerRegistry/CommandHandlerRegistry— mantienen unMap<String, Handler>indexado poreventType/commandType, poblado en startup escaneando los beans anotados. -
DomainEventHandlerDispatcher/CommandHandlerDispatcher— los llama elInbox*Processorcon cada mensaje del lote; consultan el registry y delegan al handler correspondiente.
El dev no toca ni el registry ni el dispatcher — son la maquinaria que conecta el FQN del mensaje con la clase anotada.
Idempotencia: la dedup por id del inbox (ver Bandejas) garantiza que el handler se invoca a lo mas una vez por mensaje, incluso si el broker reentrega. El handler no necesita ser idempotente por su cuenta — solo dejar que las excepciones retriables propaguen para que el worker decida el reintento.
Propiedades de configuracion
Estas propiedades tunean los workers descritos en Workers de bandeja. Cuatro grupos paralelos — uno por scheduler — mas un mapeo opcional de topicos para broker externo (app.commands.topics).
Forma comun de cada grupo:
-
enabled(boolean) — habilita el bean del dispatcher/processor. -
polling-interval(Duration) — periodo entre ciclos del scheduler. -
polling-initial-delay(Duration) — espera antes del primer ciclo post-startup. -
lock-at-least-forylock-at-most-for(Duration) — ventana del lock ShedLock. -
batch-size(int) — mensajes por pagina. -
max-retries(int) — reintentos antes de pasar aFAILED.
app.events.outbox-processor.*
| Propiedad | Default | Notas |
|---|---|---|
|
|
Apagar solo para acumular eventos sin publicarlos (debugging). |
|
|
|
|
|
|
|
|
|
|
|
Calibrado contra el timeout de graceful shutdown — ver runtime-lifecycle. |
|
|
|
|
|
Tras estos reintentos por fallo del publisher, el evento va a |
app.events.inbox-processor.*
| Propiedad | Default | Notas |
|---|---|---|
|
|
Activar cuando haya al menos un |
|
|
|
|
|
Mayor que el del outbox — deja que el outbox empiece a drenar antes. |
|
|
|
|
|
|
|
|
|
|
|
Tras estos reintentos por excepcion del handler, el evento va a |
app.commands.outbox-dispatcher.*
| Propiedad | Default | Notas |
|---|---|---|
|
|
Activar cuando el modelo emita comandos. |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|