Skip to content

Commit

Permalink
Initial commit for adding redis stream suppport to fan out. Convert p…
Browse files Browse the repository at this point in the history
…osition to string due to redis streams not supporting lists.
  • Loading branch information
dspeck1 committed Feb 18, 2025
1 parent 08e13ae commit 0420632
Showing 1 changed file with 69 additions and 3 deletions.
72 changes: 69 additions & 3 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from kafkit.registry.httpx import RegistryApi
from prometheus_client import start_http_server, Summary # type:ignore
from prometheus_client import Gauge
import redis.asyncio as redis
import yaml

REQUEST_TIME = Summary("request_processing_seconds", "Time spent processing request")
Expand All @@ -31,7 +32,7 @@ class NextVisitModel:
instrument: str
groupId: str
coordinateSystem: int
position: list[float]
position: str #TODO check if list allowed after testing.
startTime: float
rotationSystem: int
cameraAngle: float
Expand Down Expand Up @@ -98,6 +99,7 @@ def add_detectors(
for active_detector in active_detectors:
temp_message = message.copy()
temp_message["detector"] = active_detector
temp_message["position"] = str(message["position"]) #TODO cleanup after redis testing
# temporary change to modify short filter names to format expected by butler
if temp_message["filters"] != "" and len(temp_message["filters"]) == 1:
temp_message["filters"] = (
Expand Down Expand Up @@ -385,6 +387,41 @@ def dispatch_fanned_out_messages(client: httpx.AsyncClient,
except ValueError:
logging.exception("Error while sending fanned-out messages.")

def dispatch_fanned_out_messages_redis_stream(
redis_client: redis.Redis,
redis_stream: str,
tasks: collections.abc.MutableSet[asyncio.Task],
send_info: Submission,
):
"""Package and send the fanned-out messages to Prompt Processing.
Parameters
----------
redis_client : `redis.Redis`
The redis client used to send the redis stream.
redis_stream : `str`
The redis stream.
tasks : set [`asyncio.Task`]
Collection for holding the requests.
send_info : `Submission`
The data and address to submit.
"""
try:
for fan_out_message in send_info.fan_out_messages:

task = asyncio.create_task(
redis_stream_request(
redis_client,
redis_stream,
fan_out_message,
)
)
tasks.add(task)
task.add_done_callback(tasks.discard)

except ValueError:
logging.exception("Error while sending fanned-out messages.")


@REQUEST_TIME.time()
async def knative_request(
Expand Down Expand Up @@ -449,6 +486,31 @@ async def knative_request(
f"retried request {retry_result.content}"
)

async def redis_stream_request(
redis_client: redis.Redis,
redis_stream: str,
body: dict[str, typing.Any],
) -> None:
"""Makes redis stream request.
Parameters
----------
in_process_requests_gauge : `prometheus_client.Gauge`
A gauge to be updated with the start and end of the request.
client : `redis.Redis`
The redis stream client.
redis_stream : `string`
The name of the redis stream.
body : `dict[str, typing.Any]`
The next visit message body.
"""

logging.info(f"sending msg {body} for redis stream {redis_stream}")
await redis_client.xadd(
redis_stream,
body
)


async def main() -> None:
# Get environment variables
Expand All @@ -462,6 +524,8 @@ async def main() -> None:
kafka_schema_registry_url = os.environ["KAFKA_SCHEMA_REGISTRY_URL"]
max_outgoing = int(os.environ["MAX_FAN_OUT_MESSAGES"])
retry_knative = os.environ["RETRY_KNATIVE_REQUESTS"].lower() == "true"
redis_host = os.environ["REDIS_HOST"]
redis_stream = os.environ["REDIS_STREAM"]

# kafka auth
sasl_username = os.environ["SASL_USERNAME"]
Expand Down Expand Up @@ -511,6 +575,9 @@ async def main() -> None:
)
deserializer = Deserializer(registry=registry_api)

redis_client = redis.Redis(host=redis_host)
await redis_client.aclose()

while True: # run continously
async for msg in consumer:
try:
Expand All @@ -529,8 +596,7 @@ async def main() -> None:
gauges,
hsc_upload_detectors,
)
dispatch_fanned_out_messages(client, topic, tasks, send_info, gauges,
retry_knative=retry_knative)
dispatch_fanned_out_messages_redis_stream(redis_client, redis_stream, tasks, send_info)
except UnsupportedMessageError:
logging.exception("Could not process message, continuing.")
finally:
Expand Down

0 comments on commit 0420632

Please sign in to comment.