Skip to content

Commit

Permalink
Simplify producer for troubleshooting.
Browse files Browse the repository at this point in the history
  • Loading branch information
dspeck1 committed Sep 27, 2024
1 parent f0f3419 commit 8bf2fb4
Showing 1 changed file with 14 additions and 10 deletions.
24 changes: 14 additions & 10 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,11 @@ async def fan_out_msg(
):
try:
await producer.send_and_wait(fan_out_topic, bytes(data))
except KafkaException as e:
logging.info(e)
finally:
await producer.stop()



@REQUEST_TIME.time()
async def knative_request(
in_process_requests_gauge,
Expand Down Expand Up @@ -430,20 +430,24 @@ async def main() -> None:
)

try:
attributes = {
"type": "com.example.kafka",
"source": topic,
}

for fan_out_message in fan_out_message_list:

#for fan_out_message in fan_out_message_list:

async for fan_out_message in fan_out_message_list:
try:
await producer.send_and_wait(fan_out_topic, b(fan_out_message))
finally:
await producer.stop()

'''
task = asyncio.create_task(
fan_out_msg(
producer,
fan_out_topic,
fan_out_message
)
)
'''

'''
data = fan_out_message
Expand All @@ -466,8 +470,8 @@ async def main() -> None:
)
)
'''
tasks.add(task)
task.add_done_callback(tasks.discard)
#tasks.add(task)
#task.add_done_callback(tasks.discard)

except ValueError as e:
logging.info("Error ", e)
Expand Down

0 comments on commit 8bf2fb4

Please sign in to comment.