Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

Lever entities sync #53

Merged
merged 9 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions connectors/connector-common.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@

// MARK: - New way of doing things

import {z, rxjs, Rx} from "@openint/util"

export interface EtlSource<
TEntityMap extends Record<string, unknown> = Record<string, unknown>,
> {
listEntities<TK extends keyof TEntityMap>(
type: TK,
options: {
cursor?: string | null
page_size?: number
},
): Promise<{
entities: Array<{
id: string
/** `null` means deleted */
data: TEntityMap[TK] | null
}>
next_cursor: string | null
has_next_page: boolean
}>
}

interface CursorParser<T> {
fromString: (cursor: string | undefined | null) => T
toString: (value: T) => string | null
}

export const NextPageCursor: CursorParser<{next_page: number}> = {
fromString(cursor) {
const num = z.coerce.number().positive().safeParse(cursor)
return {next_page: num.success ? num.data : 1}
},
toString(value) {
return JSON.stringify(value)
},
}
export function observableFromEtlSource(
source: EtlSource,
streams: Record<string, boolean | {disabled?: boolean | undefined} | null>,
state: Record<string, {cursor?: string | null}> = {},
) {
async function* iterateEntities() {
for (const streamName of Object.keys(streams)) {
const streamValue = streams[streamName]
if (
!streamValue ||
(streamValue as {disabled: boolean}).disabled === true
// Should further check weather streamName is valid for a given connector
) {
continue
}

const {cursor} = state[streamName] ?? {}
const {entities, next_cursor, has_next_page} = await source.listEntities(
streamName,
{cursor},
)

yield entities.map((j) => ({
type: 'data' as const,
// We should make the messages easier to construct
data: {entityName: streamName, id: j.id, entity: j.data},
}))

state[streamName] = {cursor: next_cursor}
if (!has_next_page) {
continue
}
}
}
// DO somethign with the new state...

return rxjs
.from(iterateEntities())
.pipe(Rx.mergeMap((ops) => rxjs.from([...ops, {type: 'commit' as const}])))
}
78 changes: 1 addition & 77 deletions connectors/connector-greenhouse/server.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {initGreenhouseSDK, type greenhouseTypes} from '@opensdks/sdk-greenhouse'
import type {ConnectorServer} from '@openint/cdk'
import {Rx, rxjs, z} from '@openint/util'
import {type greenhouseSchema} from './def'
import {EtlSource, NextPageCursor, observableFromEtlSource} from '../connector-common'

export type GreenhouseSDK = ReturnType<typeof initGreenhouseSDK>

Expand Down Expand Up @@ -29,42 +29,6 @@ export const greenhouseServer = {

export default greenhouseServer

// MARK: - New way of doing things

interface EtlSource<
TEntityMap extends Record<string, unknown> = Record<string, unknown>,
> {
listEntities<TK extends keyof TEntityMap>(
type: TK,
options: {
cursor?: string | null
page_size?: number
},
): Promise<{
entities: Array<{
id: string
/** `null` means deleted */
data: TEntityMap[TK] | null
}>
next_cursor: string | null
has_next_page: boolean
}>
}

interface CursorParser<T> {
fromString: (cursor: string | undefined | null) => T
toString: (value: T) => string | null
}

const NextPageCursor: CursorParser<{next_page: number}> = {
fromString(cursor) {
const num = z.coerce.number().positive().safeParse(cursor)
return {next_page: num.success ? num.data : 1}
},
toString(value) {
return JSON.stringify(value)
},
}

// TODO: Implement incremental sync
// https://developers.greenhouse.io/harvest.html#get-list-jobs
Expand Down Expand Up @@ -103,43 +67,3 @@ function greenhouseSource({sdk}: {sdk: GreenhouseSDK}): EtlSource<{
}
}

