Skip to content

Commit

Permalink
add schema registry serializer
Browse files Browse the repository at this point in the history
  • Loading branch information
dspeck1 committed Oct 15, 2024
1 parent 974cbdf commit f073676
Showing 1 changed file with 93 additions and 2 deletions.
95 changes: 93 additions & 2 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from cloudevents.conversion import to_structured
from cloudevents.http import CloudEvent
import httpx
from kafkit.registry import Deserializer
from kafkit.registry import Deserializer, Serializer
from kafkit.registry.httpx import RegistryApi
from prometheus_client import start_http_server, Summary # type:ignore
from prometheus_client import Gauge
Expand Down Expand Up @@ -100,12 +100,13 @@ def serializer(value):

async def fan_out_msg(
producer,
fan_out_serializer,
fan_out_topic,
data
):
await producer.start()
logging.info(f"sending msg {data}")
await producer.send_and_wait(fan_out_topic, data)
await producer.send_and_wait(fan_out_topic, fan_out_serializer(data))
await producer.stop()


Expand Down Expand Up @@ -219,6 +220,95 @@ async def main() -> None:
)
deserializer = Deserializer(registry=registry_api)

# Schema
fan_out_schema = """
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Fanout",
"description": "Fanout message",
"type": "object",
"properties": {
"salIndex": {
"description": "sal Index",
"type": "string"
},
"scriptSalIndex": {
"description": "script sal index",
"type": "string"
},
"instrument": {
"description": "instrument",
"type": "string"
},
"groupId": {
"description": "group id",
"type": "string"
},
"coordinateSystem": {
"description": "coordinate system",
"type": "string"
},
"position": {
"description": "position",
"type": "string"
},
"startTime": {
"description": "start time",
"type": "string"
},
"rotationSystem" :{
"description": "rotation system",
"type": "string"
},
"cameraAngle": {
"description": "camera angle",
"type": "string"
},
"filters": {
"description": "filters",
"type": "string"
},
"dome": {
"description": "dome",
"type": "string"
},
"duration": {
"description": "duration",
"type": "string"
},
"nimages": {
"description": "number of images",
"type": "string"
},
"survey": {
"description": "survey",
"type": "string"
},
"totalCheckpoints": {
"description": "total checkpoints",
"type": "string"
},
"private_sndStamp": {
"description": "private send stamp",
"type": "string"
},
"detector": {
"description": "detector",
"type": "string"
}
}
}
"""
# Setup registry API
fan_out_registry_api = RegistryApi(
fan_out_http_client=client, url="http://10.104.75.248:8081"
)
fan_out_registry_api.schemas.insert(fan_out_schema, 1)
fan_out_serializer= await Serializer.register(
registry=fan_out_registry_api,
schema=fan_out_schema
)

while True: # run continously
async for msg in consumer:

Expand Down Expand Up @@ -384,6 +474,7 @@ async def main() -> None:

fan_out_msg(
producer,
fan_out_serializer,
fan_out_topic,
fan_out_message
)
Expand Down

0 comments on commit f073676

Please sign in to comment.