From 97ed91d5f220a3d1b7dcd66f2aaee4ede17d7a56 Mon Sep 17 00:00:00 2001 From: Sourav Banerjee Date: Tue, 24 Sep 2024 01:30:53 +0530 Subject: [PATCH 1/4] Added Cluster table for Platinum Layer --- .../overwatch/MultiWorkspaceDeployment.scala | 32 +++- .../labs/overwatch/MultiWorkspaceRunner.scala | 12 +- .../overwatch/pipeline/PipelineTargets.scala | 11 ++ .../labs/overwatch/pipeline/Platinum.scala | 98 +++++++++++ .../pipeline/PlatinumTransforms.scala | 164 ++++++++++++++++++ 5 files changed, 308 insertions(+), 9 deletions(-) create mode 100644 src/main/scala/com/databricks/labs/overwatch/pipeline/Platinum.scala create mode 100644 src/main/scala/com/databricks/labs/overwatch/pipeline/PlatinumTransforms.scala diff --git a/src/main/scala/com/databricks/labs/overwatch/MultiWorkspaceDeployment.scala b/src/main/scala/com/databricks/labs/overwatch/MultiWorkspaceDeployment.scala index 9913d1b22..a522f2600 100644 --- a/src/main/scala/com/databricks/labs/overwatch/MultiWorkspaceDeployment.scala +++ b/src/main/scala/com/databricks/labs/overwatch/MultiWorkspaceDeployment.scala @@ -308,6 +308,31 @@ class MultiWorkspaceDeployment extends SparkSessionWrapper { } } + private def startPlatinumDeployment(workspace: Workspace, deploymentId: String): MultiWSDeploymentReport = { + val workspaceId = workspace.getConfig.organizationId + val args = JsonUtils.objToJson(workspace.getConfig.inputConfig) + try { + println(s"""************Platinum Deployment Started workspaceID:$workspaceId args:${args.prettyString} ************"""") + + Platinum(workspace).run() + println(s"""************Platinum Deployment Completed workspaceID:$workspaceId************""") + MultiWSDeploymentReport(workspaceId, "Platinum", Some(args.compactString), + "SUCCESS", + Some(deploymentId) + ) + } catch { + case exception: Exception => + val fullMsg = PipelineFunctions.appendStackStrace(exception, "Got Exception while Deploying,") + logger.log(Level.ERROR, fullMsg) + MultiWSDeploymentReport(workspaceId, "Platinum", Some(args.compactString), + fullMsg, + Some(deploymentId) + ) + } finally { + clearThreadFromSessionsMap() + } + } + /** * Takes a snapshot of the config and saves it to /report/configTable * @@ -468,7 +493,7 @@ class MultiWorkspaceDeployment extends SparkSessionWrapper { /** * crate pipeline executions as futures and return the deployment reports * @param deploymentParams deployment params for a specific workspace - * @param medallions medallions to execute (bronze, silver, gold) + * @param medallions medallions to execute (bronze, silver, gold,platinum) * @param ec futures executionContext * @return future deployment report */ @@ -489,6 +514,7 @@ class MultiWorkspaceDeployment extends SparkSessionWrapper { if (zonesLower.contains("bronze")) threadDeploymentReport.append(startBronzeDeployment(workspace, deploymentId)) if (zonesLower.contains("silver")) threadDeploymentReport.append(startSilverDeployment(workspace, deploymentId)) if (zonesLower.contains("gold")) threadDeploymentReport.append(startGoldDeployment(workspace, deploymentId)) + if (zonesLower.contains("platinum")) threadDeploymentReport.append(startPlatinumDeployment(workspace, deploymentId)) threadDeploymentReport.toArray }(ec) @@ -499,9 +525,9 @@ class MultiWorkspaceDeployment extends SparkSessionWrapper { * Performs the multi workspace deployment * * @param parallelism the number of threads which will run in parallel to perform the deployment - * @param zones the zone can be Bronze,Silver or Gold by default it will be Bronze + * @param zones the zone can be Bronze,Silver,Gold or Platinum by default it will be Bronze */ - def deploy(parallelism: Int = 4, zones: String = "Bronze,Silver,Gold"): Unit = { + def deploy(parallelism: Int = 4, zones: String = "Bronze,Silver,Gold,Platinum"): Unit = { val processingStartTime = System.currentTimeMillis(); try { // initialize spark overrides for global spark conf diff --git a/src/main/scala/com/databricks/labs/overwatch/MultiWorkspaceRunner.scala b/src/main/scala/com/databricks/labs/overwatch/MultiWorkspaceRunner.scala index b228b62f9..49a144362 100644 --- a/src/main/scala/com/databricks/labs/overwatch/MultiWorkspaceRunner.scala +++ b/src/main/scala/com/databricks/labs/overwatch/MultiWorkspaceRunner.scala @@ -14,10 +14,10 @@ object MultiWorkspaceRunner extends SparkSessionWrapper{ val zoneArray = zones.split(",").distinct zoneArray.foreach(zone => { val layer = zone.toLowerCase() - if (layer == "bronze" || layer == "silver" || layer == "gold") { + if (layer == "bronze" || layer == "silver" || layer == "gold" || layer == "platinum") { //validated }else{ - val errMsg = s"Unknown Zone found ${zones}, Zone should be either Bronze,Silver or Gold" + val errMsg = s"Unknown Zone found ${zones}, Zone should be either Bronze,Silver ,Gold or Platinum" throw new BadConfigException(errMsg) } }) @@ -32,13 +32,13 @@ object MultiWorkspaceRunner extends SparkSessionWrapper{ def main(args: Array[String]): Unit = { envInit() if (args.length == 1) { //Deploy Bronze,Silver and Gold with default parallelism. - logger.log(Level.INFO, "Deploying Bronze,Silver and Gold") - MultiWorkspaceDeployment(args(0)).deploy(4,"Bronze,Silver,Gold") + logger.log(Level.INFO, "Deploying Bronze,Silver and Gold,Platinum") + MultiWorkspaceDeployment(args(0)).deploy(4,"Bronze,Silver,Gold,Platinum") } else if (args.length == 2) {//Deploy Bronze,Silver and Gold with provided parallelism. val parallelism = args(1).toInt - logger.log(Level.INFO, s"Deploying Bronze,Silver and Gold with parallelism: ${parallelism}") - MultiWorkspaceDeployment(args(0)).deploy(parallelism,"Bronze,Silver,Gold") + logger.log(Level.INFO, s"Deploying Bronze,Silver,Gold and Platinum with parallelism: ${parallelism}") + MultiWorkspaceDeployment(args(0)).deploy(parallelism,"Bronze,Silver,Gold,Platinum") } else if(args.length == 3) { val parallelism = args(1).toInt validateInputZone(args(2)) diff --git a/src/main/scala/com/databricks/labs/overwatch/pipeline/PipelineTargets.scala b/src/main/scala/com/databricks/labs/overwatch/pipeline/PipelineTargets.scala index 39f3adb16..d465cf6ef 100644 --- a/src/main/scala/com/databricks/labs/overwatch/pipeline/PipelineTargets.scala +++ b/src/main/scala/com/databricks/labs/overwatch/pipeline/PipelineTargets.scala @@ -740,6 +740,17 @@ abstract class PipelineTargets(config: Config) { ) } + object PlatinumTargets { + lazy private[overwatch] val clusterPlatinumTarget: PipelineTable = PipelineTable( + name = "cluster_platinum", + _keys = Array("cluster_id", "cluster_category", "date"), + config, + _mode = WriteMode.merge, + partitionBy = Seq("organization_id", "date"), + incrementalColumns = Array("date"), + zOrderBy = Array("cluster_id", "date") + ) + } } diff --git a/src/main/scala/com/databricks/labs/overwatch/pipeline/Platinum.scala b/src/main/scala/com/databricks/labs/overwatch/pipeline/Platinum.scala new file mode 100644 index 000000000..f44a8f475 --- /dev/null +++ b/src/main/scala/com/databricks/labs/overwatch/pipeline/Platinum.scala @@ -0,0 +1,98 @@ +package com.databricks.labs.overwatch.pipeline + +import com.databricks.labs.overwatch.env.{Database, Workspace} +import com.databricks.labs.overwatch.utils.{Config, OverwatchScope} +import org.apache.log4j.Logger + + +class Platinum(_workspace: Workspace, _database: Database, _config: Config) + extends Pipeline(_workspace, _database, _config) + with PlatinumTransforms { + + /** + * Enable access to Gold pipeline tables externally. + * + * @return + */ + def getAllTargets: Array[PipelineTable] = { + Array( + PlatinumTargets.clusterPlatinumTarget + ) + } + + def getAllModules: Seq[Module] = { + config.overwatchScope.flatMap { + case OverwatchScope.clusters => { + Array(clusterModule) + } + case _ => Array[Module]() + } + } + envInit() + + private val logger: Logger = Logger.getLogger(this.getClass) + + private val clsfSparkOverrides = Map( + "spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes" -> "67108864" // lower to 64MB due to high skew potential + ) + + lazy private[overwatch] val clusterModule = Module(4001, "Gold_Cluster", this, Array(3005)) + lazy private val appendClusterProccess: () => ETLDefinition = { + () => + ETLDefinition( + GoldTargets.clusterStateFactTarget.asDF, + Seq(buildClusterPlatinum(config,GoldTargets.clusterTarget.asDF)), + append(PlatinumTargets.clusterPlatinumTarget) + ) + } + + + + private def executeModules(): Unit = { + config.overwatchScope.foreach { + case OverwatchScope.clusters => clusterModule.execute(appendClusterProccess) + case _ => + } + } + + def run(): Pipeline = { + + restoreSparkConf() + executeModules() + + initiatePostProcessing() + this // to be used as fail switch later if necessary + } + +} + +object Platinum { + def apply(workspace: Workspace): Platinum = { + apply( + workspace, + readOnly = false, + suppressReport = false, + suppressStaticDatasets = false + ) + } + + private[overwatch] def apply( + workspace: Workspace, + readOnly: Boolean = false, + suppressReport: Boolean = false, + suppressStaticDatasets: Boolean = false + + ): Platinum = { + val platinumPipeline = new Platinum(workspace, workspace.database, workspace.getConfig) + .setReadOnly(if (workspace.isValidated) readOnly else true) // if workspace is not validated set it read only + .suppressRangeReport(suppressReport) + .initPipelineRun() + + if (suppressStaticDatasets) { + platinumPipeline + } else { + platinumPipeline.loadStaticDatasets() + } + } + +} \ No newline at end of file diff --git a/src/main/scala/com/databricks/labs/overwatch/pipeline/PlatinumTransforms.scala b/src/main/scala/com/databricks/labs/overwatch/pipeline/PlatinumTransforms.scala new file mode 100644 index 000000000..064f303e6 --- /dev/null +++ b/src/main/scala/com/databricks/labs/overwatch/pipeline/PlatinumTransforms.scala @@ -0,0 +1,164 @@ +package com.databricks.labs.overwatch.pipeline + +import com.databricks.labs.overwatch.pipeline.TransformFunctions._ +import com.databricks.labs.overwatch.pipeline.WorkflowsTransforms._ +import com.databricks.labs.overwatch.utils._ +import io.delta.tables.DeltaTable +import org.apache.log4j.{Level, Logger} +import org.apache.spark.internal.config +import org.apache.spark.sql.expressions.Window +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.{Column, DataFrame} + +trait PlatinumTransforms extends SparkSessionWrapper { + + import spark.implicits._ + + private val logger: Logger = Logger.getLogger(this.getClass) + + protected def buildClusterPlatinum( + config: Config, + clusterDF: DataFrame, + )(clsfDF: DataFrame): DataFrame = { + + val clsf_raw = clsfDF + + val processed_runID_path = s"${config.etlDataPathPrefix}/platinum/cluster/runID" + + val clsf = if (Helpers.pathExists(processed_runID_path)) { + println("Resume Run") + val processed_ID = spark.read.format("delta").load(processed_runID_path).select("overwatch_runID").distinct().collect().map(x => x(0).toString).toList + + val clsf_raw_filtered = clsf_raw.filter(!'overwatch_runID.isin(processed_ID: _*)) + + val current_dates = clsf_raw_filtered + .filter("custom_tags not like '%SqlEndpointId%' AND unixTimeMS_state_end > unixTimeMS_state_start") + .withColumn("date", explode(col("state_dates"))) + .select("date").distinct().collect().map(x => x(0).toString).toList + + val clsf_latest = clsf_raw_filtered + .filter("custom_tags not like '%SqlEndpointId%' AND unixTimeMS_state_end > unixTimeMS_state_start") + .withColumn("date", explode(col("state_dates"))) + + // Rollback to Previous Entry of CLSF + val clsf_old = clsf_raw + .filter("custom_tags not like '%SqlEndpointId%' AND unixTimeMS_state_end > unixTimeMS_state_start") + .filter('overwatch_runID.isin(processed_ID: _*)) + .withColumn("date", explode(col("state_dates"))) + .filter('date.isin(current_dates: _*)) + + clsf_old.union(clsf_latest) + .select( + "cluster_id", + "organization_id", + "workspace_name", + "cluster_name", + "state_dates", + "total_DBU_cost", + "total_dbus", + "total_compute_cost", + "days_in_state", + "custom_tags", + "isAutomated", + "date", + "overwatch_runID" + ) + .withColumn("dbu_cost", col("total_DBU_cost") / col("days_in_state")) + .withColumn("total_dbu_spend", col("total_dbus") / col("days_in_state")) + .withColumn("compute_cost", col("total_compute_cost") / col("days_in_state")) + + } else { + println("First Time Run") + clsf_raw + .filter("custom_tags not like '%SqlEndpointId%' AND unixTimeMS_state_end > unixTimeMS_state_start") + .select( + "cluster_id", + "organization_id", + "workspace_name", + "cluster_name", + "state_dates", + "total_DBU_cost", + "total_dbus", + "total_compute_cost", + "days_in_state", + "custom_tags", + "isAutomated", + "overwatch_runID" + ) + .withColumn("date", explode(col("state_dates"))) + .withColumn("dbu_cost", col("total_DBU_cost") / col("days_in_state")) + .withColumn("total_dbu_spend", col("total_dbus") / col("days_in_state")) + .withColumn("compute_cost", col("total_compute_cost") / col("days_in_state")) + } + + val cluster = clusterDF.select( + "organization_id", + "cluster_id", + "date", + "cluster_type", + "is_automated", + "auto_termination_minutes" + ) + + val clsf_master = clsf + .join(cluster, Seq("cluster_id", "organization_id"), "inner") + .withColumn("cluster_category", expr( + """case + | when isAutomated = 'true' and cluster_type not in ('Serverless','SQL Analytics','Single Node') then 'Automated' + | when cluster_name like 'dlt%' and cluster_type = 'Standard' then 'Standard' + | when isAutomated = 'false' and cluster_type not in ('Serverless','SQL Analytics','Single Node') then 'Interactive' + | when (isAutomated = 'false' or isAutomated = 'true' or isAutomated is null) and cluster_type = 'SQL Analytics' then 'Warehouse' + | when (isAutomated = 'false' or isAutomated = 'true' or isAutomated is null) and cluster_type = 'Serverless' then 'High-Concurrency' + | when (isAutomated = 'false' or isAutomated = 'true' or isAutomated is null) and cluster_type = 'Single Node' then 'Single Node' + | else 'Unidentified' + | end""".stripMargin + )) + .select( + "cluster_id", + "cluster_category" + ).distinct() + + val cluster_agg = clsf + .groupBy("date", "organization_id", "workspace_name", "cluster_id", "cluster_name") + .agg( + sum("total_dbu_spend").alias("total_dbus"), + sum("dbu_cost").alias("total_dbu_cost_USD"), // Fixed column name + sum("compute_cost").alias("total_compute_cost_USD") // Fixed column name + ) + .orderBy(col("organization_id"), col("date").desc) + .cache() + + val cluster_cost_details = clsf_master + .join(cluster_agg, Seq("cluster_id"), "inner") + +// if (spark.catalog.tableExists(s"${platinum_table}")){ +// print("Plantinum Table Exists. Merge Operation in Progress!!") +// val deltaTableName = s"${platinum_table}" +// val deltaTable = DeltaTable.forName(deltaTableName) +// val sourceDF = cluster_cost_details +// +// deltaTable.as("target") +// .merge( +// sourceDF.as("source"), +// """ +// target.cluster_id = source.cluster_id AND +// target.cluster_category = source.cluster_category AND +// target.date = source.date +// """ +// ) +// .whenMatched() +// .updateExpr(Map( +// "total_dbus" -> "source.total_dbus", +// "`total_dbu_cost_USD`" -> "source.`total_dbu_cost_USD`", +// "`total_compute_cost_USD`" -> "source.`total_compute_cost_USD`" +// )) +// .whenNotMatched() +// .insertAll() +// .execute() +// }else{ +// cluster_cost_details.write.saveAsTable(s"${platinum_table}") +// } + clsf.select("overwatch_runID").distinct().write.format("delta").mode("append").save(processed_runID_path) + cluster_cost_details + } +} From 036841db0c98cd168456a0c92a29e8636be089fa Mon Sep 17 00:00:00 2001 From: Sourav Banerjee Date: Tue, 24 Sep 2024 11:09:03 +0530 Subject: [PATCH 2/4] Added Module Name for Cluster Platinum --- .../labs/overwatch/pipeline/Platinum.scala | 2 +- .../pipeline/PlatinumTransforms.scala | 245 +++++++++--------- 2 files changed, 117 insertions(+), 130 deletions(-) diff --git a/src/main/scala/com/databricks/labs/overwatch/pipeline/Platinum.scala b/src/main/scala/com/databricks/labs/overwatch/pipeline/Platinum.scala index f44a8f475..5f0e738f3 100644 --- a/src/main/scala/com/databricks/labs/overwatch/pipeline/Platinum.scala +++ b/src/main/scala/com/databricks/labs/overwatch/pipeline/Platinum.scala @@ -36,7 +36,7 @@ class Platinum(_workspace: Workspace, _database: Database, _config: Config) "spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes" -> "67108864" // lower to 64MB due to high skew potential ) - lazy private[overwatch] val clusterModule = Module(4001, "Gold_Cluster", this, Array(3005)) + lazy private[overwatch] val clusterModule = Module(4001, "Platinum_Cluster", this, Array(3005)) lazy private val appendClusterProccess: () => ETLDefinition = { () => ETLDefinition( diff --git a/src/main/scala/com/databricks/labs/overwatch/pipeline/PlatinumTransforms.scala b/src/main/scala/com/databricks/labs/overwatch/pipeline/PlatinumTransforms.scala index 064f303e6..f033464dc 100644 --- a/src/main/scala/com/databricks/labs/overwatch/pipeline/PlatinumTransforms.scala +++ b/src/main/scala/com/databricks/labs/overwatch/pipeline/PlatinumTransforms.scala @@ -25,140 +25,127 @@ trait PlatinumTransforms extends SparkSessionWrapper { val processed_runID_path = s"${config.etlDataPathPrefix}/platinum/cluster/runID" - val clsf = if (Helpers.pathExists(processed_runID_path)) { - println("Resume Run") - val processed_ID = spark.read.format("delta").load(processed_runID_path).select("overwatch_runID").distinct().collect().map(x => x(0).toString).toList - - val clsf_raw_filtered = clsf_raw.filter(!'overwatch_runID.isin(processed_ID: _*)) - - val current_dates = clsf_raw_filtered - .filter("custom_tags not like '%SqlEndpointId%' AND unixTimeMS_state_end > unixTimeMS_state_start") - .withColumn("date", explode(col("state_dates"))) - .select("date").distinct().collect().map(x => x(0).toString).toList - - val clsf_latest = clsf_raw_filtered - .filter("custom_tags not like '%SqlEndpointId%' AND unixTimeMS_state_end > unixTimeMS_state_start") - .withColumn("date", explode(col("state_dates"))) - - // Rollback to Previous Entry of CLSF - val clsf_old = clsf_raw - .filter("custom_tags not like '%SqlEndpointId%' AND unixTimeMS_state_end > unixTimeMS_state_start") - .filter('overwatch_runID.isin(processed_ID: _*)) - .withColumn("date", explode(col("state_dates"))) - .filter('date.isin(current_dates: _*)) + try { + val clsf = if (Helpers.pathExists(processed_runID_path)) { + println("Resume Run") + val processed_ID = spark.read.format("delta").load(processed_runID_path).select("overwatch_runID").distinct().collect().map(x => x(0).toString).toList + + val clsf_raw_filtered = clsf_raw.filter(!'overwatch_runID.isin(processed_ID: _*)) + + val current_dates = clsf_raw_filtered + .filter("custom_tags not like '%SqlEndpointId%' AND unixTimeMS_state_end > unixTimeMS_state_start") + .withColumn("date", explode(col("state_dates"))) + .select("date").distinct().collect().map(x => x(0).toString).toList + + val clsf_latest = clsf_raw_filtered + .filter("custom_tags not like '%SqlEndpointId%' AND unixTimeMS_state_end > unixTimeMS_state_start") + .withColumn("date", explode(col("state_dates"))) + + // Rollback to Previous Entry of CLSF + val clsf_old = clsf_raw + .filter("custom_tags not like '%SqlEndpointId%' AND unixTimeMS_state_end > unixTimeMS_state_start") + .filter('overwatch_runID.isin(processed_ID: _*)) + .withColumn("date", explode(col("state_dates"))) + .filter('date.isin(current_dates: _*)) + + clsf_old.union(clsf_latest) + .select( + "cluster_id", + "organization_id", + "workspace_name", + "cluster_name", + "state_dates", + "total_DBU_cost", + "total_dbus", + "total_compute_cost", + "days_in_state", + "custom_tags", + "isAutomated", + "date", + "overwatch_runID" + ) + .withColumn("dbu_cost", col("total_DBU_cost") / col("days_in_state")) + .withColumn("total_dbu_spend", col("total_dbus") / col("days_in_state")) + .withColumn("compute_cost", col("total_compute_cost") / col("days_in_state")) + + } else { + println("First Time Run") + clsf_raw + .filter("custom_tags not like '%SqlEndpointId%' AND unixTimeMS_state_end > unixTimeMS_state_start") + .select( + "cluster_id", + "organization_id", + "workspace_name", + "cluster_name", + "state_dates", + "total_DBU_cost", + "total_dbus", + "total_compute_cost", + "days_in_state", + "custom_tags", + "isAutomated", + "overwatch_runID" + ) + .withColumn("date", explode(col("state_dates"))) + .withColumn("dbu_cost", col("total_DBU_cost") / col("days_in_state")) + .withColumn("total_dbu_spend", col("total_dbus") / col("days_in_state")) + .withColumn("compute_cost", col("total_compute_cost") / col("days_in_state")) + } + + if (clsf.isEmpty) { + throw new NoNewDataException("No New Data", Level.WARN, true) + } + + val cluster = clusterDF.select( + "organization_id", + "cluster_id", + "date", + "cluster_type", + "is_automated", + "auto_termination_minutes" + ) - clsf_old.union(clsf_latest) + val clsf_master = clsf + .join(cluster, Seq("cluster_id", "organization_id"), "inner") + .withColumn("cluster_category", expr( + """case + | when isAutomated = 'true' and cluster_type not in ('Serverless','SQL Analytics','Single Node') then 'Automated' + | when cluster_name like 'dlt%' and cluster_type = 'Standard' then 'Standard' + | when isAutomated = 'false' and cluster_type not in ('Serverless','SQL Analytics','Single Node') then 'Interactive' + | when (isAutomated = 'false' or isAutomated = 'true' or isAutomated is null) and cluster_type = 'SQL Analytics' then 'Warehouse' + | when (isAutomated = 'false' or isAutomated = 'true' or isAutomated is null) and cluster_type = 'Serverless' then 'High-Concurrency' + | when (isAutomated = 'false' or isAutomated = 'true' or isAutomated is null) and cluster_type = 'Single Node' then 'Single Node' + | else 'Unidentified' + | end""".stripMargin + )) .select( "cluster_id", - "organization_id", - "workspace_name", - "cluster_name", - "state_dates", - "total_DBU_cost", - "total_dbus", - "total_compute_cost", - "days_in_state", - "custom_tags", - "isAutomated", - "date", - "overwatch_runID" + "cluster_category" + ).distinct() + + val cluster_agg = clsf + .groupBy("date", "organization_id", "workspace_name", "cluster_id", "cluster_name") + .agg( + sum("total_dbu_spend").alias("total_dbus"), + sum("dbu_cost").alias("total_dbu_cost_USD"), // Fixed column name + sum("compute_cost").alias("total_compute_cost_USD") // Fixed column name ) - .withColumn("dbu_cost", col("total_DBU_cost") / col("days_in_state")) - .withColumn("total_dbu_spend", col("total_dbus") / col("days_in_state")) - .withColumn("compute_cost", col("total_compute_cost") / col("days_in_state")) - - } else { - println("First Time Run") - clsf_raw - .filter("custom_tags not like '%SqlEndpointId%' AND unixTimeMS_state_end > unixTimeMS_state_start") - .select( - "cluster_id", - "organization_id", - "workspace_name", - "cluster_name", - "state_dates", - "total_DBU_cost", - "total_dbus", - "total_compute_cost", - "days_in_state", - "custom_tags", - "isAutomated", - "overwatch_runID" - ) - .withColumn("date", explode(col("state_dates"))) - .withColumn("dbu_cost", col("total_DBU_cost") / col("days_in_state")) - .withColumn("total_dbu_spend", col("total_dbus") / col("days_in_state")) - .withColumn("compute_cost", col("total_compute_cost") / col("days_in_state")) + .orderBy(col("organization_id"), col("date").desc) + .cache() + + val cluster_cost_details = clsf_master + .join(cluster_agg, Seq("cluster_id"), "inner") + clsf.select("overwatch_runID").distinct().write.format("delta").mode("append").save(processed_runID_path) + cluster_cost_details } + catch + { + case e: Throwable => + val appendTrackerErrorMsg = s"cluster_platinum table operation is failed" + e.getMessage + logger.log(Level.ERROR, appendTrackerErrorMsg, e) + println(appendTrackerErrorMsg, e) + throw e + } - val cluster = clusterDF.select( - "organization_id", - "cluster_id", - "date", - "cluster_type", - "is_automated", - "auto_termination_minutes" - ) - - val clsf_master = clsf - .join(cluster, Seq("cluster_id", "organization_id"), "inner") - .withColumn("cluster_category", expr( - """case - | when isAutomated = 'true' and cluster_type not in ('Serverless','SQL Analytics','Single Node') then 'Automated' - | when cluster_name like 'dlt%' and cluster_type = 'Standard' then 'Standard' - | when isAutomated = 'false' and cluster_type not in ('Serverless','SQL Analytics','Single Node') then 'Interactive' - | when (isAutomated = 'false' or isAutomated = 'true' or isAutomated is null) and cluster_type = 'SQL Analytics' then 'Warehouse' - | when (isAutomated = 'false' or isAutomated = 'true' or isAutomated is null) and cluster_type = 'Serverless' then 'High-Concurrency' - | when (isAutomated = 'false' or isAutomated = 'true' or isAutomated is null) and cluster_type = 'Single Node' then 'Single Node' - | else 'Unidentified' - | end""".stripMargin - )) - .select( - "cluster_id", - "cluster_category" - ).distinct() - - val cluster_agg = clsf - .groupBy("date", "organization_id", "workspace_name", "cluster_id", "cluster_name") - .agg( - sum("total_dbu_spend").alias("total_dbus"), - sum("dbu_cost").alias("total_dbu_cost_USD"), // Fixed column name - sum("compute_cost").alias("total_compute_cost_USD") // Fixed column name - ) - .orderBy(col("organization_id"), col("date").desc) - .cache() - - val cluster_cost_details = clsf_master - .join(cluster_agg, Seq("cluster_id"), "inner") - -// if (spark.catalog.tableExists(s"${platinum_table}")){ -// print("Plantinum Table Exists. Merge Operation in Progress!!") -// val deltaTableName = s"${platinum_table}" -// val deltaTable = DeltaTable.forName(deltaTableName) -// val sourceDF = cluster_cost_details -// -// deltaTable.as("target") -// .merge( -// sourceDF.as("source"), -// """ -// target.cluster_id = source.cluster_id AND -// target.cluster_category = source.cluster_category AND -// target.date = source.date -// """ -// ) -// .whenMatched() -// .updateExpr(Map( -// "total_dbus" -> "source.total_dbus", -// "`total_dbu_cost_USD`" -> "source.`total_dbu_cost_USD`", -// "`total_compute_cost_USD`" -> "source.`total_compute_cost_USD`" -// )) -// .whenNotMatched() -// .insertAll() -// .execute() -// }else{ -// cluster_cost_details.write.saveAsTable(s"${platinum_table}") -// } - clsf.select("overwatch_runID").distinct().write.format("delta").mode("append").save(processed_runID_path) - cluster_cost_details } } From f9522fb6f1f9611cdbc36a826af7c344ed9ffb85 Mon Sep 17 00:00:00 2001 From: Sourav Banerjee Date: Tue, 24 Sep 2024 18:13:45 +0530 Subject: [PATCH 3/4] Added Module Name for Cluster Platinum --- .../com/databricks/labs/overwatch/pipeline/Platinum.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/scala/com/databricks/labs/overwatch/pipeline/Platinum.scala b/src/main/scala/com/databricks/labs/overwatch/pipeline/Platinum.scala index 5f0e738f3..ffee2ed39 100644 --- a/src/main/scala/com/databricks/labs/overwatch/pipeline/Platinum.scala +++ b/src/main/scala/com/databricks/labs/overwatch/pipeline/Platinum.scala @@ -23,7 +23,7 @@ class Platinum(_workspace: Workspace, _database: Database, _config: Config) def getAllModules: Seq[Module] = { config.overwatchScope.flatMap { case OverwatchScope.clusters => { - Array(clusterModule) + Array(clusterPlatinumModule) } case _ => Array[Module]() } @@ -36,8 +36,8 @@ class Platinum(_workspace: Workspace, _database: Database, _config: Config) "spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes" -> "67108864" // lower to 64MB due to high skew potential ) - lazy private[overwatch] val clusterModule = Module(4001, "Platinum_Cluster", this, Array(3005)) - lazy private val appendClusterProccess: () => ETLDefinition = { + lazy private[overwatch] val clusterPlatinumModule = Module(4001, "Platinum_Cluster", this, Array(3005)) + lazy private val appendClusterPlatinumProccess: () => ETLDefinition = { () => ETLDefinition( GoldTargets.clusterStateFactTarget.asDF, @@ -50,7 +50,7 @@ class Platinum(_workspace: Workspace, _database: Database, _config: Config) private def executeModules(): Unit = { config.overwatchScope.foreach { - case OverwatchScope.clusters => clusterModule.execute(appendClusterProccess) + case OverwatchScope.clusters => clusterPlatinumModule.execute(appendClusterPlatinumProccess) case _ => } } From 00428a65c50216de9a06d9b1421b8d39d3a2ca97 Mon Sep 17 00:00:00 2001 From: Sourav Banerjee Date: Tue, 24 Sep 2024 18:30:24 +0530 Subject: [PATCH 4/4] Removed reductant print statements --- .../databricks/labs/overwatch/pipeline/PlatinumTransforms.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/scala/com/databricks/labs/overwatch/pipeline/PlatinumTransforms.scala b/src/main/scala/com/databricks/labs/overwatch/pipeline/PlatinumTransforms.scala index f033464dc..16b6d9192 100644 --- a/src/main/scala/com/databricks/labs/overwatch/pipeline/PlatinumTransforms.scala +++ b/src/main/scala/com/databricks/labs/overwatch/pipeline/PlatinumTransforms.scala @@ -27,7 +27,6 @@ trait PlatinumTransforms extends SparkSessionWrapper { try { val clsf = if (Helpers.pathExists(processed_runID_path)) { - println("Resume Run") val processed_ID = spark.read.format("delta").load(processed_runID_path).select("overwatch_runID").distinct().collect().map(x => x(0).toString).toList val clsf_raw_filtered = clsf_raw.filter(!'overwatch_runID.isin(processed_ID: _*)) @@ -69,7 +68,6 @@ trait PlatinumTransforms extends SparkSessionWrapper { .withColumn("compute_cost", col("total_compute_cost") / col("days_in_state")) } else { - println("First Time Run") clsf_raw .filter("custom_tags not like '%SqlEndpointId%' AND unixTimeMS_state_end > unixTimeMS_state_start") .select(