Skip to main content

Message processor

Message processor provides a convenient way for building rich consumer, binding it to broker and attaching message handling logic.

The usual flow of building processor starts with initialization

val processor = MessageProcessor.init[IO]

Where the IO can be replaced with the effect of your preference. Once initialized, you can enrich the underlying consumer by subsequent enrich calls.

val richProcessor = 
processor
.enrich(_.logged)
.enrich(_.usingS3Proxy(consumerConfig))
.enrich(_.asJsonConsumer[String])

Then depending on your logic you can go either with transact if you use different effect or effectful if you want to stick to IO in our example. After that you bind the broker and provide the message handling logic. Keep in mind that you can reuse once prepared processor like in the example below.

val processor = 
MessageProcessor
.init[IO]
.enrich(_.logged)
.enrich(_.asJsonConsumer[String])
.transacted(runEffect)
.bindBroker(broker)

processor.handle(Destinations.destinationA)(MyProcessor.instanceA[AppEffect])
processor.handle(Destinations.destinationB)(MyProcessor.instanceB[AppEffect])