Skip to content

Commit

Permalink
Topic router stream types
Browse files Browse the repository at this point in the history
  • Loading branch information
Eryk Solecki committed Apr 11, 2023
1 parent 74df4c4 commit 0529c02
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 23 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
"devDependencies": {
"@istanbuljs/nyc-config-typescript": "^1.0.2",
"@npmcli/run-script": "4.2.1",
"@types/node": "15.12.5",
"@types/node": "^18.15.11",
"@typescript-eslint/eslint-plugin": "^5.41.0",
"@typescript-eslint/parser": "^5.41.0",
"build-if-changed": "^1.5.5",
Expand Down
48 changes: 26 additions & 22 deletions packages/host/src/lib/serviceDiscovery/topicRouter.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { ObjLogger } from "@scramjet/obj-logger";
import { APIExpose, OpResponse, ParsedMessage } from "@scramjet/types";
import { APIExpose, OpResponse } from "@scramjet/types";
import { ReasonPhrases } from "http-status-codes";
import { ServiceDiscovery } from "./sd-adapter";
import { IncomingMessage } from "http";
import { IncomingMessage } from "http";
import { StreamOrigin } from "./streamHandler";
import { TopicState } from "./topicHandler";
import { WorkState } from "./streamHandler";
Expand All @@ -20,12 +20,12 @@ type TopicsPostRes = {
}
type TopicDeleteReq = {}

type TopicDownstreamReq = IncomingMessage & {
headers: {
"content-type": string,
type TopicStreamReq = IncomingMessage & {
headers?: {
"content-type"?: string,
cpm?: string
},
params: { topic: string }
params?: { topic?: string }
}

class TopicRouter {
Expand All @@ -36,7 +36,7 @@ class TopicRouter {
this.serviceDiscovery = serviceDiscovery;
apiServer.get(`${apiBaseUrl}/topics`, () => this.serviceDiscovery.getTopics());
apiServer.op("post", `${apiBaseUrl}/topics`, this.topicsPost)
apiServer.get(`${apiBaseUrl}/topic/:topic`, () => this.deleteTopic);
apiServer.op("delete", `${apiBaseUrl}/topic/:topic`, () => this.deleteTopic);
apiServer.downstream(`${apiBaseUrl}/topic/:topic`, this.topicDownstream, { checkContentType: false });
apiServer.upstream(`${apiBaseUrl}/topic/:topic`, this.topicUpstream);
}
Expand All @@ -63,16 +63,16 @@ class TopicRouter {
}
}

async topicDownstream(req: TopicDownstreamReq) {
async topicDownstream(req: TopicStreamReq) {
const { "content-type": contentType = "", cpm } = req.headers;
const { topic: topicName } = req.params;
const { topic: name = "" } = req.params || {};
if (!isContentType(contentType)) return { opStatus: ReasonPhrases.BAD_REQUEST, error: "Unsupported content-type" }
if (!TopicName.validate(topicName)) return { opStatus: ReasonPhrases.BAD_REQUEST, error: "Topic name incorrect format" }
if (!TopicName.validate(name)) return { opStatus: ReasonPhrases.BAD_REQUEST, error: "Topic name incorrect format" }

const name = new TopicName(topicName);
const topicName = new TopicName(name);
this.logger.debug(`Incoming topic '${name}' request`);

let topic = this.serviceDiscovery.topicsController.get(name);
let topic = this.serviceDiscovery.topicsController.get(topicName);
if (topic) {
const topicContentType = topic.options().contentType;
if (contentType !== topicContentType) {
Expand All @@ -82,35 +82,39 @@ class TopicRouter {
};
}
} else {
topic = new Topic(name, contentType, { id: "TopicDownstream", type: "hub" });
// FIXME: Single responsibility rule validation
topic = new Topic(topicName, contentType, { id: "TopicDownstream", type: "hub" });
}
req.pipe(topic, { end: false });

if (!cpm) {
await this.serviceDiscovery.update({
provides: topic, contentType: contentType, topicName: topic
provides: topic.id(), contentType: contentType, topicName: topic.id()
});
} else {
this.logger.debug(`Incoming Downstream CPM request for topic '${topic}'`);
}
return {};
return { opStatus: ReasonPhrases.OK };
}

async topicUpstream(req: ParsedMessage) {
async topicUpstream(req: TopicStreamReq) {
const { "content-type": contentType = "", cpm } = req.headers;
const { topic: name = "" } = req.params || {};
if (!isContentType(contentType)) return { opStatus: ReasonPhrases.BAD_REQUEST, error: "Unsupported content-type" }
if (!TopicName.validate(name)) return { opStatus: ReasonPhrases.BAD_REQUEST, error: "Topic name incorrect format" }

const topicName = new TopicName(name);
//TODO: what should be the default content type and where to store this information?
const contentType = req.headers["content-type"] || "application/x-ndjson";
const { topic } = req.params || {};
const { cpm } = req.headers;

if (!cpm) {
await this.serviceDiscovery.update({
requires: topic, contentType, topicName: topic
requires: name, contentType, topicName: topicName.toString()
});
} else {
this.logger.debug(`Incoming Upstream CPM request for topic '${topic}'`);
this.logger.debug(`Incoming Upstream CPM request for topic '${name}'`);
}

return this.serviceDiscovery.createTopicIfNotExist({ topic, contentType });
return this.serviceDiscovery.createTopicIfNotExist({ topic: topicName, contentType });
}
}

Expand Down

0 comments on commit 0529c02

Please sign in to comment.