Skip to content

Commit

Permalink
improved si-generate-downloads support, Cook status/job responses, an…
Browse files Browse the repository at this point in the history
…d error handling (#567)

DPO3DPKRT-774/streams not reporting generated errors (#563)
(new) streams always log any errors now
(new) failed stream operations now handle errors and/or return the error

DPO3DPKRT-761/improved cook responses for generate downloads (#565)
(new) verify streams are valid before writing files to  disk
(fix) more flexibility in handling returned status codes from Cook

DPO3DPKRT-701/generate downloads support for scenes (#566)
(fix) restoring generate-downloads functionality with support for returned svx scenes and draco/usdz models
(fix) modified tags assigned to 'Download:<downloadType>' to allow for capturing a model's use and it's type.
  • Loading branch information
EMaslowskiQ authored Jan 23, 2024
1 parent 627d8d2 commit b05e532
Show file tree
Hide file tree
Showing 15 changed files with 978 additions and 212 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -123,5 +123,5 @@ server/config/solr/data/packratMeta/data
### Per-developer settings ###
JTConfig

### Init Scripts ###
### Environment Setup Scripts ###
*.bat
19 changes: 14 additions & 5 deletions server/collections/impl/PublishScene.ts
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,13 @@ export class PublishScene {
return null;
}

// LOG.info(`>>> computeDownloadMSXMap (${idScene}): ${H.Helpers.JSONStringify(MSXs)}`,LOG.LS.eDEBUG);
const DownloadMSXMap: Map<number, DBAPI.ModelSceneXref> = new Map<number, DBAPI.ModelSceneXref>();
for (const MSX of MSXs) {
if (MSX.Usage && MSX.Usage.startsWith('Download')) {
// HACK: Packrat is misusing the Usage property returned by Cook for Voyager scene generation. Some
// assets like draco and USDZ downloads are used by the viewer & as a download. temporarily adding
// their Usage types until a file's 'downloadable' property is detached from 'Usage'. (#DPO3DPKRT-777)
if (MSX.Usage && (MSX.Usage.startsWith('Download:') || MSX.Usage.startsWith('App3D') || MSX.Usage.startsWith('iOSApp3D'))) {
const SOI: DBAPI.SystemObjectInfo | undefined = await CACHE.SystemObjectCache.getSystemFromObjectID({ eObjectType: COMMON.eSystemObjectType.eModel, idObject: MSX.idModel });
if (SOI)
DownloadMSXMap.set(SOI.idSystemObject, MSX);
Expand All @@ -303,14 +307,14 @@ export class PublishScene {
return PublishScene.sendResult(true);

if (newDownloadState) {
LOG.info(`PublishScene.handleSceneUpdates generating downloads for scene ${idScene} (skipping)`, LOG.LS.eGQL);
LOG.info(`PublishScene.handleSceneUpdates generating downloads for scene ${idScene}`, LOG.LS.eGQL);
// Generate downloads
const workflowEngine: WF.IWorkflowEngine | null = await WF.WorkflowFactory.getInstance();
if (!workflowEngine)
return PublishScene.sendResult(false, `Unable to fetch workflow engine for download generation for scene ${idScene}`);

// HACK: temporarily skip generate downloads while development on that wraps up
// workflowEngine.generateSceneDownloads(idScene, { idUserInitiator: idUser }); // don't await
// trigger the workflow/recipe
workflowEngine.generateSceneDownloads(idScene, { idUserInitiator: _idUser }); // don't await
return { success: true, downloadsGenerated: true, downloadsRemoved: false };
} else { // Remove downloads
LOG.info(`PublishScene.handleSceneUpdates removing downloads for scene ${idScene}`, LOG.LS.eGQL);
Expand Down Expand Up @@ -343,6 +347,7 @@ export class PublishScene {
}

private async collectAssets(ePublishedStateIntended?: COMMON.ePublishedState): Promise<boolean> {
// LOG.info(`>>> collectAssets.DownloadMSXMap: ${H.Helpers.JSONStringify(this.DownloadMSXMap)}`,LOG.LS.eDEBUG);
if (!this.DownloadMSXMap)
return false;
this.assetVersions = await DBAPI.AssetVersion.fetchLatestFromSystemObject(this.idSystemObject);
Expand Down Expand Up @@ -416,6 +421,8 @@ export class PublishScene {
}
}
}

// LOG.info(`>>> collectAssets.SAC: ${H.Helpers.JSONStringify(this.SacList)}`,LOG.LS.eDEBUG);
return true;
}

Expand Down Expand Up @@ -487,6 +494,7 @@ export class PublishScene {
this.resourcesHotFolder = path.join(Config.collection.edan.resourcesHotFolder, this.scene.EdanUUID!); // eslint-disable-line @typescript-eslint/no-non-null-assertion

for (const SAC of this.SacList.values()) {
// LOG.info(`>>> stageDownloads.SAC: ${H.Helpers.JSONStringify(SAC)}`,LOG.LS.eDEBUG);
if (!SAC.model && !SAC.metadataSet) // SAC is not a attachment, skip it
continue;

Expand Down Expand Up @@ -692,7 +700,8 @@ export class PublishScene {
case '.usdz': FILE_TYPE = 'usdz'; break;
}

switch (SAC.modelSceneXref.Usage?.replace('Download ', '').toLowerCase()) {
// handle download types
switch (SAC.modelSceneXref.Usage?.replace('Download:', '').toLowerCase()) {
case undefined:
case 'webassetglblowuncompressed': category = 'Low resolution'; MODEL_FILE_TYPE = 'glb'; break;
case 'webassetglbarcompressed': category = 'Low resolution'; MODEL_FILE_TYPE = 'glb'; DRACO_COMPRESSED = true; break;
Expand Down
4 changes: 4 additions & 0 deletions server/db/api/AssetVersion.ts
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,10 @@ export class AssetVersion extends DBC.DBObject<AssetVersionBase> implements Asse
}

/** Pass in a value for Retired if you need to seek only asset versions that have or have not been retired */
/* Ingested:
null = uploaded, not processed (after bytes transfered)
false = uploaded, processed (after transferred and inspected)
true = uploaded, processed, ingested (once ingested in the system) */
static async fetchFromUserByIngested(idUserCreator: number, Ingested: boolean | null, Retired: boolean | null = null): Promise<AssetVersion[] | null> {
if (!idUserCreator)
return null;
Expand Down
1 change: 1 addition & 0 deletions server/db/api/ModelSceneXref.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ export class ModelSceneXref extends DBC.DBObject<ModelSceneXrefBase> implements
}

public computeModelAutomationTag(): string {
// LOG.info(`>>> computeModelAutomationTag for ${this.Name} (${this.Usage}|${this.Quality}|${this.UVResolution})`,LOG.LS.eDEBUG);
return `scene-${this.Usage}-${this.Quality}-${this.UVResolution}`;
}

Expand Down
1 change: 1 addition & 0 deletions server/db/api/Scene.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ export class Scene extends DBC.DBObject<SceneBase> implements SceneBase, SystemO

/** fetches scenes which are children of the specified idModelParent */
static async fetchChildrenScenes(idModelParent: number): Promise<Scene[] | null> {
// LOG.info(`>>>> idModelParent: ${idModelParent}`,LOG.LS.eDEBUG);
if (!idModelParent)
return null;
try {
Expand Down
107 changes: 82 additions & 25 deletions server/job/impl/Cook/JobCook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ export abstract class JobCook<T> extends JobPackrat {
}

async signalCompletion(): Promise<void> {
// when the Cook job has compeleted and returned
this._complete = true;
for (const mutex of this._completionMutexes)
mutex.cancel();
Expand Down Expand Up @@ -265,19 +266,28 @@ export abstract class JobCook<T> extends JobPackrat {
let requestUrl: string = this.CookServerURL() + 'job';
const jobCookPostBody: JobCookPostBody<T> = new JobCookPostBody<T>(this._configuration, await this.getParameters(), eJobCookPriority.eNormal);

// submit the Cook job via an axios request, and retry for 'CookRequestRetryCount' times.
// todo: there's a condition here leading to Cook timeout and repeated attempts even on failure
while (true) {
try {
LOG.info(`JobCook [${this.name()}] creating job: ${requestUrl} body ${JSON.stringify(jobCookPostBody, H.Helpers.saferStringify)}`, LOG.LS.eJOB);
const axiosResponse: AxiosResponse<any> | null = await axios.post(encodeURI(requestUrl), jobCookPostBody);

if (axiosResponse?.status === 201)
if (axiosResponse?.status === 201) {
LOG.info(`JobCook [${this.name()}] creating job: ${requestUrl} successful post response (${axiosResponse.status}:${axiosResponse.statusText} - ${axiosResponse.data}`,LOG.LS.eJOB);
break; // success, continue
else {
res.error = `JobCook [${this.name()}] creating job: ${requestUrl} unexpected response ${axiosResponse?.status}`;
LOG.info(res.error, LOG.LS.eJOB);
} else {
res.error = `JobCook [${this.name()}] creating job: ${requestUrl} unexpected response (${axiosResponse?.status}:${axiosResponse?.statusText})`;
LOG.error(res.error, LOG.LS.eJOB);
}
} catch (err) {
res = this.handleRequestException(err, requestUrl, 'post', jobCookPostBody);

// log error at point
res.error = `JobCook [${this.name()}] creating job: ${requestUrl} failed (${err})`;
LOG.error(res.error, LOG.LS.eJOB);

// if we can't retry, return
if (res.allowRetry === false)
return res;
}
Expand All @@ -290,24 +300,32 @@ export abstract class JobCook<T> extends JobPackrat {
await H.Helpers.sleep(CookRetryDelay);
}

// stage files
// wait for all files to be staged
res = await this.stageFiles();
if (!res.success)
if (!res.success) {
LOG.error(`JobCook [${this.name()}] failed to stage files (${res.error})`,LOG.LS.eJOB);
return res;
}

// Initiate job via PATCH to /clients/<CLIENTID>/jobs/<JOBID>/run
requestCount = 0;
res = { success: false, allowRetry: true };
requestUrl = this.CookServerURL() + `clients/${this._configuration.clientId}/jobs/${this._configuration.jobId}/run`;
while (true) {
try {
LOG.info(`JobCook [${this.name()}] running job: ${requestUrl}`, LOG.LS.eJOB);
LOG.info(`JobCook [${this.name()}] starting job: ${requestUrl}`, LOG.LS.eJOB);
const axiosResponse = await axios.patch(encodeURI(requestUrl));
if (axiosResponse.status === 202)
if (axiosResponse.status >= 200 && axiosResponse.status <= 299) {
LOG.info(`JobCook [${this.name()}] starting job: ${requestUrl} successful response (${axiosResponse.status}:${axiosResponse.statusText})`,LOG.LS.eJOB);
break; // success, continue
res = { success: false, error: `JobCook [${this.name()}] patch ${requestUrl} failed: ${JSON.stringify(axiosResponse)}` };
}

// if we failed, report out
res = { success: false, error: `JobCook [${this.name()}] starting job: ${requestUrl} failed (${axiosResponse.status}:${axiosResponse.statusText})` };
LOG.error(res.error, LOG.LS.eJOB);
} catch (err) {
res = this.handleRequestException(err, requestUrl, 'patch', jobCookPostBody);
LOG.error(res.error, LOG.LS.eJOB);
if (res.allowRetry === false)
return res;
}
Expand All @@ -317,6 +335,8 @@ export abstract class JobCook<T> extends JobPackrat {
res.allowRetry = true; // allow outer level to retry job initiation
return res;
}

// wait for our next attempt
await H.Helpers.sleep(CookRetryDelay);
}

Expand All @@ -332,17 +352,18 @@ export abstract class JobCook<T> extends JobPackrat {

async cancelJobWorker(): Promise<H.IOResults> {
// Cancel job via PATCH to /clients/<CLIENTID>/jobs/<JOBID>/cancel
// todo: use 'delete' to prevent lingering jobs on Cook server. investigate we have reports copied. (DPO3DPKRT-762)
let requestCount: number = 0;
let res: H.IOResults = { success: false };
const requestUrl: string = this.CookServerURL() + `clients/${this._configuration.clientId}/jobs/${this._configuration.jobId}/cancel`;
LOG.info(`JobCook [${this.name()}] cancelling job: ${requestUrl}`, LOG.LS.eJOB);
while (true) {
try {
const axiosResponse = await axios.patch(encodeURI(requestUrl));
if (axiosResponse.status !== 200)
res = { success: false, error: `JobCook [${this.name()}] patch ${requestUrl} failed: ${JSON.stringify(axiosResponse)}` };
if (axiosResponse.status < 200 || axiosResponse.status > 299)
res = { success: false, error: `JobCook [${this.name()}] patch ${requestUrl} failed: ${axiosResponse.status}:${axiosResponse.statusText}` };
} catch (error) {
res = { success: false, error: `JobCook [${this.name()}] patch ${requestUrl}: ${JSON.stringify(error)}` };
res = { success: false, error: `JobCook [${this.name()}] patch ${requestUrl}: ${H.Helpers.JSONStringify(error)}` };
}
if (res.success)
break;
Expand All @@ -366,7 +387,7 @@ export abstract class JobCook<T> extends JobPackrat {
const requestUrl: string = this.CookServerURL() + `clients/${this._configuration.clientId}/jobs/${this._configuration.jobId}/report`;
try {
const axiosResponse = await axios.get(encodeURI(requestUrl));
if (axiosResponse.status !== 200) {
if (axiosResponse.status < 200 || axiosResponse.status > 299) {
// only log errors after first attempt, as job creation may not be complete on Cook server
const error: string = JSON.stringify(axiosResponse);
if (pollNumber > 1)
Expand Down Expand Up @@ -404,9 +425,18 @@ export abstract class JobCook<T> extends JobPackrat {

const webdavClient: WebDAVClient = createClient(cookEndpoint, {
authType: AuthType.None,
maxBodyLength: 10 * 1024 * 1024 * 1024,
maxBodyLength: 100 * 1024 * 1024 * 1024, // 100Gb
withCredentials: false
});

// DEBUG: make sure we have a file to get and its size
// TODO: more robust support with alt type
// const stat = await webdavClient.stat(destination);
// const fileSize = (stat as FileStat).size;
// LOG.info(`>>>> fetchFile file size: ${fileSize} | ${destination}`,LOG.LS.eDEBUG);
// if(fileSize <= 0)
// throw new Error(`destination file doesn't exist or is empty. (${fileSize} bytes | ${destination})`);

const webdavWSOpts: CreateReadStreamOptions = {
headers: { 'Content-Type': 'application/octet-stream' }
};
Expand All @@ -422,9 +452,14 @@ export abstract class JobCook<T> extends JobPackrat {
}

protected async stageFiles(): Promise<H.IOResults> {
// this runs on job creation when internal work starts
LOG.info(`JobCook.stageFiles is staging ${this._idAssetVersions?.length} asset versions. (${H.Helpers.JSONStringify(this._idAssetVersions)})`,LOG.LS.eJOB);

// early out if we don't have anything staged
if (!this._idAssetVersions)
return { success: true };

// otherwise cycle through each, getting the read stream and executing
let resOuter: H.IOResults = { success: true };
for (const idAssetVersion of this._idAssetVersions) {
const resInner: H.IOResults = await JobCook._stagingSempaphoreWrite.runExclusive(async (value) => {
Expand Down Expand Up @@ -460,6 +495,7 @@ export abstract class JobCook<T> extends JobPackrat {
headers: { 'Content-Type': 'application/octet-stream' }
};

// create a write stream for transmitting our data to staging via WebDAV
let res: H.IOResultsSized = { success: false, error: 'Not Executed', size: -1 };
for (let transmitCount: number = 0; transmitCount < CookWebDAVTransmitRetryCount; transmitCount++) {
let WS: Writable | null = null;
Expand Down Expand Up @@ -538,13 +574,18 @@ export abstract class JobCook<T> extends JobPackrat {
const cookServerURL: string = this._configuration.cookServerURLs[this._configuration.cookServerURLIndex];
if (++this._configuration.cookServerURLIndex >= this._configuration.cookServerURLs.length)
this._configuration.cookServerURLIndex = 0;

// additional logging in case notification isn't sent
const error = `JobCook.handleCookConnectionFailure: Packrat was unable to connect to ${cookServerURL}`;
LOG.error(error,LOG.LS.eJOB);
LOG.info(`JobCook.handleCookConnectionFailure switching from ${cookServerURL} to ${this._configuration.cookServerURLs[this._configuration.cookServerURLIndex]}`, LOG.LS.eJOB);

// only notify once about a specific server
if (JobCook._cookServerFailureNotificationList.has(cookServerURL))
return;
JobCook._cookServerFailureNotificationList.add(cookServerURL);

// see if we should send a notification based on how long since previous notification
let sendNotification: boolean = true;
let timeSinceLastNotification: number = CookFailureNotificationTime + 1;
const now: Date = new Date();
Expand All @@ -557,7 +598,8 @@ export abstract class JobCook<T> extends JobPackrat {
}

if (sendNotification) {
const res: H.IOResults = await Email.Send(undefined, undefined, 'Cook Connection Failure', `Packrat was unable to connect to ${cookServerURL}`, undefined);
// const res: H.IOResults = await Email.Send(undefined, undefined, 'Cook Connection Failure', `Packrat was unable to connect to ${cookServerURL}`, undefined);
const res: H.IOResults = await Email.Send(undefined, undefined, 'Cook Connection Failure', error, undefined);
if (!res.success)
LOG.error(`JobCook.handleCookConnectionFailure unable to send email notification: ${res.error}`, LOG.LS.eJOB);
}
Expand Down Expand Up @@ -587,24 +629,39 @@ export abstract class JobCook<T> extends JobPackrat {
let emitLog: boolean = true;
let error: string;
if (!pollNumber)
error = `JobCook [${this.name()}] ${method} ${requestUrl} body ${JSON.stringify(jobCookPostBody)} failed with error ${message}: ${JSON.stringify(axiosResponse?.data)}`;
error = `JobCook [${this.name()}] ${method} ${requestUrl} body ${H.Helpers.JSONStringify(jobCookPostBody)} failed with error ${message}`;
else {
emitLog = (pollNumber >= 1);
error = `JobCook [${this.name()}] polling [${pollNumber}] ${method} ${requestUrl} failed with error ${message}`;
}

const res: CookIOResults = { success: false, allowRetry: true, connectFailure: false, otherCookError: false, error };

// if we receive a 500 status, log this as an error and avoid retrying
switch (status) {
case 500:
LOG.error(error, LOG.LS.eJOB);
res.allowRetry = false;
return res;
case 400:
res.otherCookError = true;
break;
// if we have a status code with 4xx/5xx
if(status) {
switch (true) {
// catch all 5xx codes and treat as errors
case (status>=500 && status<=599): {
LOG.error(error, LOG.LS.eJOB);
res.allowRetry = false;
return res;
}
// request timed out (408) or too many requests (429)
case (status===408 || status===429): {
res.otherCookError = true;
} break;
// catch remaining 4xx codes which should be failure
case (status>=400 && status<=499): {
LOG.error(error, LOG.LS.eJOB);
res.allowRetry = false;
return res;
}
}
} else {
LOG.error('JobCook.handleRequestException - no status response received.',LOG.LS.eJOB);
return res;
}

if (emitLog)
LOG.info(error, LOG.LS.eJOB);
return res;
Expand Down
Loading

0 comments on commit b05e532

Please sign in to comment.