Skip to content

Commit

Permalink
DO NOT MERGE. Removes branches ignore for ticket branch to allow for …
Browse files Browse the repository at this point in the history
…build.

Adds json serializer, awaits producer start to fix error, closes
producer.  Remove unused code.
  • Loading branch information
dspeck1 committed Oct 1, 2024
1 parent 3ddfbc8 commit 974cbdf
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 100 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ name: CI
# optimization so there's no need to ensure this is comprehensive.
- "dependabot/**"
- "renovate/**"
- "tickets/**"
#- "tickets/**"
- "u/**"
tags:
- "*"
Expand Down
124 changes: 25 additions & 99 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,69 +95,18 @@ def detector_load(conf: dict, instrument: str) -> list[int]:
active_detectors.append(k)
return active_detectors

def serializer(value):
return json.dumps(value).encode()

async def fan_out_msg(
fan_out_topic: fan_out_topic,
data: data
producer,
fan_out_topic,
data
):
try:
await producer.send_and_wait(fan_out_topic, b(data))
finally:
await producer.stop()

@REQUEST_TIME.time()
async def knative_request(
in_process_requests_gauge,
client: httpx.AsyncClient,
knative_serving_url: str,
headers: dict[str, str],
body: bytes,
info: str,
) -> None:
"""Makes knative http request.
Parameters
----------
client: `httpx.AsyncClient`
The async httpx client.
knative_serving_url : `string`
The url for the knative instance.
headers: dict[`str,'str']
The headers to pass to knative.
body: `bytes`
The next visit message body.
info: `str`
Information such as some fields of the next visit message to identify
this request and to log with.
"""
in_process_requests_gauge.inc()

result = await client.post(
knative_serving_url,
headers=headers,
data=body, # type:ignore
timeout=None,
)

logging.info(
f"nextVisit {info} status code {result.status_code} for initial request {result.content}"
)

'''
if result.status_code == 502 or result.status_code == 503:
logging.info(
f"retry after status code {result.status_code} for nextVisit {info}"
)
retry_result = await client.post(
knative_serving_url,
headers=headers,
data=body, # type:ignore
timeout=None,
)
logging.info(
f"nextVisit {info} retried request {retry_result.content}"
)
'''
in_process_requests_gauge.dec()
await producer.start()
logging.info(f"sending msg {data}")
await producer.send_and_wait(fan_out_topic, data)
await producer.stop()


async def main() -> None:
Expand All @@ -171,8 +120,6 @@ async def main() -> None:
expire = float(os.environ["MESSAGE_EXPIRATION"])
kafka_schema_registry_url = os.environ["KAFKA_SCHEMA_REGISTRY_URL"]
latiss_knative_serving_url = os.environ["LATISS_KNATIVE_SERVING_URL"]
lsstcomcam_knative_serving_url = os.environ["LSSTCOMCAM_KNATIVE_SERVING_URL"]
lsstcomcamsim_knative_serving_url = os.environ["LSSTCOMCAMSIM_KNATIVE_SERVING_URL"]
lsstcam_knative_serving_url = os.environ["LSSTCAM_KNATIVE_SERVING_URL"]
hsc_knative_serving_url = os.environ["HSC_KNATIVE_SERVING_URL"]

Expand Down Expand Up @@ -215,15 +162,7 @@ async def main() -> None:
sasl_mechanism=sasl_mechanism,
sasl_plain_username=sasl_username,
sasl_plain_password=sasl_password,
)

# https://aiokafka.readthedocs.io/en/stable/producer.html
producer = AIOKafkaProducer(
fan_out_topic,
bootstrap_servers=prompt_processing_kafka_cluster,
enable_idempotence=True
)
await producer.start()
)

latiss_gauge = Gauge(
"latiss_next_visit_messages", "next visit nessages with latiss as instrument"
Expand Down Expand Up @@ -293,6 +232,9 @@ async def main() -> None:
logging.info("Message does not have an instrument. Assuming "
"it's not an observation.")
continue

'''
# Temporary disable so we can see older messages for testing.
# efdStamp is visit publication, in seconds since 1970-01-01 UTC
if next_visit_message_initial["message"]["private_efdStamp"]:
Expand All @@ -306,7 +248,7 @@ async def main() -> None:
continue
else:
logging.warning("Message does not have private_efdStamp, can't determine age.")

'''
next_visit_message_updated = NextVisitModel(
salIndex=next_visit_message_initial["message"]["salIndex"],
scriptSalIndex=next_visit_message_initial["message"][
Expand Down Expand Up @@ -428,41 +370,25 @@ async def main() -> None:
)

try:
attributes = {
"type": "com.example.kafka",
"source": topic,
}
# https://aiokafka.readthedocs.io/en/stable/producer.html
producer = AIOKafkaProducer(
bootstrap_servers=prompt_processing_kafka_cluster,
value_serializer=serializer
)
await producer.start()
logging.info ("started kafka producer")

for fan_out_message in fan_out_message_list:

task = asyncio.create_task(

fan_out_msg(
producer,
fan_out_topic,
fan_out_message
)
)

'''
data = fan_out_message
data_json = json.dumps(data)
logging.info(f"data after json dump {data_json}")
event = CloudEvent(attributes, data_json)
headers, body = to_structured(event)
info = {
key: data[key] for key in ["instrument", "groupId", "detector"]
}
task = asyncio.create_task(
knative_request(
in_process_requests_gauge,
client,
knative_serving_url,
headers,
body,
str(info),
)
)
'''
tasks.add(task)
task.add_done_callback(tasks.discard)

Expand Down

0 comments on commit 974cbdf

Please sign in to comment.