Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-45502][BUILD] Upgrade Kafka to 3.6.0 #1

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
64ac59d
[SPARK-45497][K8S] Add a symbolic link file `spark-examples.jar` in K…
dongjoon-hyun Oct 11, 2023
305db07
[SPARK-45480][SQL][UI] Selectable Spark Plan Node on UI
yaooqinn Oct 11, 2023
0c1e9a5
[SPARK-45500][CORE][WEBUI] Show the number of abnormally completed dr…
dongjoon-hyun Oct 11, 2023
8394ebb
[SPARK-45469][CORE][SQL][CONNECT][PYTHON] Replace `toIterator` with `…
LuciferYang Oct 11, 2023
e1a7b84
[SPARK-45397][ML][CONNECT] Add array assembler feature transformer
WeichenXu123 Oct 11, 2023
ae112e4
[SPARK-45116][SQL] Add some comment for param of JdbcDialect `createT…
Hisoka-X Oct 11, 2023
11af786
[SPARK-45451][SQL] Make the default storage level of dataset cache co…
ulysses-you Oct 11, 2023
9721805
[SPARK-45496][CORE][DSTREAM] Fix the compilation warning related to `…
LuciferYang Oct 11, 2023
acd5dc4
[SPARK-45467][CORE] Replace `Proxy.getProxyClass()` with `Proxy.newPr…
LuciferYang Oct 11, 2023
c252530
[SPARK-42881][SQL] Codegen Support for get_json_object
panbingkun Oct 11, 2023
d1aff01
[SPARK-45499][CORE][TESTS] Replace `Reference#isEnqueued` with `Refer…
LuciferYang Oct 11, 2023
8e70c39
[SPARK-45483][CONNECT] Correct the function groups in connect.functions
zhengruifeng Oct 11, 2023
eae5c0e
[SPARK-45433][SQL] Fix CSV/JSON schema inference when timestamps do n…
Hisoka-X Oct 11, 2023
5ad57a7
[SPARK-45204][CONNECT] Add optional ExecuteHolder to SparkConnectPlanner
dillitz Oct 11, 2023
292a113
[SPARK-44855][CONNECT] Small tweaks to attaching ExecuteGrpcResponseS…
juliuszsompolski Oct 11, 2023
a5f0195
[SPARK-45415] Allow selective disabling of "fallocate" in RocksDB sta…
Oct 11, 2023
4027474
[SPARK-45221][PYTHON][DOCS] Refine docstring of DataFrameReader.parquet
HyukjinKwon Oct 12, 2023
045eb2d
[SPARK-45113][PYTHON][DOCS][FOLLOWUP] Make doctests deterministic
zhengruifeng Oct 12, 2023
69cf80d
[SPARK-45402][SQL][PYTHON] Add UDTF API for 'eval' and 'terminate' me…
dtenedor Oct 12, 2023
9565390
[SPARK-42881][SQL][FOLLOWUP] Update the results of JsonBenchmark-jdk2…
panbingkun Oct 12, 2023
5a00631
[SPARK-45442][PYTHON][DOCS] Refine docstring of DataFrame.show
allisonwang-db Oct 12, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper with PrivateM

test("Dataset result collection") {
def checkResult(rows: IterableOnce[java.lang.Long], expectedValues: Long*): Unit = {
rows.toIterator.zipAll(expectedValues.iterator, null, null).foreach {
rows.iterator.zipAll(expectedValues.iterator, null, null).foreach {
case (actual, expected) => assert(actual === expected)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,15 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message](
/**
* Interrupt this sender and make it exit.
*/
def interrupt(): Unit = executionObserver.synchronized {
def interrupt(): Unit = {
interrupted = true
executionObserver.notifyAll()
wakeUp()
}

// For testing
private[connect] def setDeadline(deadlineMs: Long) = executionObserver.synchronized {
private[connect] def setDeadline(deadlineMs: Long) = {
deadlineTimeMillis = deadlineMs
executionObserver.notifyAll()
wakeUp()
}

def run(lastConsumedStreamIndex: Long): Unit = {
Expand Down Expand Up @@ -152,9 +152,6 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message](
s"lastConsumedStreamIndex=$lastConsumedStreamIndex")
val startTime = System.nanoTime()

// register to be notified about available responses.
executionObserver.attachConsumer(this)

var nextIndex = lastConsumedStreamIndex + 1
var finished = false

Expand Down Expand Up @@ -191,7 +188,7 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message](
sentResponsesSize > maximumResponseSize || deadlineTimeMillis < System.currentTimeMillis()

logTrace(s"Trying to get next response with index=$nextIndex.")
executionObserver.synchronized {
executionObserver.responseLock.synchronized {
logTrace(s"Acquired executionObserver lock.")
val sleepStart = System.nanoTime()
var sleepEnd = 0L
Expand All @@ -208,7 +205,7 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message](
if (response.isEmpty) {
val timeout = Math.max(1, deadlineTimeMillis - System.currentTimeMillis())
logTrace(s"Wait for response to become available with timeout=$timeout ms.")
executionObserver.wait(timeout)
executionObserver.responseLock.wait(timeout)
logTrace(s"Reacquired executionObserver lock after waiting.")
sleepEnd = System.nanoTime()
}
Expand Down Expand Up @@ -339,4 +336,15 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message](
}
}
}

private def wakeUp(): Unit = {
// Can be sleeping on either of these two locks, wake them up.
// (Neither of these locks is ever taken for extended period of time, so this won't block)
executionObserver.responseLock.synchronized {
executionObserver.responseLock.notifyAll()
}
grpcCallObserverReadySignal.synchronized {
grpcCallObserverReadySignal.notifyAll()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ import org.apache.spark.sql.connect.service.ExecuteHolder
/**
* This StreamObserver is running on the execution thread. Execution pushes responses to it, it
* caches them. ExecuteResponseGRPCSender is the consumer of the responses ExecuteResponseObserver
* "produces". It waits on the monitor of ExecuteResponseObserver. New produced responses notify
* the monitor.
* "produces". It waits on the responseLock. New produced responses notify the responseLock.
* @see
* getResponse.
*
Expand Down Expand Up @@ -85,10 +84,12 @@ private[connect] class ExecuteResponseObserver[T <: Message](val executeHolder:
private[connect] var highestConsumedIndex: Long = 0

/**
* Consumer that waits for available responses. There can be only one at a time, @see
* attachConsumer.
* Lock used for synchronization between responseObserver and grpcResponseSenders. *
* grpcResponseSenders wait on it for a new response to be available. * grpcResponseSenders also
* notify it to wake up when interrupted * responseObserver notifies it when new responses are
* available.
*/
private var responseSender: Option[ExecuteGrpcResponseSender[T]] = None
private[connect] val responseLock = new Object()

// Statistics about cached responses.
private val cachedSizeUntilHighestConsumed = CachedSize()
Expand All @@ -106,7 +107,7 @@ private[connect] class ExecuteResponseObserver[T <: Message](val executeHolder:
0
}

def onNext(r: T): Unit = synchronized {
def onNext(r: T): Unit = responseLock.synchronized {
if (finalProducedIndex.nonEmpty) {
throw new IllegalStateException("Stream onNext can't be called after stream completed")
}
Expand All @@ -125,10 +126,10 @@ private[connect] class ExecuteResponseObserver[T <: Message](val executeHolder:
logDebug(
s"Execution opId=${executeHolder.operationId} produced response " +
s"responseId=${responseId} idx=$lastProducedIndex")
notifyAll()
responseLock.notifyAll()
}

def onError(t: Throwable): Unit = synchronized {
def onError(t: Throwable): Unit = responseLock.synchronized {
if (finalProducedIndex.nonEmpty) {
throw new IllegalStateException("Stream onError can't be called after stream completed")
}
Expand All @@ -137,33 +138,26 @@ private[connect] class ExecuteResponseObserver[T <: Message](val executeHolder:
logDebug(
s"Execution opId=${executeHolder.operationId} produced error. " +
s"Last stream index is $lastProducedIndex.")
notifyAll()
responseLock.notifyAll()
}

def onCompleted(): Unit = synchronized {
def onCompleted(): Unit = responseLock.synchronized {
if (finalProducedIndex.nonEmpty) {
throw new IllegalStateException("Stream onCompleted can't be called after stream completed")
}
finalProducedIndex = Some(lastProducedIndex)
logDebug(
s"Execution opId=${executeHolder.operationId} completed stream. " +
s"Last stream index is $lastProducedIndex.")
notifyAll()
}

/** Attach a new consumer (ExecuteResponseGRPCSender). */
def attachConsumer(newSender: ExecuteGrpcResponseSender[T]): Unit = synchronized {
// interrupt the current sender before attaching new one
responseSender.foreach(_.interrupt())
responseSender = Some(newSender)
responseLock.notifyAll()
}

/**
* Get response with a given index in the stream, if set. Note: Upon returning the response,
* this response observer assumes that the response is consumed, and the response and previous
* response can be uncached, keeping retryBufferSize of responses for the case of retries.
*/
def consumeResponse(index: Long): Option[CachedStreamResponse[T]] = synchronized {
def consumeResponse(index: Long): Option[CachedStreamResponse[T]] = responseLock.synchronized {
// we index stream responses from 1, getting a lower index would be invalid.
assert(index >= 1)
// it would be invalid if consumer would skip a response
Expand Down Expand Up @@ -198,17 +192,17 @@ private[connect] class ExecuteResponseObserver[T <: Message](val executeHolder:
}

/** Get the stream error if there is one, otherwise None. */
def getError(): Option[Throwable] = synchronized {
def getError(): Option[Throwable] = responseLock.synchronized {
error
}

/** If the stream is finished, the index of the last response, otherwise None. */
def getLastResponseIndex(): Option[Long] = synchronized {
def getLastResponseIndex(): Option[Long] = responseLock.synchronized {
finalProducedIndex
}

/** Get the index in the stream for given response id. */
def getResponseIndexById(responseId: String): Long = synchronized {
def getResponseIndexById(responseId: String): Long = responseLock.synchronized {
responseIdToIndex.getOrElse(
responseId,
throw new SparkSQLException(
Expand All @@ -217,7 +211,7 @@ private[connect] class ExecuteResponseObserver[T <: Message](val executeHolder:
}

/** Remove cached responses up to and including response with given id. */
def removeResponsesUntilId(responseId: String): Unit = synchronized {
def removeResponsesUntilId(responseId: String): Unit = responseLock.synchronized {
val index = getResponseIndexById(responseId)
removeResponsesUntilIndex(index)
logDebug(
Expand All @@ -229,7 +223,7 @@ private[connect] class ExecuteResponseObserver[T <: Message](val executeHolder:
}

/** Remove all cached responses */
def removeAll(): Unit = synchronized {
def removeAll(): Unit = responseLock.synchronized {
removeResponsesUntilIndex(lastProducedIndex)
logInfo(
s"Release all for opId=${executeHolder.operationId}. Execution stats: " +
Expand All @@ -242,7 +236,7 @@ private[connect] class ExecuteResponseObserver[T <: Message](val executeHolder:
}

/** Returns if the stream is finished. */
def completed(): Boolean = synchronized {
def completed(): Boolean = responseLock.synchronized {
finalProducedIndex.isDefined
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,8 @@ private[connect] class ExecuteThreadRunner(executeHolder: ExecuteHolder) extends
val responseObserver = executeHolder.responseObserver

val command = request.getPlan.getCommand
val planner = new SparkConnectPlanner(executeHolder.sessionHolder)
planner.process(
command = command,
responseObserver = responseObserver,
executeHolder = executeHolder)
val planner = new SparkConnectPlanner(executeHolder)
planner.process(command = command, responseObserver = responseObserver)
}

private def requestString(request: Message) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private[execution] class SparkConnectPlanExecution(executeHolder: ExecuteHolder)
throw new IllegalStateException(
s"Illegal operation type ${request.getPlan.getOpTypeCase} to be handled here.")
}
val planner = new SparkConnectPlanner(sessionHolder)
val planner = new SparkConnectPlanner(executeHolder)
val tracker = executeHolder.eventsManager.createQueryPlanningTracker
val dataframe =
Dataset.ofRows(
Expand Down
Loading