A pipeline connects functions together and defines when they run. You can configure a pipeline to trigger automatically when a file with certain tags is uploaded, or to run on demand against any file you choose. This tutorial shows you how to build a pipeline from uploaded functions, upload it to DataLab, and trigger it manually.

Setup

from gfhub import nodes
import gfhub

client = gfhub.Client()

Prerequisites

This tutorial builds directly on tutorial 3. The csv_to_parquet_example function must already be uploaded to your organization. You can confirm it is available by listing your functions.

functions = client.list_functions()
function_names = [f["name"] for f in functions]
print("Available functions:", function_names)

assert "csv_to_parquet_example" in function_names, (
    "csv_to_parquet_example not found. Run tutorial 3 first to upload it."
)
Available functions: ['Test', 'hello_world', 'csv2parquet', 'parquet2json', 'debug_noop_5880f52e', 'plot_parquet', 'csv_to_parquet_example', 'linear_fit', 'propagation_loss_from_cutback_spirals', 'plot_ring_spectrum', 'fsr_wafer_map', 'aggregate_die_analyses', 'spirals_wafer_map', 'plot_spiral_spectrum', 'ring_fsr', 'fsr_for_radius_within_die', 'fsr_wafer_aggregation', 'spiral_power_at_wavelength', 'find_common_tags', 'cutback_die_analysis', 'die_sheet_resistance', 'ring_fsr_batch']

Build the pipeline

A pipeline is a graph of nodes connected by edges. You define it using gfhub.Pipeline and the helper functions in gfhub.nodes.

The available node types are: - nodes.on_file_upload(tags=[...]): triggers the pipeline automatically when a file with matching tags is uploaded - nodes.on_manual_trigger(): allows triggering the pipeline manually against any file - nodes.load(): loads the input file from storage and passes it downstream - nodes.function(function="name"): runs an uploaded function - nodes.save(): saves the output files back to DataLab

You connect nodes using the >> operator and add edges to the pipeline with +=.

p = gfhub.Pipeline()

# Trigger nodes: run automatically on .csv uploads, or manually on demand
p.auto_trigger = nodes.on_file_upload(tags=[".csv"])
p.manual_trigger = nodes.on_manual_trigger()

# Load the input file
p.load = nodes.load()

# Processing step: convert CSV to Parquet using the function from tutorial 3
p.to_parquet = nodes.function(function="csv_to_parquet_example")

# Save the output with a tag so it is easy to find later
p.save = nodes.save(tags=["processed"])

# Connect the graph
p += p.auto_trigger >> p.load
p += p.manual_trigger >> p.load
p += p.load >> p.to_parquet
p += p.to_parquet >> p.save

print(p)
Pipeline(
  nodes={'auto_trigger': Node(name='auto_trigger', type='on_file_upload', config={'tags': ['.csv']}), 'manual_trigger': Node(name='manual_trigger', type='on_manual_trigger', config={}), 'load': Node(name='load', type='load', config={}), 'to_parquet': Node(name='to_parquet', type='function', config={'function': 'csv_to_parquet_example', 'kwargs': {}}), 'save': Node(name='save', type='save', config={'tags': ['processed']})},
  edges=[Edge(
  source={'node': 'auto_trigger', 'output': 0},
  target={'node': 'load', 'input': 0}
), Edge(
  source={'node': 'manual_trigger', 'output': 0},
  target={'node': 'load', 'input': 0}
), Edge(
  source={'node': 'load', 'output': 0},
  target={'node': 'to_parquet', 'input': 0}
), Edge(
  source={'node': 'to_parquet', 'output': 0},
  target={'node': 'save', 'input': 0}
)]
)

Upload the pipeline

Upload the pipeline by giving it a name. If a pipeline with the same name already exists it will be updated. The returned object contains the pipeline id which you can use to enable, disable, or trigger it.

import json

result = client.add_pipeline("csv_to_parquet_pipeline_example", p)
print(json.dumps(result, indent=2, default=str))
{
  "id": "019df3b5-956f-7462-b89c-4c0a1226f093",
  "name": "csv_to_parquet_pipeline_example",
  "nodes": [
    {
      "display_name": "auto_trigger",
      "id": "9e934884-8a1d-4352-8d7d-abe5591ee315",
      "params": {
        "tags": [
          ".csv"
        ]
      },
      "position": [
        0.0,
        50.0
      ],
      "type": "on_file_upload",
      "version": 1
    },
    {
      "display_name": "manual_trigger",
      "id": "67dc2dfb-580a-4951-85c9-ae98c251d584",
      "params": {},
      "position": [
        0.0,
        -50.0
      ],
      "type": "on_manual_trigger",
      "version": 1
    },
    {
      "display_name": "load",
      "id": "65919fb9-39c9-4980-8a15-c6f27451800a",
      "params": {},
      "position": [
        350.0,
        0.0
      ],
      "type": "load",
      "version": 1
    },
    {
      "display_name": "to_parquet",
      "id": "367b1cd9-5ee8-438b-aa56-abc490033640",
      "params": {
        "id": "019d63a9-aa5a-7711-b3fd-22e525fc032f",
        "kwargs": {}
      },
      "position": [
        700.0,
        0.0
      ],
      "type": "function",
      "version": 1
    },
    {
      "display_name": "save",
      "id": "09489c3f-9625-4a2b-9571-96a61477b0b4",
      "params": {
        "tags": [
          "processed"
        ]
      },
      "position": [
        1050.0,
        0.0
      ],
      "type": "save",
      "version": 1
    }
  ],
  "edges": [
    {
      "source": "9e934884-8a1d-4352-8d7d-abe5591ee315",
      "source_channel": 0,
      "target": "65919fb9-39c9-4980-8a15-c6f27451800a",
      "target_channel": 0
    },
    {
      "source": "67dc2dfb-580a-4951-85c9-ae98c251d584",
      "source_channel": 0,
      "target": "65919fb9-39c9-4980-8a15-c6f27451800a",
      "target_channel": 0
    },
    {
      "source": "65919fb9-39c9-4980-8a15-c6f27451800a",
      "source_channel": 0,
      "target": "367b1cd9-5ee8-438b-aa56-abc490033640",
      "target_channel": 0
    },
    {
      "source": "367b1cd9-5ee8-438b-aa56-abc490033640",
      "source_channel": 0,
      "target": "09489c3f-9625-4a2b-9571-96a61477b0b4",
      "target_channel": 0
    }
  ],
  "enabled": true,
  "created_at": "2026-05-04T15:57:37.007195Z",
  "updated_at": "2026-05-04T15:57:37.007195Z"
}

Trigger the pipeline manually

Once the pipeline is uploaded you can run it against any file using trigger_pipeline. You pass the pipeline name and one or more file id values (from query_files or add_file). The call returns a job object that you can poll to track progress.

# Find a CSV file to process
csv_files = client.query_files(tags=[".csv"])

if csv_files:
    file_id = csv_files[0]["id"]
    job = client.trigger_pipeline("csv_to_parquet_pipeline_example", file_id)
    print(f"Job IDs: {job['job_ids']}")

    # trigger_pipeline returns a list of job IDs, one per file
    final = client.wait_for_job(job["job_ids"][0])
    print(f"Status: {final['status']}")
else:
    print("No CSV files found. Upload one first using the upload tutorial.")
Job IDs: ['019df3b5-96c2-7832-b3c3-c3b8c49b9d20']


Status: success