Now we will run an IV analysis on the device data we uploaded in the previous notebook using pipelines.

import getpass
import json
from pathlib import Path

import gfhub
import matplotlib.pyplot as plt
import pandas as pd
from gfhub import nodes
from PIL import Image
from scipy import stats
from tqdm.auto import tqdm

user = getpass.getuser()

Client

client = gfhub.Client()

Analysis function

We can make an analysis function that runs on the data we just uploaded. This function does a linear fit between two columns in a dataframe and spits out a plot and a json with the fit parameters:

def linear_fit(
    path: Path,
    /,
    *,
    xname: str,
    yname: str,
    slopename: str = "resistance",
    xlabel: str = "",
    ylabel: str = "",
) -> tuple[Path, Path]:
    """Perform linear fit on IV data to extract resistance.

    Args:
        path: Path to parquet file with IV data
        xname: Column name for x-axis (independent variable)
        yname: Column name for y-axis (dependent variable)
        slopename: Name for the extracted slope parameter
        xlabel: Label for x-axis in plot
        ylabel: Label for y-axis in plot

    Returns:
        Tuple of (plot_path, results_path)
    """
    # Load data
    df = pd.read_parquet(path)
    x = df[xname].values
    y = df[yname].values

    # Perform linear fit
    slope, intercept, r_value, p_value, std_err = stats.linregress(x, y)

    # Create plot
    _fig, ax = plt.subplots(figsize=(8, 6))
    ax.scatter(x, y, alpha=0.6, label="Data")
    ax.plot(
        x, slope * x + intercept, "r-", label=f"Fit: y = {slope:.3e}x + {intercept:.3e}"
    )
    xlabel = xlabel or xname
    ylabel = ylabel or yname
    ax.set_xlabel(xlabel)
    ax.set_ylabel(ylabel)
    ax.set_title(f"{slopename} = {slope:.3e} (R² = {r_value**2:.4f})")
    ax.legend()
    ax.grid(True)

    # Save plot
    plot_path = path.with_name(path.stem + "_linear_fit.png")
    plt.savefig(plot_path, bbox_inches="tight", dpi=100)
    plt.close()

    # Save results as JSON
    results = {
        slopename: float(slope),
        "intercept": float(intercept),
        "r_squared": float(r_value**2),
        "p_value": float(p_value),
        "std_err": float(std_err),
    }

    results_path = path.with_name(path.stem + "_linear_fit.json")
    results_path.write_text(json.dumps(results, indent=2))

    return plot_path, results_path

To make this function runnable on the server, we need to supply it with dependencies:

func_def = gfhub.Function(
    linear_fit,
    dependencies={
        "pathlib": "from pathlib import Path",
        "json": "import json",
        "matplotlib": "import matplotlib.pyplot as plt",
        "numpy": "import numpy as np",
        "pandas[pyarrow]": "import pandas as pd",
        "scipy": "from scipy import stats",
    },
)

Let's test this function definition locally by running it in much the same way as the server will run it. We recommend doing this to prevent uploading broken functions.

# this should have been created in the previous notebook:
path = Path("last_measurement.parquet").resolve()

result = func_def.eval(path, xname="current_mA", yname="voltage_mV")
print(result)
Image.open(result["output"][0])
{'success': True, 'output': (PosixPath('/home/runner/work/DataLab/DataLab/crates/sdk/examples/resistance/last_measurement_linear_fit.png'), PosixPath('/home/runner/work/DataLab/DataLab/crates/sdk/examples/resistance/last_measurement_linear_fit.json'))}

png

Once, confirmed it works as desired, we upload this function definition to the server:

client.add_function(func_def)
{'id': '019bb97a-53e9-7862-8d30-0da453b3f167',
 'name': 'linear_fit',
 'parameters': {'slopename': {'default': '"resistance"', 'type': 'str'},
  'xlabel': {'default': '""', 'type': 'str'},
  'xname': {'type': 'str'},
  'ylabel': {'default': '""', 'type': 'str'},
  'yname': {'type': 'str'}},
 'inputs': {'path': {'type': 'Path'}},
 'outputs': {'0': {'type': 'Path'}, '1': {'type': 'Path'}},
 'created_at': '2026-01-13T22:29:20.489553Z',
 'updated_at': '2026-01-14T10:26:58.555096Z'}

Create pipeline

Let's create a pipeline which invokes this function. This pipeline will essentially generate a .png (the plot) and a .json (the fit result) and link it to the source .parquet file that we're about to upload. By enabling the pipeline, anytime we upload additional parquet files with the right set of tags the pipeline will be triggered.

