Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: pipeline detail show create schama status #918

Merged
merged 3 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion frontend/public/locales/en-US/pipeline.json
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,8 @@
"alarmDescription": "Description",
"tags": "Tags",
"bufferSize": "Buffer size",
"bufferInterval": "Buffer interval"
"bufferInterval": "Buffer interval",
"analyticSchemaStatus": "Redshift Schemas"
},
"list": {
"id": "Pipeline ID",
Expand Down
3 changes: 2 additions & 1 deletion frontend/public/locales/zh-CN/pipeline.json
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,8 @@
"alarmDescription": "描述",
"tags": "标签",
"bufferSize": "缓冲区大小",
"bufferInterval": "缓冲间隔"
"bufferInterval": "缓冲间隔",
"analyticSchemaStatus": "Redshift Schemas"
},
"list": {
"id": "管道 ID",
Expand Down
9 changes: 9 additions & 0 deletions frontend/src/apis/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@ const upgradePipeline = async (params: { id: string; pid: string }) => {
return result;
};

const getPipelineExtend = async (params: { projectId: string }) => {
const result: any = await apiRequest(
'get',
`/pipeline/${params.projectId}/extend?pid=${params.projectId}`
);
return result;
};

export {
createProjectPipeline,
updateProjectPipeline,
Expand All @@ -91,4 +99,5 @@ export {
getPipelineList,
retryPipeline,
upgradePipeline,
getPipelineExtend,
};
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ const ReviewAndLaunch: React.FC<ReviewAndLaunchProps> = (
<Header variant="h2">{t('pipeline:create.dataProcessing')}</Header>
}
>
<Processing pipelineInfo={pipelineInfo} />
<Processing pipelineInfo={pipelineInfo} displayPipelineExtend={false} />
</Container>

<Container
Expand Down
27 changes: 25 additions & 2 deletions frontend/src/pages/pipelines/detail/PipelineDetail.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import {
SpaceBetween,
Tabs,
} from '@cloudscape-design/components';
import { getPipelineDetail } from 'apis/pipeline';
import { getPipelineDetail, getPipelineExtend } from 'apis/pipeline';
import { getProjectDetail } from 'apis/project';
import Loading from 'components/common/Loading';
import CustomBreadCrumb from 'components/layouts/CustomBreadCrumb';
Expand All @@ -43,6 +43,8 @@ const PipelineDetail: React.FC = () => {
const [loadingData, setLoadingData] = useState(true);
const [projectInfo, setProjectInfo] = useState<IProject>();
const [projectPipeline, setProjectPipeline] = useState<IExtPipeline>();
const [projectPipelineExtend, setProjectPipelineExtend] =
useState<IPipelineExtend>();
const [loadingPipeline, setLoadingPipeline] = useState(false);

const { activeTab } = location.state || {};
Expand All @@ -62,12 +64,29 @@ const PipelineDetail: React.FC = () => {
setProjectPipeline(data);
setLoadingData(false);
setLoadingPipeline(false);
if (data.dataModeling.redshift) {
getProjectPipelineExtend();
}
}
} catch (error) {
setLoadingPipeline(false);
}
};

const getProjectPipelineExtend = async () => {
try {
const { success, data }: ApiResponse<IPipelineExtend> =
await getPipelineExtend({
tyyzqmf marked this conversation as resolved.
Show resolved Hide resolved
projectId: defaultStr(pid),
});
if (success) {
setProjectPipelineExtend(data);
}
} catch (error) {
console.error(error);
tyyzqmf marked this conversation as resolved.
Show resolved Hide resolved
}
};

const getProjectDetailById = async () => {
setLoadingData(true);
try {
Expand Down Expand Up @@ -146,7 +165,11 @@ const PipelineDetail: React.FC = () => {
id: 'processing',
content: (
<div className="pd-20">
<Processing pipelineInfo={projectPipeline} />
<Processing
pipelineInfo={projectPipeline}
pipelineExtend={projectPipelineExtend}
displayPipelineExtend={true}
/>
</div>
),
},
Expand Down
59 changes: 58 additions & 1 deletion frontend/src/pages/pipelines/detail/comps/Processing.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,20 @@ import { useTranslation } from 'react-i18next';
import { ExecutionType } from 'ts/const';
import {
buildReshiftLink,
buildSFNExecutionLink,
buildSecurityGroupLink,
buildSubnetLink,
buildVPCLink,
} from 'ts/url';
import { defaultStr, getLocaleLngDescription, ternary } from 'ts/utils';

interface TabContentProps {
displayPipelineExtend: boolean;
pipelineInfo?: IExtPipeline;
pipelineExtend?: IPipelineExtend;
}
const Processing: React.FC<TabContentProps> = (props: TabContentProps) => {
const { pipelineInfo } = props;
const { pipelineInfo, pipelineExtend, displayPipelineExtend } = props;
const { t } = useTranslation();

const buildRedshiftDisplay = (pipelineInfo?: IExtPipeline) => {
Expand Down Expand Up @@ -264,6 +267,38 @@ const Processing: React.FC<TabContentProps> = (props: TabContentProps) => {
return false;
};

const appSchemasStatus = (status?: string) => {
switch (status) {
case 'ABORTED':
return <StatusIndicator type="stopped">{status}</StatusIndicator>;
case 'FAILED':
case 'TIMED_OUT':
return <StatusIndicator type="error">{status}</StatusIndicator>;
case 'PENDING_REDRIVE':
return <StatusIndicator type="pending">{status}</StatusIndicator>;
case 'RUNNING':
return <StatusIndicator type="in-progress">{status}</StatusIndicator>;
case 'SUCCEEDED':
return <StatusIndicator type="success">{status}</StatusIndicator>;
default:
return <StatusIndicator type="pending">{status}</StatusIndicator>;
}
};

const appSchemasExecution = (appId: string, executionArn?: string) => {
return (
<Link
external
href={buildSFNExecutionLink(
defaultStr(pipelineInfo?.region),
defaultStr(executionArn)
)}
>
{appId}
</Link>
);
};

return (
<ColumnLayout columns={3} variant="text-grid">
<SpaceBetween direction="vertical" size="l">
Expand Down Expand Up @@ -455,6 +490,28 @@ const Processing: React.FC<TabContentProps> = (props: TabContentProps) => {
</Box>
<div>{getRedshiftDataRangeDisplay()}</div>
</div>

<div>
<Box variant="awsui-key-label">
{t('pipeline:detail.analyticSchemaStatus')}
</Box>
<div>
{displayPipelineExtend &&
pipelineExtend?.createApplicationSchemasStatus.map(
(element) => {
return (
<div key={element.appId}>
{appSchemasExecution(
element.appId,
element.executionArn
)}
:{appSchemasStatus(element.status)}
</div>
);
}
)}
</div>
</div>
</>
)}
</SpaceBetween>
Expand Down
7 changes: 7 additions & 0 deletions frontend/src/ts/url.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,13 @@ export const buildReshiftLink = (
return `https://${region}.${CONSOLE_GLOBAL_DOMAIN}/redshiftv2/home?region=${region}#cluster-details?cluster=${cluster}`;
};

export const buildSFNExecutionLink = (region: string, executionArn: string) => {
if (region.startsWith('cn')) {
return `https://${region}.${CONSOLE_CHINA_DOMAIN}/states/home?region=${region}#/v2/executions/details/${executionArn}`;
}
return `https://${region}.${CONSOLE_GLOBAL_DOMAIN}/states/home?region=${region}#/v2/executions/details/${executionArn}`;
};

export const buildQuickSightDashboardLink = (
region: string,
dashboardId: string
Expand Down
10 changes: 10 additions & 0 deletions frontend/src/types/pipeline.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,16 @@ declare global {
transformPluginChanged: boolean;
}

interface CreateApplicationSchemasStatus {
readonly appId: string;
status?: string;
executionArn?: string;
}

interface IPipelineExtend {
createApplicationSchemasStatus: CreateApplicationSchemasStatus[];
}

interface IAlarm {
AlarmName: string;
AlarmArn: string;
Expand Down
1 change: 1 addition & 0 deletions src/control-plane/backend/click-stream-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ export class ClickStreamApiConstruct extends Construct {
'iam:GetContextKeysForCustomPolicy',
'iam:SimulateCustomPolicy',
'states:DescribeExecution',
'states:ListExecutions',
'acm:ListCertificates',
'cloudformation:DescribeStacks',
'cloudformation:DescribeType',
Expand Down
6 changes: 6 additions & 0 deletions src/control-plane/backend/lambda/api/common/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -406,3 +406,9 @@ export enum IUserRole {
ANALYST = 'Analyst',
ANALYST_READER = 'AnalystReader',
}

export interface CreateApplicationSchemasStatus {
readonly appId: string;
status?: ExecutionStatus;
executionArn?: string;
}
73 changes: 70 additions & 3 deletions src/control-plane/backend/lambda/api/model/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
OUTPUT_SERVICE_CATALOG_APPREGISTRY_APPLICATION_TAG_VALUE,
PROJECT_ID_PATTERN,
SECRETS_MANAGER_ARN_PATTERN,
OUTPUT_DATA_MODELING_REDSHIFT_SQL_EXECUTION_STATE_MACHINE_ARN_SUFFIX,
} from '@aws/clickstream-base-lib';
import { Tag } from '@aws-sdk/client-cloudformation';
import { ExecutionStatus } from '@aws-sdk/client-sfn';
Expand Down Expand Up @@ -63,6 +64,7 @@ import {
} from '../common/stack-params-valid';
import {
ClickStreamBadRequestError,
CreateApplicationSchemasStatus,
ENetworkType,
IngestionServerSinkBatchProps,
IngestionServerSizeProps,
Expand All @@ -82,6 +84,7 @@ import {
import {
getPipelineStatusType,
getStackName,
getStackOutputFromPipelineStatus,
getStackPrefix,
getStackTags,
getStateMachineExecutionName,
Expand All @@ -98,7 +101,7 @@ import { listMSKClusterBrokers } from '../store/aws/kafka';
import { QuickSightUserArns, registerClickstreamUser } from '../store/aws/quicksight';
import { getRedshiftInfo } from '../store/aws/redshift';
import { isBucketExist } from '../store/aws/s3';
import { getExecutionDetail } from '../store/aws/sfn';
import { getExecutionDetail, listExecutions } from '../store/aws/sfn';
import { createTopicAndSubscribeSQSQueue } from '../store/aws/sns';
import { ClickStreamStore } from '../store/click-stream-store';
import { DynamoDbStore } from '../store/dynamodb/dynamodb-store';
Expand Down Expand Up @@ -417,7 +420,7 @@ export class CPipeline {
...CReportingStack.editAllowedList(),
...CMetricsStack.editAllowedList(),
];
const editKeys = diffParameters.edited.map(p => p[0]);
const editKeys = diffParameters.edited.map((p: any[]) => p[0]);
const notAllowEdit: string[] = [];
const editStacks: string[] = [];
const editParameters: StackUpdateParameter[] = [];
Expand All @@ -436,7 +439,7 @@ export class CPipeline {
editParameters.push({
stackName: stackName,
parameterKey: paramName,
parameterValue: diffParameters.edited.find(p => p[0] === key)?.[1],
parameterValue: diffParameters.edited.find((p: any[]) => p[0] === key)?.[1],
});
}
}
Expand Down Expand Up @@ -1259,4 +1262,68 @@ export class CPipeline {
solutionVersion: FULL_SOLUTION_VERSION,
};
};

public async getCreateApplicationSchemasStatus() {
const apps = await store.listApplication(this.pipeline.projectId, 'asc');
const appIds = apps.map(a => a.appId);
const schemasStatus: CreateApplicationSchemasStatus[] = [];
for (let appId of appIds) {
schemasStatus.push({
appId: appId,
status: undefined,
});
}
const createApplicationSchemasStateMachine = getStackOutputFromPipelineStatus(
this.pipeline.stackDetails ?? this.pipeline.status?.stackDetails,
PipelineStackType.DATA_MODELING_REDSHIFT,
OUTPUT_DATA_MODELING_REDSHIFT_SQL_EXECUTION_STATE_MACHINE_ARN_SUFFIX);
if (!createApplicationSchemasStateMachine) {
return schemasStatus;
}
const executions = await listExecutions(this.pipeline.region, createApplicationSchemasStateMachine);
const editedAppIds: string[] = [];
for (let execution of executions) {
const nameStr = execution.name?.split('-');
let appId = '';
if (nameStr && nameStr.length === 3 && nameStr[1].length === 19) {
appId = nameStr[0];
} else if (execution.executionArn) {
const executionDetail = await getExecutionDetail(this.pipeline.region, execution.executionArn);
appId = this._getAppIdFromInputStr(executionDetail?.input);
}
const status = schemasStatus.find(s => s.appId === appId);
if (appId && status && !editedAppIds.includes(appId)) {
status.status = execution.status;
status.executionArn = execution.executionArn;
editedAppIds.push(appId);
}
if (editedAppIds.length === appIds.length) {
break;
}
}
return schemasStatus;
};

private _getAppIdFromInputStr(input?: string): string {
try {
if (!input) {
return '';
}
const inputJson = JSON.parse(input);
const sqls = inputJson.sqls;
if (sqls && sqls.length > 0) {
const sql = sqls[0];
const paths = sql.split('/sqls/');
if (paths.length === 2) {
const appStr = paths[1].split('-');
if (appStr.length === 2) {
return appStr[0];
}
}
}
return '';
} catch (e) {
return '';
}
}
}
10 changes: 10 additions & 0 deletions src/control-plane/backend/lambda/api/router/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,16 @@ router_pipeline.get(
return pipelineServ.details(req, res, next);
});

router_pipeline.get(
'/:id/extend',
validate([
query('pid').custom(isProjectExisted),
]),
async (req: express.Request, res: express.Response, next: express.NextFunction) => {
return pipelineServ.extend(req, res, next);
});


router_pipeline.put(
'/:id',
validate([
Expand Down
Loading
Loading