Mensajeria

El sistema de mensajeria trabaja con bandejas de entrada y salida — outbox e inbox, una pareja para eventos de dominio y otra para comandos — para resolver dos problemas complementarios:

  • El outbox (lado emisor) evita el dual-write problem: cambiar el estado en la base de datos y publicar un mensaje son dos escrituras que no comparten transaccion. Publicar antes del commit puede dejar mensajes fantasma (si la TX revierte); publicar despues puede perderlos (si el proceso muere). El outbox guarda el mensaje en la misma transaccion que el cambio, y un worker asincrono lo entrega.

  • El inbox (lado consumidor) garantiza procesamiento exactly-once aunque el message broker entregue at-least-once: el mensaje se persiste al recibirlo con dedup por id (un id ya conocido se descarta), y un worker separado lo entrega al handler. Sin el inbox, un mensaje reentregado por el broker o por reintento del emisor se procesaria dos veces.

En ambos lados la regla es la misma: si una transaccion falla el mensaje no avanza, y si un worker se interrumpe el siguiente ciclo retoma desde donde quedo.

Flujo end-to-end

El mensaje atraviesa dos macro-fases asincronas separadas por el message broker: en la emision, el caso de uso lo persiste en el outbox dentro de su propia transaccion y un worker en background lo publica al broker; en el procesamiento, un listener lo recibe del broker y lo guarda en el inbox, y otro worker lo entrega al handler. Eventos y comandos siguen la misma forma con diferencias menores en la persistencia — eventos pasan por el agregado y el PersistenceAdapter; comandos van directos al CommandRegistry.

Emision

Eventos

Diagram

El caso de uso muta el agregado — que acumula los DomainEvent en una coleccion transient — y llama persistencePort.save(agregado). El PersistenceAdapter esta anotado @Transactional: dentro de esa transaccion hace el INSERT del agregado, hidrata los eventos inceptive con el id generado, y los drena al EventRegistry, que los inserta en outbox_events. Si la transaccion revierte, ningun evento sale.

El diagrama muestra el camino de creacion. Para un update el orden se invierte — el drenado al EventRegistry corre antes del save, sin paso de hidratacion porque el agregado ya tiene id — pero el comportamiento transaccional es identico.

Una vez persistidos, los eventos del outbox son tomados asincronamente por el OutboxEventsDispatcher: en cada ciclo de su polling-interval saca un lote de mensajes PENDING, los entrega al EventsPublisher que los envia al message broker, y marca el resultado en la bandeja (COMPLETED si fue exitoso; reintento hasta max-retries, despues FAILED).

Comandos

Diagram

No hay agregado donde acumular, asi que el caso de uso llama CommandRegistry.register(cmd) desde un metodo @Transactional propio. El INSERT en outbox_commands se commitea junto al resto del trabajo del caso de uso.

Una vez persistidos, los comandos del outbox son tomados asincronamente por el OutboxCommandDispatcher, que en cada ciclo de su polling-interval saca un lote PENDING, los entrega al CommandPublisher que los envia al message broker, y marca el resultado (COMPLETED o reintento hasta FAILED).

Procesamiento

Eventos

Diagram

El EventsListener recibe el mensaje del message broker y lo guarda en inbox_events via EventRegistry. La dedup por id absorbe redeliveries: un mensaje cuyo id ya existe en la bandeja se descarta sin llegar al handler.

En un ciclo posterior, el InboxEventsProcessor saca lotes de inbox_events con status=PENDING y los entrega al @EventHandler correspondiente, marcando el resultado — COMPLETED si fue exitoso, reintento hasta max-retries, despues FAILED.

Comandos

Diagram

El CommandListener recibe el mensaje del message broker y lo guarda en inbox_commands via CommandRegistry (con la misma dedup por id que eventos).

En un ciclo posterior, el InboxCommandProcessor saca lotes de inbox_commands con status=PENDING y los entrega al @CommandMapping correspondiente, marcando el resultado — COMPLETED o reintento hasta FAILED.

La implementacion concreta del transporte — bus in-proceso (default) o broker externo — esta en las secciones Default in-proceso y Reemplazar por un broker externo mas abajo.

Componentes

El mensaje atraviesa cinco etapas a lo largo de la cadena, de arriba hacia abajo:

  1. Emision — el emisor (agregado para events; caso de uso para commands) produce el mensaje.

  2. Despacho — un worker lee del outbox y empuja al broker.

  3. Transporte — el message broker entrega al consumidor.

  4. Recepcion — un listener guarda el mensaje en el inbox.

  5. Procesamiento — otro worker invoca al handler.

