Skip to content

Commit

Permalink
Use KRaft in Cruise Control tests
Browse files Browse the repository at this point in the history
Co-authored-by: Patrik Marton <[email protected]>
Co-authored-by: Tamas Barnabas Egyed <[email protected]>
  • Loading branch information
egyedt and patrik-marton committed Jan 29, 2025
1 parent 469dc6d commit fe6ccb1
Show file tree
Hide file tree
Showing 22 changed files with 596 additions and 235 deletions.
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -460,9 +460,10 @@ project(':cruise-control-metrics-reporter') {
testImplementation "org.apache.kafka:kafka-clients:$kafkaVersion:test"
testImplementation "org.apache.kafka:kafka-server-common:$kafkaVersion"
testImplementation "org.apache.kafka:kafka-group-coordinator:$kafkaVersion"
testImplementation "org.apache.kafka:kafka-metadata:$kafkaVersion"
testImplementation "org.apache.kafka:kafka-raft:$kafkaVersion"
testImplementation "org.apache.kafka:kafka-storage:$kafkaVersion"
testImplementation 'commons-io:commons-io:2.11.0'
testImplementation "org.apache.zookeeper:zookeeper:${zookeeperVersion}"
testOutput sourceSets.test.output
}

Expand Down
7 changes: 0 additions & 7 deletions cruise-control-core/src/test/resources/log4j2.properties
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,5 @@ logger.orgApacheKafka.level=error
logger.kafka.name=kafka
logger.kafka.level=error

# zkclient can be verbose, during debugging it is common to adjust is separately
logger.zkclient.name=kafka.zk.KafkaZkClient
logger.zkclient.level=warn

logger.zookeeper.name=org.apache.zookeeper
logger.zookeeper.level=warn

rootLogger.appenderRefs=console
rootLogger.appenderRef.console.ref=STDOUT
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import static com.linkedin.kafka.cruisecontrol.metricsreporter.metric.RawMetricType.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

