Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

Commit

Permalink
Lever entities sync (#53)
Browse files Browse the repository at this point in the history
* moving etl sync iterator to connector-common

* adding posting entity to sync for lever

* fixing logging

* fixing lever sdk

* adding token refresh to lever

* updating lever-sdk

* adding lever entities to sync

* fixing lints
  • Loading branch information
pellicceama authored Oct 23, 2024
1 parent fb980f6 commit 24ed4b5
Show file tree
Hide file tree
Showing 9 changed files with 191 additions and 113 deletions.
79 changes: 79 additions & 0 deletions connectors/connector-common.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@

// MARK: - New way of doing things

import {z, rxjs, Rx} from "@openint/util"

export interface EtlSource<
TEntityMap extends Record<string, unknown> = Record<string, unknown>,
> {
listEntities<TK extends keyof TEntityMap>(
type: TK,
options: {
cursor?: string | null
page_size?: number
},
): Promise<{
entities: Array<{
id: string
/** `null` means deleted */
data: TEntityMap[TK] | null
}>
next_cursor: string | null
has_next_page: boolean
}>
}

interface CursorParser<T> {
fromString: (cursor: string | undefined | null) => T
toString: (value: T) => string | null
}

export const NextPageCursor: CursorParser<{next_page: number}> = {
fromString(cursor) {
const num = z.coerce.number().positive().safeParse(cursor)
return {next_page: num.success ? num.data : 1}
},
toString(value) {
return JSON.stringify(value)
},
}
export function observableFromEtlSource(
source: EtlSource,
streams: Record<string, boolean | {disabled?: boolean | undefined} | null>,
state: Record<string, {cursor?: string | null}> = {},
) {
async function* iterateEntities() {
for (const streamName of Object.keys(streams)) {
const streamValue = streams[streamName]
if (
!streamValue ||
(streamValue as {disabled: boolean}).disabled === true
// Should further check weather streamName is valid for a given connector
) {
continue
}

const {cursor} = state[streamName] ?? {}
const {entities, next_cursor, has_next_page} = await source.listEntities(
streamName,
{cursor},
)

yield entities.map((j) => ({
type: 'data' as const,
// We should make the messages easier to construct
data: {entityName: streamName, id: j.id, entity: j.data},
}))

state[streamName] = {cursor: next_cursor}
if (!has_next_page) {
continue
}
}
}
// DO somethign with the new state...

return rxjs
.from(iterateEntities())
.pipe(Rx.mergeMap((ops) => rxjs.from([...ops, {type: 'commit' as const}])))
}
78 changes: 1 addition & 77 deletions connectors/connector-greenhouse/server.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {initGreenhouseSDK, type greenhouseTypes} from '@opensdks/sdk-greenhouse'
import type {ConnectorServer} from '@openint/cdk'
import {Rx, rxjs, z} from '@openint/util'
import {type greenhouseSchema} from './def'
import {EtlSource, NextPageCursor, observableFromEtlSource} from '../connector-common'

export type GreenhouseSDK = ReturnType<typeof initGreenhouseSDK>

Expand Down Expand Up @@ -29,42 +29,6 @@ export const greenhouseServer = {

export default greenhouseServer

// MARK: - New way of doing things

interface EtlSource<
TEntityMap extends Record<string, unknown> = Record<string, unknown>,
> {
listEntities<TK extends keyof TEntityMap>(
type: TK,
options: {
cursor?: string | null
page_size?: number
},
): Promise<{
entities: Array<{
id: string
/** `null` means deleted */
data: TEntityMap[TK] | null
}>
next_cursor: string | null
has_next_page: boolean
}>
}

interface CursorParser<T> {
fromString: (cursor: string | undefined | null) => T
toString: (value: T) => string | null
}

const NextPageCursor: CursorParser<{next_page: number}> = {
fromString(cursor) {
const num = z.coerce.number().positive().safeParse(cursor)
return {next_page: num.success ? num.data : 1}
},
toString(value) {
return JSON.stringify(value)
},
}

// TODO: Implement incremental sync
// https://developers.greenhouse.io/harvest.html#get-list-jobs
Expand Down Expand Up @@ -103,43 +67,3 @@ function greenhouseSource({sdk}: {sdk: GreenhouseSDK}): EtlSource<{
}
}

function observableFromEtlSource(
source: EtlSource,
streams: Record<string, boolean | {disabled?: boolean | undefined} | null>,
state: Record<string, {cursor?: string | null}> = {},
) {
async function* iterateEntities() {
for (const streamName of Object.keys(streams)) {
const streamValue = streams[streamName]
if (
!streamValue ||
(streamValue as {disabled: boolean}).disabled === true
// Should further check weather streamName is valid for a given connector
) {
continue
}

const {cursor} = state[streamName] ?? {}
const {entities, next_cursor, has_next_page} = await source.listEntities(
streamName,
{cursor},
)

yield entities.map((j) => ({
type: 'data' as const,
// We should make the messages easier to construct
data: {entityName: streamName, id: j.id, entity: j.data},
}))

state[streamName] = {cursor: next_cursor}
if (!has_next_page) {
continue
}
}
}
// DO somethign with the new state...

return rxjs
.from(iterateEntities())
.pipe(Rx.mergeMap((ops) => rxjs.from([...ops, {type: 'commit' as const}])))
}
8 changes: 7 additions & 1 deletion connectors/connector-lever/def.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import leverOas from '@opensdks/sdk-lever/lever.oas.json'
import type {ConnectorDef, ConnectorSchemas, OpenApiSpec} from '@openint/cdk'
import {connHelpers, oauthBaseSchema} from '@openint/cdk'
import {z} from '@openint/util'
import {R, z} from '@openint/util'

export const zConfig = oauthBaseSchema.connectorConfig.extend({
envName: z.enum(['sandbox', 'production']),
})

const LEVER_ENTITY_NAMES = ['posting', 'opportunity', 'offer'] as const

/**
* Full list of OAuth scopes: https://hire.lever.co/developer/documentation#scopes
*/
Expand All @@ -20,6 +22,10 @@ export const leverSchemas = {
connectorConfig: zConfig,
resourceSettings: zSettings,
connectOutput: oauthBaseSchema.connectOutput,
sourceOutputEntities: R.mapToObj(LEVER_ENTITY_NAMES, (e) => [
e,
z.unknown(),
]),
} satisfies ConnectorSchemas

export const leverHelpers = connHelpers(leverSchemas)
Expand Down
2 changes: 1 addition & 1 deletion connectors/connector-lever/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"@openint/cdk": "workspace:*",
"@openint/util": "workspace:*",
"@opensdks/runtime": "^0.0.19",
"@opensdks/sdk-lever": "0.0.9"
"@opensdks/sdk-lever": "0.0.10"
},
"devDependencies": {}
}
104 changes: 80 additions & 24 deletions connectors/connector-lever/server.ts
Original file line number Diff line number Diff line change
@@ -1,40 +1,96 @@
import {initSDK} from '@opensdks/runtime'
import {
leverSdkDef,
type initLeverSDK,
type LeverSDK,
type leverTypes,
} from '@opensdks/sdk-lever'
import type {ConnectorServer} from '@openint/cdk'
import type {leverSchemas} from './def'

