Contenidos
Eventos asíncronos, Rabbitmq y protocolo AMQP
   Jan 10, 2021     6 min lectura

La verdad es que nunca había trabajado con eventos asíncronos antes. Siempre me había limitado a mandar peticiones síncronas y recibir una respuesta “inmediata”.

Me uní a un proyecto en el que se usaba este tipo de comunicación. Cuando llegué, mis compañeros hablaban de eventos, exchanges, colas, publishers, consumers…y obviamente, la cabeza me explotó 🤯. Por si fuera poco, poco tiempo después descubrí que dicho proyecto, no sólo mandaba eventos al exterior para que otros los consumieran, sino que también mandaba eventos internos que nosotros mismos consumiamos en el propio proyecto… WHAT?

Pues bien, nuestro proyecto es un monolito con renderización en backend que se comunica con un segundo backend a través de eventos. Esta gestión de eventos se realiza con Rabbitmq, el cual los recibe y enruta.

Rabbitmq

Rabbitmq es un gestor de mensajes (message broker). Recibe mensajes de un publisher y los enruta al/a los consumer/s. Es open source y compatible con muchos OS y entornos. Admite múltiples protocolos de mensajería, aunque nosotros en este proyecto en concreto utilizamos el protocolo AMQP , la versión 0.9.1.

Protocolo AMQP (Advanced Message Queueing Protocol)

Las características principales de este protocolo son los exchanges, las colas y los enrutamientos.

Este protocolo se basa en que un exchange decide* a qué cola debe enrutar el mensaje que ha recibido. Luego esa cola se enlazará con un channel que consumirá el consumer.

*Según las condiciones que nosotros elijamos, por ejemplo: el tipo de clave que contiene el mensaje.

Proceso (muy simplificado):

  • El publisher envía un mensaje a un exchange.
  • El exchange selecciona la cola a la que lo debe enviar.
  • Lo envía por la cola que luego “bindea” con un channel.
  • El consumer “recoge” el mensaje de ese channel.