p = gfhub.Pipeline()

# we can manually trigger the pipeline
p.trigger = nodes.on_manual_trigger()

# or it will auto trigger when a file with these tags gets uploaded.
# note that some of these tags don't have specified parameters values,
# which means the trigger will activate for any of those values.
p.auto_trigger = nodes.on_file_upload(
    tags=[
        ".parquet",
        user,
        "project:resistance",
        "wafer",
        "die",
        "cell",
        "device",
        "length",
        "width",
    ]
)

# the triggers should trigger a file load and a tags load:
p.load_file = nodes.load()
p.load_tags = nodes.load_tags()

# We connect nodes together with the `>>` operator:
p += p.trigger >> p.load_file
p += p.trigger >> p.load_tags
p += p.auto_trigger >> p.load_file
p += p.auto_trigger >> p.load_tags

# after the file is loaded on disk, we'd like to run the analysis function:
p.fit = nodes.function(
    function="linear_fit",
    kwargs={
        "xname": "current_mA",
        "yname": "voltage_mV",
        "slopename": "resistance",
    },
)
p += p.load_file >> p.fit

# the fit function has two outputs: the plot and the json:
p.save_plot = nodes.save()
p.save_json = nodes.save()
# when a node has multiple ports they can be found using their index.
p += p.fit[0] >> p.save_plot[0]
p += p.fit[1] >> p.save_json[0]

# the save nodes also have an input for tags:
p += p.load_tags >> p.save_plot[1]
p += p.load_tags >> p.save_json[1]

pipeline_id = client.add_pipeline(name="device_linear_fit", schema=p)["id"]

In human language, this pipeline will auto-gtrigger when a .parquet file with the right set of tags gets uploaded. However, we can also manually trigger it on any file.

These triggers activate two load operations: load and load_tags. The first one saves the matching file on dist and the second one loads its associated tags (which we might want to use to save the function result with).

The path to the file is then given to the function, which we wrote above. This function returns the path to the plot (index 0) and the path to a json containing the fit parameters (index 1).

Both of these artifacts are then saved with separate save nodes. Save nodes have two input ports. One for the file to save (index 0) and one optional one for the tags to add to the file (index 1).

The pipeline can be viewed here. This nice visual representation should help validating that everything looks correct.

client.pipeline_url(pipeline_id)
'https://dpd.hub.gdsfactory.com/pipelines/019bbc0b-5b8d-70b2-91b1-692b4947a99d'

Trigger analysis for all devices

Even though we configured this pipeline to run automatically on new files, we haven't run it yet for the files that we already uploaded in the previous notebook. Let's quickly trigger it for all previously uploaded files.

device_files = client.query_files(
    tags=[
        ".parquet",
        user,
        "project:resistance",
        "wafer",
        "die",
        "cell",
        "device",
        "length",
        "width",
    ]
)
print(f"Found {len(device_files)} device files")
Found 180 device files
job_ids = []
for device_file in tqdm(device_files):
    triggered = client.trigger_pipeline("device_linear_fit", device_file["id"])
    job_ids.extend(triggered["job_ids"])

print(f"Triggered {len(job_ids)} analysis jobs")
  0%|          | 0/180 [00:00<?, ?it/s]


Triggered 180 analysis jobs

Wait for completion

jobs = client.wait_for_jobs(job_ids)
  0%|          | 0/180 [00:00<?, ?it/s]

View results

# Query analysis plots
analysis_plots = client.query_files(
    name="*_linear_fit.png", tags=["project:resistance", user]
)

print(f"Found {len(analysis_plots)} analysis plots")

# Display the first plot
if analysis_plots:
    img = Image.open(client.download_file(analysis_plots[0]["id"]))
    display(img.resize((530, 400)))
Found 355 analysis plots

png

Query analysis results (JSON files)

# Query JSON results
analysis_results = client.query_files(
    name="*_linear_fit.json", tags=["project:resistance", user]
)

print(f"Found {len(analysis_results)} analysis result files")
Found 348 analysis result files

First result:

if analysis_results:
    result_buf = client.download_file(analysis_results[0]["id"])
    result_data = json.load(result_buf)
    print(json.dumps(result_data, indent=2))
{
  "resistance": 504.136556655104,
  "intercept": 8.924100570512564,
  "r_squared": 0.9996543601090734,
  "p_value": 2.365081284019544e-34,
  "std_err": 2.150593791437091
}