diff --git a/src/main/scala/com/databricks/labs/overwatch/MultiWorkspaceDeployment.scala b/src/main/scala/com/databricks/labs/overwatch/MultiWorkspaceDeployment.scala index 9913d1b22..a522f2600 100644 --- a/src/main/scala/com/databricks/labs/overwatch/MultiWorkspaceDeployment.scala +++ b/src/main/scala/com/databricks/labs/overwatch/MultiWorkspaceDeployment.scala @@ -308,6 +308,31 @@ class MultiWorkspaceDeployment extends SparkSessionWrapper { } } + private def startPlatinumDeployment(workspace: Workspace, deploymentId: String): MultiWSDeploymentReport = { + val workspaceId = workspace.getConfig.organizationId + val args = JsonUtils.objToJson(workspace.getConfig.inputConfig) + try { + println(s"""************Platinum Deployment Started workspaceID:$workspaceId args:${args.prettyString} ************"""") + + Platinum(workspace).run() + println(s"""************Platinum Deployment Completed workspaceID:$workspaceId************""") + MultiWSDeploymentReport(workspaceId, "Platinum", Some(args.compactString), + "SUCCESS", + Some(deploymentId) + ) + } catch { + case exception: Exception => + val fullMsg = PipelineFunctions.appendStackStrace(exception, "Got Exception while Deploying,") + logger.log(Level.ERROR, fullMsg) + MultiWSDeploymentReport(workspaceId, "Platinum", Some(args.compactString), + fullMsg, + Some(deploymentId) + ) + } finally { + clearThreadFromSessionsMap() + } + } + /** * Takes a snapshot of the config and saves it to /report/configTable * @@ -468,7 +493,7 @@ class MultiWorkspaceDeployment extends SparkSessionWrapper { /** * crate pipeline executions as futures and return the deployment reports * @param deploymentParams deployment params for a specific workspace - * @param medallions medallions to execute (bronze, silver, gold) + * @param medallions medallions to execute (bronze, silver, gold,platinum) * @param ec futures executionContext * @return future deployment report */ @@ -489,6 +514,7 @@ class MultiWorkspaceDeployment extends SparkSessionWrapper { if (zonesLower.contains("bronze")) threadDeploymentReport.append(startBronzeDeployment(workspace, deploymentId)) if (zonesLower.contains("silver")) threadDeploymentReport.append(startSilverDeployment(workspace, deploymentId)) if (zonesLower.contains("gold")) threadDeploymentReport.append(startGoldDeployment(workspace, deploymentId)) + if (zonesLower.contains("platinum")) threadDeploymentReport.append(startPlatinumDeployment(workspace, deploymentId)) threadDeploymentReport.toArray }(ec) @@ -499,9 +525,9 @@ class MultiWorkspaceDeployment extends SparkSessionWrapper { * Performs the multi workspace deployment * * @param parallelism the number of threads which will run in parallel to perform the deployment - * @param zones the zone can be Bronze,Silver or Gold by default it will be Bronze + * @param zones the zone can be Bronze,Silver,Gold or Platinum by default it will be Bronze */ - def deploy(parallelism: Int = 4, zones: String = "Bronze,Silver,Gold"): Unit = { + def deploy(parallelism: Int = 4, zones: String = "Bronze,Silver,Gold,Platinum"): Unit = { val processingStartTime = System.currentTimeMillis(); try { // initialize spark overrides for global spark conf diff --git a/src/main/scala/com/databricks/labs/overwatch/MultiWorkspaceRunner.scala b/src/main/scala/com/databricks/labs/overwatch/MultiWorkspaceRunner.scala index b228b62f9..49a144362 100644 --- a/src/main/scala/com/databricks/labs/overwatch/MultiWorkspaceRunner.scala +++ b/src/main/scala/com/databricks/labs/overwatch/MultiWorkspaceRunner.scala @@ -14,10 +14,10 @@ object MultiWorkspaceRunner extends SparkSessionWrapper{ val zoneArray = zones.split(",").distinct zoneArray.foreach(zone => { val layer = zone.toLowerCase() - if (layer == "bronze" || layer == "silver" || layer == "gold") { + if (layer == "bronze" || layer == "silver" || layer == "gold" || layer == "platinum") { //validated }else{ - val errMsg = s"Unknown Zone found ${zones}, Zone should be either Bronze,Silver or Gold" + val errMsg = s"Unknown Zone found ${zones}, Zone should be either Bronze,Silver ,Gold or Platinum" throw new BadConfigException(errMsg) } }) @@ -32,13 +32,13 @@ object MultiWorkspaceRunner extends SparkSessionWrapper{ def main(args: Array[String]): Unit = { envInit() if (args.length == 1) { //Deploy Bronze,Silver and Gold with default parallelism. - logger.log(Level.INFO, "Deploying Bronze,Silver and Gold") - MultiWorkspaceDeployment(args(0)).deploy(4,"Bronze,Silver,Gold") + logger.log(Level.INFO, "Deploying Bronze,Silver and Gold,Platinum") + MultiWorkspaceDeployment(args(0)).deploy(4,"Bronze,Silver,Gold,Platinum") } else if (args.length == 2) {//Deploy Bronze,Silver and Gold with provided parallelism. val parallelism = args(1).toInt - logger.log(Level.INFO, s"Deploying Bronze,Silver and Gold with parallelism: ${parallelism}") - MultiWorkspaceDeployment(args(0)).deploy(parallelism,"Bronze,Silver,Gold") + logger.log(Level.INFO, s"Deploying Bronze,Silver,Gold and Platinum with parallelism: ${parallelism}") + MultiWorkspaceDeployment(args(0)).deploy(parallelism,"Bronze,Silver,Gold,Platinum") } else if(args.length == 3) { val parallelism = args(1).toInt validateInputZone(args(2)) diff --git a/src/main/scala/com/databricks/labs/overwatch/pipeline/PipelineTargets.scala b/src/main/scala/com/databricks/labs/overwatch/pipeline/PipelineTargets.scala index 39f3adb16..d465cf6ef 100644 --- a/src/main/scala/com/databricks/labs/overwatch/pipeline/PipelineTargets.scala +++ b/src/main/scala/com/databricks/labs/overwatch/pipeline/PipelineTargets.scala @@ -740,6 +740,17 @@ abstract class PipelineTargets(config: Config) { ) } + object PlatinumTargets { + lazy private[overwatch] val clusterPlatinumTarget: PipelineTable = PipelineTable( + name = "cluster_platinum", + _keys = Array("cluster_id", "cluster_category", "date"), + config, + _mode = WriteMode.merge, + partitionBy = Seq("organization_id", "date"), + incrementalColumns = Array("date"), + zOrderBy = Array("cluster_id", "date") + ) + } } diff --git a/src/main/scala/com/databricks/labs/overwatch/pipeline/Platinum.scala b/src/main/scala/com/databricks/labs/overwatch/pipeline/Platinum.scala new file mode 100644 index 000000000..ffee2ed39 --- /dev/null +++ b/src/main/scala/com/databricks/labs/overwatch/pipeline/Platinum.scala @@ -0,0 +1,98 @@ +package com.databricks.labs.overwatch.pipeline + +import com.databricks.labs.overwatch.env.{Database, Workspace} +import com.databricks.labs.overwatch.utils.{Config, OverwatchScope} +import org.apache.log4j.Logger + + +class Platinum(_workspace: Workspace, _database: Database, _config: Config) + extends Pipeline(_workspace, _database, _config) + with PlatinumTransforms { + + /** + * Enable access to Gold pipeline tables externally. + * + * @return + */ + def getAllTargets: Array[PipelineTable] = { + Array( + PlatinumTargets.clusterPlatinumTarget + ) + } + + def getAllModules: Seq[Module] = { + config.overwatchScope.flatMap { + case OverwatchScope.clusters => { + Array(clusterPlatinumModule) + } + case _ => Array[Module]() + } + } + envInit() + + private val logger: Logger = Logger.getLogger(this.getClass) + + private val clsfSparkOverrides = Map( + "spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes" -> "67108864" // lower to 64MB due to high skew potential + ) + + lazy private[overwatch] val clusterPlatinumModule = Module(4001, "Platinum_Cluster", this, Array(3005)) + lazy private val appendClusterPlatinumProccess: () => ETLDefinition = { + () => + ETLDefinition( + GoldTargets.clusterStateFactTarget.asDF, + Seq(buildClusterPlatinum(config,GoldTargets.clusterTarget.asDF)), + append(PlatinumTargets.clusterPlatinumTarget) + ) + } + + + + private def executeModules(): Unit = { + config.overwatchScope.foreach { + case OverwatchScope.clusters => clusterPlatinumModule.execute(appendClusterPlatinumProccess) + case _ => + } + } + + def run(): Pipeline = { + + restoreSparkConf() + executeModules() + + initiatePostProcessing() + this // to be used as fail switch later if necessary + } + +} + +object Platinum { + def apply(workspace: Workspace): Platinum = { + apply( + workspace, + readOnly = false, + suppressReport = false, + suppressStaticDatasets = false + ) + } + + private[overwatch] def apply( + workspace: Workspace, + readOnly: Boolean = false, + suppressReport: Boolean = false, + suppressStaticDatasets: Boolean = false + + ): Platinum = { + val platinumPipeline = new Platinum(workspace, workspace.database, workspace.getConfig) + .setReadOnly(if (workspace.isValidated) readOnly else true) // if workspace is not validated set it read only + .suppressRangeReport(suppressReport) + .initPipelineRun() + + if (suppressStaticDatasets) { + platinumPipeline + } else { + platinumPipeline.loadStaticDatasets() + } + } + +} \ No newline at end of file diff --git a/src/main/scala/com/databricks/labs/overwatch/pipeline/PlatinumTransforms.scala b/src/main/scala/com/databricks/labs/overwatch/pipeline/PlatinumTransforms.scala new file mode 100644 index 000000000..16b6d9192 --- /dev/null +++ b/src/main/scala/com/databricks/labs/overwatch/pipeline/PlatinumTransforms.scala @@ -0,0 +1,149 @@ +package com.databricks.labs.overwatch.pipeline + +import com.databricks.labs.overwatch.pipeline.TransformFunctions._ +import com.databricks.labs.overwatch.pipeline.WorkflowsTransforms._ +import com.databricks.labs.overwatch.utils._ +import io.delta.tables.DeltaTable +import org.apache.log4j.{Level, Logger} +import org.apache.spark.internal.config +import org.apache.spark.sql.expressions.Window +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.{Column, DataFrame} + +trait PlatinumTransforms extends SparkSessionWrapper { + + import spark.implicits._ + + private val logger: Logger = Logger.getLogger(this.getClass) + + protected def buildClusterPlatinum( + config: Config, + clusterDF: DataFrame, + )(clsfDF: DataFrame): DataFrame = { + + val clsf_raw = clsfDF + + val processed_runID_path = s"${config.etlDataPathPrefix}/platinum/cluster/runID" + + try { + val clsf = if (Helpers.pathExists(processed_runID_path)) { + val processed_ID = spark.read.format("delta").load(processed_runID_path).select("overwatch_runID").distinct().collect().map(x => x(0).toString).toList + + val clsf_raw_filtered = clsf_raw.filter(!'overwatch_runID.isin(processed_ID: _*)) + + val current_dates = clsf_raw_filtered + .filter("custom_tags not like '%SqlEndpointId%' AND unixTimeMS_state_end > unixTimeMS_state_start") + .withColumn("date", explode(col("state_dates"))) + .select("date").distinct().collect().map(x => x(0).toString).toList + + val clsf_latest = clsf_raw_filtered + .filter("custom_tags not like '%SqlEndpointId%' AND unixTimeMS_state_end > unixTimeMS_state_start") + .withColumn("date", explode(col("state_dates"))) + + // Rollback to Previous Entry of CLSF + val clsf_old = clsf_raw + .filter("custom_tags not like '%SqlEndpointId%' AND unixTimeMS_state_end > unixTimeMS_state_start") + .filter('overwatch_runID.isin(processed_ID: _*)) + .withColumn("date", explode(col("state_dates"))) + .filter('date.isin(current_dates: _*)) + + clsf_old.union(clsf_latest) + .select( + "cluster_id", + "organization_id", + "workspace_name", + "cluster_name", + "state_dates", + "total_DBU_cost", + "total_dbus", + "total_compute_cost", + "days_in_state", + "custom_tags", + "isAutomated", + "date", + "overwatch_runID" + ) + .withColumn("dbu_cost", col("total_DBU_cost") / col("days_in_state")) + .withColumn("total_dbu_spend", col("total_dbus") / col("days_in_state")) + .withColumn("compute_cost", col("total_compute_cost") / col("days_in_state")) + + } else { + clsf_raw + .filter("custom_tags not like '%SqlEndpointId%' AND unixTimeMS_state_end > unixTimeMS_state_start") + .select( + "cluster_id", + "organization_id", + "workspace_name", + "cluster_name", + "state_dates", + "total_DBU_cost", + "total_dbus", + "total_compute_cost", + "days_in_state", + "custom_tags", + "isAutomated", + "overwatch_runID" + ) + .withColumn("date", explode(col("state_dates"))) + .withColumn("dbu_cost", col("total_DBU_cost") / col("days_in_state")) + .withColumn("total_dbu_spend", col("total_dbus") / col("days_in_state")) + .withColumn("compute_cost", col("total_compute_cost") / col("days_in_state")) + } + + if (clsf.isEmpty) { + throw new NoNewDataException("No New Data", Level.WARN, true) + } + + val cluster = clusterDF.select( + "organization_id", + "cluster_id", + "date", + "cluster_type", + "is_automated", + "auto_termination_minutes" + ) + + val clsf_master = clsf + .join(cluster, Seq("cluster_id", "organization_id"), "inner") + .withColumn("cluster_category", expr( + """case + | when isAutomated = 'true' and cluster_type not in ('Serverless','SQL Analytics','Single Node') then 'Automated' + | when cluster_name like 'dlt%' and cluster_type = 'Standard' then 'Standard' + | when isAutomated = 'false' and cluster_type not in ('Serverless','SQL Analytics','Single Node') then 'Interactive' + | when (isAutomated = 'false' or isAutomated = 'true' or isAutomated is null) and cluster_type = 'SQL Analytics' then 'Warehouse' + | when (isAutomated = 'false' or isAutomated = 'true' or isAutomated is null) and cluster_type = 'Serverless' then 'High-Concurrency' + | when (isAutomated = 'false' or isAutomated = 'true' or isAutomated is null) and cluster_type = 'Single Node' then 'Single Node' + | else 'Unidentified' + | end""".stripMargin + )) + .select( + "cluster_id", + "cluster_category" + ).distinct() + + val cluster_agg = clsf + .groupBy("date", "organization_id", "workspace_name", "cluster_id", "cluster_name") + .agg( + sum("total_dbu_spend").alias("total_dbus"), + sum("dbu_cost").alias("total_dbu_cost_USD"), // Fixed column name + sum("compute_cost").alias("total_compute_cost_USD") // Fixed column name + ) + .orderBy(col("organization_id"), col("date").desc) + .cache() + + val cluster_cost_details = clsf_master + .join(cluster_agg, Seq("cluster_id"), "inner") + clsf.select("overwatch_runID").distinct().write.format("delta").mode("append").save(processed_runID_path) + cluster_cost_details + } + catch + { + case e: Throwable => + val appendTrackerErrorMsg = s"cluster_platinum table operation is failed" + e.getMessage + logger.log(Level.ERROR, appendTrackerErrorMsg, e) + println(appendTrackerErrorMsg, e) + throw e + } + + } +}