public class CruiseControlMetricsReporterTest extends CCKafkaClientsIntegrationTestHarness {
protected static final String TOPIC = "CruiseControlMetricsReporterTest";
Expand Down Expand Up @@ -199,14 +200,26 @@ public void testUpdatingMetricsTopicConfig() throws ExecutionException, Interrup
brokerConfig.put(CRUISE_CONTROL_METRICS_TOPIC_AUTO_CREATE_CONFIG, "true");
brokerConfig.put(CRUISE_CONTROL_METRICS_TOPIC_NUM_PARTITIONS_CONFIG, "2");
brokerConfig.put(CRUISE_CONTROL_METRICS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
CCEmbeddedBroker broker = new CCEmbeddedBroker(brokerConfig);
// Restart broker
broker.startup();
// Wait for broker to boot up
Thread.sleep(5000);
// Check whether the topic config is updated
topicDescription = adminClient.describeTopics(Collections.singleton(TOPIC)).values().get(TOPIC).get();
assertEquals(2, topicDescription.partitions().size());
try (CCEmbeddedBroker broker = new CCEmbeddedBroker(brokerConfig)) {
// Restart broker
broker.startup();
// Check whether the topic config is updated
long startTime = System.currentTimeMillis();
boolean isTopicConfigChanged = false;
while (!isTopicConfigChanged) {
if (System.currentTimeMillis() > startTime + 60000) {
fail("Topic config was not updated");
}

TopicDescription description = adminClient.describeTopics(Collections.singleton(TOPIC)).topicNameValues().get(TOPIC).get();
isTopicConfigChanged = 2 == description.partitions().size();

try {
Thread.sleep(5000);
} catch (InterruptedException ignored) {
}
}
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2025 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License").
 See License in the project root for license information.
*/
package com.linkedin.kafka.cruisecontrol.metricsreporter.utils;

public class CCAbstractKRaftTestHarness {
protected CCEmbeddedKRaftController _controller;

/**
* Setup the unit test.
*/
public void setUp() {
if (_controller == null) {
_controller = new CCEmbeddedKRaftController();
}
_controller.startup();
}

/**
* Teardown the unit test.
*/
public void tearDown() {
if (_controller != null) {
CCKafkaTestUtils.quietly(() -> _controller.close());
_controller = null;
}
}

protected CCEmbeddedKRaftController kraftController() {
return _controller;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,51 +5,49 @@
package com.linkedin.kafka.cruisecontrol.metricsreporter.utils;

import java.io.File;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import kafka.metrics.KafkaMetricsReporter;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.server.config.KRaftConfigs;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.collection.Seq;
import scala.collection.mutable.ArrayBuffer;

import static com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaRaftServer.CLUSTER_ID_CONFIG;

public class CCEmbeddedBroker implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(CCEmbeddedBroker.class);
private final Map<SecurityProtocol, Integer> _ports;
private final Map<SecurityProtocol, String> _hosts;
private final KafkaServer _kafkaServer;
private final CCKafkaRaftServer _kafkaServer;
private int _id;
private File _logDir;
private final List<File> _logDirs;
private File _metadataLogDir;

public CCEmbeddedBroker(Map<Object, Object> config) {
_ports = new HashMap<>();
_hosts = new HashMap<>();
_logDirs = new ArrayList<>();

try {
// Also validates the config
KafkaConfig kafkaConfig = new KafkaConfig(config, true);
parseConfigs(config);

_kafkaServer = createKafkaServer(kafkaConfig);
_kafkaServer = new CCKafkaRaftServer(kafkaConfig, config.get(CLUSTER_ID_CONFIG).toString(), Time.SYSTEM);

startup();
_ports.replaceAll((securityProtocol, port) -> {
try {
return _kafkaServer.boundPort(ListenerName.forSecurityProtocol(securityProtocol));
return _kafkaServer.boundBrokerPort(ListenerName.forSecurityProtocol(securityProtocol));
} catch (Exception e) {
throw new IllegalStateException(e);
}
Expand All @@ -59,46 +57,10 @@ public CCEmbeddedBroker(Map<Object, Object> config) {
}
}

/**
* Creates the {@link KafkaServer} instance using the appropriate constructor for the version of Kafka on the classpath.
* It will attempt to use the 2.8+ version first and then fall back to the 2.5+ version. If neither work, a
* {@link NoSuchElementException} will be thrown.
*
* @param kafkaConfig The {@link KafkaConfig} instance to be used to create the returned {@link KafkaServer} instance.
* @return A {@link KafkaServer} instance configured with the supplied {@link KafkaConfig}.
* @throws ClassNotFoundException If a version of {@link KafkaServer} cannot be found on the classpath.
*/
private static KafkaServer createKafkaServer(KafkaConfig kafkaConfig) throws ClassNotFoundException {
// The KafkaServer constructor changed in 2.8, so we need to figure out which one we are using and invoke it with the correct parameters
KafkaServer kafkaServer = null;
Class<?> kafkaServerClass = Class.forName(KafkaServer.class.getName());

try {
Constructor<?> kafka28PlusCon = kafkaServerClass.getConstructor(KafkaConfig.class, Time.class, Option.class, boolean.class);
kafkaServer = (KafkaServer) kafka28PlusCon.newInstance(kafkaConfig, Time.SYSTEM, Option.empty(), false);
} catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) {
LOG.debug("Unable to find Kafka 2.8+ constructor for KafkaSever class", e);
}

if (kafkaServer == null) {
try {
Constructor<?> kafka25PlusCon = kafkaServerClass.getConstructor(KafkaConfig.class, Time.class, Option.class, Seq.class);
kafkaServer = (KafkaServer) kafka25PlusCon.newInstance(kafkaConfig, Time.SYSTEM, Option.empty(), new ArrayBuffer<KafkaMetricsReporter>());
} catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) {
LOG.debug("Unable to find Kafka 2.5+ constructor for KafkaSever class", e);
}
}

if (kafkaServer != null) {
return kafkaServer;
} else {
throw new NoSuchElementException("Unable to find viable constructor fo the KafkaServer class");
}
}

private void parseConfigs(Map<Object, Object> config) {
_id = Integer.parseInt((String) config.get(ServerConfigs.BROKER_ID_CONFIG));
_logDir = new File((String) config.get(ServerLogConfigs.LOG_DIR_CONFIG));
readLogDirs(config);
_id = Integer.parseInt((String) config.get(KRaftConfigs.NODE_ID_CONFIG));
_metadataLogDir = new File((String) config.get(KRaftConfigs.METADATA_LOG_DIR_CONFIG));

// Bind addresses
String listenersString = (String) config.get(SocketServerConfigs.LISTENERS_CONFIG);
Expand All @@ -115,6 +77,14 @@ private void parseConfigs(Map<Object, Object> config) {
}
}

private void readLogDirs(Map<Object, Object> config) {
String logdir = (String) config.get(ServerLogConfigs.LOG_DIR_CONFIG);
String[] paths = logdir.split(",");
for (String path : paths) {
_logDirs.add(new File(path));
}
}

public int id() {
return _id;
}
Expand Down Expand Up @@ -154,10 +124,9 @@ public void awaitShutdown() {
public void close() {
CCKafkaTestUtils.quietly(this::shutdown);
CCKafkaTestUtils.quietly(this::awaitShutdown);
CCKafkaTestUtils.quietly(() -> FileUtils.forceDelete(_logDir));
}

public static CCEmbeddedBrokerBuilder newServer() {
return new CCEmbeddedBrokerBuilder();
CCKafkaTestUtils.quietly(() -> FileUtils.forceDelete(_metadataLogDir));
for (File logDir : _logDirs) {
CCKafkaTestUtils.quietly(() -> FileUtils.forceDelete(logDir));
}
}
}
Loading

0 comments on commit fe6ccb1

Please sign in to comment.