Now we will run an analysis using the device data we uploaded in the previous notebook.
As before, make sure you have the following environment variables set or added to a .env file:
Imports¶
import getpass
import json
from pathlib import Path
import gfhub
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from gfhub import nodes
from PIL import Image
from tqdm.notebook import tqdm
user = getpass.getuser()
Client¶
Die analysis¶
You can either trigger analysis automatically by defining it in the design manifest, using the UI or using the Python DoData library.
entries = client.query_files(
tags=[
user,
"die:-2,-3",
"device",
"project:cutback",
"cell",
"wafer",
".parquet",
]
)
paths = [
client.download_file(entry["id"], f"file_{i}.parquet")
for i, entry in enumerate(entries)
]
dfs = [pd.read_parquet(path) for path in paths]
tags = [
[
(k if not (p := v.get("parameter_value")) else f"{k}:{p}")
for k, v in entry["tags"].items()
]
for entry in entries
]
num_comps = [
int(next(t.replace("components:", "") for t in ts if t.startswith("components:")))
for ts in tags
]
powers = [df["power [dB]"].max() for df in dfs]
component_loss, insertion_loss = [
-float(x) for x in np.polyfit(num_comps, powers, deg=1)
]
x = np.arange(0, max(num_comps) + 99, 100)
plt.scatter(num_comps, powers, color="C1")
plt.plot(x, -component_loss * x - insertion_loss, color="C0")
plt.grid(visible=True)
plt.xlim(x.min() - 30, x.max() + 30)
plt.title(f"loss = {component_loss:.2e} dB/component")
plt.xlabel("# components")
plt.ylabel("Power [dBm]")
plt.show()

Analysis function¶
We can create our own DataLab function for this visualization:
def cutback_die_analysis(
files: list[Path],
tags: list[list[str]],
/,
*,
output_name: str = "cutback_die_analysis",
) -> tuple[Path, Path]:
"""Cutback die analysis."""
dfs = [pd.read_parquet(file) for file in files]
num_comps = [
int(
next(
t.replace("components:", "") for t in ts if t.startswith("components:")
)
)
for ts in tags
]
powers = [df["power [dB]"].max() for df in dfs]
component_loss, insertion_loss = [
-float(x) for x in np.polyfit(num_comps, powers, deg=1)
]
x = np.arange(0, max(num_comps) + 99, 100)
plt.scatter(num_comps, powers, color="C1")
plt.plot(x, -component_loss * x - insertion_loss, color="C0")
plt.grid(visible=True)
plt.xlim(x.min() - 30, x.max() + 30)
plt.title(f"loss = {component_loss:.2e} dB/component")
plt.xlabel("# components")
plt.ylabel("Power [dBm]")
path_plot = files[0].parent / f"{output_name}.png"
plt.savefig(path_plot, bbox_inches="tight")
path_json = files[0].parent / f"{output_name}.json"
die_x, die_y = [
int(xy)
for xy in {k.split(":")[0]: k.split(":")[1] for k in tags[0] if ":" in k}[
"die"
].split(",")
]
output = {
"die_x": die_x,
"die_y": die_y,
"component_loss": None if not np.isfinite(component_loss) else component_loss,
"insertion_loss": None if not np.isfinite(insertion_loss) else insertion_loss,
}
path_json.write_text(json.dumps(output))
return path_plot, path_json
func_def = gfhub.Function(
cutback_die_analysis,
dependencies={
"numpy": "import numpy as np",
"pandas[pyarrow]": "import pandas as pd",
"json": "import json",
"matplotlib": "import matplotlib.pyplot as plt",
},
)

The function works, so let's upload it:
{'id': '019bb969-762c-7a20-921f-380bc86513dc',
'name': 'cutback_die_analysis',
'parameters': {'output_name': {'default': '"cutback_die_analysis"',
'type': 'str'}},
'inputs': {'files': {'type': 'list[Path]'},
'tags': {'type': 'list[list[str]]'}},
'outputs': {'0': {'type': 'Path'}, '1': {'type': 'Path'}},
'created_at': '2026-01-13T22:10:55.148981Z',
'updated_at': '2026-01-14T10:28:57.512623Z'}
Tag aggregation¶
To accurately tag the output files, we create a simple function to merge common tags in a list of list of tags:
def find_common_tags(
tags: list[list[str]],
/,
) -> list[str]:
common = {}
for _tags in tags:
for t in _tags:
if ":" in t:
key, value = t.split(":", 1)
else:
key, value = t, ""
if key not in common:
common[key] = set()
common[key].add(value)
common_tags = {k: next(iter(v)) for k, v in common.items() if len(v) == 1}
return [
k if not v else f"{k}:{v}"
for k, v in common_tags.items()
if not k.startswith(".")
]
Let's test this on the tags we loaded earlier:
['project:cutback', 'wafer:wafer1', 'die:-2,-3', 'T:25.0', 'runner']
{'id': '019bb946-7cb1-7f63-aa27-98c892cf1558',
'name': 'find_common_tags',
'parameters': {},
'inputs': {'tags': {'type': 'list[list[str]]'}},
'outputs': {'0': {'type': 'Path'}},
'created_at': '2026-01-13T21:32:43.057625Z',
'updated_at': '2026-01-14T10:28:58.073228Z'}
A simple pipeline:¶
We can wrap this function in a pipeline:
p = gfhub.Pipeline()
# a pipeline that takes a list of input paths (as opposed to a single input path)
# cannot be configure to auto-trigger on upload. Therefore we only add a manual trigger:
p.trigger = nodes.on_manual_trigger()
# trigger kicks of a load from S3
p.load_file = nodes.load()
p += p.trigger >> p.load_file
# it also kicks of a load of the tags
p.load_tags = nodes.load_tags()
p += p.trigger >> p.load_tags
# the data file path (now on the local filesystem) as well as the
# tags get passed to the analysis function
p.cutback_die_analysis = nodes.function(function="cutback_die_analysis")
p += p.load_file >> p.cutback_die_analysis[0]
p += p.load_tags >> p.cutback_die_analysis[1]
# we also determine which tags all the data files have in common
p.common_tags = nodes.function(function="find_common_tags")
p += p.load_tags >> p.common_tags
# we save the plot with the common tags
p.save_plot = nodes.save()
p += p.cutback_die_analysis[0] >> p.save_plot[0]
p += p.common_tags >> p.save_plot[1]
# we save the json with the common tags
p.save_json = nodes.save()
p += p.cutback_die_analysis[1] >> p.save_json[0]
p += p.common_tags >> p.save_json[1]
# once the pipeline is defined, we can upload it:
confirmation = client.add_pipeline(name="cutback_die_analysis", schema=p)
You can inspect the pipeline here after upload:
'https://dpd.hub.gdsfactory.com/pipelines/019bbc0d-2e66-7d91-9927-ef32f0db252b'
Trigger Pipeline¶
To trigger the pipeline we can do a groupby and trigger it on groups of equal dies.
entries = client.query_files(
tags=[user, "die", "device", "project:cutback", "cell", "wafer", ".parquet"]
).groupby("die")
job_ids = []
for _tag, group in tqdm(entries.items()):
input_ids = [props["id"] for props in group]
triggered_jobs = client.trigger_pipeline("cutback_die_analysis", input_ids)
job_ids.extend(triggered_jobs["job_ids"])
0%| | 0/45 [00:00<?, ?it/s]
0%| | 0/45 [00:00<?, ?it/s]