Now we will run a sheet resistance analysis using the device analyses we triggered in the device analysis notebook. Make sure all the analyses from the previous notebook have finished!
import getpass
import json
from pathlib import Path
import gfhub
import matplotlib.pyplot as plt
import numpy as np
from gfhub import nodes
from PIL import Image
from tqdm.auto import tqdm
user = getpass.getuser()
Client¶
Die Analysis¶
This function will aggregate device-level resistance measurements to calculate sheet resistance.
def die_sheet_resistance(
files: list[Path],
tags: list[list[str]],
/,
*,
width_key: str = "width",
length_key: str = "length",
) -> tuple[Path, Path, list[str]]:
# Load resistance data
resistances = []
widths = []
lengths = []
for file, file_tags in zip(files, tags, strict=False):
data = json.loads(file.read_text())
# Extract resistance
resistance = data.get("resistance")
if resistance is None:
continue
# Extract width and length from tags
width = None
length = None
for tag in file_tags:
if tag.startswith(f"{width_key}:"):
width = float(tag.split(":", 1)[1])
elif tag.startswith(f"{length_key}:"):
length = float(tag.split(":", 1)[1])
if width is not None and length is not None:
resistances.append(resistance)
widths.append(width)
lengths.append(length)
if len(resistances) == 0:
msg = "No valid resistance measurements found"
raise ValueError(msg)
# Convert to numpy arrays
resistances = np.array(resistances)
widths = np.array(widths)
lengths = np.array(lengths)
# Calculate R * W / L for each device
# This should be constant and equal to sheet resistance
rw_over_l = resistances * widths / lengths
# Calculate sheet resistance as mean
sheet_resistance = np.mean(rw_over_l)
sheet_resistance_std = np.std(rw_over_l)
# Create plot
# Plot 2: Calculated sheet resistance for each device
plt.scatter(range(len(rw_over_l)), rw_over_l)
plt.axhline(
sheet_resistance,
color="r",
linestyle="--",
label=f"Mean = {sheet_resistance:.2e}",
)
plt.axhline(
sheet_resistance + sheet_resistance_std,
color="orange",
linestyle=":",
alpha=0.7,
)
plt.axhline(
sheet_resistance - sheet_resistance_std,
color="orange",
linestyle=":",
alpha=0.7,
label=f"±1σ = {sheet_resistance_std:.2e}",
)
plt.xlabel("Device Index")
plt.ylabel("Sheet Resistance (Ω/sq)")
plt.legend()
plt.grid(True)
plot_path = files[0].parent / "die_sheet_resistance.png"
plt.savefig(plot_path, bbox_inches="tight", dpi=100)
plt.close()
# Extract die coordinates from tags (format: "die:x,y")
die_x, die_y = None, None
for tag in tags[0]:
if tag.startswith("die:"):
coords = tag.split(":", 1)[1]
die_x, die_y = [int(c) for c in coords.split(",")]
break
# Save results
results = {
"die_x": die_x,
"die_y": die_y,
"sheet_resistance": float(sheet_resistance),
"sheet_resistance_std": float(sheet_resistance_std),
"num_devices": len(resistances),
}
results_path = files[0].parent / "die_sheet_resistance.json"
results_path.write_text(json.dumps(results, indent=2))
return plot_path, results_path
func_def = gfhub.Function(
die_sheet_resistance,
dependencies={
"numpy": "import numpy as np",
"json": "import json",
"matplotlib": "import matplotlib.pyplot as plt",
},
)
Test function¶
analysis_results = client.query_files(
name="*_linear_fit.json", tags=["project:resistance", user]
).groupby("wafer", "die")
key = (wafer, die) = next(iter(analysis_results))
results = analysis_results[key]
paths = [
client.download_file(r["id"], f"./download_{i}.json") for i, r in enumerate(results)
]
tags = [[gfhub.tags.into_string(t) for t in r["tags"].values()] for r in results]
# plot_path, _ = func_def.eval(paths, tags)
plot_path, _ = die_sheet_resistance(paths, tags)
Image.open(plot_path)

