diff --git a/.github/PULL_REQUEST_TEMPLATE b/.github/PULL_REQUEST_TEMPLATE index 072722cc39a..58bb232ad90 100644 --- a/.github/PULL_REQUEST_TEMPLATE +++ b/.github/PULL_REQUEST_TEMPLATE @@ -1,38 +1,33 @@ -# :mag: Description -## Issue References ๐Ÿ”— - - + + +### Why are the changes needed? + + + +### How was this patch tested? + + + +### Was this patch authored or co-authored using generative AI tooling? + -This pull request fixes # - -## Describe Your Solution ๐Ÿ”ง - -Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change. - - -## Types of changes :bookmark: - -- [ ] Bugfix (non-breaking change which fixes an issue) -- [ ] New feature (non-breaking change which adds functionality) -- [ ] Breaking change (fix or feature that would cause existing functionality to change) - -## Test Plan ๐Ÿงช - -#### Behavior Without This Pull Request :coffin: - - -#### Behavior With This Pull Request :tada: - - -#### Related Unit Tests - - ---- - -# Checklist ๐Ÿ“ - - - -- [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) - -**Be nice. Be informative.** diff --git a/charts/kyuubi/templates/kyuubi-configmap.yaml b/charts/kyuubi/templates/kyuubi-configmap.yaml index a171d5855f2..6465ec79df7 100644 --- a/charts/kyuubi/templates/kyuubi-configmap.yaml +++ b/charts/kyuubi/templates/kyuubi-configmap.yaml @@ -22,30 +22,6 @@ metadata: labels: {{- include "kyuubi.labels" . | nindent 4 }} data: - {{- with .Values.kyuubiConf.kyuubiEnv }} - kyuubi-env.sh: | - {{- tpl . $ | nindent 4 }} - {{- end }} - kyuubi-defaults.conf: | - ## Helm chart provided Kyuubi configurations - kyuubi.kubernetes.namespace={{ .Release.Namespace }} - kyuubi.frontend.connection.url.use.hostname=false - kyuubi.frontend.thrift.binary.bind.port={{ .Values.server.thriftBinary.port }} - kyuubi.frontend.thrift.http.bind.port={{ .Values.server.thriftHttp.port }} - kyuubi.frontend.rest.bind.port={{ .Values.server.rest.port }} - kyuubi.frontend.mysql.bind.port={{ .Values.server.mysql.port }} - kyuubi.frontend.protocols={{ include "kyuubi.frontend.protocols" . }} - - # Kyuubi Metrics - kyuubi.metrics.enabled={{ .Values.metrics.enabled }} - kyuubi.metrics.reporters={{ .Values.metrics.reporters }} - kyuubi.metrics.prometheus.port={{ .Values.metrics.prometheusPort }} - - ## User provided Kyuubi configurations - {{- with .Values.kyuubiConf.kyuubiDefaults }} - {{- tpl . $ | nindent 4 }} - {{- end }} - {{- with .Values.kyuubiConf.log4j2 }} - log4j2.xml: | - {{- tpl . $ | nindent 4 }} + {{- with .Values.kyuubiConf.files }} + {{- tpl (toYaml .) $ | nindent 2 }} {{- end }} diff --git a/charts/kyuubi/templates/kyuubi-spark-configmap.yaml b/charts/kyuubi/templates/kyuubi-spark-configmap.yaml index 5794c429f55..9b2e9784c6a 100644 --- a/charts/kyuubi/templates/kyuubi-spark-configmap.yaml +++ b/charts/kyuubi/templates/kyuubi-spark-configmap.yaml @@ -22,19 +22,6 @@ metadata: labels: {{- include "kyuubi.labels" . | nindent 4 }} data: - {{- with .Values.sparkConf.sparkEnv }} - spark-env.sh: | - {{- tpl . $ | nindent 4 }} - {{- end }} - {{- with .Values.sparkConf.sparkDefaults }} - spark-defaults.conf: | - {{- tpl . $ | nindent 4 }} - {{- end }} - {{- with .Values.sparkConf.log4j2 }} - log4j2.properties: | - {{- tpl . $ | nindent 4 }} - {{- end }} - {{- with .Values.sparkConf.metrics }} - metrics.properties: | - {{- tpl . $ | nindent 4 }} + {{- with .Values.sparkConf.files }} + {{- tpl (toYaml .) $ | nindent 2 }} {{- end }} diff --git a/charts/kyuubi/templates/kyuubi-statefulset.yaml b/charts/kyuubi/templates/kyuubi-statefulset.yaml index caea7d251b2..57601826f3a 100644 --- a/charts/kyuubi/templates/kyuubi-statefulset.yaml +++ b/charts/kyuubi/templates/kyuubi-statefulset.yaml @@ -59,14 +59,28 @@ spec: {{- with .Values.command }} command: {{- tpl (toYaml .) $ | nindent 12 }} {{- end }} - {{- with .Values.args }} + {{- if .Values.args }} args: {{- tpl (toYaml .) $ | nindent 12 }} + {{- else }} + args: + - ./bin/kyuubi + - run + - --conf kyuubi.kubernetes.namespace={{ .Release.Namespace }} + - --conf kyuubi.frontend.connection.url.use.hostname=false + - --conf kyuubi.frontend.thrift.binary.bind.port={{ .Values.server.thriftBinary.port }} + - --conf kyuubi.frontend.thrift.http.bind.port={{ .Values.server.thriftHttp.port }} + - --conf kyuubi.frontend.rest.bind.port={{ .Values.server.rest.port }} + - --conf kyuubi.frontend.mysql.bind.port={{ .Values.server.mysql.port }} + - --conf kyuubi.frontend.protocols={{ include "kyuubi.frontend.protocols" . }} + - --conf kyuubi.metrics.enabled={{ .Values.metrics.enabled }} + - --conf kyuubi.metrics.reporters={{ .Values.metrics.reporters }} + - --conf kyuubi.metrics.prometheus.port={{ .Values.metrics.prometheusPort }} {{- end }} env: - name: KYUUBI_CONF_DIR - value: {{ .Values.kyuubiConfDir }} + value: {{ .Values.kyuubiConf.dir }} - name: SPARK_CONF_DIR - value: {{ .Values.sparkConfDir }} + value: {{ .Values.sparkConf.dir }} {{- with .Values.env }} {{- tpl (toYaml .) $ | nindent 12 }} {{- end }} @@ -109,9 +123,9 @@ spec: {{- end }} volumeMounts: - name: conf - mountPath: {{ .Values.kyuubiConfDir }} + mountPath: {{ .Values.kyuubiConf.dir }} - name: conf-spark - mountPath: {{ .Values.sparkConfDir }} + mountPath: {{ .Values.sparkConf.dir }} {{- with .Values.volumeMounts }} {{- tpl (toYaml .) $ | nindent 12 }} {{- end }} @@ -120,11 +134,21 @@ spec: {{- end }} volumes: - name: conf - configMap: - name: {{ .Release.Name }} + projected: + sources: + - configMap: + name: {{ .Release.Name }} + {{- with .Values.kyuubiConf.filesFrom }} + {{- tpl (toYaml .) $ | nindent 14 }} + {{- end }} - name: conf-spark - configMap: - name: {{ .Release.Name }}-spark + projected: + sources: + - configMap: + name: {{ .Release.Name }}-spark + {{- with .Values.sparkConf.filesFrom }} + {{- tpl (toYaml .) $ | nindent 14 }} + {{- end }} {{- with .Values.volumes }} {{- tpl (toYaml .) $ | nindent 8 }} {{- end }} diff --git a/charts/kyuubi/values.yaml b/charts/kyuubi/values.yaml index 23e5e7fdc41..1f35c9ba871 100644 --- a/charts/kyuubi/values.yaml +++ b/charts/kyuubi/values.yaml @@ -145,53 +145,40 @@ server: # clientIP: # timeoutSeconds: 10800 -# $KYUUBI_CONF_DIR directory -kyuubiConfDir: /opt/kyuubi/conf # Kyuubi configuration files kyuubiConf: - # The value (templated string) is used for kyuubi-env.sh file - # See example at conf/kyuubi-env.sh.template and https://kyuubi.readthedocs.io/en/master/configuration/settings.html#environments for more details - kyuubiEnv: ~ - # kyuubiEnv: | - # #!/usr/bin/env bash - # export JAVA_HOME=/usr/jdk64/jdk1.8.0_152 - # export SPARK_HOME=/opt/spark - # export FLINK_HOME=/opt/flink - # export HIVE_HOME=/opt/hive - - # The value (templated string) is used for kyuubi-defaults.conf file - # See https://kyuubi.readthedocs.io/en/master/configuration/settings.html#kyuubi-configurations for more details - kyuubiDefaults: ~ - # kyuubiDefaults: | + # $KYUUBI_CONF_DIR directory + dir: /opt/kyuubi/conf + # Configuration files from the specified keys (file name) and values (file content) + files: ~ + #files: + # 'kyuubi-defaults.conf': | # kyuubi.authentication=NONE - # kyuubi.frontend.bind.host=10.0.0.1 - # kyuubi.engine.type=SPARK_SQL # kyuubi.engine.share.level=USER - # kyuubi.session.engine.initialize.timeout=PT3M - # kyuubi.ha.addresses=zk1:2181,zk2:2181,zk3:2181 - # kyuubi.ha.namespace=kyuubi - # The value (templated string) is used for log4j2.xml file - # See example at conf/log4j2.xml.template https://kyuubi.readthedocs.io/en/master/configuration/settings.html#logging for more details - log4j2: ~ - -# $SPARK_CONF_DIR directory -sparkConfDir: /opt/spark/conf -# Spark configuration files + # Configuration files from the list of existing ConfigMaps and Secrets + filesFrom: [] + #filesFrom: + #- configMap: + # name: kyuubi-configs + #- secret: + # name: kyuubi-secrets + #- secret: + # name: ssl-secrets + # items: + # - key: key-store + # path: certs/keystore.jks + # - key: trust-store + # path: certs/truststore.jks + +# Spark configuration, see https://github.com/apache/spark/tree/master/conf and Spark documentation for more details sparkConf: - # The value (templated string) is used for spark-env.sh file - # See example at https://github.com/apache/spark/blob/master/conf/spark-env.sh.template and Spark documentation for more details - sparkEnv: ~ - # sparkEnv: | - # #!/usr/bin/env bash - # export JAVA_HOME=/usr/jdk64/jdk1.8.0_152 - # export SPARK_LOG_DIR=/opt/spark/logs - # export SPARK_LOG_MAX_FILES=5 - - # The value (templated string) is used for spark-defaults.conf file - # See example at https://github.com/apache/spark/blob/master/conf/spark-defaults.conf.template and Spark documentation for more details - sparkDefaults: ~ - # sparkDefaults: | + # $SPARK_CONF_DIR directory + dir: /opt/spark/conf + # Configuration files from the specified keys (file name) and values (file content) + files: ~ + #files: + # 'spark-defaults.conf': | # spark.submit.deployMode=cluster # spark.kubernetes.container.image=apache/spark:3.5.0 # spark.kubernetes.authenticate.driver.serviceAccountName=spark @@ -207,13 +194,20 @@ sparkConf: # spark.hadoop.fs.s3a.path.style.access=true # spark.hadoop.fs.s3a.fast.upload=true - # The value (templated string) is used for log4j2.properties file - # See example at https://github.com/apache/spark/blob/master/conf/log4j2.properties.template and Spark documentation for more details - log4j2: ~ - - # The value (templated string) is used for metrics.properties file - # See example at https://github.com/apache/spark/blob/master/conf/metrics.properties.template and Spark documentation for more details - metrics: ~ + # Configuration files from the list of existing ConfigMaps and Secrets + filesFrom: [] + #filesFrom: + #- configMap: + # name: spark-configs + #- secret: + # name: spark-secrets + #- secret: + # name: ssl-secrets + # items: + # - key: key-store + # path: certs/keystore.jks + # - key: trust-store + # path: certs/truststore.jks # Command to launch Kyuubi server (templated) command: ~ diff --git a/extensions/flink/kyuubi-flink-token-provider/pom.xml b/extensions/flink/kyuubi-flink-token-provider/pom.xml index f7c377ccedc..0314a4333e6 100644 --- a/extensions/flink/kyuubi-flink-token-provider/pom.xml +++ b/extensions/flink/kyuubi-flink-token-provider/pom.xml @@ -69,9 +69,12 @@ net.alchim31.maven scala-maven-plugin - - true - + + + attach-scaladocs + none + + diff --git a/extensions/server/kyuubi-server-plugin/pom.xml b/extensions/server/kyuubi-server-plugin/pom.xml index 22e433d7cd8..4095762bb7a 100644 --- a/extensions/server/kyuubi-server-plugin/pom.xml +++ b/extensions/server/kyuubi-server-plugin/pom.xml @@ -35,9 +35,12 @@ net.alchim31.maven scala-maven-plugin - - true - + + + attach-scaladocs + none + + diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoProgressMonitor.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoProgressMonitor.scala new file mode 100644 index 00000000000..5328f092ec9 --- /dev/null +++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoProgressMonitor.scala @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kyuubi.engine.trino + +import java.util + +import scala.collection.JavaConverters._ +import scala.collection.immutable.SortedMap + +import io.trino.client.{StageStats, StatementClient} + +import org.apache.kyuubi.engine.trino.TrinoProgressMonitor.{COLUMN_1_WIDTH, HEADERS} +import org.apache.kyuubi.engine.trino.operation.progress.{TrinoStage, TrinoStageProgress} +import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TJobExecutionStatus + +class TrinoProgressMonitor(trino: StatementClient) { + + private lazy val progressMap: Map[TrinoStage, TrinoStageProgress] = { + if (trino != null) { + val trinoStats = trino.getStats + val stageQueue = scala.collection.mutable.Queue[StageStats]() + val stages = scala.collection.mutable.ListBuffer[(TrinoStage, TrinoStageProgress)]() + val rootStage = trinoStats.getRootStage + if (rootStage != null) { + stageQueue.enqueue(rootStage) + } + while (stageQueue.nonEmpty) { + val stage = stageQueue.dequeue() + val stageId = stage.getStageId + val stageProgress = TrinoStageProgress( + stage.getState, + stage.getTotalSplits, + stage.getCompletedSplits, + stage.getRunningSplits, + stage.getFailedTasks) + stages.append((TrinoStage(stageId), stageProgress)) + val subStages = asScalaBuffer(stage.getSubStages) + stageQueue.enqueue(subStages: _*) + } + SortedMap(stages: _*) + } else { + SortedMap() + } + } + + def headers: util.List[String] = HEADERS + + def rows: util.List[util.List[String]] = { + val progressRows = progressMap.map { + case (stage, progress) => + val complete = progress.completedSplits + val total = progress.totalSplits + val running = progress.runningSplits + val failed = progress.failedTasks + val stageName = "Stage-" + stage.stageId + val nameWithProgress = getNameWithProgress(stageName, complete, total) + val pending = total - complete - running + util.Arrays.asList( + nameWithProgress, + progress.state, + String.valueOf(total), + String.valueOf(complete), + String.valueOf(running), + String.valueOf(pending), + String.valueOf(failed), + "") + }.toList.asJavaCollection + new util.ArrayList[util.List[String]](progressRows) + } + + def footerSummary: String = { + "STAGES: %02d/%02d".format(getCompletedStages, progressMap.keySet.size) + } + + def progressedPercentage: Double = { + if (trino != null && trino.getStats != null) { + val progressPercentage = trino.getStats.getProgressPercentage + progressPercentage.orElse(0.0d) + } else { + 0.0d + } + } + + def executionStatus: TJobExecutionStatus = + if (getCompletedStages == progressMap.keySet.size) { + TJobExecutionStatus.COMPLETE + } else { + TJobExecutionStatus.IN_PROGRESS + } + + private def getNameWithProgress(s: String, complete: Int, total: Int): String = { + if (s == null) return "" + val percent = + if (total == 0) 1.0f + else complete.toFloat / total.toFloat + // lets use the remaining space in column 1 as progress bar + val spaceRemaining = COLUMN_1_WIDTH - s.length - 1 + var trimmedVName = s + // if the vertex name is longer than column 1 width, trim it down + if (s.length > COLUMN_1_WIDTH) { + trimmedVName = s.substring(0, COLUMN_1_WIDTH - 2) + trimmedVName += ".." + } else trimmedVName += " " + val toFill = (spaceRemaining * percent).toInt + s"$trimmedVName${"." * toFill}" + } + + private def getCompletedStages: Int = { + var completed = 0 + progressMap.values.foreach { progress => + val complete = progress.completedSplits + val total = progress.totalSplits + if (total > 0 && complete == total) completed += 1 + } + completed + } + +} + +object TrinoProgressMonitor { + + private val HEADERS: util.List[String] = util.Arrays.asList( + "STAGES", + "STATUS", + "TOTAL", + "COMPLETED", + "RUNNING", + "PENDING", + "FAILED", + "") + + private val COLUMN_1_WIDTH = 16 + +} diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala index 250b8d64b1e..4f1b42e1d1b 100644 --- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala +++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala @@ -41,6 +41,8 @@ class ExecuteStatement( private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle) override def getOperationLog: Option[OperationLog] = Option(operationLog) + override protected def supportProgress: Boolean = true + override protected def beforeRun(): Unit = { OperationLog.setCurrentOperationLog(operationLog) setState(OperationState.PENDING) diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala index 822f1726a3b..6afd8c09841 100644 --- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala +++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala @@ -24,16 +24,19 @@ import io.trino.client.StatementClient import org.apache.kyuubi.KyuubiSQLException import org.apache.kyuubi.Utils +import org.apache.kyuubi.config.KyuubiConf.SESSION_PROGRESS_ENABLE import org.apache.kyuubi.engine.trino.TrinoContext +import org.apache.kyuubi.engine.trino.TrinoProgressMonitor import org.apache.kyuubi.engine.trino.schema.{SchemaHelper, TrinoTRowSetGenerator} import org.apache.kyuubi.engine.trino.session.TrinoSessionImpl import org.apache.kyuubi.operation.AbstractOperation import org.apache.kyuubi.operation.FetchIterator import org.apache.kyuubi.operation.FetchOrientation.{FETCH_FIRST, FETCH_NEXT, FETCH_PRIOR, FetchOrientation} import org.apache.kyuubi.operation.OperationState +import org.apache.kyuubi.operation.OperationStatus import org.apache.kyuubi.operation.log.OperationLog import org.apache.kyuubi.session.Session -import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TFetchResultsResp, TGetResultSetMetadataResp} +import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TFetchResultsResp, TGetResultSetMetadataResp, TProgressUpdateResp} abstract class TrinoOperation(session: Session) extends AbstractOperation(session) { @@ -45,6 +48,24 @@ abstract class TrinoOperation(session: Session) extends AbstractOperation(sessio protected var iter: FetchIterator[List[Any]] = _ + protected def supportProgress: Boolean = false + + private val progressEnable: Boolean = session.sessionManager.getConf.get(SESSION_PROGRESS_ENABLE) + + override def getStatus: OperationStatus = { + if (progressEnable && supportProgress) { + val progressMonitor = new TrinoProgressMonitor(trino) + setOperationJobProgress(new TProgressUpdateResp( + progressMonitor.headers, + progressMonitor.rows, + progressMonitor.progressedPercentage, + progressMonitor.executionStatus, + progressMonitor.footerSummary, + startTime)) + } + super.getStatus + } + override def getResultSetMetadata: TGetResultSetMetadataResp = { val tTableSchema = SchemaHelper.toTTableSchema(schema) val resp = new TGetResultSetMetadataResp diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/progress/TrinoStage.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/progress/TrinoStage.scala new file mode 100644 index 00000000000..ce1a89ea611 --- /dev/null +++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/progress/TrinoStage.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kyuubi.engine.trino.operation.progress + +case class TrinoStage(stageId: String) extends Comparable[TrinoStage] { + override def compareTo(o: TrinoStage): Int = { + stageId.compareTo(o.stageId) + } +} + +case class TrinoStageProgress( + state: String, + totalSplits: Int, + completedSplits: Int, + runningSplits: Int, + failedTasks: Int) diff --git a/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationProgressSuite.scala b/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationProgressSuite.scala new file mode 100644 index 00000000000..0132735ff2f --- /dev/null +++ b/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationProgressSuite.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.trino.operation + +import scala.collection.JavaConverters._ + +import org.scalatest.concurrent.PatienceConfiguration.Timeout +import org.scalatest.time.SpanSugar.convertIntToGrainOfTime + +import org.apache.kyuubi.config.KyuubiConf.{ENGINE_TRINO_CONNECTION_CATALOG, SESSION_PROGRESS_ENABLE} +import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TExecuteStatementReq, TGetOperationStatusReq, TJobExecutionStatus} + +class TrinoOperationProgressSuite extends TrinoOperationSuite { + override def withKyuubiConf: Map[String, String] = Map( + ENGINE_TRINO_CONNECTION_CATALOG.key -> "memory", + SESSION_PROGRESS_ENABLE.key -> "true") + + test("get operation progress") { + val sql = "select * from (select item from (SELECT sequence(0, 100, 1) as t) as a " + + "CROSS JOIN UNNEST(t) AS temTable (item)) WHERE random() < 0.1" + + withSessionHandle { (client, handle) => + val req = new TExecuteStatementReq() + req.setStatement(sql) + req.setRunAsync(true) + req.setSessionHandle(handle) + val resp = client.ExecuteStatement(req) + eventually(Timeout(25.seconds)) { + val statusReq = new TGetOperationStatusReq(resp.getOperationHandle) + val statusResp = client.GetOperationStatus(statusReq) + val headers = statusResp.getProgressUpdateResponse.getHeaderNames + val progress = statusResp.getProgressUpdateResponse.getProgressedPercentage + val rows = statusResp.getProgressUpdateResponse.getRows + val footerSummary = statusResp.getProgressUpdateResponse.getFooterSummary + val status = statusResp.getProgressUpdateResponse.getStatus + assertResult(Seq( + "STAGES", + "STATUS", + "TOTAL", + "COMPLETED", + "RUNNING", + "PENDING", + "FAILED", + ""))(headers.asScala) + assert(rows.size() == 1) + progress match { + case 100.0 => + assertResult(Seq( + s"Stage-0 ........", + "FINISHED", + "3", + "3", + "0", + "0", + "0", + ""))( + rows.get(0).asScala) + assert("STAGES: 01/01" === footerSummary) + assert(TJobExecutionStatus.COMPLETE === status) + } + } + } + } +} diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala index 671d8ba6509..129fbc8d9aa 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala @@ -154,13 +154,23 @@ class BatchJobSubmission( engineId = appInfo.id, engineName = appInfo.name, engineUrl = appInfo.url.orNull, - engineState = appInfo.state.toString, + engineState = getAppState(state, appInfo.state).toString, engineError = appInfo.error, endTime = endTime) session.sessionManager.updateMetadata(metadataToUpdate) } } + private def getAppState( + opState: OperationState, + appState: ApplicationState.ApplicationState): ApplicationState.ApplicationState = { + if (opState == OperationState.ERROR && !ApplicationState.isTerminated(appState)) { + ApplicationState.UNKNOWN + } else { + appState + } + } + override def getOperationLog: Option[OperationLog] = Option(_operationLog) // we can not set to other state if it is canceled diff --git a/pom.xml b/pom.xml index 66c0f0d096d..c86bd6fe11e 100644 --- a/pom.xml +++ b/pom.xml @@ -178,7 +178,7 @@ kyuubi-relocated-zookeeper-34 6.0.5 2.24.1 - 8.0.32 + 8.4.0 4.11.0 4.1.108.Final 0.12.0