Diagram

Eventos y comandos siguen cadenas paralelas que convergen en el Message broker. La unica diferencia practica esta en el emisor — los eventos se drenan del agregado dentro de la TX del save; los comandos los registra el caso de uso directamente. Cada componente tiene su seccion mas abajo con su contrato.

DomainEvent<T>

Un domain event es un hecho del dominio ya ocurrido — un cambio de estado significativo de un agregado que otros componentes pueden necesitar observar. Su nombre se forma en pasado (TaskCreatedEvent, OrderShippedEvent) porque describe algo que ya paso, y por eso es inmutable: un hecho del historial no se reescribe.

Es la forma idiomatica en DDD para que un agregado comunique sus cambios sin acoplarse a sus consumidores: el agregado emisor no sabe — ni necesita saber — quien reacciona; cualquier handler interesado se suscribe al tipo concreto del evento.

En el arquetipo, el evento que tu defines es el payload T — un record propio del dominio bajo events/{dominio}/ que captura los datos del hecho. DomainEvent<T> envuelve ese payload con la metadata operacional para rutearlo, rastrearlo y administrar su ciclo de vida.

public record DomainEvent<T>(
        String id,
        String aggregateId,
        String aggregateType,
        String eventType,
        T payload,
        MessageDirection direction,
        MessageStatus status,
        Instant occurredAt,
        String correlationId,
        int retryCount) { ... }

La factory DomainEvent.create(…​) recibe un Supplier<String> para el aggregateId. Esto soporta el caso inceptive: cuando un evento se construye antes de que el agregado se persista (creacion), el id aun no existe; el PersistenceAdapter lo hidrata despues del INSERT y antes de drenarlo al outbox. La fila persistida siempre tiene aggregateId poblado.

Los metodos de transicion del record (process, complete, fail, asInbound, fromMessage) los llama la infraestructura, no el codigo de aplicacion.

Command<T>

Un command es una intencion de accion dirigida a un servicio (y un tenant) concretos: una solicitud para ejecutar algo, no un hecho ya ocurrido. Su nombre se forma en imperativo (ArchiveTaskCommand, SubmitOrderCommand) porque describe lo que se pide hacer.

A diferencia de un domain event — que comunica un hecho pasado sin destinatario — un command lleva en sus datos a quien va dirigido: targetTenantId, targetService y el sourceTenantId del emisor (este ultimo para tracing).

En el arquetipo, los commands no nacen en un agregado — el caso de uso los emite directamente como pieza de coordinacion cross-servicio o cross-tenant. El payload T es el record que tu defines con los datos de la accion; Command<T> lo envuelve con la metadata de ruteo y ciclo de vida.

public record Command<T>(
        String id,
        String commandType,
        T payload,
        int version,
        MessageDirection direction,
        MessageStatus status,
        Instant sentAt,
        UUID sourceTenantId,
        UUID targetTenantId,
        String targetService,
        int retryCount) { ... }

Dos factories: Command.to(targetTenantId, targetService, payload) para inter-tenant y Command.toSelf(targetService, payload) para intra-tenant. Ambas resuelven sourceTenantId del TenantContextHolder actual (lanzan IllegalStateException si no hay tenant en contexto).

Como en DomainEvent, los metodos de transicion del record los llama la infraestructura, no el codigo de aplicacion.

MessageStatus

MessageStatus modela el ciclo de vida de cada mensaje que pasa por el sistema de mensajeria: desde que se registra en la bandeja hasta que se completa, falla, o queda pendiente para reintento.

Persistir el estado junto con el mensaje es lo que hace al sistema resiliente. Si un worker se interrumpe, el siguiente ciclo lee el estado en disco y sabe exactamente cuales mensajes quedan por procesar. Sin esto, una caida obligaria a re-procesar todo (con duplicados) o asumir que todo fallo (perdiendo trabajo).

El ciclo es identico para DomainEvent y Command. Cuatro estados, dos terminales:

Diagram

PENDING es el estado inicial al persistir un mensaje en una bandeja. El dispatcher (outbox) o el processor (inbox) toman lotes de mensajes PENDING, invocan el publisher o el handler correspondiente, y aplican complete(success, maxRetries) para decidir el estado siguiente:

  • Si success es true, el mensaje queda en COMPLETED (terminal).

  • Si success es false y retryCount < maxRetries, el mensaje vuelve a PENDING con retryCount + 1. Sera reintentado en el proximo ciclo del worker.

  • Si success es false y se agotaron los reintentos, queda en FAILED (terminal). Ningun worker lo vuelve a tocar — requiere inspeccion manual.