export type LeverSDKType = ReturnType<typeof initLeverSDK>
import {initLeverSDK, type leverTypes} from '@opensdks/sdk-lever'
import {nangoProxyLink, type ConnectorServer} from '@openint/cdk'
import {type leverSchemas} from './def'
import {EtlSource, NextPageCursor, observableFromEtlSource} from '../connector-common'

export type LeverSDK = ReturnType<typeof initLeverSDK>

export type LeverTypes = leverTypes

export type LeverObjectType = LeverTypes['components']['schemas']

export const leverServer = {
newInstance: ({config, settings}) => {
const lever = initSDK(leverSdkDef, {
newInstance: ({config, settings, fetchLinks}) => {
const lever = initLeverSDK({
headers: {
authorization: `Bearer ${settings.oauth.credentials.access_token}`,
},
envName: config.envName,
links: (defaultLinks) => [
(req, next) => {
if (lever.clientOptions.baseUrl) {
req.headers.set(
nangoProxyLink.kBaseUrlOverride,
lever.clientOptions.baseUrl,
)
}
return next(req)
},
...fetchLinks,
...defaultLinks
],
})
return lever
},
sourceSync: ({instance: lever, streams, state}) =>
observableFromEtlSource(
leverSource({sdk: lever}),
streams,
(state ?? {}) as {},
),
} satisfies ConnectorServer<
typeof leverSchemas,
ReturnType<typeof initLeverSDK>
>

export default leverServer

// TODO: Implement incremental sync
// TODO2: Implement low-code connector spec
function leverSource({sdk}: {sdk: LeverSDK}): EtlSource<{
posting: LeverObjectType['posting']
// contact: LeverObjectType['contact']
opportunity: LeverObjectType['opportunity']
offer: LeverObjectType['offer']
// Add other entity types as needed
}> {
return {
// @ts-expect-error discuss with tony
async listEntities(type, {cursor}) {
const {next_page: page} = NextPageCursor.fromString(cursor)

if (type === 'offer') {
const opportunitiesRes = await sdk.GET('/opportunities', {
params: {query: {limit: 50, offset: cursor ?? undefined}}
})

async proxy(instance, req) {
return instance
.request(req.method as 'GET', req.url.replace(/.+\/api\/proxy/, ''), {
headers: req.headers,
...(!['GET', 'OPTIONS', 'HEAD'].includes(req.method) && {
body: await req.blob(), // See if this works... We need to figure out how to do streaming here...
}),
const allOffers = []
for (const opportunity of opportunitiesRes.data.data) {
const offersRes = await sdk.GET(`/opportunities/{id}/offers`, {
params: {path: {id: opportunity.id}}
})

allOffers.push(...offersRes.data.data.map((e) => ({id: `${e.id}`, data: e})))
}

return {
entities: allOffers,
next_cursor: NextPageCursor.toString({next_page: page + 1}),
has_next_page: opportunitiesRes.data.hasNext ?? false,
}
}
// for opportunity or posting
const pluralizeType = (type: string) => type === 'opportunity' ? 'opportunities' : `${type}s`;

const res = await sdk.GET(`/${pluralizeType(type) as 'postings' | 'opportunities'}`, {
params: {query: {limit: 50, offset: cursor ?? undefined}}
})
.then((r) => r.response.clone())
},
} satisfies ConnectorServer<typeof leverSchemas, LeverSDK>

export default leverServer
return {
entities: res.data.data.map((e) => ({id: `${e.id}`, data: e})),
next_cursor: NextPageCursor.toString({next_page: page + 1}),
has_next_page: res.data.hasNext ?? false,
}
},
}
}
13 changes: 13 additions & 0 deletions kits/cdk/base-links.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,23 @@ export function agColumnRenameLink(_ctx: {
}

const entityMappings = {
// Greenhouse
job: 'IntegrationAtsJob',
candidate: 'IntegrationAtsCandidate',
opening: 'IntegrationAtsJobOpening',
offer: 'IntegrationAtsOffer',
// Lever
// already mapped above, same relation name
// offer: 'IntegrationAtsOffer',

// there's no list contacts endpoint in lever
// contact: 'IntegrationAtsCandidate',

// maybe we should not sync opportunities at all as we're mapping it to the same candidate table
opportunity: 'IntegrationAtsCandidate',

posting: 'IntegrationAtsJob',
// TODO, perhaps map requisitions to IntegrationAtsJobOpening?
}

op.data.entityName = entityMappings[snakeCase(op.data.entityName) as keyof typeof entityMappings] ?? op.data.entityName
Expand Down
2 changes: 1 addition & 1 deletion packages/engine-backend/services/sync-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ export function makeSyncService({
// TODO: This should be happening async
if (!resoUpdate.source$ && !resoUpdate.triggerDefaultSync) {
console.log(
'[_syncResourceUpdate] Returning early skip syncing pipelines',
`[_syncResourceUpdate] Returning early skip syncing pipelines for resource id ${id} and source ${resoUpdate.source$} with triggerDefaultSync ${resoUpdate.triggerDefaultSync}`,
)
return id
}
Expand Down
Loading

0 comments on commit 24ed4b5

Please sign in to comment.