Skip to main content

Sender

Sender is a basic abstraction over the possibility to send single message. Its simplified definition looks like this:

trait Sender[F[_], -A] extends (A => F[Unit]) with Serializable {

/** Sends a single message.
*/
def sendOne(msg: A): F[Unit]

/** Alias for [[sendOne]]. Thanks to this, you can pass a Sender where a function type is expected.
*/
def apply(msg: A): F[Unit] = sendOne(msg)

}

As you can see Sender is basically a type of function of A => F[Unit] shape. It comes with many combinators for mapping, filtering and combining Senders, as well as Functor and Monoid instances.

Please refer to Sender.scala sources and the scaladocs.

The typical way of obtaining a Sender instance is by instantiating the Broker first.

Basic example

Here's a simple sender implementation configured to use with our localstack setup. If you want to run it locally, simply save the file somewhere and run it using scala-cli using scala-cli run filename.scala

//> using scala "2.13"
//> using lib "com.ocadotechnology::pass4s-kernel:0.3.1"
//> using lib "com.ocadotechnology::pass4s-core:0.3.1"
//> using lib "com.ocadotechnology::pass4s-high:0.3.1"
//> using lib "com.ocadotechnology::pass4s-connector-sns:0.3.1"
//> using lib "org.typelevel::log4cats-noop:2.5.0"

import cats.effect.ExitCode
import cats.effect.IO
import cats.effect.IOApp
import cats.implicits._
import com.ocadotechnology.pass4s.connectors.sns.SnsArn
import com.ocadotechnology.pass4s.connectors.sns.SnsConnector
import com.ocadotechnology.pass4s.connectors.sns.SnsDestination
import com.ocadotechnology.pass4s.core.Message
import com.ocadotechnology.pass4s.core.Source
import com.ocadotechnology.pass4s.high.Broker
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.LoggerFactory
import org.typelevel.log4cats.noop.NoOpLogger
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
import software.amazon.awssdk.regions.Region

import java.net.URI

object Producer extends IOApp {

override def run(args: List[String]): IO[ExitCode] = {
implicit val ioLogger: Logger[IO] = NoOpLogger[IO]

// Initialize credentials
val awsCredentials = AwsBasicCredentials.create("test", "AWSSECRET");
val snsDestination = SnsDestination(SnsArn("arn:aws:sns:eu-west-2:000000000000:local_sns"))
val localstackURI = new URI("http://localhost:4566")

val credentialsProvider = StaticCredentialsProvider.create(awsCredentials)

// Create connector resource using provided credentials
val snsConnector =
SnsConnector.usingLocalAwsWithDefaultAttributesProvider[IO](localstackURI, Region.EU_WEST_2, credentialsProvider)

snsConnector.use { connector => // obtain the connector resource
val broker = Broker.fromConnector(connector)

val message = Message(Message.Payload("hello world!", Map()), snsDestination)

IO.println(s"Sending message $message to $snsDestination") *>
broker.sender.sendOne(message) *> // use the sender to send one message
IO.println("Sent, exiting!").as(ExitCode.Success)
}
}
}