fail() es el atajo que un processor puede usar para saltar directo a FAILED cuando la excepcion no es retriable (sin gastar reintentos). Toda transicion ilegal — por ejemplo, llamar process() sobre un mensaje ya COMPLETED — lanza IllegalStateException.

Mensajes "stuck". MessageBoxesHealthIndicator (Monitoreo > MessageBoxes health) clasifica como stuck los mensajes PENDING cuya lastModifiedDate es anterior al umbral stuck-threshold (5 min por default). Indica que el dispatcher esta apagado, esta crasheando, o no logra drenar al ritmo del influjo. Emite DOWN en /actuator/health.

Bandejas

Una bandeja es un buffer persistente entre el emisor de un mensaje y su consumidor. No es solo almacenamiento: es el punto de control que desacopla el momento en que se decide enviar un mensaje del momento en que la entrega se confirma, y permite resumir el trabajo despues de una caida sin perder ni duplicar mensajes.

Hay cuatro bandejas — una por cada combinacion <events|commands> x <outbox|inbox> — porque cada par tiene su propio ciclo operativo independiente: intervalos de polling, contadores de reintento, metricas, y on/off no compartidos entre tipos ni direcciones.

Todas comparten estructura: id (PK), status, direction, payload serializado a JSON, retry_count, timestamps de creacion y ultima modificacion.

  • outbox_events — eventos OUTBOUND que este servicio emitio. El PersistenceAdapter de cada agregado los drena dentro de la TX del save; OutboxEventsDispatcher los publica al message broker.

  • inbox_events — eventos INBOUND que este servicio recibio. El EventsListener los deposita aqui; InboxEventsProcessor los rutea al DomainEventHandler<T> correspondiente.

  • outbox_commands — comandos OUTBOUND emitidos. El caso de uso los inserta via CommandRegistry.register; OutboxCommandDispatcher los publica.

  • inbox_commands — comandos INBOUND recibidos. El CommandListener los deposita; InboxCommandProcessor los rutea al CommandHandler<T> correspondiente.

La PK por id del mensaje da dedup natural: cuando EventRegistry.register o CommandRegistry.register recibe un mensaje cuyo id ya existe en la bandeja destino, hace skip explicito (no lanza error, no actualiza). Esto importa especialmente en el inbox — si un mismo mensaje llega dos veces (broker, replay, reintento del lado emisor), el segundo register no inserta una segunda fila ni el handler se invoca de nuevo. El handler no necesita ser idempotente por su cuenta.

Las bandejas son la fuente local de lo que el servicio emitio y recibio — la base de datos, no el broker. Si el broker se cae o se vuelve lento, los mensajes ya registrados en una bandeja no se pierden: el outbox sigue acumulando hasta que el broker vuelva; los mensajes ya en el inbox los sigue procesando el Inbox*Processor. Esto tambien hace al outbox replayable: a diferencia del broker (que tipicamente tiene ventana de retencion limitada), la fila de outbox_events/outbox_commands queda indefinidamente como evidencia de lo que se emitio.

El endpoint /actuator/message-boxes expone contadores agregados por bandeja (pending, failed, stuck, oldestPendingAt, lastProcessedAt); ver Monitoreo > endpoint message-boxes.

Workers de bandeja

Los workers son el motor asincrono de las bandejas: cada uno corre en su propio ciclo, lee los mensajes pendientes de la bandeja que le corresponde, los entrega al publisher o handler apropiado, y persiste el resultado. Las bandejas son estado; los workers son el movimiento.

Hay cuatro — uno por bandeja — cada uno con configuracion independiente (encendido/apagado, intervalo de polling, tamano de lote, reintentos). Esto permite operar los flujos por separado: pausar el procesamiento de inbox sin pausar la emision de outbox, por ejemplo.

Todos siguen el mismo patron:

  1. Cada polling-interval, el scheduler dispara una tarea por tenant activo.

  2. La tarea intenta adquirir un lock ShedLock especifico de la bandeja y del tenant. Si ya hay otro nodo procesando esa bandeja para ese tenant, vuelve sin hacer nada.

  3. Con el lock tomado, llama al servicio dispatcher/processor.

  4. El servicio pagina sobre los mensajes PENDING (la consulta filtra por status = PENDING y por la direction correspondiente), procesa cada uno, y persiste el resultado del lote via updateAll.

  5. Libera el lock.

