From 24ed4b54a045c99fc0fde2198e9564cf4fd83742 Mon Sep 17 00:00:00 2001 From: Amadeo Pellicce Date: Wed, 23 Oct 2024 05:29:18 +0200 Subject: [PATCH] Lever entities sync (#53) * moving etl sync iterator to connector-common * adding posting entity to sync for lever * fixing logging * fixing lever sdk * adding token refresh to lever * updating lever-sdk * adding lever entities to sync * fixing lints --- connectors/connector-common.ts | 79 +++++++++++++ connectors/connector-greenhouse/server.ts | 78 +------------ connectors/connector-lever/def.ts | 8 +- connectors/connector-lever/package.json | 2 +- connectors/connector-lever/server.ts | 104 ++++++++++++++---- kits/cdk/base-links.ts | 13 +++ .../engine-backend/services/sync-service.ts | 2 +- pnpm-lock.yaml | 14 +-- .../adapters/lever-adapter/index.ts | 4 +- 9 files changed, 191 insertions(+), 113 deletions(-) create mode 100644 connectors/connector-common.ts diff --git a/connectors/connector-common.ts b/connectors/connector-common.ts new file mode 100644 index 000000000..9d6f84ae6 --- /dev/null +++ b/connectors/connector-common.ts @@ -0,0 +1,79 @@ + +// MARK: - New way of doing things + +import {z, rxjs, Rx} from "@openint/util" + +export interface EtlSource< + TEntityMap extends Record = Record, +> { + listEntities( + type: TK, + options: { + cursor?: string | null + page_size?: number + }, + ): Promise<{ + entities: Array<{ + id: string + /** `null` means deleted */ + data: TEntityMap[TK] | null + }> + next_cursor: string | null + has_next_page: boolean + }> +} + +interface CursorParser { + fromString: (cursor: string | undefined | null) => T + toString: (value: T) => string | null +} + +export const NextPageCursor: CursorParser<{next_page: number}> = { + fromString(cursor) { + const num = z.coerce.number().positive().safeParse(cursor) + return {next_page: num.success ? num.data : 1} + }, + toString(value) { + return JSON.stringify(value) + }, +} +export function observableFromEtlSource( + source: EtlSource, + streams: Record, + state: Record = {}, +) { + async function* iterateEntities() { + for (const streamName of Object.keys(streams)) { + const streamValue = streams[streamName] + if ( + !streamValue || + (streamValue as {disabled: boolean}).disabled === true + // Should further check weather streamName is valid for a given connector + ) { + continue + } + + const {cursor} = state[streamName] ?? {} + const {entities, next_cursor, has_next_page} = await source.listEntities( + streamName, + {cursor}, + ) + + yield entities.map((j) => ({ + type: 'data' as const, + // We should make the messages easier to construct + data: {entityName: streamName, id: j.id, entity: j.data}, + })) + + state[streamName] = {cursor: next_cursor} + if (!has_next_page) { + continue + } + } + } + // DO somethign with the new state... + + return rxjs + .from(iterateEntities()) + .pipe(Rx.mergeMap((ops) => rxjs.from([...ops, {type: 'commit' as const}]))) +} diff --git a/connectors/connector-greenhouse/server.ts b/connectors/connector-greenhouse/server.ts index 0df1c0b2c..57fb89525 100644 --- a/connectors/connector-greenhouse/server.ts +++ b/connectors/connector-greenhouse/server.ts @@ -1,7 +1,7 @@ import {initGreenhouseSDK, type greenhouseTypes} from '@opensdks/sdk-greenhouse' import type {ConnectorServer} from '@openint/cdk' -import {Rx, rxjs, z} from '@openint/util' import {type greenhouseSchema} from './def' +import {EtlSource, NextPageCursor, observableFromEtlSource} from '../connector-common' export type GreenhouseSDK = ReturnType @@ -29,42 +29,6 @@ export const greenhouseServer = { export default greenhouseServer -// MARK: - New way of doing things - -interface EtlSource< - TEntityMap extends Record = Record, -> { - listEntities( - type: TK, - options: { - cursor?: string | null - page_size?: number - }, - ): Promise<{ - entities: Array<{ - id: string - /** `null` means deleted */ - data: TEntityMap[TK] | null - }> - next_cursor: string | null - has_next_page: boolean - }> -} - -interface CursorParser { - fromString: (cursor: string | undefined | null) => T - toString: (value: T) => string | null -} - -const NextPageCursor: CursorParser<{next_page: number}> = { - fromString(cursor) { - const num = z.coerce.number().positive().safeParse(cursor) - return {next_page: num.success ? num.data : 1} - }, - toString(value) { - return JSON.stringify(value) - }, -} // TODO: Implement incremental sync // https://developers.greenhouse.io/harvest.html#get-list-jobs @@ -103,43 +67,3 @@ function greenhouseSource({sdk}: {sdk: GreenhouseSDK}): EtlSource<{ } } -function observableFromEtlSource( - source: EtlSource, - streams: Record, - state: Record = {}, -) { - async function* iterateEntities() { - for (const streamName of Object.keys(streams)) { - const streamValue = streams[streamName] - if ( - !streamValue || - (streamValue as {disabled: boolean}).disabled === true - // Should further check weather streamName is valid for a given connector - ) { - continue - } - - const {cursor} = state[streamName] ?? {} - const {entities, next_cursor, has_next_page} = await source.listEntities( - streamName, - {cursor}, - ) - - yield entities.map((j) => ({ - type: 'data' as const, - // We should make the messages easier to construct - data: {entityName: streamName, id: j.id, entity: j.data}, - })) - - state[streamName] = {cursor: next_cursor} - if (!has_next_page) { - continue - } - } - } - // DO somethign with the new state... - - return rxjs - .from(iterateEntities()) - .pipe(Rx.mergeMap((ops) => rxjs.from([...ops, {type: 'commit' as const}]))) -} diff --git a/connectors/connector-lever/def.ts b/connectors/connector-lever/def.ts index 46dc3d1bb..ce2463674 100644 --- a/connectors/connector-lever/def.ts +++ b/connectors/connector-lever/def.ts @@ -1,12 +1,14 @@ import leverOas from '@opensdks/sdk-lever/lever.oas.json' import type {ConnectorDef, ConnectorSchemas, OpenApiSpec} from '@openint/cdk' import {connHelpers, oauthBaseSchema} from '@openint/cdk' -import {z} from '@openint/util' +import {R, z} from '@openint/util' export const zConfig = oauthBaseSchema.connectorConfig.extend({ envName: z.enum(['sandbox', 'production']), }) +const LEVER_ENTITY_NAMES = ['posting', 'opportunity', 'offer'] as const + /** * Full list of OAuth scopes: https://hire.lever.co/developer/documentation#scopes */ @@ -20,6 +22,10 @@ export const leverSchemas = { connectorConfig: zConfig, resourceSettings: zSettings, connectOutput: oauthBaseSchema.connectOutput, + sourceOutputEntities: R.mapToObj(LEVER_ENTITY_NAMES, (e) => [ + e, + z.unknown(), + ]), } satisfies ConnectorSchemas export const leverHelpers = connHelpers(leverSchemas) diff --git a/connectors/connector-lever/package.json b/connectors/connector-lever/package.json index 12709eda4..a13de0ef9 100644 --- a/connectors/connector-lever/package.json +++ b/connectors/connector-lever/package.json @@ -7,7 +7,7 @@ "@openint/cdk": "workspace:*", "@openint/util": "workspace:*", "@opensdks/runtime": "^0.0.19", - "@opensdks/sdk-lever": "0.0.9" + "@opensdks/sdk-lever": "0.0.10" }, "devDependencies": {} } diff --git a/connectors/connector-lever/server.ts b/connectors/connector-lever/server.ts index 774b24e18..4fa430c24 100644 --- a/connectors/connector-lever/server.ts +++ b/connectors/connector-lever/server.ts @@ -1,40 +1,96 @@ -import {initSDK} from '@opensdks/runtime' -import { - leverSdkDef, - type initLeverSDK, - type LeverSDK, - type leverTypes, -} from '@opensdks/sdk-lever' -import type {ConnectorServer} from '@openint/cdk' -import type {leverSchemas} from './def' - -export type LeverSDKType = ReturnType +import {initLeverSDK, type leverTypes} from '@opensdks/sdk-lever' +import {nangoProxyLink, type ConnectorServer} from '@openint/cdk' +import {type leverSchemas} from './def' +import {EtlSource, NextPageCursor, observableFromEtlSource} from '../connector-common' + +export type LeverSDK = ReturnType export type LeverTypes = leverTypes export type LeverObjectType = LeverTypes['components']['schemas'] export const leverServer = { - newInstance: ({config, settings}) => { - const lever = initSDK(leverSdkDef, { + newInstance: ({config, settings, fetchLinks}) => { + const lever = initLeverSDK({ headers: { authorization: `Bearer ${settings.oauth.credentials.access_token}`, }, envName: config.envName, + links: (defaultLinks) => [ + (req, next) => { + if (lever.clientOptions.baseUrl) { + req.headers.set( + nangoProxyLink.kBaseUrlOverride, + lever.clientOptions.baseUrl, + ) + } + return next(req) + }, + ...fetchLinks, + ...defaultLinks + ], }) return lever }, + sourceSync: ({instance: lever, streams, state}) => + observableFromEtlSource( + leverSource({sdk: lever}), + streams, + (state ?? {}) as {}, + ), +} satisfies ConnectorServer< + typeof leverSchemas, + ReturnType +> + +export default leverServer + +// TODO: Implement incremental sync +// TODO2: Implement low-code connector spec +function leverSource({sdk}: {sdk: LeverSDK}): EtlSource<{ + posting: LeverObjectType['posting'] + // contact: LeverObjectType['contact'] + opportunity: LeverObjectType['opportunity'] + offer: LeverObjectType['offer'] + // Add other entity types as needed +}> { + return { + // @ts-expect-error discuss with tony + async listEntities(type, {cursor}) { + const {next_page: page} = NextPageCursor.fromString(cursor) + + if (type === 'offer') { + const opportunitiesRes = await sdk.GET('/opportunities', { + params: {query: {limit: 50, offset: cursor ?? undefined}} + }) - async proxy(instance, req) { - return instance - .request(req.method as 'GET', req.url.replace(/.+\/api\/proxy/, ''), { - headers: req.headers, - ...(!['GET', 'OPTIONS', 'HEAD'].includes(req.method) && { - body: await req.blob(), // See if this works... We need to figure out how to do streaming here... - }), + const allOffers = [] + for (const opportunity of opportunitiesRes.data.data) { + const offersRes = await sdk.GET(`/opportunities/{id}/offers`, { + params: {path: {id: opportunity.id}} + }) + + allOffers.push(...offersRes.data.data.map((e) => ({id: `${e.id}`, data: e}))) + } + + return { + entities: allOffers, + next_cursor: NextPageCursor.toString({next_page: page + 1}), + has_next_page: opportunitiesRes.data.hasNext ?? false, + } + } + // for opportunity or posting + const pluralizeType = (type: string) => type === 'opportunity' ? 'opportunities' : `${type}s`; + + const res = await sdk.GET(`/${pluralizeType(type) as 'postings' | 'opportunities'}`, { + params: {query: {limit: 50, offset: cursor ?? undefined}} }) - .then((r) => r.response.clone()) - }, -} satisfies ConnectorServer -export default leverServer + return { + entities: res.data.data.map((e) => ({id: `${e.id}`, data: e})), + next_cursor: NextPageCursor.toString({next_page: page + 1}), + has_next_page: res.data.hasNext ?? false, + } + }, + } +} diff --git a/kits/cdk/base-links.ts b/kits/cdk/base-links.ts index d0d1e9c5a..68f004ef0 100644 --- a/kits/cdk/base-links.ts +++ b/kits/cdk/base-links.ts @@ -145,10 +145,23 @@ export function agColumnRenameLink(_ctx: { } const entityMappings = { + // Greenhouse job: 'IntegrationAtsJob', candidate: 'IntegrationAtsCandidate', opening: 'IntegrationAtsJobOpening', offer: 'IntegrationAtsOffer', + // Lever + // already mapped above, same relation name + // offer: 'IntegrationAtsOffer', + + // there's no list contacts endpoint in lever + // contact: 'IntegrationAtsCandidate', + + // maybe we should not sync opportunities at all as we're mapping it to the same candidate table + opportunity: 'IntegrationAtsCandidate', + + posting: 'IntegrationAtsJob', + // TODO, perhaps map requisitions to IntegrationAtsJobOpening? } op.data.entityName = entityMappings[snakeCase(op.data.entityName) as keyof typeof entityMappings] ?? op.data.entityName diff --git a/packages/engine-backend/services/sync-service.ts b/packages/engine-backend/services/sync-service.ts index 2d36bcc21..10c526689 100644 --- a/packages/engine-backend/services/sync-service.ts +++ b/packages/engine-backend/services/sync-service.ts @@ -428,7 +428,7 @@ export function makeSyncService({ // TODO: This should be happening async if (!resoUpdate.source$ && !resoUpdate.triggerDefaultSync) { console.log( - '[_syncResourceUpdate] Returning early skip syncing pipelines', + `[_syncResourceUpdate] Returning early skip syncing pipelines for resource id ${id} and source ${resoUpdate.source$} with triggerDefaultSync ${resoUpdate.triggerDefaultSync}`, ) return id } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 5a818178c..fefd430bb 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -899,8 +899,8 @@ importers: specifier: ^0.0.19 version: 0.0.19 '@opensdks/sdk-lever': - specifier: 0.0.9 - version: 0.0.9 + specifier: 0.0.10 + version: 0.0.10 connectors/connector-lunchmoney: dependencies: @@ -3663,8 +3663,8 @@ packages: '@opensdks/sdk-hubspot@0.0.6': resolution: {integrity: sha512-UivI44M/g4XwoQvDq57b9PVdlGeiCYS+6zcFJq5HhEJLzYejg21oSJ+R3LC49TINfde88fZvV+wx/ism62C65A==} - '@opensdks/sdk-lever@0.0.9': - resolution: {integrity: sha512-pBWxKrTYTUBlFrBogk9ifnmly8+jgSny6e0gIV56i+FvrX6yZ83/DKCDe9rFlpSFSM6qR1dIOM5A36rem1RnyQ==} + '@opensdks/sdk-lever@0.0.10': + resolution: {integrity: sha512-0TqPeVgoffPaDNLgeovvPH+Rq3W6L9Xw9rNKUJpuM/3HVKnmaX3KVGpepv33lURf29/N9O/P12yXDLB0iazWKA==} '@opensdks/sdk-merge@0.0.6': resolution: {integrity: sha512-OJO+rJFpjh0ctqFH7Fq88sepjoVlGseN82MujNZfb7IPpoUIlAdM9dfj0kORfDgt4yFsqnz7TxhgQtNUvDemeg==} @@ -15647,7 +15647,7 @@ snapshots: '@opensdks/sdk-hubspot@0.0.6': {} - '@opensdks/sdk-lever@0.0.9': + '@opensdks/sdk-lever@0.0.10': dependencies: '@opensdks/runtime': 0.0.20 @@ -20381,7 +20381,7 @@ snapshots: transitivePeerDependencies: - supports-color - eslint-module-utils@2.8.0(@typescript-eslint/parser@6.21.0(eslint@8.23.0)(typescript@5.6.2))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.5.3)(eslint@8.23.0): + eslint-module-utils@2.8.0(@typescript-eslint/parser@6.21.0(eslint@8.23.0)(typescript@5.6.2))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.5.3(eslint-plugin-import@2.29.0)(eslint@8.23.0))(eslint@8.23.0): dependencies: debug: 3.2.7 optionalDependencies: @@ -20426,7 +20426,7 @@ snapshots: doctrine: 2.1.0 eslint: 8.23.0 eslint-import-resolver-node: 0.3.9 - eslint-module-utils: 2.8.0(@typescript-eslint/parser@6.21.0(eslint@8.23.0)(typescript@5.6.2))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.5.3)(eslint@8.23.0) + eslint-module-utils: 2.8.0(@typescript-eslint/parser@6.21.0(eslint@8.23.0)(typescript@5.6.2))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.5.3(eslint-plugin-import@2.29.0)(eslint@8.23.0))(eslint@8.23.0) hasown: 2.0.0 is-core-module: 2.13.1 is-glob: 4.0.3 diff --git a/unified/unified-ats/adapters/lever-adapter/index.ts b/unified/unified-ats/adapters/lever-adapter/index.ts index ae54de38d..d21d7c501 100644 --- a/unified/unified-ats/adapters/lever-adapter/index.ts +++ b/unified/unified-ats/adapters/lever-adapter/index.ts @@ -1,4 +1,4 @@ -import {type LeverSDKType} from '@openint/connector-lever' +import {type LeverSDK} from '@openint/connector-lever' import {applyMapper} from '@openint/vdk' import type {ATSAdapter} from '../../router' import {mappers} from './mappers' @@ -90,4 +90,4 @@ export const leverAdapter = { } return resp }, -} satisfies ATSAdapter +} satisfies ATSAdapter