A lightweight RabbitMQ client for Java (built on top of rabittmq java client)
Documentation and project available at GitHub repo
https://github.com/gregbugaj/hoplin.io
To make working with RabbitMQ as simple as possible with minimum dependencies.
Minimal dependencies, simple configuration and API.
- Subscriber client
- Publisher client
- Async RPC Client
Creating simple RabbitMQ client can be done in couple different ways.
The simplest way with minimal configuration
ExchangeClient client = ExchangeClient.topic(RabbitMQOptions.from("host=localhost"))
This creates new Exchange client bound to a Topic exchange.
We can also specify which queue and which routing key we want to handle.
final RabbitMQOptions options = RabbitMQOptions.from("host=localhost");
final ExchangeClient client = ExchangeClient.topic(options, "my.exchange", "log.critical", "log.critical.*")
For complete control we can use the Exchange to Queue Binding builder.
ExchangeClient clientFromBinding(final String exchange, final String queue, final String routingKey)
{
final Binding binding = BindingBuilder
.bind(queue)
.to(new TopicExchange(exchange))
.withAutoAck(true)
.withPrefetchCount(1)
.withPublisherConfirms(true)
.with(routingKey)
.build();
return ExchangeClient.topic(options(), binding);
}
This is the most flexible method as it allows us to control all the aspect of how messages are handled.
Publishing and receiving messages is simple as well. Both methods provide number of overloaded methods to provide different levels of flexibility.
// Publish message
client.publish(new LogDetail("Msg : " + System.nanoTime()));
// Consume message
client.subscribe(LogDetail.class, msg-> log.info("Message received [{}]", msg));
Here is example that includes both the Publisher and Subscriber
public class SamePublisherConsumerExample extends BaseExample
{
private static final Logger log = LoggerFactory.getLogger(SamePublisherConsumerExample.class);
private static final String EXCHANGE = "topic_logs";
public static void main(final String... args) throws InterruptedException
{
log.info("Starting producer/consumer for exchange : {}", EXCHANGE);
final ExchangeClient client = ExchangeClient.topic(options(), EXCHANGE);
client.subscribe(LogDetail.class, SamePublisherConsumerExample::handle);
for(int i = 0; i < 5; ++i)
{
client.publish(createMessage("info"), "log.info.info");
client.publish(createMessage("debug"), "log.info.debug");
Thread.sleep(1000L);
}
}
private static void handle(final LogDetail msg)
{
log.info("Incoming msg : {}", msg);
}
private static LogDetail createMessage(final String level)
{
return new LogDetail("Msg : " + System.nanoTime(), level);
}
}