Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Newly created groupId will delete automatically upon application shutdown. #1129

Open
wants to merge 7 commits into
base: 5.8.x
Choose a base branch
from

Conversation

cmabdullah
Copy link

@cmabdullah cmabdullah commented Jan 13, 2025

A unique consumer group can be created for a Kafka listener by setting uniqueGroupId=true on the @KafkaListener annotation. This automatically generates a unique groupId each time the application starts. However, once the application shuts down, the generated groupId remains in Kafka and is not automatically deleted.

package org.cm.kafka;

import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.Topic;

@KafkaListener(groupId = "message-app", uniqueGroupId = true)
public class KafkaMessageConsumerClient {
	@Topic("user-online")
	void consumeEvent(String message) {
		System.out.println("event from Kafka: " + message);
	}
}

This is the groupId list remains on kafka

'consumerGroupList'
Multiple consumer groups support is added on this.
where a flag is added (uniqueGroupId) in the annotation KafkaListener, if uniqueGroupId is true, Then the app will create a unique groupID.

However, the generated groupId persists in Kafka even after the application shuts down.

We are currently developing a whiteboard application where each service have multiple instances, and any instance can broadcasts events, and all other instances need to receive these event. This follows a pub/sub pattern.
As this feature is critical for our use case, we want to ensure better manageability by avoiding unnecessary groupId entries in Kafka.

To address this, I have implemented a quick fix that still relies on annotations to delete newly created groupId will deleted automatically upon application shutdown.

I successfully extended the @KafkaListener annotation and tested the new feature using plain Kafka.

My initial idea is, I have added uniqueGroupIdDeleteOnShutdown field on @KafkaListener, If uniqueGroupIdDeleteOnShutdown is set to true alongside uniqueGroupId, the unique group ID will be automatically deleted during the application shutdown process, ensuring better manageability of Kafka group IDs.

    @Override
    public void onApplicationEvent(ApplicationShutdownEvent event) {
        LOG.trace("Application shutdown initiated. Preparing to delete registered Kafka unique consumer groups.");
        List<String> uniqueGroupIdsDeleteOnShutdown = registerConsumerForGroupDeletion.values()
            .stream()
            .filter(consumerState -> consumerState.kafkaConsumer.groupMetadata() != null)
            .map(consumerState -> consumerState.kafkaConsumer.groupMetadata().groupId())
            .toList();
        if (!uniqueGroupIdsDeleteOnShutdown.isEmpty()) {
            LOG.trace("Closing {} consumers and attempting to delete the following consumer groups: {}",
                uniqueGroupIdsDeleteOnShutdown.size(), uniqueGroupIdsDeleteOnShutdown);
            closeConsumers();
            adminClient.deleteConsumerGroups(uniqueGroupIdsDeleteOnShutdown)
                .all().whenComplete((voidResult, throwable) -> {
                    if (throwable == null) {
                        LOG.trace("Successfully deleted the following consumer groups: {}", uniqueGroupIdsDeleteOnShutdown);
                    } else {
                        LOG.error("Failed to delete the following consumer groups: {}. Error: {}",
                            uniqueGroupIdsDeleteOnShutdown, throwable.getMessage(), throwable);
                    }
                });
        } else {
            LOG.trace("No unique consumer groups are registered for deletion.");
        }
    }

@CLAassistant
Copy link

CLAassistant commented Jan 13, 2025

CLA assistant check
All committers have signed the CLA.

@cmabdullah cmabdullah changed the title Delete newly created groupId will delete automatically upon application shutdown. Newly created groupId will delete automatically upon application shutdown. Jan 13, 2025
@graemerocher
Copy link
Contributor

can you add a test please?

@iNviNho
Copy link

iNviNho commented Jan 23, 2025

This is a nice addition (I haven't checked the code yet) but please bear in mind that not all applications will be granted access to delete consumer group.

If this is meant to go through, I would either make this deletion by default disabled or make the log level only warning or info as this is not critical functionality that has to work.

Byt the way you can always decrease retention on internal topic that you use to store committed offset and therefore such unused consumer groups will eventually disappear.

@graemerocher
Copy link
Contributor

it should absolutely be opt-in

@graemerocher graemerocher requested a review from sdelamo January 30, 2025 19:50
@cmabdullah cmabdullah force-pushed the uniqueGroupIdDeleteOnShutdown branch from 2ac3b78 to 037c31f Compare January 30, 2025 20:13
@cmabdullah cmabdullah changed the base branch from 5.7.x to 5.8.x January 30, 2025 20:25
renovate bot and others added 6 commits January 31, 2025 02:30
Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
…eter-bom to v5.9.3 (micronaut-projects#1109)

Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
…down.

- Managed List of unique consumer group IDs, as accessing consumerState.kafkaConsumer.groupMetadata() is not the best way; the consumer is running by another thread through the executor service, so it should not be a good way to access consumerState.kafkaConsumer.groupMetadata() from another thread.
…down.

- added test case for the feature groupId will delete automatically upon application shutdown
@cmabdullah cmabdullah force-pushed the uniqueGroupIdDeleteOnShutdown branch from 037c31f to 7efae6b Compare January 30, 2025 20:35
@cmabdullah
Copy link
Author

cmabdullah commented Jan 30, 2025

@graemerocher,
Thank you for the feedback!
I have added an integration test as requested and also provided a video demonstration at the given link.

Additionally, I’ve made the deletion functionality disabled by default, ensuring it aligns with the concerns raised.

I’m also sharing the build log for reference. Please let me know if any further refinements are needed. Looking forward to your thoughts!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Ready for Review
Development

Successfully merging this pull request may close these issues.

4 participants