From febef9923b5cd187c4d5fea144835313a43cc00b Mon Sep 17 00:00:00 2001 From: Wey Gu Date: Wed, 7 Aug 2024 18:55:44 +0800 Subject: [PATCH] feat: support parquet file now. example: ``` %%ngql CREATE TAG entity(name string); CREATE EDGE relationship(relationship string); CREATE TAG INDEX entity_index ON entity(name(256)); %ng_load --header --source https://github.com/microsoft/graphrag/raw/main/examples_notebooks/inputs/operation%20dulce/create_final_entities.parquet --tag entity --vid 1 --props 1:name --space ms_paper ``` --- docs/index.md | 2 +- docs/magic_words/ng_load.md | 6 ++-- ngql/magic.py | 8 +++-- ngql/ng_load.py | 63 ++++++++++++++++++++++++++++--------- setup.py | 3 +- setup_ipython.py | 3 +- 6 files changed, 62 insertions(+), 23 deletions(-) diff --git a/docs/index.md b/docs/index.md index 8e578f3..f9cfb2f 100644 --- a/docs/index.md +++ b/docs/index.md @@ -26,7 +26,7 @@ For a more comprehensive guide on how to get started with `jupyter_nebulagraph`, | Feature | Cheat Sheet | Example | Command Documentation | | ------- | ----------- | --------- | ---------------------- | | Connect | `%ngql --address 127.0.0.1 --port 9669 --user user --password password` | [Connect](https://jupyter-nebulagraph.readthedocs.io/en/stable/get_started_docs/#connect-to-nebulagraph) | [`%ngql`](https://jupyter-nebulagraph.readthedocs.io/en/stable/magic_words/ngql/#connect-to-nebulagraph) | -| Load Data from CSV | `%ng_load --source actor.csv --tag player --vid 0 --props 1:name,2:age --space basketballplayer` | [Load Data](https://jupyter-nebulagraph.readthedocs.io/en/stable/get_started_docs/#load-data-from-csv) | [`%ng_load`](https://jupyter-nebulagraph.readthedocs.io/en/stable/magic_words/ng_load/) | +| Load Data from CSV or Parquet | `%ng_load --source actor.csv --tag player --vid 0 --props 1:name,2:age --space basketballplayer` | [Load Data](https://jupyter-nebulagraph.readthedocs.io/en/stable/get_started_docs/#load-data-from-csv) | [`%ng_load`](https://jupyter-nebulagraph.readthedocs.io/en/stable/magic_words/ng_load/) | | Query Execution | `%ngql MATCH p=(v:player{name:"Tim Duncan"})-->(v2:player) RETURN p;`| [Query Execution](https://jupyter-nebulagraph.readthedocs.io/en/stable/get_started_docs/#query) | [`%ngql` or `%%ngql`(multi-line)](https://jupyter-nebulagraph.readthedocs.io/en/stable/magic_words/ngql/#make-queries) | | Result Visualization | `%ng_draw` | [Draw Graph](https://jupyter-nebulagraph.readthedocs.io/en/stable/magic_words/ng_draw/) | [`%ng_draw`](https://jupyter-nebulagraph.readthedocs.io/en/stable/magic_words/ng_draw/) | | Draw Schema | `%ng_draw_schema` | [Draw Schema](https://jupyter-nebulagraph.readthedocs.io/en/stable/magic_words/ng_draw_schema/) | [`%ng_draw_schema`](https://jupyter-nebulagraph.readthedocs.io/en/stable/magic_words/ng_draw_schema/) | diff --git a/docs/magic_words/ng_load.md b/docs/magic_words/ng_load.md index 82390b3..b9be72b 100644 --- a/docs/magic_words/ng_load.md +++ b/docs/magic_words/ng_load.md @@ -1,6 +1,6 @@ -## Load Data from CSV +## Load Data from CSV or Parquet file -It's supported to load data from a CSV file into NebulaGraph with the help of `ng_load_csv` magic. +It's supported to load data from a CSV or Parquet file into NebulaGraph with the help of `ng_load` magic. ### Examples @@ -58,7 +58,7 @@ Some other examples: ### Usage -``` +```python %ng_load --source [--header] --space [--tag ] [--vid ] [--edge ] [--src ] [--dst ] [--rank ] [--props ] [-b ] [--limit ] ``` diff --git a/ngql/magic.py b/ngql/magic.py index a907064..b5552c8 100644 --- a/ngql/magic.py +++ b/ngql/magic.py @@ -268,7 +268,9 @@ def _execute(self, query): if self.space is not None: # Always use space automatically session.execute(f"USE { self.space }") result = session.execute(query) - assert result.is_succeeded(), f"Query Failed:\n { result.error_msg() }" + assert ( + result.is_succeeded() + ), f"Query Failed:\n { result.error_msg() }\n Query:\n { query }" self._remember_space(result) except Exception as e: fancy_print(f"[ERROR]:\n { e }", color="red") @@ -514,7 +516,9 @@ def ng_draw(self, line, cell=None, local_ns={}): # Update node sizes based on PageRank scores for node_id, score in pagerank_scores.items(): - normalized_size = 10 + score * 90 # Reduced multiplier for smaller size normalization + normalized_size = ( + 10 + score * 90 + ) # Reduced multiplier for smaller size normalization g.get_node(node_id)["size"] = min(normalized_size, 80) except Exception as e: fancy_print( diff --git a/ngql/ng_load.py b/ngql/ng_load.py index 7746a56..d14323e 100644 --- a/ngql/ng_load.py +++ b/ngql/ng_load.py @@ -1,6 +1,6 @@ import requests import pandas as pd -from io import StringIO +from io import BytesIO, StringIO from typing import Callable from nebula3.data.ResultSet import ResultSet @@ -69,25 +69,51 @@ def ng_load(execute_fn: Callable[[str], ResultSet], args: LoadDataArgsModel): limit = args.limit - # Function to safely load CSV with limit - def safe_load_csv(source, header_option, limit=None): - temp_df = pd.read_csv(source, header=header_option) + # Function to safely load CSV or Parquet with limit + def safe_load_file(source, file_type, header_option=None, limit=None): + if file_type == "csv": + temp_df = pd.read_csv(source, header=header_option) + elif file_type == "parquet": + temp_df = pd.read_parquet(source) + else: + raise ValueError(f"Unsupported file type: {file_type}") + if isinstance(limit, int) and limit > 0: return temp_df.head(limit) return temp_df - # Load CSV from file or URL + # Determine file type based on source extension + file_type = ( + "csv" + if args.source.lower().endswith(".csv") + else "parquet" + if args.source.lower().endswith(".parquet") + else None + ) + if file_type is None: + raise ValueError( + "Unsupported file type. Please use either CSV or Parquet files." + ) + + # Load file from URL or local path if args.source.startswith("http://") or args.source.startswith("https://"): response = requests.get(args.source) - csv_string = response.content.decode("utf-8") - df = safe_load_csv( - StringIO(csv_string), + if file_type == "csv": + file_content = StringIO(response.content.decode("utf-8")) + else: # parquet + file_content = BytesIO(response.content) + df = safe_load_file( + file_content, + file_type, header_option=0 if with_header else None, limit=limit, ) else: - df = safe_load_csv( - args.source, header_option=0 if with_header else None, limit=limit + df = safe_load_file( + args.source, + file_type, + header_option=0 if with_header else None, + limit=limit, ) # Build schema type map for tag or edge type @@ -217,7 +243,9 @@ def safe_load_csv(source, header_option, limit=None): else: query = f"INSERT VERTEX `{args.tag}` (`{'`, `'.join(prop_columns)}`) VALUES " for index, row in batch.iterrows(): - vid_str = f'{QUOTE_VID}{row["___vid"]}{QUOTE_VID}' + raw_vid_str = row["___vid"].strip('"').replace('"', '\\"') + vid_str = f"{QUOTE_VID}{raw_vid_str}{QUOTE_VID}" + prop_str = "" if with_props: for prop_name in prop_columns: @@ -230,7 +258,8 @@ def safe_load_csv(source, header_option, limit=None): ) prop_str += "NULL, " elif prop_schema_map[prop_name]["type"] == "string": - prop_str += f"{QUOTE}{prop_value}{QUOTE}, " + raw_prop_str = prop_value.strip('"').replace('"', '\\"') + prop_str += f"{QUOTE}{raw_prop_str}{QUOTE}, " elif prop_schema_map[prop_name]["type"] == "date": prop_str += f"date({QUOTE}{prop_value}{QUOTE}), " elif prop_schema_map[prop_name]["type"] == "datetime": @@ -275,8 +304,10 @@ def safe_load_csv(source, header_option, limit=None): f"INSERT EDGE `{args.edge}` (`{'`, `'.join(prop_columns)}`) VALUES " ) for index, row in batch.iterrows(): - src_str = f'{QUOTE_VID}{row["___src"]}{QUOTE_VID}' - dst_str = f'{QUOTE_VID}{row["___dst"]}{QUOTE_VID}' + raw_src_str = row["___src"].strip('"').replace('"', '\\"') + src_str = f"{QUOTE_VID}{raw_src_str}{QUOTE_VID}" + raw_dst_str = row["___dst"].strip('"').replace('"', '\\"') + dst_str = f"{QUOTE_VID}{raw_dst_str}{QUOTE_VID}" prop_str = "" if with_props: for prop_name in prop_columns: @@ -289,7 +320,8 @@ def safe_load_csv(source, header_option, limit=None): ) prop_str += "NULL, " elif prop_schema_map[prop_name]["type"] == "string": - prop_str += f"{QUOTE}{prop_value}{QUOTE}, " + raw_prop_str = prop_value.strip('"').replace('"', '\\"') + prop_str += f"{QUOTE}{raw_prop_str}{QUOTE}, " elif prop_schema_map[prop_name]["type"] == "date": prop_str += f"date({QUOTE}{prop_value}{QUOTE}), " elif prop_schema_map[prop_name]["type"] == "datetime": @@ -344,6 +376,7 @@ def args_load(line: str): %ng_load --source https://github.com/wey-gu/awesome-graph-dataset/raw/main/datasets/shareholding/tiny/corp_share.csv --edge hold_share --src 0 --dst 1 --props 2:share --space shareholding %ng_load --source https://github.com/wey-gu/awesome-graph-dataset/raw/main/datasets/shareholding/tiny/person_corp_share.csv --edge hold_share --src 0 --dst 1 --props 2:share --space shareholding %ng_load --source https://github.com/wey-gu/awesome-graph-dataset/raw/main/datasets/shareholding/tiny/person_rel.csv --edge reletive_with --src 0 --dst 1 --props 2:degree --space shareholding +%ng_load --header --source https://github.com/microsoft/graphrag/raw/main/examples_notebooks/inputs/operation%20dulce/create_final_entities.parquet --tag entity --vid 1 --props 1:name --space ms_paper """ execute_fn = conn_pool.get_session("root", "nebula").execute for line in test.split("\n"): diff --git a/setup.py b/setup.py index 0da9bb1..819c855 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setuptools.setup( name="jupyter_nebulagraph", - version="0.13.4", + version="0.14.0", author="Wey Gu", author_email="weyl.gu@gmail.com", description="Jupyter extension for NebulaGraph", @@ -33,5 +33,6 @@ "pydantic", "scipy", "ipywidgets", + "pyarrow", ], ) diff --git a/setup_ipython.py b/setup_ipython.py index 0c9b9c9..fc202ef 100644 --- a/setup_ipython.py +++ b/setup_ipython.py @@ -5,7 +5,7 @@ setuptools.setup( name="ipython-ngql", - version="0.13.4", + version="0.14.0", author="Wey Gu", author_email="weyl.gu@gmail.com", description="Jupyter extension for NebulaGraph", @@ -33,5 +33,6 @@ "pydantic", "scipy", "ipywidgets", + "pyarrow", ], )