From 7af46f3657e2c0fd52d3c0cd08417a5d896ffb56 Mon Sep 17 00:00:00 2001 From: Tony Xiao Date: Thu, 25 Apr 2024 23:27:38 -0700 Subject: [PATCH] feat: Add streams to pipeline and unhack revert temp_pipe_out --- .../2024-04-26_0614_add_pipeline_streams.sql | 1 + connectors/connector-revert/def.ts | 13 ------------- connectors/connector-revert/server.ts | 17 ++++++----------- kits/cdk/connector.types.ts | 7 ++++++- kits/cdk/models.ts | 12 ++++++++++++ .../engine-backend/router/pipelineRouter.ts | 2 ++ .../engine-backend/services/sync-service.ts | 12 ++++++++++-- 7 files changed, 37 insertions(+), 27 deletions(-) create mode 100644 apps/web/migrations/2024-04-26_0614_add_pipeline_streams.sql diff --git a/apps/web/migrations/2024-04-26_0614_add_pipeline_streams.sql b/apps/web/migrations/2024-04-26_0614_add_pipeline_streams.sql new file mode 100644 index 000000000..e278d9857 --- /dev/null +++ b/apps/web/migrations/2024-04-26_0614_add_pipeline_streams.sql @@ -0,0 +1 @@ +ALTER TABLE pipeline ADD COLUMN streams jsonb; diff --git a/connectors/connector-revert/def.ts b/connectors/connector-revert/def.ts index 6185eb2df..2e276b4e8 100644 --- a/connectors/connector-revert/def.ts +++ b/connectors/connector-revert/def.ts @@ -18,19 +18,6 @@ export const revertSchemas = { .describe( "x-revert-t-id header. This is the end user, aka Revert's customer's customer", ), - temp_pipe_out_streams: z - .record( - z.enum(['company', 'contact', 'deal']), - // z.union([ - z.object({ - fields: z.array(z.string()).describe('List of fields to retrieve'), - }), - // // FIXME(@tony): Caused the app to crash with: "Error: Unknown schema ZodUndefined. Please assign it a manual 'type'." - // z.undefined().describe('Disabled'), - // ]), - ) - .optional() - .describe('This will be moved to the pipeline object'), }), sourceOutputEntities: { company: zCast(), diff --git a/connectors/connector-revert/server.ts b/connectors/connector-revert/server.ts index 54836741c..bdd55ed08 100644 --- a/connectors/connector-revert/server.ts +++ b/connectors/connector-revert/server.ts @@ -14,21 +14,16 @@ export const revertServer = { 'x-revert-t-id': settings.tenant_id, }, }), - sourceSync: ({instance, state}) => { - // TODO(@tony): Please help me fix this so the stream names from the UI & fields as well. - // const streamNames = Object.keys(streams ?? {}).filter( - // (s) => !!streams[s as keyof typeof streams], - // ) - + sourceSync: ({instance, state, streams}) => { async function* iterateRecords() { - for (const [name, stream] of Object.entries( - ['company', 'contact', 'deal'] ?? {}, - )) { - if (!stream) { + for (const [name, _stream] of Object.entries(streams ?? {})) { + const stream = + typeof _stream === 'boolean' ? {disabled: _stream} : _stream + if (!stream || stream.disabled) { continue } const sState = ((state ?? {}) as Record)[name] ?? {} - yield* iterateRecordsInStream(stream, [], sState) // TODO(@jatin): update this. + yield* iterateRecordsInStream(name, stream.fields ?? [], sState) // TODO(@jatin): update this. } } diff --git a/kits/cdk/connector.types.ts b/kits/cdk/connector.types.ts index f03d025d6..fe7a6d252 100644 --- a/kits/cdk/connector.types.ts +++ b/kits/cdk/connector.types.ts @@ -182,7 +182,12 @@ export interface ConnectorServer< instance: TInstance endUser: {id: EndUserId} | null | undefined /* Enabled streams */ - streams: {[k in T['_streamName']]?: boolean | null} + streams: { + [k in T['_streamName']]?: + | boolean + | null + | {disabled?: boolean; fields?: string[]} + } state: T['_types']['sourceState'] /** @deprecated, use `instance` instead */ config: T['_types']['connectorConfig'] diff --git a/kits/cdk/models.ts b/kits/cdk/models.ts index 6d24c00f2..965ff9ec2 100644 --- a/kits/cdk/models.ts +++ b/kits/cdk/models.ts @@ -101,6 +101,17 @@ const zBase = z.object({ updatedAt: z.date(), // should be string but slonik returns date }) +export const zStreamsV2 = z.record( + z.object({ + disabled: z.boolean().optional(), + fields: z.array(z.string()).optional(), + }), +) +export type StreamsV2 = z.infer + +export const zStreamsV1 = z.record(z.boolean()) +export type StreamsV1 = z.infer + /** TODO: Add other links / gather the schema from various links here */ export const zLink = z.enum(['banking']).openapi({ref: 'Link'}) @@ -169,6 +180,7 @@ export const zRaw = { // TODO: Remove nullish now that pipelines are more fixed sourceId: zId('reso').optional(), sourceState: z.record(z.unknown()).optional(), + streams: zStreamsV2.nullish(), destinationId: zId('reso').optional(), destinationState: z.record(z.unknown()).optional(), linkOptions: z diff --git a/packages/engine-backend/router/pipelineRouter.ts b/packages/engine-backend/router/pipelineRouter.ts index d590f1215..2e14cfe81 100644 --- a/packages/engine-backend/router/pipelineRouter.ts +++ b/packages/engine-backend/router/pipelineRouter.ts @@ -30,6 +30,7 @@ export const pipelineRouter = trpc.router({ id: true, metadata: true, disabled: true, + streams: true, }), ) .output(zRaw.pipeline) @@ -44,6 +45,7 @@ export const pipelineRouter = trpc.router({ metadata: true, disabled: true, sourceId: true, + streams: true, destinationId: true, }), ) diff --git a/packages/engine-backend/services/sync-service.ts b/packages/engine-backend/services/sync-service.ts index e77eed8eb..a46500665 100644 --- a/packages/engine-backend/services/sync-service.ts +++ b/packages/engine-backend/services/sync-service.ts @@ -7,6 +7,8 @@ import type { Link, ResourceUpdate, Source, + StreamsV1, + StreamsV2, } from '@openint/cdk' import {bankingLink, logLink, makeId, sync} from '@openint/cdk' import type {z} from '@openint/util' @@ -172,7 +174,7 @@ export function makeSyncService({ src: _ResourceExpanded state: unknown endUser?: {id: EndUserId} | null | undefined - streams?: Record + streams?: StreamsV1 | StreamsV2 opts: {fullResync?: boolean | null} }) => { const defaultSource$ = () => @@ -279,7 +281,13 @@ export function makeSyncService({ const endUserId = src.endUserId ?? dest.endUserId const endUser = endUserId ? {id: endUserId} : null - const _source$ = sourceSync({opts, src, state: pipe.sourceState, endUser}) + const _source$ = sourceSync({ + opts, + src, + state: pipe.sourceState, + endUser, + streams: pipeline.streams ?? undefined, + }) const source$ = opts.source$ ? opts.source$ConcatDefault