-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathreset_offsets.py
86 lines (68 loc) · 2.82 KB
/
reset_offsets.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
from tools.common import setup_args, check_empty_arg
from tools.commands import describe_consumer_group, list_consumer_groups, list_group_topics, reset_topic_offset, save_group_offsets
KAFKA_URL = 'localhost:9092'
args_conf = [
{
"short": "-t",
"full": "--topic",
"dest": "topic",
"help": "topic to reset offset, '_all_' to reset all topics",
"metavar": "TOPIC"
},
{
"short": "-o",
"full": "--offset",
"dest": "offset",
"help": "new offset to reset ('earliest', 'latest', 'to_date')",
"metavar": "OFFSET"
},
{
"short": "-d",
"full": "--date",
"dest": "date",
"help": "date to reset to (Format: 'YYYY-MM-DDTHH:mm:SS.sss')",
"metavar": "DATE"
},
{
"short": "-b",
"full": "--backup-to",
"dest": "offsets_path",
"help": "path to directory where the current offsets will be stored",
"metavar": "OFFSETS_PATH"
},
{
"short": "-s",
"full": "--bootstrap-server",
"dest": "bootstrap_server",
"help": "kafka node server address",
"metavar": "BOOTSTRAP_SERVER"
},
]
# this function finds the given topic in all consumer groups and changes its current offset to the given value or date
def reset_offsets_for_topic_in_all_groups(input_args):
groups = list_consumer_groups(KAFKA_URL)
print('found {0} groups'.format(len(groups)))
try:
# all(x in ['b', 'a', 'foo', 'bar'] for x in ['a', 'b']) <-- check if all elements ('a','b') are in list ('b','a','foo','bar')
if all(x in input_args.keys() for x in ['topic', 'offset']):
reset_topic = input_args['topic']
print('will reset topic: {0}'.format(check_empty_arg(reset_topic, 'topic')))
reset_date = ''
if input_args['offset'] == 'to_date':
if 'date' in input_args.keys():
reset_date = check_empty_arg(input_args['date'], 'DATE')
logs_structure = {}
kafka_node = input_args['bootstrap_server'] if 'bootstrap_server' in input_args.keys() else KAFKA_URL
for group in groups:
about = describe_consumer_group(group, kafka_node)
if 'offsets_path' in input_args.keys():
offsets_path = input_args['offsets_path']
save_group_offsets(group, offsets_path, about)
logs_structure[group] = list_group_topics(about)
if reset_topic == '_all_' or reset_topic in logs_structure[group]:
reset_topic_offset(group, reset_topic, kafka_node, input_args['offset'], reset_date)
except Exception as e:
print(e)
if __name__=='__main__':
args = setup_args(args_conf)
reset_offsets_for_topic_in_all_groups(vars(args))