Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

Commit

Permalink
feat: Add streams to pipeline and unhack revert temp_pipe_out
Browse files Browse the repository at this point in the history
  • Loading branch information
tonyxiao committed Apr 26, 2024
1 parent 21dc25e commit 7af46f3
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 27 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE pipeline ADD COLUMN streams jsonb;
13 changes: 0 additions & 13 deletions connectors/connector-revert/def.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,6 @@ export const revertSchemas = {
.describe(
"x-revert-t-id header. This is the end user, aka Revert's customer's customer",
),
temp_pipe_out_streams: z
.record(
z.enum(['company', 'contact', 'deal']),
// z.union([
z.object({
fields: z.array(z.string()).describe('List of fields to retrieve'),
}),
// // FIXME(@tony): Caused the app to crash with: "Error: Unknown schema ZodUndefined. Please assign it a manual 'type'."
// z.undefined().describe('Disabled'),
// ]),
)
.optional()
.describe('This will be moved to the pipeline object'),
}),
sourceOutputEntities: {
company: zCast<components['schemas']['commonCompany']>(),
Expand Down
17 changes: 6 additions & 11 deletions connectors/connector-revert/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,16 @@ export const revertServer = {
'x-revert-t-id': settings.tenant_id,
},
}),
sourceSync: ({instance, state}) => {
// TODO(@tony): Please help me fix this so the stream names from the UI & fields as well.
// const streamNames = Object.keys(streams ?? {}).filter(
// (s) => !!streams[s as keyof typeof streams],
// )

sourceSync: ({instance, state, streams}) => {
async function* iterateRecords() {
for (const [name, stream] of Object.entries(
['company', 'contact', 'deal'] ?? {},
)) {
if (!stream) {
for (const [name, _stream] of Object.entries(streams ?? {})) {
const stream =
typeof _stream === 'boolean' ? {disabled: _stream} : _stream
if (!stream || stream.disabled) {
continue
}
const sState = ((state ?? {}) as Record<string, unknown>)[name] ?? {}
yield* iterateRecordsInStream(stream, [], sState) // TODO(@jatin): update this.
yield* iterateRecordsInStream(name, stream.fields ?? [], sState) // TODO(@jatin): update this.
}
}

Expand Down
7 changes: 6 additions & 1 deletion kits/cdk/connector.types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,12 @@ export interface ConnectorServer<
instance: TInstance
endUser: {id: EndUserId} | null | undefined
/* Enabled streams */
streams: {[k in T['_streamName']]?: boolean | null}
streams: {
[k in T['_streamName']]?:
| boolean
| null
| {disabled?: boolean; fields?: string[]}
}
state: T['_types']['sourceState']
/** @deprecated, use `instance` instead */
config: T['_types']['connectorConfig']
Expand Down
12 changes: 12 additions & 0 deletions kits/cdk/models.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,17 @@ const zBase = z.object({
updatedAt: z.date(), // should be string but slonik returns date
})

export const zStreamsV2 = z.record(
z.object({
disabled: z.boolean().optional(),
fields: z.array(z.string()).optional(),
}),
)
export type StreamsV2 = z.infer<typeof zStreamsV2>

export const zStreamsV1 = z.record(z.boolean())
export type StreamsV1 = z.infer<typeof zStreamsV1>

/** TODO: Add other links / gather the schema from various links here */
export const zLink = z.enum(['banking']).openapi({ref: 'Link'})

Expand Down Expand Up @@ -169,6 +180,7 @@ export const zRaw = {
// TODO: Remove nullish now that pipelines are more fixed
sourceId: zId('reso').optional(),
sourceState: z.record(z.unknown()).optional(),
streams: zStreamsV2.nullish(),
destinationId: zId('reso').optional(),
destinationState: z.record(z.unknown()).optional(),
linkOptions: z
Expand Down
2 changes: 2 additions & 0 deletions packages/engine-backend/router/pipelineRouter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export const pipelineRouter = trpc.router({
id: true,
metadata: true,
disabled: true,
streams: true,
}),
)
.output(zRaw.pipeline)
Expand All @@ -44,6 +45,7 @@ export const pipelineRouter = trpc.router({
metadata: true,
disabled: true,
sourceId: true,
streams: true,
destinationId: true,
}),
)
Expand Down
12 changes: 10 additions & 2 deletions packages/engine-backend/services/sync-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import type {
Link,
ResourceUpdate,
Source,
StreamsV1,
StreamsV2,
} from '@openint/cdk'
import {bankingLink, logLink, makeId, sync} from '@openint/cdk'
import type {z} from '@openint/util'
Expand Down Expand Up @@ -172,7 +174,7 @@ export function makeSyncService({
src: _ResourceExpanded
state: unknown
endUser?: {id: EndUserId} | null | undefined
streams?: Record<string, boolean>
streams?: StreamsV1 | StreamsV2
opts: {fullResync?: boolean | null}
}) => {
const defaultSource$ = () =>
Expand Down Expand Up @@ -279,7 +281,13 @@ export function makeSyncService({
const endUserId = src.endUserId ?? dest.endUserId
const endUser = endUserId ? {id: endUserId} : null

const _source$ = sourceSync({opts, src, state: pipe.sourceState, endUser})
const _source$ = sourceSync({
opts,
src,
state: pipe.sourceState,
endUser,
streams: pipeline.streams ?? undefined,
})

const source$ = opts.source$
? opts.source$ConcatDefault
Expand Down

0 comments on commit 7af46f3

Please sign in to comment.