Now we will run an analysis using the device data we uploaded in the previous notebook.

As before, make sure you have the following environment variables set or added to a .env file:

Imports

import getpass
import json
from pathlib import Path

import gfhub
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from gfhub import nodes
from PIL import Image
from tqdm.notebook import tqdm

user = getpass.getuser()

Client

client = gfhub.Client()

Die analysis

You can either trigger analysis automatically by defining it in the design manifest, using the UI or using the Python DoData library.

entries = client.query_files(
    tags=[
        user,
        "die:-2,-3",
        "device",
        "project:cutback",
        "cell",
        "wafer",
        ".parquet",
    ]
)
paths = [
    client.download_file(entry["id"], f"file_{i}.parquet")
    for i, entry in enumerate(entries)
]
dfs = [pd.read_parquet(path) for path in paths]
tags = [
    [
        (k if not (p := v.get("parameter_value")) else f"{k}:{p}")
        for k, v in entry["tags"].items()
    ]
    for entry in entries
]
num_comps = [
    int(next(t.replace("components:", "") for t in ts if t.startswith("components:")))
    for ts in tags
]
powers = [df["power [dB]"].max() for df in dfs]

component_loss, insertion_loss = [
    -float(x) for x in np.polyfit(num_comps, powers, deg=1)
]
x = np.arange(0, max(num_comps) + 99, 100)
plt.scatter(num_comps, powers, color="C1")
plt.plot(x, -component_loss * x - insertion_loss, color="C0")
plt.grid(visible=True)
plt.xlim(x.min() - 30, x.max() + 30)
plt.title(f"loss = {component_loss:.2e} dB/component")
plt.xlabel("# components")
plt.ylabel("Power [dBm]")
plt.show()

png

Analysis function

We can create our own DataLab function for this visualization:

def cutback_die_analysis(
    files: list[Path],
    tags: list[list[str]],
    /,
    *,
    output_name: str = "cutback_die_analysis",
) -> tuple[Path, Path]:
    """Cutback die analysis."""
    dfs = [pd.read_parquet(file) for file in files]
    num_comps = [
        int(
            next(
                t.replace("components:", "") for t in ts if t.startswith("components:")
            )
        )
        for ts in tags
    ]

    powers = [df["power [dB]"].max() for df in dfs]
    component_loss, insertion_loss = [
        -float(x) for x in np.polyfit(num_comps, powers, deg=1)
    ]
    x = np.arange(0, max(num_comps) + 99, 100)
    plt.scatter(num_comps, powers, color="C1")
    plt.plot(x, -component_loss * x - insertion_loss, color="C0")
    plt.grid(visible=True)
    plt.xlim(x.min() - 30, x.max() + 30)
    plt.title(f"loss = {component_loss:.2e} dB/component")
    plt.xlabel("# components")
    plt.ylabel("Power [dBm]")
    path_plot = files[0].parent / f"{output_name}.png"
    plt.savefig(path_plot, bbox_inches="tight")
    path_json = files[0].parent / f"{output_name}.json"
    die_x, die_y = [
        int(xy)
        for xy in {k.split(":")[0]: k.split(":")[1] for k in tags[0] if ":" in k}[
            "die"
        ].split(",")
    ]
    output = {
        "die_x": die_x,
        "die_y": die_y,
        "component_loss": None if not np.isfinite(component_loss) else component_loss,
        "insertion_loss": None if not np.isfinite(insertion_loss) else insertion_loss,
    }
    path_json.write_text(json.dumps(output))
    return path_plot, path_json
func_def = gfhub.Function(
    cutback_die_analysis,
    dependencies={
        "numpy": "import numpy as np",
        "pandas[pyarrow]": "import pandas as pd",
        "json": "import json",
        "matplotlib": "import matplotlib.pyplot as plt",
    },
)
result = func_def.eval(paths, tags)
Image.open(result["output"][0])

png

The function works, so let's upload it:

