Skip to content


Adding documentation and examples for s3 connector (#115)
Browse files Browse the repository at this point in the history
  • Loading branch information
ASRagab authored Nov 3, 2022
1 parent b0537ec commit 87cdb3d
Show file tree
Hide file tree
Showing 8 changed files with 360 additions and 14 deletions.
10 changes: 9 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ lazy val examples = project
publishArtifact := false,
moduleName := "zio-connect-examples"
.aggregate(fileConnectorExamples, s3ConnectorExamples)

lazy val fileConnectorExamples = project
Expand All @@ -120,3 +120,11 @@ lazy val fileConnectorExamples = project
scalacOptions -= "-Xfatal-warnings"

lazy val s3ConnectorExamples = project
publish / skip := true,
scalacOptions -= "-Xfatal-warnings"
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ package object s3 {
def createBucket(implicit trace: Trace): ZSink[S3Connector, S3Exception, BucketName, BucketName, Unit] =

def deleteEmptyBuckets(implicit trace: Trace): ZSink[S3Connector, S3Exception, BucketName, BucketName, Unit] =
def deleteEmptyBucket(implicit trace: Trace): ZSink[S3Connector, S3Exception, BucketName, BucketName, Unit] =

def deleteObjects(bucketName: BucketName)(implicit
Expand All @@ -33,6 +33,9 @@ package object s3 {
): ZStream[S3Connector, S3Exception, Byte] =
ZStream.serviceWithStream(_.getObject(bucketName, key))

def listBuckets(implicit trace: Trace): ZStream[S3Connector, S3Exception, BucketName] =

def listObjects(bucketName: => BucketName)(implicit trace: Trace): ZStream[S3Connector, S3Exception, ObjectKey] =

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ trait S3ConnectorSpec extends ZIOSpecDefault {
for {
_ <- ZStream.succeed(bucketName) >>> createBucket
wasCreated <- ZStream(bucketName) >>> existsBucket
_ <- ZStream.succeed(bucketName) >>> deleteEmptyBuckets
_ <- ZStream.succeed(bucketName) >>> deleteEmptyBucket
wasDeleted <- (ZStream(bucketName) >>> existsBucket).map(!_)
} yield assertTrue(wasCreated) && assertTrue(wasDeleted)
Expand All @@ -87,7 +87,7 @@ trait S3ConnectorSpec extends ZIOSpecDefault {
_ <- ZStream.succeed(bucketName) >>> createBucket
wasCreated <- ZStream(bucketName) >>> existsBucket
_ <- ZStream.fromChunk[Byte](Chunk(1, 2, 3)) >>> putObject(bucketName, ObjectKey(UUID.randomUUID().toString))
wasDeleted <- (ZStream.succeed(bucketName) >>> deleteEmptyBuckets).as(true).catchSome { case _: S3Exception =>
wasDeleted <- (ZStream.succeed(bucketName) >>> deleteEmptyBucket).as(true).catchSome { case _: S3Exception =>
} yield assertTrue(wasCreated) && assert(wasDeleted)(equalTo(false))
Expand All @@ -97,7 +97,7 @@ trait S3ConnectorSpec extends ZIOSpecDefault {
for {
wasCreated <- ZStream(bucketName) >>> existsBucket
deleteFails <-
(ZStream.succeed(bucketName) >>> deleteEmptyBuckets).as(false).catchSome { case _: S3Exception =>
(ZStream.succeed(bucketName) >>> deleteEmptyBucket).as(false).catchSome { case _: S3Exception =>
} yield assert(wasCreated)(equalTo(false)) && assertTrue(deleteFails)
Expand All @@ -107,7 +107,7 @@ trait S3ConnectorSpec extends ZIOSpecDefault {
for {
_ <- ZStream.succeed(bucketName) >>> createBucket
wasCreated <- ZStream(bucketName) >>> existsBucket
wasDeleted <- (ZStream.succeed(bucketName) >>> deleteEmptyBuckets).as(true)
wasDeleted <- (ZStream.succeed(bucketName) >>> deleteEmptyBucket).as(true)
} yield assertTrue(wasCreated) && assertTrue(wasDeleted)
Expand Down
19 changes: 12 additions & 7 deletions docs/quickstart/
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,19 @@ id: quickstart_index
title: "Quick Start"

Connectors are easy to use, and they are designed to be composable. You can use them to build pipelines that can be used to process data.
Each connector is defined as a separate module and can be used independently or in combination with other connectors.


libraryDependencies += "dev.zio" %% "zio-connect-file" % "<version>"
The following connectors are available:

`zio-connect-file` - Filesystem connector. [file-connector-examples][file-connector-examples]

`zio-connect-s3` - Amazon S3 connector uses [zio-aws-s3][zio-aws] under the hood. [s3-connector-examples][s3-connector-examples]


Runnable examples can be found in the [file-connector-examples](../../examples/file-connector-examples) directory.
220 changes: 220 additions & 0 deletions docs/quickstart/
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
id: quickstart_s3_connector
title: "S3Connector"


libraryDependencies += "dev.zio" %% "zio-connect-s3" % "<version>"

How to use it?

All available S3Connector combinators and operations are available in the package object `zio.connect.s3`, you only need to import `zio.connect.s3._`

First, you must configure the underlying S3 connection provided by `zio-aws` you can read more about how to configure it [here][zio-aws]
If you have default credentials in the system environment typically at `~/.aws/credentials` or as env variables
the following configuration will likely work.


import zio._
import zio.connect.s3._

lazy val zioAwsConfig = NettyHttpClient.default >>> AwsConfig.default

Now let's create a bucket:

val bucketName = BucketName("this-very-charming-bucket-name") // BucketName is a zio prelude newtype of String

val program1: ZIO[S3Connector, S3Exception, Unit] =
for {
_ <- ZStream(bucketName) >>> createBucket
} yield ()

The way to understand this is to recognize that `createBucket` is a `ZSink` that expects elements of type `BucketName` as its streamed input.
In this case we have a `ZStream` with a single element of type `BucketName` but we could have an arbitrary number of buckets and the code
would look and work virtually the same.

Okay, let's put some readable bytes into that bucket:

val objectKey = ObjectKey("my-object") // ObjectKey is a zio prelude newtype of String

val program2: ZIO[S3Connector, S3Exception, Unit] =
for {
content <- Random.nextString(100).map(_.getBytes).map(Chunk.fromArray)
_ <- ZStream.fromChunk(content) >>> putObject(bucketName, objectKey)
} yield ()

Here a stream of chunks of bytes are streamed into the `putObject` sink. The sink takes two arguments, the bucket name and the object key to associate with the data
being streamed in.

Let's list objects in the bucket:

val program3: ZIO[S3Connector, S3Exception, Chunk[ObjectKey]] =
for {
keys <- listObjects(bucketName).runCollect
} yield keys

`listObjects` is a `ZStream` that emits elements of type `ObjectKey` and we can use the `runCollect` operator to collect
all the elements into a `Chunk`.

Here's what it looks like to get an object put earlier:

val program5: ZIO[S3Connector, Object, String] =
for {
content <- getObject(bucketName, objectKey) >>> ZPipeline.utf8Decode >>> ZSink.mkString
} yield content

Finally, let's look at how to run one of these program:

def run = program1.provide(zioAwsConfig,, s3ConnectorLiveLayer)

You need to provide the configuration layer for `zio-aws`, the `S3` layer from `zio-aws` and the `s3ConnectorLiveLayer`
which is the live implementation of the `S3Connector` interface.

Test / Stub
A stub implementation of S3Connector is provided for testing purposes via the `TestS3Connector.layer`. It uses
internally an `TRef[Map[BucketName, S3Bucket]]` instead of talking to S3. You can use create the test harness as follows:

import zio.connect.s3._

object MyTestSpec extends ZIOSpecDefault {

override def spec =


Operators & Examples

The following operators are available:

## `copyObject`

Copy an object from one bucket to another

ZStream(CopyObject(bucket1, objectKey, bucket2)) >>> copyObject

## `createBucket`

Creates S3 buckets

ZStream(bucketName1, bucketName2) >>> createBucket

## `deleteEmptyBucket`

Deletes empty S3 buckets

ZStream(bucketName1, bucketName2) >>> deleteEmptyBucket
The buckets must be empty, if they are not you will get an `BucketsNotEmptyException` from S3

## `deleteObjects`

Deletes objects from an S3 bucket

ZStream(objectKey1, objectKey2) >>> deleteObjects(bucketName)
Does not result in an error, if object keys do not exist

## `existsBucket`

Checks if a bucket exists

ZStream(bucketName1, bucketName2) >>> existsBucket

## `existsObject`

Checks if an object exists in an s3 bucket

ZStream(objectKey1, objectKey2) >>> existsObject(bucketName)
It expects the bucket to exist and will return a `NoSuchBucketException` if the _bucket_ does not

## `getObject`

Gets an object from an S3 bucket

getObject(bucket2, objectKey) >>> ZPipeline.utf8Decode >>> ZSink.mkString
You will receive the objects as a stream of bytes, parsing/decoding of course depends on the object contents.
The example here assumes you have a stream of utf-8 encoded bytes and you want to decode them into a string.

## `listBuckets`

Lists all buckets in the account

listBuckets >>> ZSink.collectAll
Currently, gets ALL buckets, there is no pagination support yet. You may want to use some other ZStream combinators
to filter the lists prior to collecting bucket names

## `listObjects`

Lists all objects keys in a bucket takes a `BucketName` as an argument

listObjects(bucketName) >>> ZSink.collectAll
Currently, gets ALL objects in the bucket, there is no pagination support yet. You may want to use some other ZStream combinators
to filter the lists prior to collecting object keys

## `moveObject`

Move an object from one bucket to another

ZStream(MoveObject(sourceBucket, sourceKey, targetBucket, targetKey)) >>> moveObject
The `sourceBucket`, `sourceKey`, and `targetBucket` must exist. If the `targetKey` exists, it will be overwritten.

## `putObject`

Puts an object into an S3 bucket

ZStream.fromChunk(content) >>> putObject(bucketName, objectKey)
Expects as stream of bytes, returns a `Unit` if successful.
61 changes: 61 additions & 0 deletions examples/s3-connector-examples/src/main/scala/Example1.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import zio._
import zio.connect.s3.S3Connector._
import zio.connect.s3._

import java.nio.charset.StandardCharsets

object Example1 extends ZIOAppDefault {

// Please read to learn more about configuring/authenticating zio-aws
// this configuration will work provided you have default aws credentials, i.e. access key and secret key in your `.aws` directory
lazy val zioAwsConfig = NettyHttpClient.default >>> AwsConfig.default

// Program does the following:
// 1. Creates two random buckets
// 2. Puts a quote as a text file into one bucket
// 3. Copies that to another,
// 4. Lists the objects in those buckets
// 5. Gets the quote back from the second bucket
// 6. Deletes the objects in both buckets
// 7. Checks for the existence of the objects in both buckets
// 8. Deletes the buckets provided they are empty
val program: ZIO[S3Connector, Object, String] = {
for {
bucket1 <- => BucketName(s"zio-connect-s3-bucket-$uuid"))
bucket2 <- => BucketName(s"zio-connect-s3-bucket-$uuid"))
_ <- ZStream(bucket1, bucket2).run(createBucket)
buckets <- listBuckets.runCollect
objectKey = ObjectKey("quote.txt")
_ <- ZStream.fromIterable(quote.getBytes(StandardCharsets.UTF_8)).run(putObject(bucket1, objectKey))
_ <- ZStream(CopyObject(bucket1, objectKey, bucket2)).run(copyObject)
objectsPerBucket <- ZIO.foreach(buckets)(bucket => listObjects(bucket), _)))
_ <- ZIO.foreach(objectsPerBucket) { case (bucket, objects) =>
Console.printLine(s"Objects in bucket $bucket: ${objects.mkString}")
text <- getObject(bucket2, objectKey) >>> ZPipeline.utf8Decode >>> ZSink.mkString
_ <- ZIO.foreachPar(buckets)(bucket => ZStream(objectKey).run(deleteObjects(bucket)))
bucketsNonEmpty <- ZIO.foreachPar(buckets)(bucket => ZStream(objectKey).run(existsObject(bucket)))
_ <- ZStream
.when(bucketsNonEmpty.forall(_ == false))
.orElseFail(new RuntimeException("Could not delete non-empty buckets"))
} yield text

override def run: ZIO[Any with ZIOAppArgs with Scope, Object, String] =
.provide(zioAwsConfig,, s3ConnectorLiveLayer)
error => Console.printLine(s"error: ${error}"),
text => Console.printLine(s"${text} ==\n ${quote}\nis ${text == quote}")

private def quote =
"You should give up looking for lost cats and start searching for the other half of your shadow"


0 comments on commit 87cdb3d

Please sign in to comment.