diff --git a/CHANGELOG.md b/CHANGELOG.md index 338fdd514..6f42a12e4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,8 @@ librdkafka v2.6.2 is a maintenance release: trusted root certificates (#4900). * Fixes to allow to migrate partitions to leaders with same leader epoch, or NULL leader epoch (#4901). +* Commits during a cooperative incremental rebalance aren't causing + an assignment lost if the generation id was bumped in between (#4908). ## Fixes @@ -40,6 +42,13 @@ librdkafka v2.6.2 is a maintenance release: temporarily migrated to the internal broker (#4804), or if broker implementation never bumps it, as it's not needed to validate the offsets. Happening since v2.4.0 (#4901). +* Issues: #4059 + Commits during a cooperative incremental rebalance could cause an + assignment lost if the generation id was bumped by a second join + group request. + Solved by not rejoining the group in case an illegal generation error happens + during a rebalance. + Happening since v1.6.0 (#4908) diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index b7aa3fd63..c34051820 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -3793,7 +3793,12 @@ static void rd_kafka_cgrp_op_handle_OffsetCommit(rd_kafka_t *rk, break; case RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION: - /* Revoke assignment and rebalance on illegal generation */ + /* Revoke assignment and rebalance on illegal generation, + * only if not rebalancing, because a new generation id + * can be received soon after this error. */ + if (RD_KAFKA_CGRP_REBALANCING(rkcg)) + break; + rk->rk_cgrp->rkcg_generation_id = -1; rd_kafka_cgrp_revoke_all_rejoin_maybe( rkcg, rd_true /*assignment is lost*/, diff --git a/tests/0113-cooperative_rebalance.cpp b/tests/0113-cooperative_rebalance.cpp index 8d0325daf..e18276623 100644 --- a/tests/0113-cooperative_rebalance.cpp +++ b/tests/0113-cooperative_rebalance.cpp @@ -3171,8 +3171,11 @@ static void v_rebalance_cb(rd_kafka_t *rk, if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) { test_consumer_incremental_assign("assign", rk, parts); } else { - test_consumer_incremental_unassign("unassign", rk, parts); - + TEST_ASSERT(!rd_kafka_assignment_lost(rk), + "Assignment must not be lost, " + " that is a sign that an ILLEGAL_GENERATION error, " + " during a commit happening during a rebalance is " + "causing the assignment to be lost."); if (!*auto_commitp) { rd_kafka_resp_err_t commit_err; @@ -3181,10 +3184,14 @@ static void v_rebalance_cb(rd_kafka_t *rk, rd_sleep(2); commit_err = rd_kafka_commit(rk, NULL, 0 /*sync*/); TEST_ASSERT(!commit_err || commit_err == RD_KAFKA_RESP_ERR__NO_OFFSET || - commit_err == RD_KAFKA_RESP_ERR__DESTROY, + commit_err == RD_KAFKA_RESP_ERR__DESTROY || + commit_err == RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION, "%s: manual commit failed: %s", rd_kafka_name(rk), rd_kafka_err2str(commit_err)); } + + /* Unassign must be done after manual commit. */ + test_consumer_incremental_unassign("unassign", rk, parts); } } @@ -3198,11 +3205,23 @@ static void v_commit_cb(rd_kafka_t *rk, TEST_SAY("%s offset commit for %d offsets: %s\n", rd_kafka_name(rk), offsets ? offsets->cnt : -1, rd_kafka_err2name(err)); TEST_ASSERT(!err || err == RD_KAFKA_RESP_ERR__NO_OFFSET || + err == RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION || err == RD_KAFKA_RESP_ERR__DESTROY /* consumer was closed */, "%s offset commit failed: %s", rd_kafka_name(rk), rd_kafka_err2str(err)); } +/** + * @brief Log callback for the v_.. test. + */ +static void v_log_cb(const rd_kafka_t *rk, + int level, + const char *fac, + const char *buf) { + /* Slow down logging to make ILLEGAL_GENERATION errors caused by + * manual commit more likely. */ + rd_usleep(1000, 0); +} static void v_commit_during_rebalance(bool with_rebalance_cb, bool auto_commit) { @@ -3240,8 +3259,13 @@ static void v_commit_during_rebalance(bool with_rebalance_cb, test_conf_set(conf, "auto.offset.reset", "earliest"); + test_conf_set(conf, "debug", "consumer,cgrp,topic,fetch"); test_conf_set(conf, "enable.auto.commit", auto_commit ? "true" : "false"); test_conf_set(conf, "partition.assignment.strategy", "cooperative-sticky"); + if (!auto_commit) + /* Slowing down logging is necessary only to make assignment lost + * errors more evident. */ + rd_kafka_conf_set_log_cb(conf, v_log_cb); rd_kafka_conf_set_offset_commit_cb(conf, v_commit_cb); rd_kafka_conf_set_opaque(conf, (void *)&auto_commit); @@ -3266,8 +3290,20 @@ static void v_commit_during_rebalance(bool with_rebalance_cb, /* Poll both consumers */ for (i = 0; i < 10; i++) { - test_consumer_poll_once(c1, NULL, 1000); - test_consumer_poll_once(c2, NULL, 1000); + int poll_result1, poll_result2; + do { + poll_result1 = test_consumer_poll_once(c1, NULL, 1000); + poll_result2 = test_consumer_poll_once(c2, NULL, 1000); + + if (poll_result1 == 1 && !auto_commit) { + rd_kafka_resp_err_t err; + TEST_SAY("Attempting manual commit after poll\n"); + err = rd_kafka_commit(c1, NULL, 0); + TEST_ASSERT(!err || err == RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION, + "Expected not error or ILLEGAL_GENERATION, got: %s", + rd_kafka_err2str(err)); + } + } while (poll_result1 == 0 || poll_result2 == 0); } TEST_SAY("Closing consumers\n");