client.add_function(func_def)
{'id': '019bb969-762c-7a20-921f-380bc86513dc',
 'name': 'cutback_die_analysis',
 'parameters': {'output_name': {'default': '"cutback_die_analysis"',
   'type': 'str'}},
 'inputs': {'files': {'type': 'list[Path]'},
  'tags': {'type': 'list[list[str]]'}},
 'outputs': {'0': {'type': 'Path'}, '1': {'type': 'Path'}},
 'created_at': '2026-01-13T22:10:55.148981Z',
 'updated_at': '2026-01-14T10:28:57.512623Z'}

Tag aggregation

To accurately tag the output files, we create a simple function to merge common tags in a list of list of tags:

def find_common_tags(
    tags: list[list[str]],
    /,
) -> list[str]:
    common = {}
    for _tags in tags:
        for t in _tags:
            if ":" in t:
                key, value = t.split(":", 1)
            else:
                key, value = t, ""
            if key not in common:
                common[key] = set()
            common[key].add(value)
    common_tags = {k: next(iter(v)) for k, v in common.items() if len(v) == 1}
    return [
        k if not v else f"{k}:{v}"
        for k, v in common_tags.items()
        if not k.startswith(".")
    ]

Let's test this on the tags we loaded earlier:

find_common_tags(tags)
['project:cutback', 'wafer:wafer1', 'die:-2,-3', 'T:25.0', 'runner']
client.add_function(find_common_tags)
{'id': '019bb946-7cb1-7f63-aa27-98c892cf1558',
 'name': 'find_common_tags',
 'parameters': {},
 'inputs': {'tags': {'type': 'list[list[str]]'}},
 'outputs': {'0': {'type': 'Path'}},
 'created_at': '2026-01-13T21:32:43.057625Z',
 'updated_at': '2026-01-14T10:28:58.073228Z'}

A simple pipeline:

We can wrap this function in a pipeline:

p = gfhub.Pipeline()

# a pipeline that takes a list of input paths (as opposed to a single input path)
# cannot be configure to auto-trigger on upload. Therefore we only add a manual trigger:
p.trigger = nodes.on_manual_trigger()

# trigger kicks of a load from S3
p.load_file = nodes.load()
p += p.trigger >> p.load_file

# it also kicks of a load of the tags
p.load_tags = nodes.load_tags()
p += p.trigger >> p.load_tags

# the data file path (now on the local filesystem) as well as the
# tags get passed to the analysis function
p.cutback_die_analysis = nodes.function(function="cutback_die_analysis")
p += p.load_file >> p.cutback_die_analysis[0]
p += p.load_tags >> p.cutback_die_analysis[1]

# we also determine which tags all the data files have in common
p.common_tags = nodes.function(function="find_common_tags")
p += p.load_tags >> p.common_tags

# we save the plot with the common tags
p.save_plot = nodes.save()
p += p.cutback_die_analysis[0] >> p.save_plot[0]
p += p.common_tags >> p.save_plot[1]

# we save the json with the common tags
p.save_json = nodes.save()
p += p.cutback_die_analysis[1] >> p.save_json[0]
p += p.common_tags >> p.save_json[1]

# once the pipeline is defined, we can upload it:
confirmation = client.add_pipeline(name="cutback_die_analysis", schema=p)

You can inspect the pipeline here after upload:

client.pipeline_url(confirmation["id"])
'https://dpd.hub.gdsfactory.com/pipelines/019bbc0d-2e66-7d91-9927-ef32f0db252b'

Trigger Pipeline

To trigger the pipeline we can do a groupby and trigger it on groups of equal dies.

entries = client.query_files(
    tags=[user, "die", "device", "project:cutback", "cell", "wafer", ".parquet"]
).groupby("die")
job_ids = []
for _tag, group in tqdm(entries.items()):
    input_ids = [props["id"] for props in group]
    triggered_jobs = client.trigger_pipeline("cutback_die_analysis", input_ids)
    job_ids.extend(triggered_jobs["job_ids"])
  0%|          | 0/45 [00:00<?, ?it/s]
client.wait_for_jobs(job_ids);
  0%|          | 0/45 [00:00<?, ?it/s]