In the previous notebook each spiral device got a single number: the transmitted power at 1550 nm. Now we aggregate those per-device measurements to the die level and extract the propagation loss. Assembling the right files for each fit group is the step that normally requires either careful filename parsing or a separate lookup table. Here we query by tags and let the grouping happen on the server.
The physical model is straightforward. For a waveguide of length L:
P(L) [dBm] = P0 [dBm] - alpha [dB/cm] * L [cm]
where P0 is the power at zero length (set entirely by the grating coupler insertion loss) and alpha is the propagation loss we want to measure. A linear fit of P_dBm vs. L gives alpha as the magnitude of the slope.
The grouping key is (die, waveguide type, width). We cannot mix widths, because each width has a different loss. We cannot mix waveguide types either (rib and ridge have different etch depths and different loss values). Each group provides six points (one per spiral length, from 0 to 25 mm), which is enough for a reliable linear fit.
The function also extracts the grating coupler insertion loss from the fit intercept at L = 0, which is useful for process monitoring independent of waveguide loss.
Setup¶
import getpass
import json
import shutil
from pathlib import Path
import gfhub
import matplotlib.pyplot as plt
import numpy as np
from gfhub import nodes
from PIL import Image
from tqdm.auto import tqdm
client = gfhub.Client()
user = getpass.getuser()
print(f"Running as user: {user}")
Running as user: runner
Load sample data¶
We download the device analysis JSONs for one (die, type, width) group to verify the data before defining the pipeline function.
entries = client.query_files(
name="*_analysis.json",
tags=["project:tutorial_spirals", user, "die:0,0", "waveguide_type:rib", "width_nm:500"],
)
tags = [
[gfhub.tags.into_string(t) for t in entry["tags"].values()]
for entry in entries
]
sample_dir = Path("sample_die")
shutil.rmtree(sample_dir, ignore_errors=True)
sample_dir.mkdir()
sample_paths = []
for entry in entries:
dest = sample_dir / f"{entry['id']}.json"
client.download_file(entry["id"], dest)
sample_paths.append(dest)
print(f"Downloaded {len(sample_paths)} files for die (0,0), rib, 500 nm")
print("Sample tags:", tags[0])
Downloaded 12 files for die (0,0), rib, 500 nm
Sample tags: ['project:tutorial_spirals', 'wafer:wafer_tutorial', 'die:0,0', 'cell:cutback_rib_assembled_MFalse_W0p5_L0', '.json', 'runner', 'waveguide_type:rib', 'width_nm:500', 'length_um:0']
Defining the die-level propagation loss function¶
propagation_loss_from_cutback_spirals receives all the device analysis JSON files for one (die, waveguide type, width) group. It reads power_dBm from each JSON and reads length_um from the file's tags (not from inside the JSON), then performs a linear fit.
The slope of the fit is in dB/µm. We multiply by 1e4 to convert to dB/cm, then negate to get a positive loss value.
The waveguide type and width are also read from tags so the output filenames and plot titles are descriptive.
def propagation_loss_from_cutback_spirals(
files: list[Path],
tags: list[list[str]],
/,
) -> tuple[Path, Path]:
"""Fit propagation loss from spiral measurements at different lengths.
Reads power_dBm from each JSON file and length_um from the corresponding tags,
then fits P_dBm = P0 - alpha * L to extract alpha in dB/cm.
"""
# Collect (length_um, power_dBm) pairs from files and their tags
lengths_um = []
powers_dBm = []
for file, file_tags in zip(files, tags):
data = json.loads(file.read_text())
power_dBm = data.get("power_dBm")
if power_dBm is None:
continue
length_um = next(
(float(t.split(":", 1)[1]) for t in file_tags if t.startswith("length_um:")),
None,
)
if length_um is not None:
lengths_um.append(length_um)
powers_dBm.append(power_dBm)
if len(lengths_um) < 2:
msg = "Need at least 2 measurements for a propagation loss fit"
raise ValueError(msg)
lengths_um = np.array(lengths_um)
powers_dBm = np.array(powers_dBm)
# Sort by length so the plot looks clean
order = np.argsort(lengths_um)
lengths_um = lengths_um[order]
powers_dBm = powers_dBm[order]
# Linear fit: slope is in dB/µm, convert to dB/cm
coeffs = np.polyfit(lengths_um, powers_dBm, deg=1)
slope_dB_per_um = coeffs[0] # negative value
intercept_dBm = coeffs[1] # grating coupler reference power at L=0
alpha_dBcm = -slope_dB_per_um * 1e4 # propagation loss in dB/cm
# R-squared of the fit
fitted = np.polyval(coeffs, lengths_um)
ss_res = float(np.sum((powers_dBm - fitted) ** 2))
ss_tot = float(np.sum((powers_dBm - powers_dBm.mean()) ** 2))
r_squared = 1.0 - ss_res / ss_tot if ss_tot > 0 else 0.0
# Read die coordinates, width, and waveguide type from the first file's tags
die_x, die_y = None, None
width_nm = None
waveguide_type = None
for t in tags[0]:
if t.startswith("die:"):
die_x, die_y = [int(v) for v in t.split(":", 1)[1].split(",")]
elif t.startswith("width_nm:"):
width_nm = int(t.split(":", 1)[1])
elif t.startswith("waveguide_type:"):
waveguide_type = t.split(":", 1)[1]
fig, ax = plt.subplots(figsize=(8, 5))
ax.scatter(lengths_um * 1e-4, powers_dBm, s=60, zorder=5, label="measured")
length_fit = np.linspace(0, lengths_um.max(), 100)
ax.plot(
length_fit * 1e-4,
np.polyval(coeffs, length_fit),
"r--",
label=f"fit: {alpha_dBcm:.2f} dB/cm (R²={r_squared:.3f})",
)
ax.set_xlabel("Length (cm)")
ax.set_ylabel("Power (dBm)")
ax.set_title(
f"Die ({die_x}, {die_y}), {waveguide_type}, {width_nm} nm — propagation loss"
)
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
stem = f"propagation_loss_{waveguide_type}_{width_nm}nm"
plot_path = files[0].parent / f"{stem}.png"
plt.savefig(plot_path, bbox_inches="tight", dpi=100)
plt.close()
results = {
"die_x": die_x,
"die_y": die_y,
"propagation_loss": float(alpha_dBcm),
"r_squared": float(r_squared),
"grating_coupler_loss_dBm": float(intercept_dBm),
"num_devices": len(lengths_um),
"width_nm": width_nm,
"waveguide_type": waveguide_type,
}
results_path = files[0].parent / f"{stem}.json"
results_path.write_text(json.dumps(results, indent=2))
return plot_path, results_path
func_def = gfhub.Function(
propagation_loss_from_cutback_spirals,
dependencies={
"json": "import json",
"numpy": "import numpy as np",
"matplotlib": "import matplotlib.pyplot as plt",
},
)
Testing locally¶
We test the function on the sample files downloaded above. This runs the function in a sandboxed uv environment with func_def.eval(), the same way it will run on the server. For a tuple-return function, result["output"] is a list with one element per return value. The plot should show a clear downward trend of power vs. length, with the fit line capturing the propagation loss slope.
result = func_def.eval(sample_paths, tags)
plot_path, json_path = result["output"]
print(json_path.read_text())
Image.open(plot_path)
{
"die_x": 0,
"die_y": 0,
"propagation_loss": 2.2581782174984606,
"r_squared": 0.996783162539795,
"grating_coupler_loss_dBm": -6.140679357450991,
"num_devices": 12,
"width_nm": 500,
"waveguide_type": "rib"
}