{'id': '019bb97d-488f-7c11-82d3-3c084bd31ae5',
'name': 'die_sheet_resistance',
'parameters': {'length_key': {'default': '"length"', 'type': 'str'},
'width_key': {'default': '"width"', 'type': 'str'}},
'inputs': {'files': {'type': 'list[Path]'},
'tags': {'type': 'list[list[str]]'}},
'outputs': {'0': {'type': 'Path'}, '1': {'type': 'Path'}},
'created_at': '2026-01-13T22:32:34.191069Z',
'updated_at': '2026-01-14T10:29:17.763536Z'}
Tag aggregation¶
To accurately tag the output files, we create a simple function to merge common tags in a list of list of tags:
def find_common_tags(
tags: list[list[str]],
/,
) -> list[str]:
common = {}
for _tags in tags:
for t in _tags:
if ":" in t:
key, value = t.split(":", 1)
else:
key, value = t, ""
if key not in common:
common[key] = set()
common[key].add(value)
common_tags = {k: next(iter(v)) for k, v in common.items() if len(v) == 1}
return [
k if not v else f"{k}:{v}"
for k, v in common_tags.items()
if not k.startswith(".")
]
{'id': '019bb946-7cb1-7f63-aa27-98c892cf1558',
'name': 'find_common_tags',
'parameters': {},
'inputs': {'tags': {'type': 'list[list[str]]'}},
'outputs': {'0': {'type': 'Path'}},
'created_at': '2026-01-13T21:32:43.057625Z',
'updated_at': '2026-01-14T10:29:18.337410Z'}
Create pipeline¶
We can now create a pipeline which brings this all together:
p = gfhub.Pipeline()
# a pipeline that takes a list of input paths (as opposed to a single input path)
# cannot be configure to auto-trigger on upload. Therefore we only add a manual trigger:
p.trigger = nodes.on_manual_trigger()
# trigger kicks of a load from S3
p.load_file = nodes.load()
p += p.trigger >> p.load_file
# it also kicks of a load of the tags
p.load_tags = nodes.load_tags()
p += p.trigger >> p.load_tags
# the data file path (now on the local filesystem) as well as the
# tags get passed to the analysis function
p.sheet_resistance = nodes.function(function="die_sheet_resistance")
p += p.load_file >> p.sheet_resistance[0]
p += p.load_tags >> p.sheet_resistance[1]
# we also determine which tags all the data files have in common
p.common_tags = nodes.function(function="find_common_tags")
p += p.load_tags >> p.common_tags
# we save the plot with the common tags
p.save_plot = nodes.save()
p += p.sheet_resistance[0] >> p.save_plot[0]
p += p.common_tags >> p.save_plot[1]
# we save the json with the common tags
p.save_json = nodes.save()
p += p.sheet_resistance[1] >> p.save_json[0]
p += p.common_tags >> p.save_json[1]
# once the pipeline is defined, we can upload it:
confirmation = client.add_pipeline("die_sheet_resistance", p)
Let's upload this pipeline
https://dpd.hub.gdsfactory.com/pipelines/019bbc0d-7dee-79f1-b3e0-ad217e94151a
Trigger pipeline for all dies¶
analysis_results = client.query_files(
name="*_linear_fit.json", tags=["project:resistance", user]
).groupby("wafer", "die")
job_ids = []
for _die_tag, files in tqdm(analysis_results.items()):
# Get file IDs for this die
input_ids = [f["id"] for f in files]
# Trigger pipeline
triggered = client.trigger_pipeline("die_sheet_resistance", input_ids)
job_ids.extend(triggered["job_ids"])
print(f"Triggered {len(job_ids)} die analysis jobs")
0%| | 0/62 [00:00<?, ?it/s]
Triggered 62 die analysis jobs
Wait for completion¶
0%| | 0/62 [00:00<?, ?it/s]
Final plot¶
# Query die analysis plots
die_plots = client.query_files(
name="die_sheet_resistance.png", tags=["project:resistance", user]
)
print(f"Found {len(die_plots)} die analysis plots")
# Display the first plot
if die_plots:
img = Image.open(client.download_file(die_plots[0]["id"]))
display(img.resize((530, 400)))
Found 62 die analysis plots
