From 079dc4d1a7e88c17bc4a5c303d5af8d799b03de7 Mon Sep 17 00:00:00 2001 From: Vasileios Lampridis Date: Tue, 23 May 2023 22:16:28 +0100 Subject: [PATCH 1/4] #259: Added function to allow processing messages at least once --- .../src/main/scala/zio/sqs/SqsStream.scala | 55 +++++++++++++++++-- 1 file changed, 51 insertions(+), 4 deletions(-) diff --git a/zio-sqs/src/main/scala/zio/sqs/SqsStream.scala b/zio-sqs/src/main/scala/zio/sqs/SqsStream.scala index 314fe90..69ab35d 100644 --- a/zio-sqs/src/main/scala/zio/sqs/SqsStream.scala +++ b/zio-sqs/src/main/scala/zio/sqs/SqsStream.scala @@ -1,17 +1,20 @@ package zio.sqs +import zio.Schedule.WithState import zio.aws.sqs._ import zio.aws.sqs.model._ -import zio.{ RIO, ZIO } +import zio._ import zio.stream.ZStream import zio.aws.sqs.model.primitives.MessageAttributeName +import java.time.OffsetDateTime + object SqsStream { def apply( - queueUrl: String, - settings: SqsStreamSettings = SqsStreamSettings() - ): ZStream[Sqs, Throwable, Message.ReadOnly] = { + queueUrl: String, + settings: SqsStreamSettings = SqsStreamSettings() + ): ZStream[Sqs, Throwable, Message.ReadOnly] = { val request = ReceiveMessageRequest( queueUrl = queueUrl, @@ -36,4 +39,48 @@ object SqsStream { def deleteMessage(queueUrl: String, msg: Message.ReadOnly): RIO[Sqs, Unit] = zio.aws.sqs.Sqs.deleteMessage(DeleteMessageRequest(queueUrl, msg.receiptHandle.getOrElse(""))).mapError(_.toThrowable) + + /** + * Processes SQS messages from the queue using a given function and + * deletes the messages from the queue only if they processed successfully. + * + * If the message fails to be processed (=error or defect), + * it will reappear in the queue after the `visibilityTimeout` expires. + * If a DLQ is attached to the queue, the message will move into the DLQ after a set number of retries. + * + * Returns a stream original message and the outcome of the processing, to allow attaching logging or metrics. + * The returned Stream is also convenient for testing: + * exposing what was processed and allowing to stop the stream on some ocndition. + * + * @param url the URL of the SQS queue + * @param settings the settings for the stream of messages from the queue + * @param deleteRetrySchedule The schedule by which to retry deleting the message (which should be rare) + * @param process the function that takes an ObjectSummary and returns a ZIO effect + * @tparam E the error type of the ZIO effect + * @tparam A the value type of the ZIO effect + * @return a ZStream that can either fail with a Throwable or emit values of type Either[(Message.ReadOnly, Cause[E]), A] + */ + def processMessages[E, A](url: String, settings: SqsStreamSettings, + parallelism: Int = 1, + deleteRetrySchedule: Schedule[Any, Any, zio.Duration] = defaultDeleteRetryPolicy) + (process: Message.ReadOnly => ZIO[Any, E, A]): ZStream[Sqs, Throwable, Either[(Message.ReadOnly, Cause[E]), A]] = + zio.sqs.SqsStream(url, settings) + // Unordered since SQS (and queues in general) don't have ordering semantics + .mapZIOParUnordered(parallelism) { msg => + process(msg) + // Capture any errors and defects + .sandbox + .zipLeft { + SqsStream.deleteMessage(url, msg) + // Retry in case it fails to delete transiently + .retry(defaultDeleteRetryPolicy) + // Die if it fails and stop the stream, to avoid reprocessing validly processed messages + // This would be very rare and caught early in testing. + .orDie + }.mapError(err => (msg, err)) + .either // Move the error to the return value, so that it doesn't stop the stream + } + + private val defaultDeleteRetryPolicy: Schedule[Any, Any, zio.Duration] = + (Schedule.exponential(10.milliseconds) >>> Schedule.elapsed).whileOutput(_ < 30.seconds) } From cca83754101fc23aac62340e4dda7abcfbd48628 Mon Sep 17 00:00:00 2001 From: Vasileios Lampridis Date: Tue, 30 May 2023 00:05:21 +0100 Subject: [PATCH 2/4] Added tests for the method --- .../src/main/scala/zio/sqs/SqsStream.scala | 51 ++++++++++--------- .../test/scala/zio/sqs/SqsConsumerSpec.scala | 40 +++++++++++++++ .../test/scala/zio/sqs/ZioSqsMockServer.scala | 45 +++++++++++++++- 3 files changed, 110 insertions(+), 26 deletions(-) create mode 100644 zio-sqs/src/test/scala/zio/sqs/SqsConsumerSpec.scala diff --git a/zio-sqs/src/main/scala/zio/sqs/SqsStream.scala b/zio-sqs/src/main/scala/zio/sqs/SqsStream.scala index 69ab35d..fd64eae 100644 --- a/zio-sqs/src/main/scala/zio/sqs/SqsStream.scala +++ b/zio-sqs/src/main/scala/zio/sqs/SqsStream.scala @@ -1,20 +1,17 @@ package zio.sqs -import zio.Schedule.WithState import zio.aws.sqs._ import zio.aws.sqs.model._ import zio._ import zio.stream.ZStream import zio.aws.sqs.model.primitives.MessageAttributeName -import java.time.OffsetDateTime - object SqsStream { def apply( - queueUrl: String, - settings: SqsStreamSettings = SqsStreamSettings() - ): ZStream[Sqs, Throwable, Message.ReadOnly] = { + queueUrl: String, + settings: SqsStreamSettings = SqsStreamSettings() + ): ZStream[Sqs, Throwable, Message.ReadOnly] = { val request = ReceiveMessageRequest( queueUrl = queueUrl, @@ -43,6 +40,7 @@ object SqsStream { /** * Processes SQS messages from the queue using a given function and * deletes the messages from the queue only if they processed successfully. + * This ensures that no message is lost in case of an error. * * If the message fails to be processed (=error or defect), * it will reappear in the queue after the `visibilityTimeout` expires. @@ -54,33 +52,36 @@ object SqsStream { * * @param url the URL of the SQS queue * @param settings the settings for the stream of messages from the queue - * @param deleteRetrySchedule The schedule by which to retry deleting the message (which should be rare) + * @param parallelism how many messages are processed in parallel + * @param deleteRetrySchedule the schedule by which to retry deleting the message (which should be rare) * @param process the function that takes an ObjectSummary and returns a ZIO effect * @tparam E the error type of the ZIO effect * @tparam A the value type of the ZIO effect * @return a ZStream that can either fail with a Throwable or emit values of type Either[(Message.ReadOnly, Cause[E]), A] */ - def processMessages[E, A](url: String, settings: SqsStreamSettings, - parallelism: Int = 1, - deleteRetrySchedule: Schedule[Any, Any, zio.Duration] = defaultDeleteRetryPolicy) - (process: Message.ReadOnly => ZIO[Any, E, A]): ZStream[Sqs, Throwable, Either[(Message.ReadOnly, Cause[E]), A]] = - zio.sqs.SqsStream(url, settings) + def processMessages[E, A]( + url: String, + settings: SqsStreamSettings, + parallelism: Int = 1, + deleteRetrySchedule: Schedule[Any, Any, Any] = defaultDeleteRetryPolicy + )(process: Message.ReadOnly => ZIO[Any, E, A]): ZStream[Sqs, Throwable, Either[(Message.ReadOnly, Cause[E]), A]] = + zio.sqs + .SqsStream(url, settings) // Unordered since SQS (and queues in general) don't have ordering semantics .mapZIOParUnordered(parallelism) { msg => process(msg) - // Capture any errors and defects - .sandbox - .zipLeft { - SqsStream.deleteMessage(url, msg) - // Retry in case it fails to delete transiently - .retry(defaultDeleteRetryPolicy) - // Die if it fails and stop the stream, to avoid reprocessing validly processed messages - // This would be very rare and caught early in testing. - .orDie - }.mapError(err => (msg, err)) - .either // Move the error to the return value, so that it doesn't stop the stream + // Capture any errors and defects + .sandbox.zipLeft { + SqsStream + .deleteMessage(url, msg) + // Retry in case it fails to delete transiently + .retry(deleteRetrySchedule) + // Die if it fails and stop the stream, to avoid reprocessing validly processed messages + // This would be very rare and caught early in testing. + .orDie + }.mapError(err => (msg, err)).either // Move the error to the return value, so that it doesn't stop the stream } - private val defaultDeleteRetryPolicy: Schedule[Any, Any, zio.Duration] = - (Schedule.exponential(10.milliseconds) >>> Schedule.elapsed).whileOutput(_ < 30.seconds) + private val defaultDeleteRetryPolicy: Schedule[Any, Any, Any] = + Schedule.exponential(10.milliseconds) && Schedule.upTo(30.seconds) } diff --git a/zio-sqs/src/test/scala/zio/sqs/SqsConsumerSpec.scala b/zio-sqs/src/test/scala/zio/sqs/SqsConsumerSpec.scala new file mode 100644 index 0000000..dcd7016 --- /dev/null +++ b/zio-sqs/src/test/scala/zio/sqs/SqsConsumerSpec.scala @@ -0,0 +1,40 @@ +package zio.sqs +import zio.test._ +import zio._ +import zio.aws.sqs.Sqs +import zio.aws.sqs.model.SendMessageRequest +object SqsConsumerSpec extends ZIOSpecDefault { + + def produce(url: String, message: String): ZIO[Sqs, Throwable, Unit] = + Sqs.sendMessage(SendMessageRequest(url, message)).unit.mapError(_.toThrowable) + override def spec: Spec[TestEnvironment with Scope, Any] = + suite("SQS Consumer spec")( + test("retry failed messages indefinitely") { + val settings = SqsStreamSettings( + autoDelete = false, + stopWhenQueueEmpty = true, + visibilityTimeout = Some(1), + waitTimeSeconds = Some(2) // more than visibilityTimeout to allow errors to reappear + ) + for { + _ <- Utils.createQueue("test-queue") + url <- Utils.getQueueUrl("test-queue") + _ <- produce(url, "1") + _ <- produce(url, "not number1") + _ <- produce(url, "2") + messages <- SqsStream + .processMessages(url, settings)( + _.getBody.flatMap(body => ZIO.attempt(body.toInt)) + ) + .take(3 + 2) + .runCollect + successes = messages.collect { case Right(v) => v } + failures = messages.collect { case Left(v) => v } + } yield assertTrue( + successes == Chunk(1, 2), + failures.length == 3, // It is retried multiple times + failures.map(_._1.body.toOption.get).distinct.length == 1 + ) + } + ).provideLayer(MockSqsServerAndClient.layer) +} diff --git a/zio-sqs/src/test/scala/zio/sqs/ZioSqsMockServer.scala b/zio-sqs/src/test/scala/zio/sqs/ZioSqsMockServer.scala index 016032b..715d573 100644 --- a/zio-sqs/src/test/scala/zio/sqs/ZioSqsMockServer.scala +++ b/zio-sqs/src/test/scala/zio/sqs/ZioSqsMockServer.scala @@ -1,7 +1,7 @@ package zio.sqs import java.net.URI -import zio.aws.core.config.AwsConfig +import zio.aws.core.config.{ AwsConfig, CommonAwsConfig } import zio.aws.sqs.Sqs import org.elasticmq.rest.sqs.SQSRestServer import org.elasticmq.RelaxedSQSLimits @@ -9,6 +9,7 @@ import org.elasticmq.rest.sqs.TheSQSRestServerBuilder import org.elasticmq.NodeAddress import software.amazon.awssdk.auth.credentials.{ AwsBasicCredentials, StaticCredentialsProvider } import software.amazon.awssdk.regions.Region +import zio.aws.netty.NettyHttpClient import zio.{ Scope, ZIO, ZLayer } object ZioSqsMockServer extends TheSQSRestServerBuilder(None, None, "", 9324, NodeAddress(), true, RelaxedSQSLimits, "elasticmq", "000000000000", None) { @@ -31,3 +32,45 @@ object ZioSqsMockServer extends TheSQSRestServerBuilder(None, None, "", 9324, No .endpointOverride(uri) ) } + +object MockSqsServerAndClient { + + lazy val layer: ZLayer[Any, Throwable, Sqs] = + (NettyHttpClient.default ++ mockServer) >>> AwsConfig.configured() >>> zio.aws.sqs.Sqs.live + + private lazy val mockServer: ZLayer[Any, Throwable, CommonAwsConfig] = { + val dummyAwsKeys = + StaticCredentialsProvider.create(AwsBasicCredentials.create("key", "key")) + val region = Region.AP_NORTHEAST_2 + ZLayer.scoped { + for { + // Random port to allow parallel tests + port <- ZIO.randomWith(_.nextIntBetween(9000, 50000)) + serverBuilder <- ZIO.attempt( + TheSQSRestServerBuilder( + providedActorSystem = None, + providedQueueManagerActor = None, + interface = "", + port = port, + serverAddress = NodeAddress(), + generateServerAddress = true, + sqsLimits = RelaxedSQSLimits, + _awsRegion = region.toString, + _awsAccountId = "000000000000", + queueEventListener = None + ) + ) + server <- ZIO.acquireRelease( + ZIO.attempt(serverBuilder.start()) + )(server => ZIO.succeed(server.stopAndWait())) + awsConfig = CommonAwsConfig( + region = Some(region), + credentialsProvider = dummyAwsKeys, + endpointOverride = Some(new URI(s"http://localhost:${port}")), + None + ) + } yield awsConfig + } + } + +} From 88f2ba68dce8257922c48c72b355e3bee70b4213 Mon Sep 17 00:00:00 2001 From: Vasileios Lampridis Date: Tue, 30 May 2023 18:44:26 +0100 Subject: [PATCH 3/4] Added lower-level api method --- .../src/main/scala/zio/sqs/SqsStream.scala | 58 +++++++++++++------ .../test/scala/zio/sqs/SqsConsumerSpec.scala | 3 +- .../test/scala/zio/sqs/ZioSqsMockServer.scala | 13 ++++- 3 files changed, 53 insertions(+), 21 deletions(-) diff --git a/zio-sqs/src/main/scala/zio/sqs/SqsStream.scala b/zio-sqs/src/main/scala/zio/sqs/SqsStream.scala index fd64eae..261d245 100644 --- a/zio-sqs/src/main/scala/zio/sqs/SqsStream.scala +++ b/zio-sqs/src/main/scala/zio/sqs/SqsStream.scala @@ -3,7 +3,7 @@ package zio.sqs import zio.aws.sqs._ import zio.aws.sqs.model._ import zio._ -import zio.stream.ZStream +import zio.stream._ import zio.aws.sqs.model.primitives.MessageAttributeName object SqsStream { @@ -37,6 +37,36 @@ object SqsStream { def deleteMessage(queueUrl: String, msg: Message.ReadOnly): RIO[Sqs, Unit] = zio.aws.sqs.Sqs.deleteMessage(DeleteMessageRequest(queueUrl, msg.receiptHandle.getOrElse(""))).mapError(_.toThrowable) + /** + * Wraps the messages in a scoped ZIO that will delete the message from the queue only if the ZIO succeeds. + * Lower level api, e.g. when you want to batch-process messages, or expose parsed messages to your users. + * Use `processMessages` for the common case. + */ + def deletingOnSuccess( + url: String, + settings: SqsStreamSettings = SqsStreamSettings(), + deleteRetrySchedule: Schedule[Any, Any, Any] = defaultDeleteRetryPolicy + ): ZStream[Sqs, Throwable, ZIO[Scope, Nothing, Message.ReadOnly]] = + SqsStream(url, settings).mapZIO { msg: Message.ReadOnly => + ZIO.serviceWith[Sqs] { sqs => + ZIO.succeedNow(msg).withFinalizerExit { (msg, exit) => + exit match { + case Exit.Success(_) => + SqsStream + .deleteMessage(url, msg) + .provideEnvironment(ZEnvironment(sqs)) + // Retry in case it fails to delete transiently + .retry(deleteRetrySchedule) + // Die if it fails and stop the stream, to avoid reprocessing validly processed messages + // This would be very rare and caught early in testing. + .orDie + case Exit.Failure(_) => + ZIO.unit + } + } + } + } + /** * Processes SQS messages from the queue using a given function and * deletes the messages from the queue only if they processed successfully. @@ -64,22 +94,16 @@ object SqsStream { settings: SqsStreamSettings, parallelism: Int = 1, deleteRetrySchedule: Schedule[Any, Any, Any] = defaultDeleteRetryPolicy - )(process: Message.ReadOnly => ZIO[Any, E, A]): ZStream[Sqs, Throwable, Either[(Message.ReadOnly, Cause[E]), A]] = - zio.sqs - .SqsStream(url, settings) - // Unordered since SQS (and queues in general) don't have ordering semantics - .mapZIOParUnordered(parallelism) { msg => - process(msg) - // Capture any errors and defects - .sandbox.zipLeft { - SqsStream - .deleteMessage(url, msg) - // Retry in case it fails to delete transiently - .retry(deleteRetrySchedule) - // Die if it fails and stop the stream, to avoid reprocessing validly processed messages - // This would be very rare and caught early in testing. - .orDie - }.mapError(err => (msg, err)).either // Move the error to the return value, so that it doesn't stop the stream + )(process: Message.ReadOnly => ZIO[Any, E, A]): ZStream[Sqs, Throwable, Either[Cause[E], A]] = + zio.sqs.SqsStream + .deletingOnSuccess(url, settings, deleteRetrySchedule) + .mapZIOPar(parallelism) { msg => + ZIO + .scoped[Any] { + msg.flatMap(process) + } + .sandbox // Capture any errors and defects + .either // Move the error to the return value, so that it doesn't stop the stream } private val defaultDeleteRetryPolicy: Schedule[Any, Any, Any] = diff --git a/zio-sqs/src/test/scala/zio/sqs/SqsConsumerSpec.scala b/zio-sqs/src/test/scala/zio/sqs/SqsConsumerSpec.scala index dcd7016..3f86ccf 100644 --- a/zio-sqs/src/test/scala/zio/sqs/SqsConsumerSpec.scala +++ b/zio-sqs/src/test/scala/zio/sqs/SqsConsumerSpec.scala @@ -32,8 +32,7 @@ object SqsConsumerSpec extends ZIOSpecDefault { failures = messages.collect { case Left(v) => v } } yield assertTrue( successes == Chunk(1, 2), - failures.length == 3, // It is retried multiple times - failures.map(_._1.body.toOption.get).distinct.length == 1 + failures.length == 3 // It is retried multiple times ) } ).provideLayer(MockSqsServerAndClient.layer) diff --git a/zio-sqs/src/test/scala/zio/sqs/ZioSqsMockServer.scala b/zio-sqs/src/test/scala/zio/sqs/ZioSqsMockServer.scala index 715d573..626a4c6 100644 --- a/zio-sqs/src/test/scala/zio/sqs/ZioSqsMockServer.scala +++ b/zio-sqs/src/test/scala/zio/sqs/ZioSqsMockServer.scala @@ -10,7 +10,7 @@ import org.elasticmq.NodeAddress import software.amazon.awssdk.auth.credentials.{ AwsBasicCredentials, StaticCredentialsProvider } import software.amazon.awssdk.regions.Region import zio.aws.netty.NettyHttpClient -import zio.{ Scope, ZIO, ZLayer } +import zio._ object ZioSqsMockServer extends TheSQSRestServerBuilder(None, None, "", 9324, NodeAddress(), true, RelaxedSQSLimits, "elasticmq", "000000000000", None) { private val staticCredentialsProvider: StaticCredentialsProvider = @@ -38,6 +38,15 @@ object MockSqsServerAndClient { lazy val layer: ZLayer[Any, Throwable, Sqs] = (NettyHttpClient.default ++ mockServer) >>> AwsConfig.configured() >>> zio.aws.sqs.Sqs.live + private val usedPorts = Ref.unsafe.make(Set.empty[Int])(Unsafe.unsafe) + + private val getUnusedPort: ZIO[Scope, Throwable, Int] = ZIO + .randomWith(_.nextIntBetween(9000, 50000)) + .repeatUntilZIO(port => usedPorts.get.map(!_.contains(port))) + .tap(port => usedPorts.update(_ + port)) + .withFinalizer(port => usedPorts.update(_ - port)) + .timeoutFail(new Exception("Could not find unused port"))(1.seconds) + private lazy val mockServer: ZLayer[Any, Throwable, CommonAwsConfig] = { val dummyAwsKeys = StaticCredentialsProvider.create(AwsBasicCredentials.create("key", "key")) @@ -45,7 +54,7 @@ object MockSqsServerAndClient { ZLayer.scoped { for { // Random port to allow parallel tests - port <- ZIO.randomWith(_.nextIntBetween(9000, 50000)) + port <- getUnusedPort serverBuilder <- ZIO.attempt( TheSQSRestServerBuilder( providedActorSystem = None, From aa32ccce1f572ac4df1b71a5096f352baff2168f Mon Sep 17 00:00:00 2001 From: Vasileios Lampridis Date: Tue, 30 May 2023 18:51:49 +0100 Subject: [PATCH 4/4] added some example docs --- zio-sqs/src/main/scala/zio/sqs/SqsStream.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/zio-sqs/src/main/scala/zio/sqs/SqsStream.scala b/zio-sqs/src/main/scala/zio/sqs/SqsStream.scala index 261d245..cccfee8 100644 --- a/zio-sqs/src/main/scala/zio/sqs/SqsStream.scala +++ b/zio-sqs/src/main/scala/zio/sqs/SqsStream.scala @@ -39,7 +39,11 @@ object SqsStream { /** * Wraps the messages in a scoped ZIO that will delete the message from the queue only if the ZIO succeeds. - * Lower level api, e.g. when you want to batch-process messages, or expose parsed messages to your users. + * WARNING: Do NOT do .mapZIO(identity) unless the stream is short-lived, as this will only delete the messages when the stream is closed. + * Lower level api for more advanced use cases, e.g.: + * - batch-process messages, deleting all messages when the batch succeeds: .rechunk(1000).chunks.map(chunk => ZIO.foreach(chunk)(identity).parallelFinalizers) + * - expose parsed messages to your users: .map(_.flatMap(_.getBody).flatMap(parseBody)) + * * Use `processMessages` for the common case. */ def deletingOnSuccess(