function observableFromEtlSource(
source: EtlSource,
streams: Record<string, boolean | {disabled?: boolean | undefined} | null>,
state: Record<string, {cursor?: string | null}> = {},
) {
async function* iterateEntities() {
for (const streamName of Object.keys(streams)) {
const streamValue = streams[streamName]
if (
!streamValue ||
(streamValue as {disabled: boolean}).disabled === true
// Should further check weather streamName is valid for a given connector
) {
continue
}

const {cursor} = state[streamName] ?? {}
const {entities, next_cursor, has_next_page} = await source.listEntities(
streamName,
{cursor},
)

yield entities.map((j) => ({
type: 'data' as const,
// We should make the messages easier to construct
data: {entityName: streamName, id: j.id, entity: j.data},
}))

state[streamName] = {cursor: next_cursor}
if (!has_next_page) {
continue
}
}
}
// DO somethign with the new state...

return rxjs
.from(iterateEntities())
.pipe(Rx.mergeMap((ops) => rxjs.from([...ops, {type: 'commit' as const}])))
}
8 changes: 7 additions & 1 deletion connectors/connector-lever/def.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import leverOas from '@opensdks/sdk-lever/lever.oas.json'
import type {ConnectorDef, ConnectorSchemas, OpenApiSpec} from '@openint/cdk'
import {connHelpers, oauthBaseSchema} from '@openint/cdk'
import {z} from '@openint/util'
import {R, z} from '@openint/util'

export const zConfig = oauthBaseSchema.connectorConfig.extend({
envName: z.enum(['sandbox', 'production']),
})

const LEVER_ENTITY_NAMES = ['posting', 'opportunity', 'offer'] as const

/**
* Full list of OAuth scopes: https://hire.lever.co/developer/documentation#scopes
*/
Expand All @@ -20,6 +22,10 @@ export const leverSchemas = {
connectorConfig: zConfig,
resourceSettings: zSettings,
connectOutput: oauthBaseSchema.connectOutput,
sourceOutputEntities: R.mapToObj(LEVER_ENTITY_NAMES, (e) => [
e,
z.unknown(),
]),
} satisfies ConnectorSchemas

export const leverHelpers = connHelpers(leverSchemas)
Expand Down
2 changes: 1 addition & 1 deletion connectors/connector-lever/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"@openint/cdk": "workspace:*",
"@openint/util": "workspace:*",
"@opensdks/runtime": "^0.0.19",
"@opensdks/sdk-lever": "0.0.9"
"@opensdks/sdk-lever": "0.0.10"
},
"devDependencies": {}
}
104 changes: 80 additions & 24 deletions connectors/connector-lever/server.ts
Original file line number Diff line number Diff line change
@@ -1,40 +1,96 @@
import {initSDK} from '@opensdks/runtime'
import {
leverSdkDef,
type initLeverSDK,
type LeverSDK,
type leverTypes,
} from '@opensdks/sdk-lever'
import type {ConnectorServer} from '@openint/cdk'
import type {leverSchemas} from './def'

export type LeverSDKType = ReturnType<typeof initLeverSDK>
import {initLeverSDK, type leverTypes} from '@opensdks/sdk-lever'
import {nangoProxyLink, type ConnectorServer} from '@openint/cdk'
import {type leverSchemas} from './def'
import {EtlSource, NextPageCursor, observableFromEtlSource} from '../connector-common'

export type LeverSDK = ReturnType<typeof initLeverSDK>

export type LeverTypes = leverTypes

export type LeverObjectType = LeverTypes['components']['schemas']

