This notebook demonstrates how to: 1. Upload two Python functions 2. Create a pipeline that connects them
Step 1: Upload Two Functions¶
We'll upload csv2parquet and parquet2json functions.
# Function 1: CSV to Parquet
def csv2parquet(input_file: Path, /) -> dict[str, Path]:
"""Convert a CSV file to Parquet format.
:param input_file: Path to the input CSV file.
:return: Dict with 'parquet' key containing the output Parquet file path.
"""
# Read the CSV file
df = pd.read_csv(input_file)
# Define the output path with .parquet extension
parquet_path = input_file.with_suffix(".parquet")
# Write the DataFrame to a Parquet file
df.to_parquet(parquet_path, index=False)
return {"parquet": parquet_path}
func1 = client.add_function(
gfhub.Function(
csv2parquet,
dependencies={"pandas[pyarrow]": "import pandas as pd"},
)
)
print("Function 1 uploaded:")
print(func1)
Function 1 uploaded:
{'id': '019bb972-1978-7b11-a786-1a35c71ebc93', 'name': 'csv2parquet', 'parameters': {}, 'inputs': {'input_file': {'type': 'Path'}}, 'outputs': {'parquet': {'type': 'Path'}}, 'created_at': '2026-01-13T22:20:21.240862Z', 'updated_at': '2026-01-14T10:24:07.209418Z'}
# Function 2: Parquet to JSON
def parquet2json(path: Path, /, *, orient: str = "records", indent: int = 2) -> Path:
"""Convert Parquet to JSON."""
df = pd.read_parquet(path)
output = path.with_suffix(".json")
df.to_json(output, orient=orient, indent=indent)
return output
func2 = client.add_function(
gfhub.Function(
parquet2json,
dependencies={"pandas[pyarrow]": "import pandas as pd"},
)
)
print("Function 2 uploaded:")
print(func2)
Function 2 uploaded:
{'id': '019bb975-a0ba-79a0-adca-aad7fda84eca', 'name': 'parquet2json', 'parameters': {'indent': {'default': '2', 'type': 'int'}, 'orient': {'default': '"records"', 'type': 'str'}}, 'inputs': {'path': {'type': 'Path'}}, 'outputs': {'0': {'type': 'Path'}}, 'created_at': '2026-01-13T22:24:12.474722Z', 'updated_at': '2026-01-14T10:24:07.659175Z'}
Step 2: Create Pipeline¶
Now create a pipeline that connects the two functions using the JsonNode/JsonEdge format:
p = gfhub.Pipeline()
p.trigger = nodes.on_file_upload(tags=[".csv"])
p.on_manual_trigger = nodes.on_manual_trigger()
p.load = nodes.load()
p.to_parquet = nodes.function(function="csv2parquet")
p.to_json = nodes.function(function="parquet2json")
p.save = nodes.save()
p += p.trigger >> p.load
p += p.on_manual_trigger >> p.load
p += p.load >> p.to_parquet
p += p.to_parquet >> p.to_json
p += p.to_json >> p.save
# Convert to JSON string and pass to add_pipeline
# update defaults to True
pipeline = client.add_pipeline("csv2json", p)
pipeline["updated_at"]
'2026-01-14T10:24:08.482560Z'
Done!¶
The pipeline is now created. You can manually trigger this pipeline with specific files.
Note: The new pipeline system uses explicit trigger mechanisms rather than automatic tag-based triggering. To run the pipeline: 1. Upload your CSV file 2. Use the trigger API to run the pipeline with the uploaded file ID
For automatic triggering based on tags, you would need to add appropriate trigger nodes to the pipeline definition.