Skip to content

Commit

Permalink
Add test
Browse files Browse the repository at this point in the history
  • Loading branch information
lmr3796 committed Nov 2, 2023
1 parent d6e1c97 commit 30de7c5
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ public enum Errors {
LI_OFFSET_MOVED_TO_TIERED_STORAGE and fully move to using OFFSET_MOVED_TO_TIERED_STORAGE.
*/
LI_OFFSET_MOVED_TO_TIERED_STORAGE(1107, "The requested offset is moved to tiered storage.", OffsetMovedToTieredStorageException::new),
LI_NOT_ENOUGH_PREFERRED_CONTROLLERS(2000, "Not enough live preferred controllers", NotEnoughPreferredControllersException::new);
NOT_ENOUGH_PREFERRED_CONTROLLERS(2000, "Not enough live preferred controllers", NotEnoughPreferredControllersException::new);

private static final Logger log = LoggerFactory.getLogger(Errors.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ private static Collection<Arguments> parameters() {
new NotEnoughReplicasException(), Errors.NOT_ENOUGH_REPLICAS, null));

arguments.add(Arguments.of(
new NotEnoughPreferredControllersException(), Errors.LI_NOT_ENOUGH_PREFERRED_CONTROLLERS, null));
new NotEnoughPreferredControllersException(), Errors.NOT_ENOUGH_PREFERRED_CONTROLLERS, null));

// avoid populating the error message if it's a generic one
arguments.add(Arguments.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,29 @@
* limitations under the License.
*/

package kafka.server

import java.util.Properties
package integration.kafka.server

import kafka.server.KafkaConfig.fromProps
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.CoreUtils._
import kafka.utils.TestUtils
import kafka.utils.TestUtils._
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient, NetworkClientUtils}
import org.apache.kafka.clients.admin._
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.errors.NotEnoughPreferredControllersException
import org.apache.kafka.common.message.ControlledShutdownRequestData
import org.apache.kafka.common.network.{ChannelBuilders, ListenerName, NetworkReceive, Selectable, Selector}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{ControlledShutdownRequest, ControlledShutdownResponse}
import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.junit.jupiter.api.Assertions.{assertTrue, fail}
import org.apache.kafka.common.utils.{LogContext, SystemTime}
import org.easymock.EasyMock
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue, fail}
import org.junit.jupiter.api.{AfterEach, Test}

import java.util.Properties
import scala.collection.JavaConverters._
import scala.collection.Map

Expand Down Expand Up @@ -128,6 +136,114 @@ class PreferredControllerTest extends ZooKeeperTestHarness {
ensureControllersInBrokers(Seq(controllerId))
}

private def simulateControlledShutdownRequest(controller: KafkaServer, from: KafkaServer): ControlledShutdownResponse = {
// Mimicking what KafkaServer#controlledShutdown
val config = from.config
val time = new SystemTime()
val logContext = new LogContext()

val metadataUpdater = new ManualMetadataUpdater()
val channelBuilder = ChannelBuilders.clientChannelBuilder(
config.interBrokerSecurityProtocol,
JaasContext.Type.SERVER,
config,
config.interBrokerListenerName,
config.saslMechanismInterBrokerProtocol,
time,
config.saslInterBrokerHandshakeRequestEnable,
logContext)
val selector = new Selector(
NetworkReceive.UNLIMITED,
config.connectionsMaxIdleMs,
from.metrics,
time,
"kafka-server-controlled-shutdown",
Map.empty.asJava,
false,
channelBuilder,
logContext
)
val networkClient = new NetworkClient(
selector,
metadataUpdater,
config.brokerId.toString,
1,
0,
0,
Selectable.USE_DEFAULT_BUFFER_SIZE,
Selectable.USE_DEFAULT_BUFFER_SIZE,
config.requestTimeoutMs,
config.connectionSetupTimeoutMs,
config.connectionSetupTimeoutMaxMs,
time,
false,
new ApiVersions,
logContext)

from.metadataCache
.getAliveBrokerNode(controller.config.brokerId, from.config.interBrokerListenerName)
.foreach(node => {
metadataUpdater.setNodes(Seq(node).asJava)
NetworkClientUtils.awaitReady(networkClient, node, time, 10000L)
})

val controlledShutdownRequest = new ControlledShutdownRequest.Builder(
new ControlledShutdownRequestData()
.setBrokerId(config.brokerId)
.setBrokerEpoch(from.kafkaController.brokerEpoch),
3
)
val request = networkClient.newClientRequest(controller.config.brokerId.toString,
controlledShutdownRequest,
time.milliseconds(),
true)

val clientResponse = NetworkClientUtils.sendAndReceive(networkClient, request, time)

networkClient.close()
selector.close()
clientResponse.responseBody.asInstanceOf[ControlledShutdownResponse]
}