propagation_loss_from_cutback_spirals uploaded.
Common tags¶
The outputs for each (die, type, width) group should be tagged with the provenance shared by all devices in that group: die, waveguide_type, width_nm, wafer, and project. Tags that vary between devices — in particular length_um and cell — should not appear on the aggregated output. find_common_tags handles this by keeping only tags where every input file agrees on the same value. Extension tags (keys starting with .) are always excluded.
This is the same helper used in the cutback, resistance, and rings series. It is re-uploaded here so this tutorial can be followed independently.
def find_common_tags(tags: list[list[str]], /) -> list[str]:
"""Return only the tags that are identical across all input files."""
common: dict[str, set] = {}
for _tags in tags:
for t in _tags:
if ":" in t:
key, value = t.split(":", 1)
else:
key, value = t, ""
common.setdefault(key, set()).add(value)
agreed = {k: next(iter(v)) for k, v in common.items() if len(v) == 1}
return [k if not v else f"{k}:{v}" for k, v in agreed.items() if not k.startswith(".")]
client.add_function(find_common_tags)
print("find_common_tags uploaded.")
find_common_tags uploaded.
Creating the pipeline¶
This pipeline uses a manual trigger only. There is no meaningful auto-trigger here because the pipeline needs a group of files (all lengths for one die and width), not a single file. The function is registered server-side, so re-running the fit for a new wafer does not require anyone to re-upload or re-configure the analysis. Grouping happens in the trigger loop below, where we pass a list of file IDs per group.
p = gfhub.Pipeline()
p.trigger = nodes.on_manual_trigger()
p.load_file = nodes.load()
p.load_tags = nodes.load_tags()
p += p.trigger >> p.load_file
p += p.trigger >> p.load_tags
p.propagation_loss = nodes.function(function="propagation_loss_from_cutback_spirals")
p += p.load_file >> p.propagation_loss[0]
p += p.load_tags >> p.propagation_loss[1]
p.common_tags = nodes.function(function="find_common_tags")
p += p.load_tags >> p.common_tags
p.save_plot = nodes.save()
p.save_json = nodes.save()
p += p.propagation_loss[0] >> p.save_plot[0]
p += p.common_tags >> p.save_plot[1]
p += p.propagation_loss[1] >> p.save_json[0]
p += p.common_tags >> p.save_json[1]
confirmation = client.add_pipeline(name="spiral_propagation_loss", schema=p)
print(f"Pipeline ready: {client.pipeline_url(confirmation['id'])}")
Pipeline ready: https://api.dev.gdsfactory.com/pipelines/019df3c3-359a-7532-9703-0bc913a30bd1
Trigger per (die, waveguide type, width) group¶
We group the device analysis JSONs by all three dimensions at once. Grouping by (die, waveguide_type, width_nm) works because all three were set as tags at upload time. Without tags, assembling the right files for each fit group would require parsing filenames or querying a separate database. Each group has six files (one per spiral length), and each group gets its own pipeline job.
analysis_files = client.query_files(
name="*_analysis.json",
tags=["project:tutorial_spirals", user],
).groupby("die", "waveguide_type", "width_nm")
print(f"Found {len(analysis_files)} (die, type, width) groups")
job_ids = []
for group_key, files in tqdm(analysis_files.items()):
input_ids = [f["id"] for f in files]
triggered = client.trigger_pipeline("spiral_propagation_loss", input_ids)
job_ids.extend(triggered["job_ids"])
print(f"Triggered {len(job_ids)} die analysis jobs")
Found 24 (die, type, width) groups
0%| | 0/24 [00:00<?, ?it/s]
Triggered 24 die analysis jobs
jobs = client.wait_for_jobs(job_ids)
print(f"All jobs complete. Statuses: {set(j['status'] for j in jobs)}")
0%| | 0/24 [00:00<?, ?it/s]
All jobs complete. Statuses: {'success'}
View a sample result¶
die_plots = client.query_files(name="propagation_loss_*.png", tags=["project:tutorial_spirals", user])
print(f"Found {len(die_plots)} propagation loss plots")
if die_plots:
img = Image.open(client.download_file(die_plots[0]["id"]))
display(img.resize((530, 400)))
Found 24 propagation loss plots

What's next?¶
Each (die, waveguide type, width) combination now has a JSON with the fitted propagation loss in dB/cm and the die coordinates. Notebooks 5 and 6 aggregate these into spatial wafer maps — notebook 5 for rib waveguides, notebook 6 for ridge waveguides — so you can see whether loss is uniform across the wafer or shows a systematic gradient.