Skip to content

Commit

Permalink
Let Kafka consumer wait until rebalancing is finished
Browse files Browse the repository at this point in the history
  • Loading branch information
dominikriemer committed Feb 15, 2025
1 parent 2a5f0bd commit 71776d9
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ public class KafkaProtocol implements StreamPipesAdapter, SupportsRuntimeConfig

public static final String ID = "org.apache.streampipes.connect.iiot.protocol.stream.kafka";

private Thread thread;
private SpKafkaConsumer kafkaConsumer;

public KafkaProtocol() {
Expand Down Expand Up @@ -190,13 +189,9 @@ public void onAdapterStarted(IAdapterParameterExtractor extractor,
protocol.setTopicDefinition(new SimpleTopicDefinition(config.getTopic()));

this.kafkaConsumer = new SpKafkaConsumer(protocol,
config.getTopic(),
new BrokerEventProcessor(extractor.selectedParser(), collector),
config.getConfigAppenders()
);

thread = new Thread(this.kafkaConsumer);
thread.start();
this.kafkaConsumer.connect(new BrokerEventProcessor(extractor.selectedParser(), collector));
}

@Override
Expand All @@ -215,7 +210,6 @@ public void onAdapterStopped(IAdapterParameterExtractor extractor,
}

LOG.info("Kafka Adapter was sucessfully stopped");
thread.interrupt();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

public class SpKafkaConsumer implements EventConsumer, Runnable,
Expand All @@ -50,56 +52,25 @@ public class SpKafkaConsumer implements EventConsumer, Runnable,
private InternalEventProcessor<byte[]> eventProcessor;
private final KafkaTransportProtocol protocol;
private volatile boolean isRunning;
private Boolean patternTopic = false;

private List<KafkaConfigAppender> appenders = new ArrayList<>();
private KafkaConsumer<byte[], byte[]> consumer;

private static final Logger LOG = LoggerFactory.getLogger(SpKafkaConsumer.class);

public SpKafkaConsumer(KafkaTransportProtocol protocol) {
this.protocol = protocol;
this.topic = protocol.getTopicDefinition().getActualTopicName();
}

public SpKafkaConsumer(KafkaTransportProtocol protocol,
String topic,
InternalEventProcessor<byte[]> eventProcessor) {
this.protocol = protocol;
this.topic = topic;
this.eventProcessor = eventProcessor;
this.isRunning = true;
}

public SpKafkaConsumer(KafkaTransportProtocol protocol,
String topic,
InternalEventProcessor<byte[]> eventProcessor,
List<KafkaConfigAppender> appenders) {
this(protocol, topic, eventProcessor);
this(protocol);
this.appenders = appenders;
}

@Override
public void run() {

Properties props = makeProperties(protocol, appenders);

LOG.info("Using kafka properties: {}", props.toString());
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
if (!patternTopic) {
consumer.subscribe(Collections.singletonList(topic));
} else {
topic = replaceWildcardWithPatternFormat(topic);
consumer.subscribe(Pattern.compile(topic), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// TODO
}

@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// TODO
}
});
}
Duration duration = Duration.of(100, ChronoUnit.MILLIS);
while (isRunning) {
ConsumerRecords<byte[], byte[]> records = consumer.poll(duration);
Expand All @@ -108,40 +79,76 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
consumer.close();
}

private String replaceWildcardWithPatternFormat(String topic) {
topic = topic.replaceAll("\\.", "\\\\.");
return topic.replaceAll("\\*", ".*");
}

private Properties makeProperties(KafkaTransportProtocol protocol,
List<KafkaConfigAppender> appenders) {
return new ConsumerConfigFactory(protocol).buildProperties(appenders);
}

@Override
public void connect(InternalEventProcessor<byte[]> eventProcessor) throws SpRuntimeException {
LOG.info("Kafka consumer: Connecting to " + protocol.getTopicDefinition().getActualTopicName());
if (protocol.getTopicDefinition() instanceof WildcardTopicDefinition) {
this.patternTopic = true;
}
LOG.info("Kafka consumer: Connecting to {}", protocol.getTopicDefinition().getActualTopicName());
var patternTopic = isPatternTopic();
this.eventProcessor = eventProcessor;

this.topic = protocol.getTopicDefinition().getActualTopicName();
this.isRunning = true;
Properties props = makeProperties(protocol, appenders);

consumer = new KafkaConsumer<>(props);
var latch = new CountDownLatch(1);
if (!patternTopic) {
consumer.subscribe(Collections.singletonList(topic), new RebalanceListener(latch));
} else {
topic = replaceWildcardWithPatternFormat(topic);
consumer.subscribe(Pattern.compile(topic), new RebalanceListener(latch));
}

Thread thread = new Thread(this);
thread.start();
try {
if (!latch.await(10, TimeUnit.SECONDS)) {
throw new SpRuntimeException("Timeout while waiting for partition assignment");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SpRuntimeException("Interrupted while waiting for partition assignment", e);
}
}

@Override
public void disconnect() throws SpRuntimeException {
LOG.info("Kafka consumer: Disconnecting from " + topic);
LOG.info("Kafka consumer: Disconnecting from {}", topic);
this.isRunning = false;

}

@Override
public boolean isConnected() {
return isRunning;
}

private boolean isPatternTopic() {
return this.protocol.getTopicDefinition() instanceof WildcardTopicDefinition;
}

private String replaceWildcardWithPatternFormat(String topic) {
topic = topic.replaceAll("\\.", "\\\\.");
return topic.replaceAll("\\*", ".*");
}

private Properties makeProperties(KafkaTransportProtocol protocol,
List<KafkaConfigAppender> appenders) {
return new ConsumerConfigFactory(protocol).buildProperties(appenders);
}

private class RebalanceListener implements ConsumerRebalanceListener {

private final CountDownLatch latch;
public RebalanceListener(CountDownLatch latch) {
this.latch = latch;
}

@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
consumer.pause(collection);
}

@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
consumer.resume(partitions);
latch.countDown();
}
}
}

0 comments on commit 71776d9

Please sign in to comment.