![Rabbitmq690x184](upload://rs2gcAqFGgOEb4HBobAb8Xm21dU.png)

Toda la configuración de exchanges, colas, etc, está hecha (sorprendentemente, al menos para mí) en ficheros xml.

Pero a ver, ¿cómo se pone en práctica eso?

Digamos que queremos enviar un mensaje desde el publisher en el que queremos indicar al consumer que debe sincronizar (event) datos de un usuario (entity).

Estructura mensaje para este ejemplo:

  • En headers, especificamos el tipo de evento y la entidad a la que queremos aplicarle una acción.
  • Y en payload, indicamos la data que queremos que llegue al consumer.

Lo publicamos a través de un gateway:

def headers = [
                'eventType'  : 'sync',
                'entityType' : 'user',
	            ]
def payload = [ userId: user.id ]

ourGateway.send(headers, payload.toString())

(Y ahora empieza la fiesta de los xml 🥳 )

En este enlace podéis encontrar más información acerca de la sintaxis a seguir en los xml.

Pasos a seguir para establecer la comunicación entre nuestro publisher y nuestro consumer:

  1. Definimos ourGateway y le asignamos un channel que filtrará todos los mensajes que lleguen por ese gateway y los enviará a los channels que le indiquemos. Puesto que nosotros le hemos puesto en header un “eventType” enviará estos mensajes al channel “ourChannelPublisher”. (Es necesario definir cada channel/cola.)

    publisher-common.xml

     <int:gateway id="ourGateway"
                      service-interface="ourproject.Publisher"
                      default-request-channel="gatewayChannel">
     </int:gateway>
    
     <!-- Definición channel -->
     <int:channel id="gatewayChannel"/>
    
     <int:filter input-channel="gatewayChannel"
                     expression="headers.containsKey('eventType')"
                     output-channel="ourChannelPublisher"
                     discard-channel="otherChannel"/>
    
     <!-- Definición channels -->
     <int:channel id="ourChannelPublisher"/>
     <int:channel id="otherChannel"/>
    
  2. Enlazamos ese channel “ourChannelPublisher” con un exchange de salida que “bindearemos” a una cola. (Tendremos que definir tanto el exchange como la cola en un fichero de configuración. Punto 3).

    publisher-amqp.xml

     <!-- Enlazamos el channel con un exchange de salida -->
     <int-amqp:outbound-channel-adapter channel="ourChannelPublisher"
     									   exchange-name="${integration.rabbitmq.internalSyncExchangeName}"
     									   mapped-request-headers="*"
     									   amqp-template="template" />
    
     <!-- Definimos una cola a la que "*bindearemos"* ese *exchange* -->
     <rabbit:queue id="internalSyncQueue" name="${integration.rabbitmq.internalSyncQueueName}" />
    
     <!-- Lo bindeamos con fanout, que significa que los mensajes irán a todas las colas bindeadas-->
     <rabbit:fanout-exchange id="internalSyncExchange" name="${integration.rabbitmq.internalSyncExchangeName}" >
     		<rabbit:bindings>
     			<rabbit:binding queue="internalSyncQueue" />
     		</rabbit:bindings>
     </rabbit:fanout-exchange>
    
  3. Definimos la configuración tanto del exchange de salida como de la cola que hemos usado en el fichero publisher-amqp.xml.

    integrationCoreDefaultConfig.groovy

     integration {
     	activateAMQP = false
     	rabbitmq {
     		// Queues
     		internalSyncQueueName = "myApp.sync.queue"
     		// Exchanges
     		internalSyncExchangeName = "myApp.sync.exchange"
     	}
     }
    
  4. El mensaje está enviado, y se podría ver desde la interfaz gráfica de Rabbitmq en el exchange “myApp.sync.exchange”. Pero queremos también consumirlo, así que, antes de definir el consumer, necesitamos 2 ficheros xml más que configuran la conexión entre los publishers y los consumers.

    publisher-memory.xml

     <int:bridge input-channel="ourChannelPublisher" output-channel="queueOurChannelPublisher"/>
     <!-- Definimos el channel queueOurChannelPublisher  -->
     <int:channel id="queueOurChannelPublisher">
     	<int:queue/>
     </int:channel>
    

    Relacionamos “ourChannelPublisher” con “ourChannelConsumer”:

    (Usamos un service-activator. Activator indica que será un consumer que “consumirá” el mensaje. En este caso, llamaremos a una clase con un método para que reconstruya el mensaje (clase: dummyEchoServiceActivator.groovy, método: activate).

    consumer-memory.xml

     <int:service-activator input-channel="queueOurChannelPublisher" output-channel="ourChannelConsumer"
     						   ref="dummyEchoServiceActivator" method="activate">
     	<int:poller fixed-delay="2000"/>
    </int:service-activator>
    
    <!-- bean de la clase que recontruye el mensaje -->
    <bean id="dummyEchoServiceActivator" class="path.consume.DummyEchoServiceActivator"/>
    
  5. Para poder consumir el mensaje, una vez creada la conexión entre el publisher y el consumer, es necesario conectar la cola por la que se enviaban los mensajes desde el publisher al channel al que estará suscrito nuestro consumer.

    Para el consumer será mediante el “inbound-channel-adapter” (para el publisher antes hemos usado “outbound-channel-adapter”).

    consumer-amqp.xml

     <int-amqp:inbound-channel-adapter id="inputInternalSyncChannelAdapter"
     									  channel="ourChannelConsumer"
     									  queue-names="${integration.rabbitmq.internalSyncQueueName}"
     									  concurrent-consumers="${integration.rabbitmq.bulkInternalConcurrentConsumers}"
     									  mapped-request-headers="*"
     									  connection-factory="connectionFactoryConsume"
     									  auto-startup="false"/>
    
     <!-- queue-names="${integration..." es la cola por la que se enviaban los mensajes desde el publisher   -->
     <!-- ourChannelConsumer: channel al que estará suscrito nuestro consumer. -->
    
  6. Por último, enrutaremos nuestro mensaje por el channel que queremos dependiendo del valor que traiga el entityType.

    consumer-common.xml

     <int:channel id="ourChannelConsumer"/>
     <!-- Enrutamos dependiendo del valor del entityType -->
     <int:header-value-router input-channel="ourChannelConsumer" header-name="entityType"
                                  default-output-channel="rejectionChannel" resolution-required="false">
         <int:mapping value="user" channel="userSyncChannel" />
     </int:header-value-router>
    
     <!-- Definimos el channel que necesitamos para consumir el mensaje -->
     <int:channel id="userSyncChannel"/>
    
     <!-- Definimos nuestro consumer -->
     <int:service-activator input-channel="userSyncChannel" ref="userSyncSubscriber" method="handleMessagePayload">
         <int:request-handler-advice-chain>
             <ref bean="retryAdvice"/>
         </int:request-handler-advice-chain>
     </int:service-activator>
    

    El mensaje que el consumer recibirá será algo como:

     Exchange	      	      	  myApp.sync.exchange
     Routing Key
     Redelivered
     Properties
     priority:	        	  0
     delivery_mode:	  	      2
     headers:
     entityType:	      	      user
     entityType:	      	  user
     eventType:	      	      sync
     content_encoding:	      UTF-8
     content_type:	    	  text/plain
     Payload           { "userId":"50c8d773459c40fda2dd23abbf353ba1" }
    
eventType:	      	      sync
content_encoding:	      UTF-8
content_type:	    	  text/plain
Payload           { "userId":"50c8d773459c40fda2dd23abbf353ba1" }

Ahora ya está todo configurado para que la clase “userSyncSubscriber.groovy” consuma nuestro evento y realice la lógica de negocio en el método “handleMessagePayload”.

Rabbitmq tiene infinidad de propiedades y configuraciones que se pueden usar. Esto sólo ha sido una pequeña introducción…😆

Si has llegado hasta aquí, mis felicitaciones ㊗️ y sobre todo, GRACIAS.

Cualquier pregunta o sugerencia será bien recibida.

María