diff --git a/docs/connectors/sources/csv-source.md b/docs/connectors/sources/csv-source.md index 690ce72e8..1a395fad6 100644 --- a/docs/connectors/sources/csv-source.md +++ b/docs/connectors/sources/csv-source.md @@ -1,6 +1,6 @@ # CSV Source -A basic source that reads data from a single CSV file. +A base CSV source that reads data from a CSV file and produces rows to the Kafka topic in JSON format. The CSV source reads the file, produce the data and exit. It doesn't keep any state. On restart, the whole file will be re-consumed. @@ -14,7 +14,9 @@ from quixstreams.sources.core.csv import CSVSource def main(): app = Application() - source = CSVSource(path="input.csv") + # Create the Source instance with a file path and a name. + # The name will be included to the default topic name. + source = CSVSource(path="input.csv", name="csv") sdf = app.dataframe(source=source) sdf.print(metadata=True) @@ -27,25 +29,76 @@ if __name__ == "__main__": ## File format -The CSV source expect the input file to have headers, a `key` column, a `value` column and optionally a `timestamp` column. +The CSV source expect the input file to have headers. + +Every row will be converted to a JSON dictionary and set to the topic. Example file: ```csv -key,value -foo1,bar1 -foo2,bar2 -foo3,bar3 -foo4,bar4 -foo5,bar5 -foo6,bar6 -foo7,bar7 +field1,field2,timestamp +foo1,bar1,1 +foo2,bar2,2 +foo3,bar3,3 +``` + +What the source will produce: +```json lines +{"field1": "foo1", "field2": "bar1", "timestamp": "1"} +{"field1": "foo2", "field2": "bar2", "timestamp": "2"} +{"field1": "foo3", "field2": "bar3", "timestamp": "3"} ``` -## Key and value format +## Key and timestamp extractors +By default, the produced Kafka messages don't have keys and use current epoch as timestamps. + +To specify keys and timestamps for the messages, you may pass `key_extractor` and `timestamp_extractor` callables: + +```python +from typing import AnyStr + +from quixstreams import Application +from quixstreams.sources.core.csv import CSVSource + + +def key_extractor(row: dict) -> AnyStr: + return row["field1"] + + +def timestamp_extractor(row: dict) -> int: + return int(row["timestamp"]) + + +def main(): + app = Application(broker_address="localhost:9092") + # input.csv: + # field1,field2,timestamp + # foo1,bar1,1 + # foo2,bar2,2 + # foo3,bar3,3 + + source = CSVSource( + path="input.csv", + name="csv", + # Extract field "field1" from each row and use it as a message key. + # Keys must be either strings or bytes. + key_extractor=key_extractor, + # Extract field "timestamp" from each row and use it as a timestamp. + # Timestamps must be integers in milliseconds. + timestamp_extractor=timestamp_extractor, + ) + + sdf = app.dataframe(source=source) + sdf.print(metadata=True) + + app.run() + + +if __name__ == "__main__": + main() +``` -By default the CSV source expect the `key` is a string and the `value` a json object. You can configure the deserializers using the `key_deserializer` and `value_deserializer` paramaters. ## Topic -The default topic used for the CSV source will use the `path` as a name and expect keys to be strings and values to be JSON objects. +The default topic used for the CSV source will use the `name` as a part of the topic name and expect keys to be strings and values to be JSON objects. diff --git a/quixstreams/sources/base/source.py b/quixstreams/sources/base/source.py index e75512df1..e9c952656 100644 --- a/quixstreams/sources/base/source.py +++ b/quixstreams/sources/base/source.py @@ -182,8 +182,8 @@ def main(): def __init__(self, name: str, shutdown_timeout: float = 10) -> None: """ - :param name: The source unique name. Used to generate the topic configurtion - :param shutdown_timeout: Time in second the application waits for the source to gracefully shutdown + :param name: The source unique name. It is used to generate the topic configuration. + :param shutdown_timeout: Time in second the application waits for the source to gracefully shutdown. """ super().__init__() diff --git a/quixstreams/sources/core/csv.py b/quixstreams/sources/core/csv.py index b8cf5bd72..e4fc5bd9d 100644 --- a/quixstreams/sources/core/csv.py +++ b/quixstreams/sources/core/csv.py @@ -1,74 +1,84 @@ import csv -import json -from typing import Any, Callable, Optional +import logging +import time +from pathlib import Path +from typing import AnyStr, Callable, Optional, Union from quixstreams.models.topics import Topic from quixstreams.sources.base import Source +logger = logging.getLogger(__name__) + class CSVSource(Source): def __init__( self, - path: str, - dialect: str = "excel", - name: Optional[str] = None, + path: Union[str, Path], + name: str, + key_extractor: Optional[Callable[[dict], AnyStr]] = None, + timestamp_extractor: Optional[Callable[[dict], int]] = None, + delay: float = 0, shutdown_timeout: float = 10, - key_deserializer: Callable[[Any], str] = str, - value_deserializer: Callable[[Any], str] = json.loads, + dialect: str = "excel", ) -> None: """ - A base CSV source that reads data from a single CSV file. - Best used with `quixstreams.sinks.csv.CSVSink`. + A base CSV source that reads data from a CSV file and produces rows + to the Kafka topic in JSON format. - Required columns: key, value - Optional columns: timestamp - - :param path: path to the CSV file + :param path: a path to the CSV file. + :param name: a unique name for the Source. + It is used as a part of the default topic name. + :param key_extractor: an optional callable to extract the message key from the row. + It must return either `str` or `bytes`. + If empty, the Kafka messages will be produced without keys. + Default - `None`. + :param timestamp_extractor: an optional callable to extract the message timestamp from the row. + It must return time in milliseconds as `int`. + If empty, the current epoch will be used. + Default - `None` + :param delay: an optional delay after producing each row for stream simulation. + Default - `0`. + :param shutdown_timeout: Time in second the application waits for the source to gracefully shut down. :param dialect: a CSV dialect to use. It affects quoting and delimiters. See the ["csv" module docs](https://docs.python.org/3/library/csv.html#csv-fmt-params) for more info. Default - `"excel"`. - :param key_deseralizer: a callable to convert strings to key. - Default - `str` - :param value_deserializer: a callable to convert strings to value. - Default - `json.loads` """ - super().__init__(name or path, shutdown_timeout) self.path = path + self.delay = delay self.dialect = dialect - self._key_deserializer = key_deserializer - self._value_deserializer = value_deserializer + self.key_extractor = key_extractor + self.timestamp_extractor = timestamp_extractor - def run(self): - key_deserializer = self._key_deserializer - value_deserializer = self._value_deserializer + super().__init__(name=name, shutdown_timeout=shutdown_timeout) + def run(self): + # Start reading the file with open(self.path, "r") as f: + logger.info(f'Producing data from the file "{self.path}"') reader = csv.DictReader(f, dialect=self.dialect) while self.running: try: - item = next(reader) + row = next(reader) except StopIteration: return - # if a timestamp column exist with no value timestamp is "" - timestamp = item.get("timestamp") or None - if timestamp is not None: - timestamp = int(timestamp) - - msg = self.serialize( - key=key_deserializer(item["key"]), - value=value_deserializer(item["value"]), - timestamp_ms=timestamp, + # Extract message key from the row + message_key = self.key_extractor(row) if self.key_extractor else None + # Extract timestamp from the row + timestamp = ( + self.timestamp_extractor(row) if self.timestamp_extractor else None ) + # Serialize data before sending to Kafka + msg = self.serialize(key=message_key, value=row, timestamp_ms=timestamp) - self.produce( - key=msg.key, - value=msg.value, - timestamp=msg.timestamp, - headers=msg.headers, - ) + # Publish the data to the topic + self.produce(timestamp=msg.timestamp, key=msg.key, value=msg.value) + + # If the delay is specified, sleep before producing the next row + if self.delay > 0: + time.sleep(self.delay) def default_topic(self) -> Topic: return Topic( diff --git a/tests/test_quixstreams/test_sources/test_core/test_csv.py b/tests/test_quixstreams/test_sources/test_core/test_csv.py index 8be2b577d..2fc58d7af 100644 --- a/tests/test_quixstreams/test_sources/test_core/test_csv.py +++ b/tests/test_quixstreams/test_sources/test_core/test_csv.py @@ -1,5 +1,4 @@ import csv -import json from unittest.mock import MagicMock import pytest @@ -19,24 +18,26 @@ def test_read(self, tmp_path, producer): path = tmp_path / "source.csv" with open(path, "w") as f: writer = csv.DictWriter( - f, dialect="excel", fieldnames=("key", "value", "timestamp") + f, dialect="excel", fieldnames=("key", "field", "timestamp") ) writer.writeheader() writer.writerows( [ - {"key": "key1", "value": json.dumps({"value": "value1"})}, - {"key": "key2", "value": json.dumps({"value": "value2"})}, - {"key": "key3", "value": json.dumps({"value": "value3"})}, - {"key": "key4", "value": json.dumps({"value": "value4"})}, - { - "key": "key5", - "value": json.dumps({"value": "value5"}), - "timestamp": 10000, - }, + {"key": "key1", "field": "value1", "timestamp": 1}, + {"key": "key2", "field": "value2", "timestamp": 2}, + {"key": "key3", "field": "value3", "timestamp": 3}, + {"key": "key4", "field": "value4", "timestamp": 4}, + {"key": "key5", "field": "value5", "timestamp": 5}, ] ) - source = CSVSource(path) + name = "csv" + source = CSVSource( + name=name, + path=path, + key_extractor=lambda r: r["key"], + timestamp_extractor=lambda r: int(r["timestamp"]), + ) source.configure(source.default_topic(), producer) source.start() @@ -48,27 +49,30 @@ def test_read(self, tmp_path, producer): "key": b"key5", "partition": None, "poll_timeout": 5.0, - "timestamp": 10000, - "topic": path, - "value": b'{"value":"value5"}', + "timestamp": 5, + "topic": name, + "value": b'{"key":"key5","field":"value5","timestamp":"5"}', } - def test_read_no_timestamp(self, tmp_path, producer): + def test_read_no_extractors(self, tmp_path, producer): path = tmp_path / "source.csv" with open(path, "w") as f: - writer = csv.DictWriter(f, dialect="excel", fieldnames=("key", "value")) + writer = csv.DictWriter( + f, dialect="excel", fieldnames=("key", "field", "timestamp") + ) writer.writeheader() writer.writerows( [ - {"key": "key1", "value": json.dumps({"value": "value1"})}, - {"key": "key2", "value": json.dumps({"value": "value2"})}, - {"key": "key3", "value": json.dumps({"value": "value3"})}, - {"key": "key4", "value": json.dumps({"value": "value4"})}, - {"key": "key5", "value": json.dumps({"value": "value5"})}, + {"key": "key1", "field": "value1", "timestamp": 1}, + {"key": "key2", "field": "value2", "timestamp": 2}, + {"key": "key3", "field": "value3", "timestamp": 3}, + {"key": "key4", "field": "value4", "timestamp": 4}, + {"key": "key5", "field": "value5", "timestamp": 5}, ] ) - source = CSVSource(path) + name = "csv" + source = CSVSource(name="csv", path=path) source.configure(source.default_topic(), producer) source.start() @@ -77,10 +81,10 @@ def test_read_no_timestamp(self, tmp_path, producer): assert producer.produce.call_args.kwargs == { "buffer_error_max_tries": 3, "headers": None, - "key": b"key5", + "key": None, "partition": None, "poll_timeout": 5.0, "timestamp": None, - "topic": path, - "value": b'{"value":"value5"}', + "topic": name, + "value": b'{"key":"key5","field":"value5","timestamp":"5"}', }