Skip to content

Commit

Permalink
starting the sim target
Browse files Browse the repository at this point in the history
  • Loading branch information
marciocadev committed Feb 13, 2024
1 parent 0c477df commit d9744f2
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 19 deletions.
8 changes: 7 additions & 1 deletion message-fanout/commons/api.w
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
bring cloud;

pub struct MessageFanoutProps extends cloud.QueueProps {
name: str;
}

/**
* A cloud message fanout interface
*/
Expand All @@ -10,5 +16,5 @@ pub interface IMessageFanout extends std.IResource {
/**
* Create a new consumer for the fanout
*/
addConsumer(name: str, handler: inflight(str): void): void;
addConsumer(handler: inflight(str): void, props: MessageFanoutProps): void;
}
2 changes: 1 addition & 1 deletion message-fanout/inflight/publish.aws.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ export const _publish = async function (topicArn, message) {
});

await client.send(command);
}
}
8 changes: 4 additions & 4 deletions message-fanout/lib.test.w
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@ let table = new ex.Table(
}
);

fanout.addConsumer("first", inflight (event: str) => {
fanout.addConsumer(inflight (event: str) => {
let obj = Json.parse(event);
let msg = "first_publisher_" + obj.get("Message").asStr();
table.insert("first", { message: msg });
});
}, name: "first");

fanout.addConsumer("second", inflight (event: str) => {
fanout.addConsumer(inflight (event: str) => {
let obj = Json.parse(event);
let msg = "second_publisher_" + obj.get("Message").asStr();
table.insert("second", { message: msg });
});
}, name: "second");

let target = util.env("WING_TARGET");
if target == "tf-aws" {
Expand Down
4 changes: 2 additions & 2 deletions message-fanout/lib.w
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ pub class MessageFanout impl api.IMessageFanout {
}
}

pub addConsumer(name: str, handler: inflight(str): void): void {
this.inner.addConsumer(name, handler);
pub addConsumer(handler: inflight(str): void, props: api.MessageFanoutProps): void {
this.inner.addConsumer(handler, props);
}

pub inflight publish(message: str): void {
Expand Down
13 changes: 13 additions & 0 deletions message-fanout/platform/sim.w
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
bring cloud;
bring "../commons/api.w" as api;

pub class MessageFanout_sim impl api.IMessageFanout {
topic: cloud.Topic;

new() {
this.topic = new cloud.Topic();
}

pub addConsumer(handler: inflight(str): void, props: api.MessageFanoutProps): void {}
pub inflight publish(message: str) {}
}
23 changes: 12 additions & 11 deletions message-fanout/platform/tf-aws.w
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,27 @@ pub class MessageFanout_tfaws impl api.IMessageFanout {
this.queueList = MutArray<tfaws.sqsQueue.SqsQueue>[];
}

pub addConsumer(name: str, handler: inflight(str): void): void {
pub addConsumer(handler: inflight(str): void, props: api.MessageFanoutProps): void {
let my_function = new cloud.Function(inflight(event: str): str? => {
let json: Json = unsafeCast(event);
let sqsEvent = SqsEvent.fromJson(event);
for message in sqsEvent.Records {
handler(message.body);
}
}) as name;
}) as props.name;

let queue = new tfaws.sqsQueue.SqsQueue(
visibilityTimeoutSeconds: duration.fromSeconds(120).seconds,
messageRetentionSeconds: duration.fromHours(1).seconds,
) as "queue_" + name;
visibilityTimeoutSeconds: props?.timeout?.seconds
?? duration.fromSeconds(120).seconds,
messageRetentionSeconds: props?.retentionPeriod?.seconds
?? duration.fromHours(1).seconds,
) as "queue_" + props.name;

let subscription = new tfaws.snsTopicSubscription.SnsTopicSubscription(
topicArn: this.topicArn,
endpoint: queue.arn,
protocol: "sqs",
) as "subscription_" + name;
) as "subscription_" + props.name;

let queue_policy = new tfaws.dataAwsIamPolicyDocument.DataAwsIamPolicyDocument(
statement: [
Expand All @@ -65,12 +67,12 @@ pub class MessageFanout_tfaws impl api.IMessageFanout {
]
}
]
) as "policy_" + name;
) as "policy_" + props.name;

new tfaws.sqsQueuePolicy.SqsQueuePolicy(
queueUrl: queue.id,
policy: cdktf.Token.asString(queue_policy.json),
) as "queue_policy_" + name;
) as "queue_policy_" + props.name;

let lambda = aws.Function.from(my_function);
lambda?.addPolicyStatements({
Expand All @@ -88,9 +90,8 @@ pub class MessageFanout_tfaws impl api.IMessageFanout {
eventSourceArn: queue.arn,
functionName: "{lambda?.functionName}",
batchSize: 1,
) as "event_source_mapping_" + name;
) as "event_source_mapping_" + props.name;


this.queueList.push(queue);
}

Expand All @@ -114,4 +115,4 @@ pub class MessageFanout_tfaws impl api.IMessageFanout {
pub inflight publish(message: str) {
MessageFanout_tfaws._publish(this.topicArn, message);
}
}
}

0 comments on commit d9744f2

Please sign in to comment.