Skip to content

Commit

Permalink
Wr/memory usage (#575)
Browse files Browse the repository at this point in the history
* chore: initial cleanup, average lower, still spikes to 3GB

* chore: staticresource unzip now pipelined asap

* chore: add UTs

* chore: revert some changes from global pipeline effort

* Sm/tweaks fp (#577)

* perf: promise.all the pipelining

* test: equal in any order

* chore: adding types for SDR

Co-authored-by: Shane McLaughlin <[email protected]>
  • Loading branch information
WillieRuemmele and mshanemc authored Feb 15, 2022
1 parent 67c7a1f commit c1648c6
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 98 deletions.
2 changes: 2 additions & 0 deletions METADATA_SUPPORT.md
Original file line number Diff line number Diff line change
Expand Up @@ -488,9 +488,11 @@ v55 introduces the following new types. Here's their current level of support
|ExternalDataSrcDescriptor||Not supported, but support could be added|
|ExternalDataTranField||Not supported, but support could be added|
|ExternalDataTranObject||Not supported, but support could be added|
|FavoriteTransferDestination||Not supported, but support could be added|
|IndustriesAutomotiveSettings|||
|IndustriesMfgServiceSettings|||
|InvLatePymntRiskCalcSettings|||
|LiveChatObjectAccessDefinition||Not supported, but support could be added|
|PaymentsManagementEnabledSettings|||
|RegisteredExternalService||Not supported, but support could be added|
|StreamingAppDataConnector||Not supported, but support could be added|
Expand Down
67 changes: 34 additions & 33 deletions src/convert/streams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,19 @@
* For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
*/
import { basename, dirname, isAbsolute, join } from 'path';
import { pipeline as cbPipeline, Readable, Transform, Writable, Stream } from 'stream';
import { pipeline as cbPipeline, Readable, Stream, Transform, Writable } from 'stream';
import { promisify } from 'util';
import { Archiver, create as createArchive } from 'archiver';
import { createWriteStream, existsSync } from 'graceful-fs';
import { JsonMap } from '@salesforce/ts-types';
import { j2xParser } from 'fast-xml-parser';
import { Logger } from '@salesforce/core';
import { MetadataResolver, SourceComponent } from '../resolve';
import { ensureFileExists } from '../utils/fileSystemHandler';
import { SourcePath, XML_DECL } from '../common';
import { ComponentSet } from '../collections';
import { LibraryError } from '../errors';
import { RegistryAccess } from '../registry';
import { ensureFileExists } from '../utils/fileSystemHandler';
import { MetadataTransformerFactory } from './transformers';
import { ConvertContext } from './convertContext';
import { SfdxFileFormat, WriteInfo, WriterFormat } from './types';
Expand Down Expand Up @@ -92,6 +92,7 @@ export class ComponentConverter extends Transform {
try {
const converts: Array<Promise<WriteInfo[]>> = [];
const transformer = this.transformerFactory.getTransformer(chunk);
transformer.defaultDirectory = this.defaultDirectory;
const mergeWith = this.mergeSet?.getSourceComponents(chunk);
switch (this.targetFormat) {
case 'source':
Expand Down Expand Up @@ -163,43 +164,43 @@ export class StandardWriter extends ComponentWriter {
if (chunk.writeInfos.length !== 0) {
try {
const toResolve: string[] = [];
const writeTasks = chunk.writeInfos.map((info: WriteInfo) => {
const fullDest = isAbsolute(info.output) ? info.output : join(this.rootDestination, info.output);
if (!existsSync(fullDest)) {
for (const ignoredPath of this.forceIgnoredPaths) {
if (
dirname(ignoredPath).includes(dirname(fullDest)) &&
basename(ignoredPath).includes(basename(fullDest))
) {
return;
// it is a reasonable expectation that when a conversion call exits, the files of
// every component has been written to the destination. This await ensures the microtask
// queue is empty when that call exits and overall less memory is consumed.
await Promise.all(
chunk.writeInfos.map((info: WriteInfo) => {
const fullDest = isAbsolute(info.output) ? info.output : join(this.rootDestination, info.output);
if (!existsSync(fullDest)) {
for (const ignoredPath of this.forceIgnoredPaths) {
if (
dirname(ignoredPath).includes(dirname(fullDest)) &&
basename(ignoredPath).includes(basename(fullDest))
) {
return;
}
}
}
}
if (this.forceIgnoredPaths.has(fullDest)) {
return;
}
// if there are children, resolve each file. o/w just pick one of the files to resolve
if (toResolve.length === 0 || chunk.component.type.children) {
// This is a workaround for a server side ListViews bug where
// duplicate components are sent. W-9614275
if (toResolve.includes(fullDest)) {
this.logger.debug(`Ignoring duplicate metadata for: ${fullDest}`);
if (this.forceIgnoredPaths.has(fullDest)) {
return;
}
toResolve.push(fullDest);
}
ensureFileExists(fullDest);
return pipeline(info.source, createWriteStream(fullDest));
});

// it is a reasonable expectation that when a conversion call exits, the files of
// every component has been written to the destination. This await ensures the microtask
// queue is empty when that call exits and overall less memory is consumed.
await Promise.all(writeTasks);
// if there are children, resolve each file. o/w just pick one of the files to resolve
if (toResolve.length === 0 || chunk.component.type.children) {
// This is a workaround for a server side ListViews bug where
// duplicate components are sent. W-9614275
if (toResolve.includes(fullDest)) {
this.logger.debug(`Ignoring duplicate metadata for: ${fullDest}`);
return;
}
toResolve.push(fullDest);
}
ensureFileExists(fullDest);
return pipeline(info.source, createWriteStream(fullDest));
})
);

for (const fsPath of toResolve) {
toResolve.map((fsPath) => {
this.converted.push(...this.resolver.getComponentsFromPath(fsPath));
}
});
} catch (e) {
err = e as Error;
}
Expand Down
1 change: 1 addition & 0 deletions src/convert/transformers/baseMetadataTransformer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { RegistryAccess } from '../../registry';

export abstract class BaseMetadataTransformer implements MetadataTransformer {
public readonly context: ConvertContext;
public defaultDirectory?: string;
protected registry: RegistryAccess;

public constructor(registry = new RegistryAccess(), context = new ConvertContext()) {
Expand Down
100 changes: 59 additions & 41 deletions src/convert/transformers/staticResourceMetadataTransformer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,20 @@
* Licensed under the BSD 3-Clause license.
* For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
*/
import { basename, dirname, join } from 'path';
import { basename, dirname, isAbsolute, join } from 'path';
import { Readable } from 'stream';
import { create as createArchive } from 'archiver';
import { getExtension } from 'mime';
import { Open } from 'unzipper';
import { JsonMap } from '@salesforce/ts-types';
import { createWriteStream } from 'graceful-fs';
import { baseName } from '../../utils';
import { WriteInfo } from '..';
import { LibraryError } from '../../errors';
import { SourceComponent } from '../../resolve';
import { SourcePath } from '../../common';
import { ensureFileExists } from '../../utils/fileSystemHandler';
import { pipeline } from '../streams';
import { BaseMetadataTransformer } from './baseMetadataTransformer';

export class StaticResourceMetadataTransformer extends BaseMetadataTransformer {
Expand Down Expand Up @@ -63,38 +66,65 @@ export class StaticResourceMetadataTransformer extends BaseMetadataTransformer {

public async toSourceFormat(component: SourceComponent, mergeWith?: SourceComponent): Promise<WriteInfo[]> {
const { xml, content } = component;
const writeInfos: WriteInfo[] = [];

if (content) {
const componentContentType = await this.getContentType(component);
const mergeContentPath = mergeWith?.content;
const baseContentPath = this.getBaseContentPath(component, mergeWith);

// only unzip an archive component if there isn't a merge component, or the merge component is itself expanded
const shouldUnzipArchive =
StaticResourceMetadataTransformer.ARCHIVE_MIME_TYPES.has(componentContentType) &&
(!mergeWith || mergeWith.tree.isDirectory(mergeContentPath));

if (shouldUnzipArchive) {
const zipBuffer = await component.tree.readFile(content);
for await (const info of this.createWriteInfosFromArchive(zipBuffer, baseContentPath)) {
writeInfos.push(info);
}
} else {
const extension = this.getExtensionFromType(componentContentType);
writeInfos.push({
source: component.tree.stream(content),
output: `${baseContentPath}.${extension}`,
});
}

writeInfos.push({
if (!content) {
return [];
}
const componentContentType = await this.getContentType(component);
const mergeContentPath = mergeWith?.content;
const baseContentPath = this.getBaseContentPath(component, mergeWith);

// only unzip an archive component if there isn't a merge component, or the merge component is itself expanded
const shouldUnzipArchive =
StaticResourceMetadataTransformer.ARCHIVE_MIME_TYPES.has(componentContentType) &&
(!mergeWith || mergeWith.tree.isDirectory(mergeContentPath));

if (shouldUnzipArchive) {
// for the bulk of static resource writing we'll start writing ASAP
// we'll still defer writing the resource-meta.xml file by pushing it onto the writeInfos
await Promise.all(
(
await Open.buffer(await component.tree.readFile(content))
).files
.filter((f) => f.type === 'File')
.map(async (f) => {
const path = join(baseContentPath, f.path);
const fullDest = isAbsolute(path)
? path
: join(this.defaultDirectory || component.getPackageRelativePath('', 'source'), path);
// push onto the pipeline and start writing now
return this.pipeline(f.stream(), fullDest);
})
);
}
return [
{
source: component.tree.stream(xml),
output: mergeWith?.xml || component.getPackageRelativePath(basename(xml), 'source'),
});
}
},
].concat(
shouldUnzipArchive
? []
: [
{
source: component.tree.stream(content),
output: `${baseContentPath}.${this.getExtensionFromType(componentContentType)}`,
},
]
);
}

return writeInfos;
/**
* Only separated into its own method for unit testing purposes
* I was unable to find a way to stub/spy a pipline() call
*
* @param stream the data to be written
* @param destination the destination path to be written
* @private
*/
private async pipeline(stream: Readable, destination: string): Promise<void> {
ensureFileExists(destination);
await pipeline(stream, createWriteStream(destination));
}

private getBaseContentPath(component: SourceComponent, mergeWith?: SourceComponent): SourcePath {
Expand All @@ -118,18 +148,6 @@ export class StaticResourceMetadataTransformer extends BaseMetadataTransformer {
return false;
}

private async *createWriteInfosFromArchive(zipBuffer: Buffer, baseDir: string): AsyncIterable<WriteInfo> {
const directory = await Open.buffer(zipBuffer);
for (const entry of directory.files) {
if (entry.type === 'File') {
yield {
source: entry.stream(),
output: join(baseDir, entry.path),
};
}
}
}

private async getContentType(component: SourceComponent): Promise<string> {
const resource = (await component.parseXml()).StaticResource as JsonMap;

Expand Down
1 change: 1 addition & 0 deletions src/convert/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ export type MergeConfig = {
* Transforms metadata component files into different SFDX file formats
*/
export interface MetadataTransformer {
defaultDirectory?: string;
toMetadataFormat(component: SourceComponent): Promise<WriteInfo[]>;
toSourceFormat(component: SourceComponent, mergeWith?: SourceComponent): Promise<WriteInfo[]>;
}
Expand Down
Loading

0 comments on commit c1648c6

Please sign in to comment.