ShedLock coordina multiples instancias: dos pods no pueden drenar la misma bandeja del mismo tenant a la vez. Si un worker muere a mitad del ciclo, el lock expira despues de lock-at-most-for y otra instancia lo retoma. La calibracion contra el graceful shutdown vive en Decisiones de ciclo de vida > timeout vs ShedLock.

Aislamiento por tenant: el scheduler dispara una tarea por tenant activo. Un tenant con backlog grande no bloquea el procesamiento de los demas.

Reintentos: la logica vive en el worker, no en el handler. Si el handler (o el publisher) lanza una excepcion, el worker marca el mensaje como PENDING con retryCount + 1 hasta max-retries, despues FAILED. El handler no necesita capturar excepciones ni reintentar por su cuenta.

Los servicios detras de cada scheduler:

  • OutboxEventsDispatcher — saca lotes de outbox_events, invoca EventsPublisher.publish(event), y marca cada evento segun resultado.

  • InboxEventsProcessor — saca lotes de inbox_events, invoca DomainEventHandlerDispatcher.dispatch(event) (rutea al @EventHandler correspondiente), marca el resultado.

  • OutboxCommandDispatcher e InboxCommandProcessor — los equivalentes para comandos.

Los cuatro estan anotados @ConditionalOnProperty(value = "<bandeja>.enabled", havingValue = "true") — si enabled=false el bean no se registra y el scheduler simplemente no tiene a quien llamar. Apagar un procesador es una decision de configuracion sin costo en runtime.

Los parametros operativos (intervalo, batch, retries) viven en Propiedades de configuracion.

Default in-proceso

El arquetipo arranca listo para emitir y consumir mensajes sin broker externo. Por default, el rol del message broker lo cumple ApplicationEventPublisher de Spring — el mismo bus que usan los @EventListener del framework — y la mensajeria entera corre intra-JVM, sin red ni serializacion. Util para desarrollo, pruebas, y deployments donde un broker dedicado seria sobreingenieria.

Cuatro beans Spring se registran en SpringMessagingAutoConfiguration para conectar los outbox con los inbox:

  • SpringApplicationEventPublisher y SpringCommandPublisher implementan los ports EventsPublisher y CommandPublisher, delegando al bus de Spring.

  • SpringEventsListener y SpringCommandListener consumen del bus y depositan los mensajes en sus respectivos inbox via los registries.

Los cuatro estan bajo @ConditionalOnMissingBean(EventsPublisher.class) y @ConditionalOnMissingBean(CommandPublisher.class). Esa auto-configuracion es lo que permite reemplazar el default por un broker real sin tocar ningun otro componente del sistema — ver seccion siguiente.

Reemplazar por un broker externo

Para llevar el servicio a un entorno con multiples servicios y broker real (Kafka, Pub/Sub, RabbitMQ), se reemplaza el default in-proceso registrando implementaciones alternativas de los ports EventsPublisher y/o CommandPublisher. El resto del sistema — bandejas, workers, registries, handlers — sigue funcionando igual; solo cambia la pieza de transporte.

El contrato tiene dos lados:

Lado emisor: un bean que implemente EventsPublisher (y CommandPublisher si emitis comandos). Recibe el mensaje y lo envia al broker (serializar, llamar al SDK del broker, manejar acks/errores).

Lado consumidor: un componente que reciba mensajes del broker, los reconstruya como INBOUND (tipicamente con DomainEvent.fromMessage(…​) o Command.fromMessage(…​), que ya devuelven el mensaje con direction=INBOUND y status=PENDING), y los registre en el inbox via EventRegistry.register(…​) / CommandRegistry.register(…​). La dedup por id del inbox absorbe redeliveries.

Desactivacion en cascada: al registrar tu EventsPublisher, SpringMessagingAutoConfiguration apaga tanto SpringApplicationEventPublisher como SpringEventsListener via @ConditionalOnMissingBean(EventsPublisher.class). Lo mismo para CommandPublisher con su listener. Reemplazas el publisher → asumes el rol completo (emitir y recibir); el listener default no se entromete.

Para ruteo cross-servicio de comandos, app.commands.topics (ver Propiedades de configuracion) mapea targetService → topico del broker; tu publisher externo lee ese mapa para resolver a donde publicar cada comando.

Handlers

