diff --git a/message-fanout/commons/api.w b/message-fanout/commons/api.w index 1d3e6b09..e68a9ff3 100644 --- a/message-fanout/commons/api.w +++ b/message-fanout/commons/api.w @@ -1,3 +1,9 @@ +bring cloud; + +pub struct MessageFanoutProps extends cloud.QueueProps { + name: str; +} + /** * A cloud message fanout interface */ @@ -10,5 +16,5 @@ pub interface IMessageFanout extends std.IResource { /** * Create a new consumer for the fanout */ - addConsumer(name: str, handler: inflight(str): void): void; + addConsumer(handler: inflight(str): void, props: MessageFanoutProps): void; } diff --git a/message-fanout/inflight/publish.aws.js b/message-fanout/inflight/publish.aws.js index f75f07b1..3c015f8c 100644 --- a/message-fanout/inflight/publish.aws.js +++ b/message-fanout/inflight/publish.aws.js @@ -12,4 +12,4 @@ export const _publish = async function (topicArn, message) { }); await client.send(command); -} \ No newline at end of file +} diff --git a/message-fanout/lib.test.w b/message-fanout/lib.test.w index efed3724..73ede286 100644 --- a/message-fanout/lib.test.w +++ b/message-fanout/lib.test.w @@ -13,17 +13,17 @@ let table = new ex.Table( } ); -fanout.addConsumer("first", inflight (event: str) => { +fanout.addConsumer(inflight (event: str) => { let obj = Json.parse(event); let msg = "first_publisher_" + obj.get("Message").asStr(); table.insert("first", { message: msg }); -}); +}, name: "first"); -fanout.addConsumer("second", inflight (event: str) => { +fanout.addConsumer(inflight (event: str) => { let obj = Json.parse(event); let msg = "second_publisher_" + obj.get("Message").asStr(); table.insert("second", { message: msg }); -}); +}, name: "second"); let target = util.env("WING_TARGET"); if target == "tf-aws" { diff --git a/message-fanout/lib.w b/message-fanout/lib.w index 2d7939e1..c7c06b01 100644 --- a/message-fanout/lib.w +++ b/message-fanout/lib.w @@ -13,8 +13,8 @@ pub class MessageFanout impl api.IMessageFanout { } } - pub addConsumer(name: str, handler: inflight(str): void): void { - this.inner.addConsumer(name, handler); + pub addConsumer(handler: inflight(str): void, props: api.MessageFanoutProps): void { + this.inner.addConsumer(handler, props); } pub inflight publish(message: str): void { diff --git a/message-fanout/platform/sim.w b/message-fanout/platform/sim.w new file mode 100644 index 00000000..f3d3752d --- /dev/null +++ b/message-fanout/platform/sim.w @@ -0,0 +1,13 @@ +bring cloud; +bring "../commons/api.w" as api; + +pub class MessageFanout_sim impl api.IMessageFanout { + topic: cloud.Topic; + + new() { + this.topic = new cloud.Topic(); + } + + pub addConsumer(handler: inflight(str): void, props: api.MessageFanoutProps): void {} + pub inflight publish(message: str) {} +} \ No newline at end of file diff --git a/message-fanout/platform/tf-aws.w b/message-fanout/platform/tf-aws.w index 55006ab5..10b0e3f7 100644 --- a/message-fanout/platform/tf-aws.w +++ b/message-fanout/platform/tf-aws.w @@ -26,25 +26,27 @@ pub class MessageFanout_tfaws impl api.IMessageFanout { this.queueList = MutArray[]; } - pub addConsumer(name: str, handler: inflight(str): void): void { + pub addConsumer(handler: inflight(str): void, props: api.MessageFanoutProps): void { let my_function = new cloud.Function(inflight(event: str): str? => { let json: Json = unsafeCast(event); let sqsEvent = SqsEvent.fromJson(event); for message in sqsEvent.Records { handler(message.body); } - }) as name; + }) as props.name; let queue = new tfaws.sqsQueue.SqsQueue( - visibilityTimeoutSeconds: duration.fromSeconds(120).seconds, - messageRetentionSeconds: duration.fromHours(1).seconds, - ) as "queue_" + name; + visibilityTimeoutSeconds: props?.timeout?.seconds + ?? duration.fromSeconds(120).seconds, + messageRetentionSeconds: props?.retentionPeriod?.seconds + ?? duration.fromHours(1).seconds, + ) as "queue_" + props.name; let subscription = new tfaws.snsTopicSubscription.SnsTopicSubscription( topicArn: this.topicArn, endpoint: queue.arn, protocol: "sqs", - ) as "subscription_" + name; + ) as "subscription_" + props.name; let queue_policy = new tfaws.dataAwsIamPolicyDocument.DataAwsIamPolicyDocument( statement: [ @@ -65,12 +67,12 @@ pub class MessageFanout_tfaws impl api.IMessageFanout { ] } ] - ) as "policy_" + name; + ) as "policy_" + props.name; new tfaws.sqsQueuePolicy.SqsQueuePolicy( queueUrl: queue.id, policy: cdktf.Token.asString(queue_policy.json), - ) as "queue_policy_" + name; + ) as "queue_policy_" + props.name; let lambda = aws.Function.from(my_function); lambda?.addPolicyStatements({ @@ -88,9 +90,8 @@ pub class MessageFanout_tfaws impl api.IMessageFanout { eventSourceArn: queue.arn, functionName: "{lambda?.functionName}", batchSize: 1, - ) as "event_source_mapping_" + name; + ) as "event_source_mapping_" + props.name; - this.queueList.push(queue); } @@ -114,4 +115,4 @@ pub class MessageFanout_tfaws impl api.IMessageFanout { pub inflight publish(message: str) { MessageFanout_tfaws._publish(this.topicArn, message); } -} \ No newline at end of file +}