diff --git a/kafka-test-harness/src/main/java/com/linkedin/kafka/clients/utils/tests/AbstractKafkaIntegrationTestHarness.java b/kafka-test-harness/src/main/java/com/linkedin/kafka/clients/utils/tests/AbstractKafkaIntegrationTestHarness.java index 739d97c..c489bb3 100644 --- a/kafka-test-harness/src/main/java/com/linkedin/kafka/clients/utils/tests/AbstractKafkaIntegrationTestHarness.java +++ b/kafka-test-harness/src/main/java/com/linkedin/kafka/clients/utils/tests/AbstractKafkaIntegrationTestHarness.java @@ -7,17 +7,21 @@ import java.io.File; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Random; +import java.util.Set; import java.util.StringJoiner; import org.apache.kafka.common.security.auth.SecurityProtocol; public abstract class AbstractKafkaIntegrationTestHarness extends AbstractZookeeperTestHarness { + private final static Random RANDOM = new Random(); protected Map _brokers = null; + protected Set _deadBrokers; protected String _bootstrapUrl; - @Override public void setUp() { super.setUp(); @@ -42,6 +46,7 @@ public void setUp() { StringJoiner joiner = new StringJoiner(","); _brokers.values().forEach(broker -> joiner.add(broker.getAddr(securityProtocol()))); _bootstrapUrl = joiner.toString(); + _deadBrokers = new HashSet<>(); } @Override @@ -118,4 +123,46 @@ protected int clusterSize() { protected Map overridingProps() { return Collections.emptyMap(); } + + /** + * Kill broker by broker id + * @param id id of broker + * @throws Exception + */ + public void killBroker(int id) throws Exception { + EmbeddedBroker broker = _brokers.get(id); + + if (!_deadBrokers.contains(id)) { + broker.shutdown(); + broker.awaitShutdown(); + _deadBrokers.add(id); + } + } + + /** + * Kill a random broker that is not alive. + * + * @return id of broker killed + */ + public int killRandomBroker() throws Exception { + int index = RANDOM.nextInt(_brokers.size()); + int id = (Integer) _brokers.keySet().toArray()[index]; + killBroker(id); + return id; + } + + /** + * Restart all dead brokers + * @return Returns a list of brokers that were restarted + * @throws Exception all exceptions caused while starting brokers + */ + public List restartDeadBrokers() throws Exception { + List brokersStarted = new ArrayList<>(); + for (int id : _deadBrokers) { + _brokers.get(id).startup(); + brokersStarted.add(id); + } + _deadBrokers.clear(); + return brokersStarted; + } } diff --git a/kafka-test-harness/src/test/java/com/linkedin/kafka/clients/utils/tests/KafkaIntegrationTestHarnessTest.java b/kafka-test-harness/src/test/java/com/linkedin/kafka/clients/utils/tests/KafkaIntegrationTestHarnessTest.java new file mode 100644 index 0000000..2134a3d --- /dev/null +++ b/kafka-test-harness/src/test/java/com/linkedin/kafka/clients/utils/tests/KafkaIntegrationTestHarnessTest.java @@ -0,0 +1,65 @@ +/* + * Copyright 2017 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License").
 See License in the project root for license information. + */ + +package com.linkedin.kafka.clients.utils.tests; + +import java.util.List; +import java.util.Set; +import org.testng.Assert; +import org.testng.annotations.AfterTest; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + + +public class KafkaIntegrationTestHarnessTest { + private static final int CLUSTER_SIZE = 4; + private AbstractKafkaIntegrationTestHarness _kafkaIntegrationTestHarness; + + @BeforeTest + public void setup() { + _kafkaIntegrationTestHarness = new AbstractKafkaIntegrationTestHarness() { + @Override + protected int clusterSize() { + return CLUSTER_SIZE; + } + }; + _kafkaIntegrationTestHarness.setUp(); + } + + @AfterTest + public void teardown() { + _kafkaIntegrationTestHarness.tearDown(); + } + + @Test + public void testKillBroker() throws Exception { + Set brokerIds = _kafkaIntegrationTestHarness._brokers.keySet(); + Assert.assertFalse(brokerIds.isEmpty(), "broker not initialized"); + Assert.assertEquals(brokerIds.size(), CLUSTER_SIZE, "expected cluster size doesn't match the initialized brokers"); + + int killedBrokerId = -1; + for (Integer brokerId : brokerIds) { + killedBrokerId = brokerId; + _kafkaIntegrationTestHarness.killBroker(killedBrokerId); + break; + } + + List restartedBrokers = _kafkaIntegrationTestHarness.restartDeadBrokers(); + Assert.assertEquals(restartedBrokers.size(), 1, "unexpected brokers restarted"); + Assert.assertTrue(restartedBrokers.contains(killedBrokerId), "broker restart is not the broker that was killed"); + } + + @Test + public void testKillRandomBroker() throws Exception { + Set brokerIds = _kafkaIntegrationTestHarness._brokers.keySet(); + Assert.assertFalse(brokerIds.isEmpty(), "broker not initialized"); + Assert.assertEquals(brokerIds.size(), CLUSTER_SIZE, "expected cluster size doesn't match the initialized brokers"); + + int killedBrokerId = _kafkaIntegrationTestHarness.killRandomBroker(); + + List restartedBrokers = _kafkaIntegrationTestHarness.restartDeadBrokers(); + Assert.assertEquals(restartedBrokers.size(), 1, "unexpected brokers restarted"); + Assert.assertTrue(restartedBrokers.contains(killedBrokerId), "broker restart is not the broker that was killed"); + } +}