forked from pedrodeoliveira/fastapi-kafka-consumer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
122 lines (92 loc) · 3.31 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
from random import randint
from typing import Set, Any
from fastapi import FastAPI
from kafka import TopicPartition
import uvicorn
import aiokafka
import asyncio
import json
import logging
import os
# instantiate the API
app = FastAPI()
# global variables
consumer_task = None
consumer = None
_state = 0
# env variables
KAFKA_TOPIC = os.getenv('KAFKA_TOPIC')
KAFKA_CONSUMER_GROUP_PREFIX = os.getenv('KAFKA_CONSUMER_GROUP_PREFIX', 'group')
KAFKA_BOOTSTRAP_SERVERS = os.getenv('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092')
# initialize logger
logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s',
level=logging.INFO)
log = logging.getLogger(__name__)
@app.on_event("startup")
async def startup_event():
log.info('Initializing API ...')
await initialize()
await consume()
@app.on_event("shutdown")
async def shutdown_event():
log.info('Shutting down API')
consumer_task.cancel()
await consumer.stop()
@app.get("/")
async def root():
return {"message": "Hello World"}
@app.get("/state")
async def state():
return {"state": _state}
async def initialize():
loop = asyncio.get_event_loop()
global consumer
group_id = f'{KAFKA_CONSUMER_GROUP_PREFIX}-{randint(0, 10000)}'
log.debug(f'Initializing KafkaConsumer for topic {KAFKA_TOPIC}, group_id {group_id}'
f' and using bootstrap servers {KAFKA_BOOTSTRAP_SERVERS}')
consumer = aiokafka.AIOKafkaConsumer(KAFKA_TOPIC, loop=loop,
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
group_id=group_id)
# get cluster layout and join group
await consumer.start()
partitions: Set[TopicPartition] = consumer.assignment()
nr_partitions = len(partitions)
if nr_partitions != 1:
log.warning(f'Found {nr_partitions} partitions for topic {KAFKA_TOPIC}. Expecting '
f'only one, remaining partitions will be ignored!')
for tp in partitions:
# get the log_end_offset
end_offset_dict = await consumer.end_offsets([tp])
end_offset = end_offset_dict[tp]
if end_offset == 0:
log.warning(f'Topic ({KAFKA_TOPIC}) has no messages (log_end_offset: '
f'{end_offset}), skipping initialization ...')
return
log.debug(f'Found log_end_offset: {end_offset} seeking to {end_offset-1}')
consumer.seek(tp, end_offset-1)
msg = await consumer.getone()
log.info(f'Initializing API with data from msg: {msg}')
# update the API state
_update_state(msg)
return
async def consume():
global consumer_task
consumer_task = asyncio.create_task(send_consumer_message(consumer))
async def send_consumer_message(consumer):
try:
# consume messages
async for msg in consumer:
# x = json.loads(msg.value)
log.info(f"Consumed msg: {msg}")
# update the API state
_update_state(msg)
finally:
# will leave consumer group; perform autocommit if enabled
log.warning('Stopping consumer')
await consumer.stop()
def _update_state(message: Any) -> None:
value = json.loads(message.value)
global _state
_state = value['state']
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)