This notebook demonstrates how to: 1. Upload two Python functions 2. Create a pipeline that connects them

from pathlib import Path

import gfhub
import pandas as pd
from gfhub import nodes
client = gfhub.Client()

Step 1: Upload Two Functions

We'll upload csv2parquet and parquet2json functions.

# Function 1: CSV to Parquet
def csv2parquet(input_file: Path, /) -> dict[str, Path]:
    """Convert a CSV file to Parquet format.

    :param input_file: Path to the input CSV file.
    :return: Dict with 'parquet' key containing the output Parquet file path.
    """
    # Read the CSV file
    df = pd.read_csv(input_file)

    # Define the output path with .parquet extension
    parquet_path = input_file.with_suffix(".parquet")

    # Write the DataFrame to a Parquet file
    df.to_parquet(parquet_path, index=False)

    return {"parquet": parquet_path}


func1 = client.add_function(
    gfhub.Function(
        csv2parquet,
        dependencies={"pandas[pyarrow]": "import pandas as pd"},
    )
)
print("Function 1 uploaded:")
print(func1)
Function 1 uploaded:
{'id': '019bb972-1978-7b11-a786-1a35c71ebc93', 'name': 'csv2parquet', 'parameters': {}, 'inputs': {'input_file': {'type': 'Path'}}, 'outputs': {'parquet': {'type': 'Path'}}, 'created_at': '2026-01-13T22:20:21.240862Z', 'updated_at': '2026-01-14T10:24:07.209418Z'}
# Function 2: Parquet to JSON

def parquet2json(path: Path, /, *, orient: str = "records", indent: int = 2) -> Path:
    """Convert Parquet to JSON."""
    df = pd.read_parquet(path)
    output = path.with_suffix(".json")
    df.to_json(output, orient=orient, indent=indent)
    return output

func2 = client.add_function(
    gfhub.Function(
        parquet2json,
        dependencies={"pandas[pyarrow]": "import pandas as pd"},
    )
)
print("Function 2 uploaded:")
print(func2)
Function 2 uploaded:
{'id': '019bb975-a0ba-79a0-adca-aad7fda84eca', 'name': 'parquet2json', 'parameters': {'indent': {'default': '2', 'type': 'int'}, 'orient': {'default': '"records"', 'type': 'str'}}, 'inputs': {'path': {'type': 'Path'}}, 'outputs': {'0': {'type': 'Path'}}, 'created_at': '2026-01-13T22:24:12.474722Z', 'updated_at': '2026-01-14T10:24:07.659175Z'}

Step 2: Create Pipeline

Now create a pipeline that connects the two functions using the JsonNode/JsonEdge format:

p = gfhub.Pipeline()
p.trigger = nodes.on_file_upload(tags=[".csv"])
p.on_manual_trigger = nodes.on_manual_trigger()
p.load = nodes.load()
p.to_parquet = nodes.function(function="csv2parquet")
p.to_json = nodes.function(function="parquet2json")
p.save = nodes.save()
p += p.trigger >> p.load
p += p.on_manual_trigger >> p.load
p += p.load >> p.to_parquet
p += p.to_parquet >> p.to_json
p += p.to_json >> p.save

# Convert to JSON string and pass to add_pipeline
# update defaults to True
pipeline = client.add_pipeline("csv2json", p)
pipeline["updated_at"]
'2026-01-14T10:24:08.482560Z'

Done!

The pipeline is now created. You can manually trigger this pipeline with specific files.

Note: The new pipeline system uses explicit trigger mechanisms rather than automatic tag-based triggering. To run the pipeline: 1. Upload your CSV file 2. Use the trigger API to run the pipeline with the uploaded file ID

For automatic triggering based on tags, you would need to add appropriate trigger nodes to the pipeline definition.