diff --git a/frontend/public/locales/en-US/pipeline.json b/frontend/public/locales/en-US/pipeline.json index 483e259de6..e8c5676a8c 100644 --- a/frontend/public/locales/en-US/pipeline.json +++ b/frontend/public/locales/en-US/pipeline.json @@ -391,7 +391,8 @@ "alarmDescription": "Description", "tags": "Tags", "bufferSize": "Buffer size", - "bufferInterval": "Buffer interval" + "bufferInterval": "Buffer interval", + "analyticSchemaStatus": "Redshift Schemas" }, "list": { "id": "Pipeline ID", diff --git a/frontend/public/locales/zh-CN/pipeline.json b/frontend/public/locales/zh-CN/pipeline.json index a6c54c628e..652f2a2539 100644 --- a/frontend/public/locales/zh-CN/pipeline.json +++ b/frontend/public/locales/zh-CN/pipeline.json @@ -391,7 +391,8 @@ "alarmDescription": "描述", "tags": "标签", "bufferSize": "缓冲区大小", - "bufferInterval": "缓冲间隔" + "bufferInterval": "缓冲间隔", + "analyticSchemaStatus": "Redshift Schemas" }, "list": { "id": "管道 ID", diff --git a/frontend/src/apis/pipeline.ts b/frontend/src/apis/pipeline.ts index a7fd53a2fe..2044e718f6 100644 --- a/frontend/src/apis/pipeline.ts +++ b/frontend/src/apis/pipeline.ts @@ -82,6 +82,14 @@ const upgradePipeline = async (params: { id: string; pid: string }) => { return result; }; +const getPipelineExtend = async (params: { projectId: string }) => { + const result: any = await apiRequest( + 'get', + `/pipeline/${params.projectId}/extend?pid=${params.projectId}` + ); + return result; +}; + export { createProjectPipeline, updateProjectPipeline, @@ -91,4 +99,5 @@ export { getPipelineList, retryPipeline, upgradePipeline, + getPipelineExtend, }; diff --git a/frontend/src/pages/pipelines/create/steps/ReviewAndLaunch.tsx b/frontend/src/pages/pipelines/create/steps/ReviewAndLaunch.tsx index 1b7bb25979..c241787f8c 100644 --- a/frontend/src/pages/pipelines/create/steps/ReviewAndLaunch.tsx +++ b/frontend/src/pages/pipelines/create/steps/ReviewAndLaunch.tsx @@ -44,7 +44,7 @@ const ReviewAndLaunch: React.FC = (
{t('pipeline:create.dataProcessing')}
} > - + { const [loadingData, setLoadingData] = useState(true); const [projectInfo, setProjectInfo] = useState(); const [projectPipeline, setProjectPipeline] = useState(); + const [projectPipelineExtend, setProjectPipelineExtend] = + useState(); const [loadingPipeline, setLoadingPipeline] = useState(false); const { activeTab } = location.state || {}; @@ -62,12 +64,29 @@ const PipelineDetail: React.FC = () => { setProjectPipeline(data); setLoadingData(false); setLoadingPipeline(false); + if (data.dataModeling.redshift) { + getProjectPipelineExtend(); + } } } catch (error) { setLoadingPipeline(false); } }; + const getProjectPipelineExtend = async () => { + try { + const { success, data }: ApiResponse = + await getPipelineExtend({ + projectId: defaultStr(pid), + }); + if (success) { + setProjectPipelineExtend(data); + } + } catch (error) { + console.error(error); + } + }; + const getProjectDetailById = async () => { setLoadingData(true); try { @@ -146,7 +165,11 @@ const PipelineDetail: React.FC = () => { id: 'processing', content: (
- +
), }, diff --git a/frontend/src/pages/pipelines/detail/comps/Processing.tsx b/frontend/src/pages/pipelines/detail/comps/Processing.tsx index 5ba670a4e3..d1e78b8cf5 100644 --- a/frontend/src/pages/pipelines/detail/comps/Processing.tsx +++ b/frontend/src/pages/pipelines/detail/comps/Processing.tsx @@ -23,6 +23,7 @@ import { useTranslation } from 'react-i18next'; import { ExecutionType } from 'ts/const'; import { buildReshiftLink, + buildSFNExecutionLink, buildSecurityGroupLink, buildSubnetLink, buildVPCLink, @@ -30,10 +31,12 @@ import { import { defaultStr, getLocaleLngDescription, ternary } from 'ts/utils'; interface TabContentProps { + displayPipelineExtend: boolean; pipelineInfo?: IExtPipeline; + pipelineExtend?: IPipelineExtend; } const Processing: React.FC = (props: TabContentProps) => { - const { pipelineInfo } = props; + const { pipelineInfo, pipelineExtend, displayPipelineExtend } = props; const { t } = useTranslation(); const buildRedshiftDisplay = (pipelineInfo?: IExtPipeline) => { @@ -264,6 +267,38 @@ const Processing: React.FC = (props: TabContentProps) => { return false; }; + const appSchemasStatus = (status?: string) => { + switch (status) { + case 'ABORTED': + return {status}; + case 'FAILED': + case 'TIMED_OUT': + return {status}; + case 'PENDING_REDRIVE': + return {status}; + case 'RUNNING': + return {status}; + case 'SUCCEEDED': + return {status}; + default: + return {status}; + } + }; + + const appSchemasExecution = (appId: string, executionArn?: string) => { + return ( + + {appId} + + ); + }; + return ( @@ -455,6 +490,28 @@ const Processing: React.FC = (props: TabContentProps) => {
{getRedshiftDataRangeDisplay()}
+ +
+ + {t('pipeline:detail.analyticSchemaStatus')} + +
+ {displayPipelineExtend && + pipelineExtend?.createApplicationSchemasStatus.map( + (element) => { + return ( +
+ {appSchemasExecution( + element.appId, + element.executionArn + )} + :{appSchemasStatus(element.status)} +
+ ); + } + )} +
+
)}
diff --git a/frontend/src/ts/url.ts b/frontend/src/ts/url.ts index c209693d2c..ccaf2f02a5 100644 --- a/frontend/src/ts/url.ts +++ b/frontend/src/ts/url.ts @@ -147,6 +147,13 @@ export const buildReshiftLink = ( return `https://${region}.${CONSOLE_GLOBAL_DOMAIN}/redshiftv2/home?region=${region}#cluster-details?cluster=${cluster}`; }; +export const buildSFNExecutionLink = (region: string, executionArn: string) => { + if (region.startsWith('cn')) { + return `https://${region}.${CONSOLE_CHINA_DOMAIN}/states/home?region=${region}#/v2/executions/details/${executionArn}`; + } + return `https://${region}.${CONSOLE_GLOBAL_DOMAIN}/states/home?region=${region}#/v2/executions/details/${executionArn}`; +}; + export const buildQuickSightDashboardLink = ( region: string, dashboardId: string diff --git a/frontend/src/types/pipeline.d.ts b/frontend/src/types/pipeline.d.ts index b52744e8f2..6fed8900d8 100644 --- a/frontend/src/types/pipeline.d.ts +++ b/frontend/src/types/pipeline.d.ts @@ -246,6 +246,16 @@ declare global { transformPluginChanged: boolean; } + interface CreateApplicationSchemasStatus { + readonly appId: string; + status?: string; + executionArn?: string; + } + + interface IPipelineExtend { + createApplicationSchemasStatus: CreateApplicationSchemasStatus[]; + } + interface IAlarm { AlarmName: string; AlarmArn: string; diff --git a/src/control-plane/backend/click-stream-api.ts b/src/control-plane/backend/click-stream-api.ts index a3da71f1d0..93dc613d3b 100644 --- a/src/control-plane/backend/click-stream-api.ts +++ b/src/control-plane/backend/click-stream-api.ts @@ -451,6 +451,7 @@ export class ClickStreamApiConstruct extends Construct { 'iam:GetContextKeysForCustomPolicy', 'iam:SimulateCustomPolicy', 'states:DescribeExecution', + 'states:ListExecutions', 'acm:ListCertificates', 'cloudformation:DescribeStacks', 'cloudformation:DescribeType', diff --git a/src/control-plane/backend/lambda/api/common/types.ts b/src/control-plane/backend/lambda/api/common/types.ts index 94e60c3e7a..ed2895264c 100644 --- a/src/control-plane/backend/lambda/api/common/types.ts +++ b/src/control-plane/backend/lambda/api/common/types.ts @@ -406,3 +406,9 @@ export enum IUserRole { ANALYST = 'Analyst', ANALYST_READER = 'AnalystReader', } + +export interface CreateApplicationSchemasStatus { + readonly appId: string; + status?: ExecutionStatus; + executionArn?: string; +} diff --git a/src/control-plane/backend/lambda/api/model/pipeline.ts b/src/control-plane/backend/lambda/api/model/pipeline.ts index da4cd835d7..4bf9b79266 100644 --- a/src/control-plane/backend/lambda/api/model/pipeline.ts +++ b/src/control-plane/backend/lambda/api/model/pipeline.ts @@ -17,6 +17,7 @@ import { OUTPUT_SERVICE_CATALOG_APPREGISTRY_APPLICATION_TAG_VALUE, PROJECT_ID_PATTERN, SECRETS_MANAGER_ARN_PATTERN, + OUTPUT_DATA_MODELING_REDSHIFT_SQL_EXECUTION_STATE_MACHINE_ARN_SUFFIX, } from '@aws/clickstream-base-lib'; import { Tag } from '@aws-sdk/client-cloudformation'; import { ExecutionStatus } from '@aws-sdk/client-sfn'; @@ -63,6 +64,7 @@ import { } from '../common/stack-params-valid'; import { ClickStreamBadRequestError, + CreateApplicationSchemasStatus, ENetworkType, IngestionServerSinkBatchProps, IngestionServerSizeProps, @@ -82,6 +84,7 @@ import { import { getPipelineStatusType, getStackName, + getStackOutputFromPipelineStatus, getStackPrefix, getStackTags, getStateMachineExecutionName, @@ -98,7 +101,7 @@ import { listMSKClusterBrokers } from '../store/aws/kafka'; import { QuickSightUserArns, registerClickstreamUser } from '../store/aws/quicksight'; import { getRedshiftInfo } from '../store/aws/redshift'; import { isBucketExist } from '../store/aws/s3'; -import { getExecutionDetail } from '../store/aws/sfn'; +import { getExecutionDetail, listExecutions } from '../store/aws/sfn'; import { createTopicAndSubscribeSQSQueue } from '../store/aws/sns'; import { ClickStreamStore } from '../store/click-stream-store'; import { DynamoDbStore } from '../store/dynamodb/dynamodb-store'; @@ -417,7 +420,7 @@ export class CPipeline { ...CReportingStack.editAllowedList(), ...CMetricsStack.editAllowedList(), ]; - const editKeys = diffParameters.edited.map(p => p[0]); + const editKeys = diffParameters.edited.map((p: any[]) => p[0]); const notAllowEdit: string[] = []; const editStacks: string[] = []; const editParameters: StackUpdateParameter[] = []; @@ -436,7 +439,7 @@ export class CPipeline { editParameters.push({ stackName: stackName, parameterKey: paramName, - parameterValue: diffParameters.edited.find(p => p[0] === key)?.[1], + parameterValue: diffParameters.edited.find((p: any[]) => p[0] === key)?.[1], }); } } @@ -1259,4 +1262,68 @@ export class CPipeline { solutionVersion: FULL_SOLUTION_VERSION, }; }; + + public async getCreateApplicationSchemasStatus() { + const apps = await store.listApplication(this.pipeline.projectId, 'asc'); + const appIds = apps.map(a => a.appId); + const schemasStatus: CreateApplicationSchemasStatus[] = []; + for (let appId of appIds) { + schemasStatus.push({ + appId: appId, + status: undefined, + }); + } + const createApplicationSchemasStateMachine = getStackOutputFromPipelineStatus( + this.pipeline.stackDetails ?? this.pipeline.status?.stackDetails, + PipelineStackType.DATA_MODELING_REDSHIFT, + OUTPUT_DATA_MODELING_REDSHIFT_SQL_EXECUTION_STATE_MACHINE_ARN_SUFFIX); + if (!createApplicationSchemasStateMachine) { + return schemasStatus; + } + const executions = await listExecutions(this.pipeline.region, createApplicationSchemasStateMachine); + const editedAppIds: string[] = []; + for (let execution of executions) { + const nameStr = execution.name?.split('-'); + let appId = ''; + if (nameStr && nameStr.length === 3 && nameStr[1].length === 19) { + appId = nameStr[0]; + } else if (execution.executionArn) { + const executionDetail = await getExecutionDetail(this.pipeline.region, execution.executionArn); + appId = this._getAppIdFromInputStr(executionDetail?.input); + } + const status = schemasStatus.find(s => s.appId === appId); + if (appId && status && !editedAppIds.includes(appId)) { + status.status = execution.status; + status.executionArn = execution.executionArn; + editedAppIds.push(appId); + } + if (editedAppIds.length === appIds.length) { + break; + } + } + return schemasStatus; + }; + + private _getAppIdFromInputStr(input?: string): string { + try { + if (!input) { + return ''; + } + const inputJson = JSON.parse(input); + const sqls = inputJson.sqls; + if (sqls && sqls.length > 0) { + const sql = sqls[0]; + const paths = sql.split('/sqls/'); + if (paths.length === 2) { + const appStr = paths[1].split('-'); + if (appStr.length === 2) { + return appStr[0]; + } + } + } + return ''; + } catch (e) { + return ''; + } + } } \ No newline at end of file diff --git a/src/control-plane/backend/lambda/api/router/pipeline.ts b/src/control-plane/backend/lambda/api/router/pipeline.ts index d5af6854c4..52439d1c68 100644 --- a/src/control-plane/backend/lambda/api/router/pipeline.ts +++ b/src/control-plane/backend/lambda/api/router/pipeline.ts @@ -78,6 +78,16 @@ router_pipeline.get( return pipelineServ.details(req, res, next); }); +router_pipeline.get( + '/:id/extend', + validate([ + query('pid').custom(isProjectExisted), + ]), + async (req: express.Request, res: express.Response, next: express.NextFunction) => { + return pipelineServ.extend(req, res, next); + }); + + router_pipeline.put( '/:id', validate([ diff --git a/src/control-plane/backend/lambda/api/service/pipeline.ts b/src/control-plane/backend/lambda/api/service/pipeline.ts index 7bce2b490b..3b0de51245 100644 --- a/src/control-plane/backend/lambda/api/service/pipeline.ts +++ b/src/control-plane/backend/lambda/api/service/pipeline.ts @@ -100,6 +100,24 @@ export class PipelineServ { } }; + public async extend(req: any, res: any, next: any) { + try { + const { pid } = req.query; + const latestPipelines = await store.listPipeline(pid, 'latest', 'asc'); + if (latestPipelines.length === 0) { + return res.status(404).send(new ApiFail('Pipeline not found')); + } + const latestPipeline = latestPipelines[0]; + const pipeline = new CPipeline(latestPipeline); + const createApplicationSchemasStatus = await pipeline.getCreateApplicationSchemasStatus(); + return res.json(new ApiSuccess({ + createApplicationSchemasStatus, + })); + } catch (error) { + next(error); + } + }; + public async getPipelineByProjectId(projectId: string) { const latestPipelines = await store.listPipeline(projectId, 'latest', 'asc'); if (latestPipelines.length === 0) { diff --git a/src/control-plane/backend/lambda/api/store/aws/sfn.ts b/src/control-plane/backend/lambda/api/store/aws/sfn.ts index ba9675cad4..12a85ef5b4 100644 --- a/src/control-plane/backend/lambda/api/store/aws/sfn.ts +++ b/src/control-plane/backend/lambda/api/store/aws/sfn.ts @@ -15,7 +15,8 @@ import { DescribeExecutionCommand, DescribeExecutionCommandOutput, ExecutionDoesNotExist, - SFNClient, StartExecutionCommand, StartExecutionCommandOutput, + ExecutionListItem, + SFNClient, StartExecutionCommand, StartExecutionCommandOutput, paginateListExecutions, } from '@aws-sdk/client-sfn'; import { logger } from '../../common/powertools'; import { aws_sdk_client_common_config } from '../../common/sdk-client-config-ln'; @@ -61,3 +62,21 @@ export const getExecutionDetail = async (region: string, executionArn: string) = return undefined; } }; + +export const listExecutions = async (region: string, stateMachineArn: string) => { + try { + const client = new SFNClient({ + ...aws_sdk_client_common_config, + region, + }); + const records: ExecutionListItem[] = []; + for await (const page of paginateListExecutions({ client: client }, { + stateMachineArn: stateMachineArn, + })) { + records.push(...page.executions as ExecutionListItem[]); + } + return records.sort((a: ExecutionListItem, b: ExecutionListItem) => (b.startDate?.getTime() ?? 0) - (a.startDate?.getTime() ?? 0)); + } catch (error) { + return []; + } +}; diff --git a/src/control-plane/backend/lambda/api/test/api/pipeline.test.ts b/src/control-plane/backend/lambda/api/test/api/pipeline.test.ts index 7387066523..a813a64093 100644 --- a/src/control-plane/backend/lambda/api/test/api/pipeline.test.ts +++ b/src/control-plane/backend/lambda/api/test/api/pipeline.test.ts @@ -11,6 +11,7 @@ * and limitations under the License. */ +import { OUTPUT_DATA_MODELING_REDSHIFT_SQL_EXECUTION_STATE_MACHINE_ARN_SUFFIX } from '@aws/clickstream-base-lib'; import { DescribeStacksCommand, CloudFormationClient, StackStatus } from '@aws-sdk/client-cloudformation'; import { CloudWatchEventsClient, PutRuleCommand, TagResourceCommand as EventTagResourceCommand } from '@aws-sdk/client-cloudwatch-events'; import { TransactWriteItemsCommand } from '@aws-sdk/client-dynamodb'; @@ -42,7 +43,7 @@ import { } from '@aws-sdk/client-s3'; import { SecretsManagerClient } from '@aws-sdk/client-secrets-manager'; -import { DescribeExecutionCommand, ExecutionStatus, SFNClient, StartExecutionCommand } from '@aws-sdk/client-sfn'; +import { DescribeExecutionCommand, ExecutionStatus, ListExecutionsCommand, SFNClient, StartExecutionCommand } from '@aws-sdk/client-sfn'; import { CreateTopicCommand, SNSClient, TagResourceCommand as SNSTagResourceCommand } from '@aws-sdk/client-sns'; import { DynamoDBDocumentClient, GetCommand, PutCommand, QueryCommand, ScanCommand, UpdateCommand } from '@aws-sdk/lib-dynamodb'; import { mockClient } from 'aws-sdk-client-mock'; @@ -2023,6 +2024,101 @@ describe('Pipeline test', () => { message: 'Pipeline not found', }); }); + it('Get pipeline extend information', async () => { + projectExistedMock(ddbMock, true); + const stackDetails = [ + stackDetailsWithOutputs[0], + stackDetailsWithOutputs[1], + stackDetailsWithOutputs[2], + { + ...stackDetailsWithOutputs[3], + outputs: [ + { + OutputKey: `xxxxxxxx-xxxx-${OUTPUT_DATA_MODELING_REDSHIFT_SQL_EXECUTION_STATE_MACHINE_ARN_SUFFIX}`, + OutputValue: 'mock-data-modeling-redshift-sql-execution-state-machine-arn', + }, + ], + }, + stackDetailsWithOutputs[4], + stackDetailsWithOutputs[5], + ]; + ddbMock.on(QueryCommand).resolvesOnce({ + Items: [{ + ...KINESIS_DATA_PROCESSING_NEW_REDSHIFT_PIPELINE_WITH_WORKFLOW, + stackDetails: stackDetails, + }], + }).resolves({ + Items: [ + { appId: 'Application01' }, + { appId: 'Application02' }, + { appId: 'Application03' }, + ], + }); + sfnMock.on(ListExecutionsCommand).resolves({ + executions: [ + { + name: 'Application01-20240301T071531482Z-55', + status: ExecutionStatus.SUCCEEDED, + startDate: new Date(), + executionArn: 'mockExecutionArn1', + stateMachineArn: 'mockStateMachineArn', + }, + { + name: 'Application01-20240301T071531482Z-55', + status: ExecutionStatus.FAILED, + startDate: new Date(), + executionArn: 'mockExecutionArn2', + stateMachineArn: 'mockStateMachineArn', + }, + { + name: '12345678-1234-1234-1234-123456789012', + status: ExecutionStatus.ABORTED, + startDate: new Date(), + executionArn: 'mockExecutionArn3', + stateMachineArn: 'mockStateMachineArn', + }, + ], + }); + sfnMock.on(DescribeExecutionCommand).resolves({ + executionArn: 'arn:aws:states:ap-southeast-1:123456789012:execution:ForceExecutionName:12345678-1234-1234-1234-123456789012', + name: '12345678-1234-1234-1234-123456789012', + status: ExecutionStatus.FAILED, + startDate: new Date(), + input: JSON.stringify({ + sqls: [ + 's3://EXAMPLE_BUCKET/clickstream/new1203_mggt/data/load-workflow/tmp/new1203_mggt/sqls/Application03-20240301T071531482Z/0.sql', + ], + }), + }); + const res = await request(app) + .get(`/api/pipeline/${MOCK_PIPELINE_ID}/extend?pid=${MOCK_PROJECT_ID}`); + expect(ddbMock).toHaveReceivedCommandTimes(QueryCommand, 2); + expect(sfnMock).toHaveReceivedCommandTimes(ListExecutionsCommand, 1); + expect(sfnMock).toHaveReceivedCommandTimes(DescribeExecutionCommand, 1); + expect(res.headers['content-type']).toEqual('application/json; charset=utf-8'); + expect(res.statusCode).toBe(200); + expect(res.body).toEqual({ + data: { + createApplicationSchemasStatus: [ + { + appId: 'Application01', + executionArn: 'mockExecutionArn1', + status: 'SUCCEEDED', + }, + { + appId: 'Application02', + }, + { + appId: 'Application03', + executionArn: 'mockExecutionArn3', + status: 'ABORTED', + }, + ], + }, + message: '', + success: true, + }); + }); it('Get pipeline list', async () => { ddbMock.on(QueryCommand).resolves({ Items: [ diff --git a/test/control-plane/click-stream-api.test.ts b/test/control-plane/click-stream-api.test.ts index 9f61d36459..b81b754bd6 100644 --- a/test/control-plane/click-stream-api.test.ts +++ b/test/control-plane/click-stream-api.test.ts @@ -938,6 +938,7 @@ describe('Click Stream Api ALB deploy Construct Test', () => { 'iam:GetContextKeysForCustomPolicy', 'iam:SimulateCustomPolicy', 'states:DescribeExecution', + 'states:ListExecutions', 'acm:ListCertificates', 'cloudformation:DescribeStacks', 'cloudformation:DescribeType', @@ -2560,6 +2561,7 @@ describe('Click Stream Api ALB deploy Construct With IAM Role Prefix', () => { 'iam:GetContextKeysForCustomPolicy', 'iam:SimulateCustomPolicy', 'states:DescribeExecution', + 'states:ListExecutions', 'acm:ListCertificates', 'cloudformation:DescribeStacks', 'cloudformation:DescribeType',