Los handlers son el lado reactivo del sistema: el codigo que el dev escribe para que algo concreto ocurra cuando un mensaje sale del inbox. Mientras los emisores producen eventos o comandos sin saber quien escucha, los handlers son los que efectivamente reaccionan a un tipo de mensaje.

Para declarar un handler, implementas la interfaz correspondiente y la anotas con el FQN del payload como value:

@EventHandler("com.example.events.task.TaskCreatedEvent")
class TaskEventHandler implements DomainEventHandler<TaskCreatedEvent> {

    @Override
    public void handle(DomainEvent<TaskCreatedEvent> event) {
        // ...
    }
}

Las anotaciones @EventHandler y @CommandMapping son meta-anotaciones de @Component, asi que el descubrimiento es por component scan — sin configuracion adicional. CommandHandler<T> y @CommandMapping son el par equivalente para comandos.

El value de la anotacion debe coincidir literalmente con eventType / commandType del mensaje — que a su vez es el FQN de la clase del payload (ver DomainEvent y Command arriba). No hay matching por tipo Java: si renombras o mueves el record del payload, actualiza tambien el value.

Un solo handler por tipo. Si dos clases declaran el mismo value, el ultimo registrado gana y los anteriores quedan inactivos (WARN en el log al startup). Para multiples reacciones al mismo mensaje, ponlas dentro del mismo handler.

Entre el inbox processor y el handler viven dos piezas de infraestructura:

  • DomainEventHandlerRegistry / CommandHandlerRegistry — mantienen un Map<String, Handler> indexado por eventType / commandType, poblado en startup escaneando los beans anotados.

  • DomainEventHandlerDispatcher / CommandHandlerDispatcher — los llama el Inbox*Processor con cada mensaje del lote; consultan el registry y delegan al handler correspondiente.

El dev no toca ni el registry ni el dispatcher — son la maquinaria que conecta el FQN del mensaje con la clase anotada.

Idempotencia: la dedup por id del inbox (ver Bandejas) garantiza que el handler se invoca a lo mas una vez por mensaje, incluso si el broker reentrega. El handler no necesita ser idempotente por su cuenta — solo dejar que las excepciones retriables propaguen para que el worker decida el reintento.

Propiedades de configuracion

Estas propiedades tunean los workers descritos en Workers de bandeja. Cuatro grupos paralelos — uno por scheduler — mas un mapeo opcional de topicos para broker externo (app.commands.topics).

Forma comun de cada grupo:

  • enabled (boolean) — habilita el bean del dispatcher/processor.

  • polling-interval (Duration) — periodo entre ciclos del scheduler.

  • polling-initial-delay (Duration) — espera antes del primer ciclo post-startup.

  • lock-at-least-for y lock-at-most-for (Duration) — ventana del lock ShedLock.

  • batch-size (int) — mensajes por pagina.

  • max-retries (int) — reintentos antes de pasar a FAILED.

app.events.outbox-processor.*

Propiedad Default Notas

enabled

true

Apagar solo para acumular eventos sin publicarlos (debugging).

polling-interval

PT5S

Duration ISO-8601.

polling-initial-delay

PT2S

lock-at-least-for

PT10S

lock-at-most-for

PT30S

Calibrado contra el timeout de graceful shutdown — ver runtime-lifecycle.

batch-size

100

max-retries

3

Tras estos reintentos por fallo del publisher, el evento va a FAILED.

app.events.inbox-processor.*

Propiedad Default Notas

enabled

false

Activar cuando haya al menos un @EventHandler.

polling-interval

PT2S

polling-initial-delay

PT10S

Mayor que el del outbox — deja que el outbox empiece a drenar antes.

lock-at-least-for

PT5S

lock-at-most-for

PT20S

batch-size

50

max-retries

3

Tras estos reintentos por excepcion del handler, el evento va a FAILED.

app.commands.outbox-dispatcher.*

Propiedad Default Notas

enabled

false

Activar cuando el modelo emita comandos.

polling-interval

PT2S

polling-initial-delay

PT10S

lock-at-least-for

PT5S

lock-at-most-for

PT20S

batch-size

50

max-retries

3

app.commands.inbox-processor.*

Mismas propiedades y defaults que outbox-dispatcher. Activar cuando haya al menos un @CommandMapping.

app.commands.topics

Map<String, String> opcional. Mapea targetService → identificador de topico/canal del broker externo. Vacio por default — en el flujo in-proceso el ruteo es por commandType, no por topico. Un starter que reemplace CommandPublisher puede leerlo para resolver el topico de salida.