@Test
def testRefuseStandByPreferredControllerShutdownIfBelowMinPreferredControllerCount(): Unit = {
// create 6 brokers, 3 of the them are preferred controllers.
val brokerConfigs = Seq((0, false), (1, false), (2, true), (3, true), (4, true), (5, true) )
createBrokersWithPreferredControllers(brokerConfigs, allowFallback = false, minPreferredControllerCount = 2)


val controllerId = TestUtils.waitUntilControllerElected(zkClient)
val nonPreferredControllerBroker = brokers(0)
val activeController = brokers(controllerId)
val preferredControllersIds = activeController.kafkaController.controllerContext.getLivePreferredControllerIds
val standByControllerIds = (preferredControllersIds - controllerId).toSeq
val firstStandByController = brokers(standByControllerIds.head)


// Asserting the response --- active controller & first standby controller should be accepted
assertEquals(Errors.NONE, simulateControlledShutdownRequest(controller = activeController, from = activeController).error())
assertEquals(Errors.NONE, simulateControlledShutdownRequest(controller = activeController, from = firstStandByController).error())
// The threshold should not affect non-preferred controller
assertEquals(Errors.NONE, simulateControlledShutdownRequest(controller = activeController, from = nonPreferredControllerBroker).error())


// Previously was asserting on the requests; now do actual shutdown to simulate decrease in live preferred controllers
firstStandByController.shutdown()
activeController.shutdown()

// Taking down 2 controller, should still have 2 remain
val newActiveController = brokers(TestUtils.waitUntilControllerElected(zkClient))
val newLivePreferredControllers = newActiveController.kafkaController.controllerContext.getLivePreferredControllerIds
val newStandByControllerIds = newLivePreferredControllers - newActiveController.config.brokerId
assertEquals(2, newLivePreferredControllers.size)

// Now at minPreferredControllerCount. Taking down anymore controller should be rejected, either active or stand by controller
assertEquals(Errors.NOT_ENOUGH_PREFERRED_CONTROLLERS, simulateControlledShutdownRequest(controller = newActiveController, from = newActiveController).error())
assertEquals(Errors.NOT_ENOUGH_PREFERRED_CONTROLLERS, simulateControlledShutdownRequest(controller = newActiveController, from = brokers(newStandByControllerIds.head)).error())
// The threshold should not affect non-preferred controller
assertEquals(Errors.NONE, simulateControlledShutdownRequest(controller = newActiveController,from = nonPreferredControllerBroker).error())
}

@Test
def testAllPreferredControllerDownWithPreferredControllersAndNoFallback(): Unit = {
// create 5 brokers, 3 of the them are preferred controllers.
Expand Down Expand Up @@ -197,12 +313,15 @@ class PreferredControllerTest extends ZooKeeperTestHarness {
* @param brokerConfigs: a list of (brokerid, preferredController) configs
* @param allowFallback: "allow.preferred.controller.fallback" config
*/
private def createBrokersWithPreferredControllers(brokerConfigs: Seq[(Int, Boolean)], allowFallback: Boolean): Unit = {
private def createBrokersWithPreferredControllers(brokerConfigs: Seq[(Int, Boolean)],
allowFallback: Boolean,
minPreferredControllerCount: Int = 0): Unit = {
brokers = brokerConfigs.map {
case (id, preferredController) =>
val props: Properties = createBrokerConfig(id, zkConnect)
props.put(KafkaConfig.PreferredControllerProp, preferredController.toString)
props.put(KafkaConfig.AllowPreferredControllerFallbackProp, allowFallback.toString)
props.put(KafkaConfig.LiMinPreferredControllerCountProp, minPreferredControllerCount.toString)
createServer(fromProps(props))
}
}
Expand Down

0 comments on commit 30de7c5

Please sign in to comment.