Background
We were building a microservice-based application. I won’t give a detailed description, but it’s partly described in this diagram.
The microservices involved use different technologies. Some are Grails applications, some are implemented in Pharo, and the most recent one is a SpringBoot application implemented in Groovy, which uses Spring-Cloud-Stream.
Beneath the Spring-Cloud-Stream abstraction the heavy-lifting work is done by Spring-AMQP.
The good
The code required to start consuming messages from a RabbitMQ queue is really simple:
@Service
class EventListener {
/**
* Receives a new event from the queue.
* @param event the event received.
*/
@StreamListener(EventChannels.QUEUE_NAME)
void eventReceived(ApplicationPassed event) {
// event handling
}
}
Notice the ApplicationPassed
class belongs to our domain. Spring-Cloud-Stream has no way to automatically
build our own instances from what it reads from the queue. For that, it expects us to provide
a custom converter:
@Component
class MyMessageConverter
implements MessageConverter {
@Override
Object fromMessage(Message<?> message, Class<?> targetClass) {
new Gson().fromJson(new String((byte[]) message.payload, 'UTF-8'), ApplicationPassed)
}
[..]
}
We use Gson instead of JsonSlurper because our event class has structure which needs to be parsed recursively. It’s not based on maps and lists.
Spring-Cloud-Stream needs some help to know how to start consuming messages from RabbitMQ. A simple interface does the trick:
interface EventChannels {
/**
* The queue name (look for spring.cloud.stream.rabbit.bindings.input entries in application.properties):
* spring.cloud.stream.rabbit.bindings.input.destination=myqueue
*/
String QUEUE_NAME = 'input'
@Input
SubscribableChannel input()
}
After configuring the required settings in application.properties
, the only missing step is to export
the converter as a bean so Spring sees it and injects it into the list of converters used
by Spring-Cloud-Stream. Somewhere, in a class declared to provide configuration, we must provide a builder
method like:
@Bean
@StreamMessageConverter
MessageConverter buildMessageConverter() {
new MyMessageConverter()
}
For publishing messages the API is also simple. If you’re familiar with Spring-Cloud-Stream, you might be wondering if we are using a processor. We are not.
We borrow the ports and adapters approach from hexagonal architectures. The consumer adapter cannot know the publisher adapter is also Spring-Cloud-Stream’s. Actually, the fact that the adapters used are both reading from and writing to RabbitMQ, is not essential to the application logic, and could change.
Anyway, to send a message to a RabbitMQ exchange, we just needed to define a class with an output channel:
@EnableBinding(source)
@EnableAutoConfiguration
class CommandPublisher {
// This matches application.properties binding.
@Autowired
MessageChannel output
void send(UnpassApplication command) {
this.output.send(new Gson().toJson(command))
}
}
Obviously, the application.properties
needed some configuration as well. At the very least, the name
of the exchange.
spring.cloud.stream.bindings.output.destination=myexchange
The bad
Once we had everything set up, we encountered a problem when sending messages. A problem that couldn’t be solved easily. No converter or configuration setting could help us.
When you publish messages in a RabbitMQ exchange, you can define the message headers. We currently use it to provide some basic metadata information to the consumer. While you can customize it and send your own, we only declared “Hey, this message is in JSON format”. That’s the meaning of the content-type header.
So we added the configuration setting:
spring.cloud.stream.bindings.output.content-type=application/json
However, other microservices were unable to consume the message, and sent it to the Dead-Letter queue.
Why? Because they didn’t take into account the content-type header, but the actual content type.
Spring-AMQP underneath Spring-Cloud-Stream was declaring the type of the content as application/octet-stream
.
Regardless of your converter or the configuration setting, you can only customize the message headers. The actual content type is determined by the class of the message payload you try to send.
And there’re only three options: a byte array, a String, or a serialized instance.
The first one ends up as application/octet-stream
.
The second, as text/plain
. The latter, as application/x-java-serialized-object
.
That logic is defined in org.springframework.amqp.support.converter.SimpleMessageConverter, and there’s nothing you can do to customize it.
Unless you propose a solution, of course. More information in Pull Request #763.
The ugly
Spring-Cloud-Stream was easy to use. However, I found it very difficult to know which setting
in application.properties
was really meaningful. I still don’t know how to be sure which
component of the Spring ecosystem gets affected by which setting.
It forced us to change our queue naming too. If you don’t define a group for the input binding, an anonymous queue gets created each time. However, the group feature names the queue using a destination.group scheme. That’s fine, but we don’t use that convention anywhere else.
While the pull request gets evaluated, our microservice has copied the SimpleMessageConverter
class
directly into its codebase. That way, the classloader doesn’t load it when loading Spring-AMQP, since it’s
already loaded. This kind of class shadowing is not a desirable approach, but we preferred
it before changing other microservices to make the content-type header take preference over
the message content type.
In summary, Spring-Cloud-Stream was really easy to deal with in the code. When testing it at runtime, we had to do some sacrifices. But in the integration testing phase we encountered the content-type issue that took us quite some time to solve.
Credits
- Images:
- Rabbit licensed under CC0 Creative Commons.
- Rabbit licensed under CC0 Creative Commons.