export const leverServer = {
newInstance: ({config, settings}) => {
const lever = initSDK(leverSdkDef, {
newInstance: ({config, settings, fetchLinks}) => {
const lever = initLeverSDK({
headers: {
authorization: `Bearer ${settings.oauth.credentials.access_token}`,
},
envName: config.envName,
links: (defaultLinks) => [
(req, next) => {
if (lever.clientOptions.baseUrl) {
req.headers.set(
nangoProxyLink.kBaseUrlOverride,
lever.clientOptions.baseUrl,
)
}
return next(req)
},
...fetchLinks,
...defaultLinks
],
})
return lever
},
sourceSync: ({instance: lever, streams, state}) =>
observableFromEtlSource(
leverSource({sdk: lever}),
streams,
(state ?? {}) as {},
),
} satisfies ConnectorServer<
typeof leverSchemas,
ReturnType<typeof initLeverSDK>
>

export default leverServer

// TODO: Implement incremental sync
// TODO2: Implement low-code connector spec
function leverSource({sdk}: {sdk: LeverSDK}): EtlSource<{
posting: LeverObjectType['posting']
// contact: LeverObjectType['contact']
opportunity: LeverObjectType['opportunity']
offer: LeverObjectType['offer']
// Add other entity types as needed
}> {
return {
// @ts-expect-error discuss with tony
async listEntities(type, {cursor}) {
const {next_page: page} = NextPageCursor.fromString(cursor)

if (type === 'offer') {
const opportunitiesRes = await sdk.GET('/opportunities', {
params: {query: {limit: 50, offset: cursor ?? undefined}}
})

async proxy(instance, req) {
return instance
.request(req.method as 'GET', req.url.replace(/.+\/api\/proxy/, ''), {
headers: req.headers,
...(!['GET', 'OPTIONS', 'HEAD'].includes(req.method) && {
body: await req.blob(), // See if this works... We need to figure out how to do streaming here...
}),
const allOffers = []
for (const opportunity of opportunitiesRes.data.data) {
const offersRes = await sdk.GET(`/opportunities/{id}/offers`, {
params: {path: {id: opportunity.id}}
})

allOffers.push(...offersRes.data.data.map((e) => ({id: `${e.id}`, data: e})))
}

return {
entities: allOffers,
next_cursor: NextPageCursor.toString({next_page: page + 1}),
has_next_page: opportunitiesRes.data.hasNext ?? false,
}
}
// for opportunity or posting
const pluralizeType = (type: string) => type === 'opportunity' ? 'opportunities' : `${type}s`;

const res = await sdk.GET(`/${pluralizeType(type) as 'postings' | 'opportunities'}`, {
params: {query: {limit: 50, offset: cursor ?? undefined}}
})
.then((r) => r.response.clone())
},
} satisfies ConnectorServer<typeof leverSchemas, LeverSDK>

export default leverServer
return {
entities: res.data.data.map((e) => ({id: `${e.id}`, data: e})),
next_cursor: NextPageCursor.toString({next_page: page + 1}),
has_next_page: res.data.hasNext ?? false,
}
},
}
}
13 changes: 13 additions & 0 deletions kits/cdk/base-links.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,23 @@ export function agColumnRenameLink(_ctx: {
}

const entityMappings = {
// Greenhouse
job: 'IntegrationAtsJob',
candidate: 'IntegrationAtsCandidate',
opening: 'IntegrationAtsJobOpening',
offer: 'IntegrationAtsOffer',
// Lever
// already mapped above, same relation name
// offer: 'IntegrationAtsOffer',

// there's no list contacts endpoint in lever
// contact: 'IntegrationAtsCandidate',

// maybe we should not sync opportunities at all as we're mapping it to the same candidate table
opportunity: 'IntegrationAtsCandidate',

posting: 'IntegrationAtsJob',
// TODO, perhaps map requisitions to IntegrationAtsJobOpening?
}

op.data.entityName = entityMappings[snakeCase(op.data.entityName) as keyof typeof entityMappings] ?? op.data.entityName
Expand Down
2 changes: 1 addition & 1 deletion packages/engine-backend/services/sync-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ export function makeSyncService({
// TODO: This should be happening async
if (!resoUpdate.source$ && !resoUpdate.triggerDefaultSync) {
console.log(
'[_syncResourceUpdate] Returning early skip syncing pipelines',
`[_syncResourceUpdate] Returning early skip syncing pipelines for resource id ${id} and source ${resoUpdate.source$} with triggerDefaultSync ${resoUpdate.triggerDefaultSync}`,
)
return id
}
Expand Down
Loading
Loading