From e0acf87757816051d37f3c3c39e43568735acef4 Mon Sep 17 00:00:00 2001 From: Elad Ben-Israel Date: Sat, 18 May 2024 14:12:34 -0400 Subject: [PATCH] feat(containers): forwarding (#239) Introduce an API that allows forwarding various events to containers. Currently only implemented for `sim`. The `workload.forward()` method returns an `IForward` object with a `fromXxx()` method for each supported handler type. For example, this is how you can forward `cloud.Api` requests: ```js let work = new containers.Workload(...); let api = new cloud.Api(); api.get("/my_request", work.forward().fromApi()); ``` You can pass an optional `route` and `method` to `forward()` in order to customize the behavior: ```js work.forward(route: "/your_request", method: cloud.HttpMethod.PUT); ``` --- containers/README.md | 19 +++++ containers/api.w | 18 +++++ containers/forwarders.test.w | 99 +++++++++++++++++++++++++ containers/helm.extern.d.ts | 10 +-- containers/helm.w | 6 +- containers/package.json | 2 +- containers/test/forwarders/Dockerfile | 4 + containers/test/forwarders/index.js | 42 +++++++++++ containers/workload.sim.w | 102 +++++++++++++++++++++++++- containers/workload.tfaws.w | 7 +- containers/workload.w | 11 ++- 11 files changed, 310 insertions(+), 10 deletions(-) create mode 100644 containers/forwarders.test.w create mode 100644 containers/test/forwarders/Dockerfile create mode 100755 containers/test/forwarders/index.js diff --git a/containers/README.md b/containers/README.md index 7ca65435..55e61d25 100644 --- a/containers/README.md +++ b/containers/README.md @@ -29,6 +29,25 @@ new containers.Workload( ); ``` +## Forwarding + +The `workload.forward()` method returns an `IForward` object with a `fromXxx()` method for each +supported handler type. + +For example, this is how you can forward `cloud.Api` requests: + +```js +let work = new containers.Workload(...); +let api = new cloud.Api(); +api.get("/my_request", work.forward().fromApi()); +``` + +You can pass an optional `route` and `method` to `forward()` in order to customize the behavior: + +```js +work.forward(route: "/your_request", method: cloud.HttpMethod.PUT); +``` + ## `sim` When executed in the Wing Simulator, the workload is started within a local Docker container. diff --git a/containers/api.w b/containers/api.w index ebefee7a..cd02ded2 100644 --- a/containers/api.w +++ b/containers/api.w @@ -1,3 +1,4 @@ +bring cloud; pub struct ContainerOpts { name: str; @@ -27,3 +28,20 @@ pub struct ContainerOpts { pub struct WorkloadProps extends ContainerOpts { } + +pub struct ForwardOptions { + route: str?; + method: cloud.HttpMethod?; +} + +pub interface IWorkload { + forward(opts: ForwardOptions?): IForward; +} + +pub interface IForward { + fromApi(): cloud.IApiEndpointHandler; + fromQueue(): cloud.IQueueSetConsumerHandler; + fromTopic(): cloud.ITopicOnMessageHandler; + fromSchedule(): cloud.IScheduleOnTickHandler; + fromBucketEvent(): cloud.IBucketEventHandler; +} diff --git a/containers/forwarders.test.w b/containers/forwarders.test.w new file mode 100644 index 00000000..0081fba0 --- /dev/null +++ b/containers/forwarders.test.w @@ -0,0 +1,99 @@ +bring cloud; +bring util; +bring http; +bring expect; +bring "./workload.w" as w; + +let workload = new w.Workload( + image: "./test/forwarders", + name: "forwarders", + port: 3000, + public: true, +); + +let requests = inflight (): Array => { + let response = http.get("{workload.publicUrl!}/requests"); + assert(response.ok); + return Json.values(Json.parse(response.body)); +}; + +let api = new cloud.Api(); +api.get("/get-api", workload.forward().fromApi()); +api.post("/post-api", workload.forward().fromApi()); + +api.get("/foof", workload.forward(route: "/foo").fromApi()); + +test "api forwarding" { + http.get("{api.url}/get-api?hello=world"); + + expect.equal(requests(), [ + { method: "GET", url: "/get-api" } + ]); + + http.post("{api.url}/post-api", body: "hello, body!"); + + expect.equal(requests(), [ + { method: "GET", url: "/get-api" }, + { method: "POST", url: "/post-api", body: "hello, body!" } + ]); +} + +let queue1 = new cloud.Queue() as "queue1"; +let queue2 = new cloud.Queue() as "queue2"; +queue1.setConsumer(workload.forward(route: "/queue_message", method: cloud.HttpMethod.PUT).fromQueue()); +queue2.setConsumer(workload.forward().fromQueue()); + +test "queue forwarding" { + queue1.push("message1"); + util.waitUntil(() => { return requests().length == 1; }); + + queue1.push("message2"); + util.waitUntil(() => { return requests().length == 2; }); + + expect.equal(requests(), [ + { method: "PUT", url: "/queue_message", body: "message1" }, + { method: "PUT", url: "/queue_message", body: "message2" }, + ]); + + queue2.push("message3"); + util.waitUntil(() => { return requests().length == 3; }); + + expect.equal(requests(), [ + { method: "PUT", url: "/queue_message", body: "message1" }, + { method: "PUT", url: "/queue_message", body: "message2" }, + { method: "POST", url: "/", body: "message3" }, + ]); +} + +let topic = new cloud.Topic(); +topic.onMessage(workload.forward(route: "/my_topic").fromTopic()); + +test "subscribe to topic" { + topic.publish("message from topic!"); + util.waitUntil(() => { return requests().length == 1; }); + + expect.equal(requests(), [ + { method: "POST", url: "/my_topic", body: "message from topic!" }, + ]); +} + +let bucket = new cloud.Bucket(); +bucket.onCreate(workload.forward(route: "/object-created").fromBucketEvent()); + +test "forward bucket events" { + bucket.put("object1", "content1"); + util.waitUntil(() => { return requests().length == 1; }); + + expect.equal(requests(), [ + { method: "POST", url: "/object-created", body: Json.stringify({"key":"object1","type":"create"}), }, + ]); +} + +let schedule = new cloud.Schedule(rate: 1m); +schedule.onTick(workload.forward(route: "/tick", method: cloud.HttpMethod.GET).fromSchedule()); + +test "forward schedule events" { + util.waitUntil(() => { return requests().length >= 1; }); + + expect.equal(requests()[0], { method: "GET", url: "/tick" }); +} diff --git a/containers/helm.extern.d.ts b/containers/helm.extern.d.ts index 4b12c1b3..4eb3253c 100644 --- a/containers/helm.extern.d.ts +++ b/containers/helm.extern.d.ts @@ -46,7 +46,7 @@ export interface MetadataEntry { export class Node { /** Add an ordering dependency on another construct. An `IDependable` */ - readonly addDependency: (deps?: ((readonly (IDependable)[])) | undefined) => void; + readonly addDependency: (deps: (readonly (IDependable)[])) => void; /** Adds a metadata entry to this construct. Entries are arbitrary values and will also include a stack trace to allow tracing back to the code location for when the entry was added. It can be used, for example, to include source @@ -202,7 +202,7 @@ export class ApiObjectMetadataDefinition { /** Add an annotation. */ readonly addAnnotation: (key: string, value: string) => void; /** Add one or more finalizers. */ - readonly addFinalizers: (finalizers?: ((readonly (string)[])) | undefined) => void; + readonly addFinalizers: (finalizers: (readonly (string)[])) => void; /** Add a label. */ readonly addLabel: (key: string, value: string) => void; /** Add an owner. */ @@ -224,10 +224,10 @@ export class ApiObjectMetadataDefinition { export class ApiObject extends Construct { /** Create a dependency between this ApiObject and other constructs. These can be other ApiObjects, Charts, or custom. */ - readonly addDependency: (dependencies?: ((readonly (IConstruct)[])) | undefined) => void; + readonly addDependency: (dependencies: (readonly (IConstruct)[])) => void; /** Applies a set of RFC-6902 JSON-Patch operations to the manifest synthesized for this API object. kubePod.addJsonPatch(JsonPatch.replace('/spec/enableServiceLinks', true)); */ - readonly addJsonPatch: (ops?: ((readonly (JsonPatch)[])) | undefined) => void; + readonly addJsonPatch: (ops: (readonly (JsonPatch)[])) => void; /** The group portion of the API version (e.g. `authorization.k8s.io`). */ readonly apiGroup: string; /** The object's API version (e.g. `authorization.k8s.io/v1`). */ @@ -252,7 +252,7 @@ export class ApiObject extends Construct { export class Chart extends Construct { /** Create a dependency between this Chart and other constructs. These can be other ApiObjects, Charts, or custom. */ - readonly addDependency: (dependencies?: ((readonly (IConstruct)[])) | undefined) => void; + readonly addDependency: (dependencies: (readonly (IConstruct)[])) => void; /** Returns all the included API objects. */ readonly apiObjects: (readonly (ApiObject)[]); /** Generates a app-unique name for an object given it's construct node path. diff --git a/containers/helm.w b/containers/helm.w index 540ce26d..5c05400e 100644 --- a/containers/helm.w +++ b/containers/helm.w @@ -2,7 +2,7 @@ bring "./api.w" as api; bring "cdk8s-plus-27" as plus; bring "cdk8s" as cdk8s; -pub class Chart extends cdk8s.Chart { +pub class Chart extends cdk8s.Chart impl api.IWorkload { name: str; new(props: api.WorkloadProps) { @@ -83,5 +83,9 @@ pub class Chart extends cdk8s.Chart { return Chart.toHelmChart(workdir, this); } + pub forward(opts: api.ForwardOptions?): api.IForward { + throw "Not implemented"; + } + extern "./helm.js" pub static toHelmChart(wingdir: str, chart: cdk8s.Chart): str; } diff --git a/containers/package.json b/containers/package.json index c0dd70e1..5d9a60d4 100644 --- a/containers/package.json +++ b/containers/package.json @@ -1,6 +1,6 @@ { "name": "@winglibs/containers", - "version": "0.1.0", + "version": "0.1.1", "description": "Container support for Wing", "repository": { "type": "git", diff --git a/containers/test/forwarders/Dockerfile b/containers/test/forwarders/Dockerfile new file mode 100644 index 00000000..686c1283 --- /dev/null +++ b/containers/test/forwarders/Dockerfile @@ -0,0 +1,4 @@ +FROM node:20.8.0-alpine +EXPOSE 3000 +ADD index.js /app/index.js +ENTRYPOINT [ "/app/index.js" ] \ No newline at end of file diff --git a/containers/test/forwarders/index.js b/containers/test/forwarders/index.js new file mode 100755 index 00000000..46bcaeb3 --- /dev/null +++ b/containers/test/forwarders/index.js @@ -0,0 +1,42 @@ +#!/usr/bin/env node +const http = require('http'); + +const requests = []; + +process.on('SIGTERM', () => { + console.info("Interrupted") + process.exit(0) +}); + +const server = http.createServer((req, res) => { + + if (req.url === '/requests') { + res.writeHead(200, { 'Content-Type': 'application/json' }); + return res.end(JSON.stringify(requests)); + } + + console.log(`request received: ${req.method} ${req.url}`); + + const body = []; + req.on("data", (data) => { + body.push(data); + }); + + req.on("end", () => { + let s = Buffer.concat(body).toString(); + if (s.length === 0) { + s = undefined; + } + + requests.push({ + method: req.method, + url: req.url, + body: s, + }); + + res.end('OK'); + }); +}); + +console.log('listening on port 3000'); +server.listen(3000); \ No newline at end of file diff --git a/containers/workload.sim.w b/containers/workload.sim.w index 2c564e84..59343640 100644 --- a/containers/workload.sim.w +++ b/containers/workload.sim.w @@ -5,7 +5,7 @@ bring sim; bring ui; bring "./api.w" as api; -pub class Workload_sim { +pub class Workload_sim impl api.IWorkload { pub publicUrl: str?; pub internalUrl: str?; @@ -64,6 +64,14 @@ pub class Workload_sim { } } + pub forward(opts: api.ForwardOptions?): api.IForward { + if this.publicUrl == nil { + throw "Cannot forward requests to a non-public container"; + } + + return new Forward(this.publicUrl!, opts) as "forward_{util.nanoid()}"; + } + toEnv(input: Map?): Map { let env = MutMap{}; let i = input ?? {}; @@ -76,3 +84,95 @@ pub class Workload_sim { return env.copy(); } } + +class Forward impl api.IForward { + containerUrl: str; + route: str?; + method: cloud.HttpMethod?; + + new(containerUrl: str, opts: api.ForwardOptions?) { + this.containerUrl = containerUrl; + this.route = opts?.route; + this.method = opts?.method; + + if let r = this.route { + if !r.startsWith("/") { + throw "Route must start with '/'"; + } + } + } + + pub fromApi(): cloud.IApiEndpointHandler { + return inflight (request) => { + let var body = request.body; + if request.method == cloud.HttpMethod.GET || request.method == cloud.HttpMethod.HEAD { + body = nil; + } + + let response = http.fetch("{this.containerUrl}{request.path}", { + body: body, + headers: request.headers, + method: request.method, + }); + + return { + body: response.body, + status: response.status, + headers: response.headers + }; + }; + } + + pub fromQueue(): cloud.IQueueSetConsumerHandler { + return inflight (message) => { + let route = this.route ?? "/"; + let method = this.method ?? cloud.HttpMethod.POST; + http.fetch("{this.containerUrl}{route}", { + body: message, + method: method, + }); + }; + } + + pub fromTopic(): cloud.ITopicOnMessageHandler { + return inflight (message) => { + let route = this.route ?? "/"; + let method = this.method ?? cloud.HttpMethod.POST; + http.fetch("{this.containerUrl}{route}", { + body: message, + method: method, + }); + }; + } + + pub fromSchedule(): cloud.IScheduleOnTickHandler { + return inflight () => { + let route = this.route ?? "/"; + let method = this.method ?? cloud.HttpMethod.GET; + + http.fetch("{this.containerUrl}{route}", { + method: method, + }); + }; + } + + pub fromBucketEvent(): cloud.IBucketEventHandler { + return inflight (key, type) => { + let route = this.route ?? "/"; + let method = this.method ?? cloud.HttpMethod.POST; + let stype = () => { + if type == cloud.BucketEventType.CREATE { return "create"; } + if type == cloud.BucketEventType.UPDATE { return "update"; } + if type == cloud.BucketEventType.DELETE { return "delete"; } + }(); + + http.fetch("{this.containerUrl}{route}", { + method: method, + body: Json.stringify({ + key: key, + type: stype + }) + }); + }; + } +} \ No newline at end of file diff --git a/containers/workload.tfaws.w b/containers/workload.tfaws.w index 33a0326c..07f3c6e5 100644 --- a/containers/workload.tfaws.w +++ b/containers/workload.tfaws.w @@ -8,8 +8,9 @@ bring "@cdktf/provider-kubernetes" as k8s; bring "@cdktf/provider-helm" as helm_provider; bring "./helm.w" as helm; bring fs; +bring cloud; -pub class Workload_tfaws { +pub class Workload_tfaws impl api.IWorkload { pub internalUrl: str?; pub publicUrl: str?; @@ -74,5 +75,9 @@ pub class Workload_tfaws { return props.sourceHash ?? fs.md5(props.image, props.sources); } + + pub forward(opts: api.ForwardOptions?): api.IForward { + throw "Not implemented"; + } } diff --git a/containers/workload.w b/containers/workload.w index 809a6b8c..123107a8 100644 --- a/containers/workload.w +++ b/containers/workload.w @@ -7,13 +7,15 @@ bring http; bring fs; bring ui; -pub class Workload { +pub class Workload impl api.IWorkload { /** internal url, `nil` if there is no exposed port */ pub internalUrl: str?; /** extern url, `nil` if there is no exposed port or if `public` is `false` */ pub publicUrl: str?; + inner: api.IWorkload; + new(props: api.WorkloadProps) { let target = util.env("WING_TARGET"); @@ -21,6 +23,7 @@ pub class Workload { let w = new sim.Workload_sim(props) as props.name; this.internalUrl = w.internalUrl; this.publicUrl = w.publicUrl; + this.inner = w; nodeof(w).hidden = true; if let url = w.internalUrl { @@ -39,11 +42,13 @@ pub class Workload { let w = new tfaws.Workload_tfaws(props) as props.name; this.internalUrl = w.internalUrl; this.publicUrl = w.publicUrl; + this.inner = w; return this; } if provider == "helm" { let w = new helm.Chart(props); + this.inner = w; this.internalUrl = "http://dummy"; this.publicUrl = "http://dummy"; w.toHelm(fs.join(nodeof(this).app.workdir, "..")); @@ -53,6 +58,10 @@ pub class Workload { throw "unsupported provider {provider}"; } + pub forward(opts: api.ForwardOptions?): api.IForward { + return this.inner.forward(opts); + } + resolveProvider(target: str): str { let allowed = ["eks", "helm"]; let params: Json = nodeof(this).app.parameters.value("containers");