Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reporting cluster 1276 #1289

Open
wants to merge 4 commits into
base: 0820_release
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,31 @@ class MultiWorkspaceDeployment extends SparkSessionWrapper {
}
}

private def startPlatinumDeployment(workspace: Workspace, deploymentId: String): MultiWSDeploymentReport = {
val workspaceId = workspace.getConfig.organizationId
val args = JsonUtils.objToJson(workspace.getConfig.inputConfig)
try {
println(s"""************Platinum Deployment Started workspaceID:$workspaceId args:${args.prettyString} ************"""")

Platinum(workspace).run()
println(s"""************Platinum Deployment Completed workspaceID:$workspaceId************""")
MultiWSDeploymentReport(workspaceId, "Platinum", Some(args.compactString),
"SUCCESS",
Some(deploymentId)
)
} catch {
case exception: Exception =>
val fullMsg = PipelineFunctions.appendStackStrace(exception, "Got Exception while Deploying,")
logger.log(Level.ERROR, fullMsg)
MultiWSDeploymentReport(workspaceId, "Platinum", Some(args.compactString),
fullMsg,
Some(deploymentId)
)
} finally {
clearThreadFromSessionsMap()
}
}

/**
* Takes a snapshot of the config and saves it to /report/configTable
*
Expand Down Expand Up @@ -468,7 +493,7 @@ class MultiWorkspaceDeployment extends SparkSessionWrapper {
/**
* crate pipeline executions as futures and return the deployment reports
* @param deploymentParams deployment params for a specific workspace
* @param medallions medallions to execute (bronze, silver, gold)
* @param medallions medallions to execute (bronze, silver, gold,platinum)
* @param ec futures executionContext
* @return future deployment report
*/
Expand All @@ -489,6 +514,7 @@ class MultiWorkspaceDeployment extends SparkSessionWrapper {
if (zonesLower.contains("bronze")) threadDeploymentReport.append(startBronzeDeployment(workspace, deploymentId))
if (zonesLower.contains("silver")) threadDeploymentReport.append(startSilverDeployment(workspace, deploymentId))
if (zonesLower.contains("gold")) threadDeploymentReport.append(startGoldDeployment(workspace, deploymentId))
if (zonesLower.contains("platinum")) threadDeploymentReport.append(startPlatinumDeployment(workspace, deploymentId))
threadDeploymentReport.toArray
}(ec)

Expand All @@ -499,9 +525,9 @@ class MultiWorkspaceDeployment extends SparkSessionWrapper {
* Performs the multi workspace deployment
*
* @param parallelism the number of threads which will run in parallel to perform the deployment
* @param zones the zone can be Bronze,Silver or Gold by default it will be Bronze
* @param zones the zone can be Bronze,Silver,Gold or Platinum by default it will be Bronze
*/
def deploy(parallelism: Int = 4, zones: String = "Bronze,Silver,Gold"): Unit = {
def deploy(parallelism: Int = 4, zones: String = "Bronze,Silver,Gold,Platinum"): Unit = {
val processingStartTime = System.currentTimeMillis();
try {
// initialize spark overrides for global spark conf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ object MultiWorkspaceRunner extends SparkSessionWrapper{
val zoneArray = zones.split(",").distinct
zoneArray.foreach(zone => {
val layer = zone.toLowerCase()
if (layer == "bronze" || layer == "silver" || layer == "gold") {
if (layer == "bronze" || layer == "silver" || layer == "gold" || layer == "platinum") {
//validated
}else{
val errMsg = s"Unknown Zone found ${zones}, Zone should be either Bronze,Silver or Gold"
val errMsg = s"Unknown Zone found ${zones}, Zone should be either Bronze,Silver ,Gold or Platinum"
throw new BadConfigException(errMsg)
}
})
Expand All @@ -32,13 +32,13 @@ object MultiWorkspaceRunner extends SparkSessionWrapper{
def main(args: Array[String]): Unit = {
envInit()
if (args.length == 1) { //Deploy Bronze,Silver and Gold with default parallelism.
logger.log(Level.INFO, "Deploying Bronze,Silver and Gold")
MultiWorkspaceDeployment(args(0)).deploy(4,"Bronze,Silver,Gold")
logger.log(Level.INFO, "Deploying Bronze,Silver and Gold,Platinum")
MultiWorkspaceDeployment(args(0)).deploy(4,"Bronze,Silver,Gold,Platinum")

} else if (args.length == 2) {//Deploy Bronze,Silver and Gold with provided parallelism.
val parallelism = args(1).toInt
logger.log(Level.INFO, s"Deploying Bronze,Silver and Gold with parallelism: ${parallelism}")
MultiWorkspaceDeployment(args(0)).deploy(parallelism,"Bronze,Silver,Gold")
logger.log(Level.INFO, s"Deploying Bronze,Silver,Gold and Platinum with parallelism: ${parallelism}")
MultiWorkspaceDeployment(args(0)).deploy(parallelism,"Bronze,Silver,Gold,Platinum")
} else if(args.length == 3) {
val parallelism = args(1).toInt
validateInputZone(args(2))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,17 @@ abstract class PipelineTargets(config: Config) {
)

}
object PlatinumTargets {
lazy private[overwatch] val clusterPlatinumTarget: PipelineTable = PipelineTable(
name = "cluster_platinum",
_keys = Array("cluster_id", "cluster_category", "date"),
config,
_mode = WriteMode.merge,
partitionBy = Seq("organization_id", "date"),
incrementalColumns = Array("date"),
zOrderBy = Array("cluster_id", "date")
)
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package com.databricks.labs.overwatch.pipeline

import com.databricks.labs.overwatch.env.{Database, Workspace}
import com.databricks.labs.overwatch.utils.{Config, OverwatchScope}
import org.apache.log4j.Logger


class Platinum(_workspace: Workspace, _database: Database, _config: Config)
extends Pipeline(_workspace, _database, _config)
with PlatinumTransforms {

/**
* Enable access to Gold pipeline tables externally.
*
* @return
*/
def getAllTargets: Array[PipelineTable] = {
Array(
PlatinumTargets.clusterPlatinumTarget
)
}

def getAllModules: Seq[Module] = {
config.overwatchScope.flatMap {
case OverwatchScope.clusters => {
Array(clusterPlatinumModule)
}
case _ => Array[Module]()
}
}
envInit()

private val logger: Logger = Logger.getLogger(this.getClass)

private val clsfSparkOverrides = Map(
"spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes" -> "67108864" // lower to 64MB due to high skew potential
)

lazy private[overwatch] val clusterPlatinumModule = Module(4001, "Platinum_Cluster", this, Array(3005))
lazy private val appendClusterPlatinumProccess: () => ETLDefinition = {
() =>
ETLDefinition(
GoldTargets.clusterStateFactTarget.asDF,
Seq(buildClusterPlatinum(config,GoldTargets.clusterTarget.asDF)),
append(PlatinumTargets.clusterPlatinumTarget)
)
}



private def executeModules(): Unit = {
config.overwatchScope.foreach {
case OverwatchScope.clusters => clusterPlatinumModule.execute(appendClusterPlatinumProccess)
case _ =>
}
}

def run(): Pipeline = {

restoreSparkConf()
executeModules()

initiatePostProcessing()
this // to be used as fail switch later if necessary
}

}

object Platinum {
def apply(workspace: Workspace): Platinum = {
apply(
workspace,
readOnly = false,
suppressReport = false,
suppressStaticDatasets = false
)
}

private[overwatch] def apply(
workspace: Workspace,
readOnly: Boolean = false,
suppressReport: Boolean = false,
suppressStaticDatasets: Boolean = false

): Platinum = {
val platinumPipeline = new Platinum(workspace, workspace.database, workspace.getConfig)
.setReadOnly(if (workspace.isValidated) readOnly else true) // if workspace is not validated set it read only
.suppressRangeReport(suppressReport)
.initPipelineRun()

if (suppressStaticDatasets) {
platinumPipeline
} else {
platinumPipeline.loadStaticDatasets()
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package com.databricks.labs.overwatch.pipeline

import com.databricks.labs.overwatch.pipeline.TransformFunctions._
import com.databricks.labs.overwatch.pipeline.WorkflowsTransforms._
import com.databricks.labs.overwatch.utils._
import io.delta.tables.DeltaTable
import org.apache.log4j.{Level, Logger}
import org.apache.spark.internal.config
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Column, DataFrame}

trait PlatinumTransforms extends SparkSessionWrapper {

import spark.implicits._

private val logger: Logger = Logger.getLogger(this.getClass)

protected def buildClusterPlatinum(
config: Config,
clusterDF: DataFrame,
)(clsfDF: DataFrame): DataFrame = {

val clsf_raw = clsfDF

val processed_runID_path = s"${config.etlDataPathPrefix}/platinum/cluster/runID"

try {
val clsf = if (Helpers.pathExists(processed_runID_path)) {
val processed_ID = spark.read.format("delta").load(processed_runID_path).select("overwatch_runID").distinct().collect().map(x => x(0).toString).toList

val clsf_raw_filtered = clsf_raw.filter(!'overwatch_runID.isin(processed_ID: _*))

val current_dates = clsf_raw_filtered
.filter("custom_tags not like '%SqlEndpointId%' AND unixTimeMS_state_end > unixTimeMS_state_start")
.withColumn("date", explode(col("state_dates")))
.select("date").distinct().collect().map(x => x(0).toString).toList

val clsf_latest = clsf_raw_filtered
.filter("custom_tags not like '%SqlEndpointId%' AND unixTimeMS_state_end > unixTimeMS_state_start")
.withColumn("date", explode(col("state_dates")))

// Rollback to Previous Entry of CLSF
val clsf_old = clsf_raw
.filter("custom_tags not like '%SqlEndpointId%' AND unixTimeMS_state_end > unixTimeMS_state_start")
.filter('overwatch_runID.isin(processed_ID: _*))
.withColumn("date", explode(col("state_dates")))
.filter('date.isin(current_dates: _*))

clsf_old.union(clsf_latest)
.select(
"cluster_id",
"organization_id",
"workspace_name",
"cluster_name",
"state_dates",
"total_DBU_cost",
"total_dbus",
"total_compute_cost",
"days_in_state",
"custom_tags",
"isAutomated",
"date",
"overwatch_runID"
)
.withColumn("dbu_cost", col("total_DBU_cost") / col("days_in_state"))
.withColumn("total_dbu_spend", col("total_dbus") / col("days_in_state"))
.withColumn("compute_cost", col("total_compute_cost") / col("days_in_state"))

} else {
clsf_raw
.filter("custom_tags not like '%SqlEndpointId%' AND unixTimeMS_state_end > unixTimeMS_state_start")
.select(
"cluster_id",
"organization_id",
"workspace_name",
"cluster_name",
"state_dates",
"total_DBU_cost",
"total_dbus",
"total_compute_cost",
"days_in_state",
"custom_tags",
"isAutomated",
"overwatch_runID"
)
.withColumn("date", explode(col("state_dates")))
.withColumn("dbu_cost", col("total_DBU_cost") / col("days_in_state"))
.withColumn("total_dbu_spend", col("total_dbus") / col("days_in_state"))
.withColumn("compute_cost", col("total_compute_cost") / col("days_in_state"))
}

if (clsf.isEmpty) {
throw new NoNewDataException("No New Data", Level.WARN, true)
}

val cluster = clusterDF.select(
"organization_id",
"cluster_id",
"date",
"cluster_type",
"is_automated",
"auto_termination_minutes"
)

val clsf_master = clsf
.join(cluster, Seq("cluster_id", "organization_id"), "inner")
.withColumn("cluster_category", expr(
"""case
| when isAutomated = 'true' and cluster_type not in ('Serverless','SQL Analytics','Single Node') then 'Automated'
| when cluster_name like 'dlt%' and cluster_type = 'Standard' then 'Standard'
| when isAutomated = 'false' and cluster_type not in ('Serverless','SQL Analytics','Single Node') then 'Interactive'
| when (isAutomated = 'false' or isAutomated = 'true' or isAutomated is null) and cluster_type = 'SQL Analytics' then 'Warehouse'
| when (isAutomated = 'false' or isAutomated = 'true' or isAutomated is null) and cluster_type = 'Serverless' then 'High-Concurrency'
| when (isAutomated = 'false' or isAutomated = 'true' or isAutomated is null) and cluster_type = 'Single Node' then 'Single Node'
| else 'Unidentified'
| end""".stripMargin
))
.select(
"cluster_id",
"cluster_category"
).distinct()

val cluster_agg = clsf
.groupBy("date", "organization_id", "workspace_name", "cluster_id", "cluster_name")
.agg(
sum("total_dbu_spend").alias("total_dbus"),
sum("dbu_cost").alias("total_dbu_cost_USD"), // Fixed column name
sum("compute_cost").alias("total_compute_cost_USD") // Fixed column name
)
.orderBy(col("organization_id"), col("date").desc)
.cache()

val cluster_cost_details = clsf_master
.join(cluster_agg, Seq("cluster_id"), "inner")
clsf.select("overwatch_runID").distinct().write.format("delta").mode("append").save(processed_runID_path)
cluster_cost_details
}
catch
{
case e: Throwable =>
val appendTrackerErrorMsg = s"cluster_platinum table operation is failed" + e.getMessage
logger.log(Level.ERROR, appendTrackerErrorMsg, e)
println(appendTrackerErrorMsg, e)
throw e
}

}
}