Now we will create wafer maps using the die analysis JSON outputs from the previous notebook.
Make sure: 1. You've run the die analysis pipeline from notebook 4 2. All die analysis jobs have completed successfully 3. The JSON output files are tagged appropriately
Imports¶
import getpass
from pathlib import Path
from gfhub import Client, Pipeline, nodes
from PIL import Image
from tqdm.notebook import tqdm
user = getpass.getuser()
Client¶
Wafer Analysis Function¶
We reuse the aggregate_die_analyses function from the cutback examples. This function reads JSON files containing die_x, die_y, and a metric to aggregate.
# Read the wafer analysis function from cutback examples
script = Path("../cutback/aggregate_die_analyses.py").read_text()
# Create the function in DataLab
client.add_function(name="aggregate_die_analyses", function=script)
{'id': '019bb958-85ca-76d0-b97f-72e663048c04',
'name': 'aggregate_die_analyses',
'parameters': {'max_output': {'default': '0.115', 'type': 'float'},
'min_output': {'default': '0', 'type': 'float'},
'output_key': {'default': '"component_loss"', 'type': 'str'},
'output_name': {'default': '"wafer_map"', 'type': 'str'}},
'inputs': {'files': {'type': 'list[Path]'}},
'outputs': {'0': {'type': 'Path'}},
'created_at': '2026-01-13T21:52:25.034842Z',
'updated_at': '2026-01-14T10:30:17.389254Z'}
Create Wafer Analysis Pipeline¶
Create a pipeline that will aggregate die analysis JSON files into a wafer map.
p = Pipeline()
p.trigger = nodes.on_manual_trigger()
p.load_file = nodes.load()
p.load_tags = nodes.load_tags()
p.find_common_tags = nodes.function(
function="find_common_tags"
) # defined in previous notebook
p.aggregate = nodes.function(
function="aggregate_die_analyses",
kwargs={
"output_key": "sheet_resistance",
"min_output": 80.0,
"max_output": 120.0,
"output_name": "wafer_sheet_resistance",
},
)
p.save = nodes.save()
# Connect trigger to load nodes
p += p.trigger >> p.load_file
p += p.trigger >> p.load_tags
# Connect files to aggregate function
p += p.load_file >> p.aggregate
# Connect to save node
p += p.aggregate >> p.save[0]
# Find common tags and use them for saving
p += p.load_tags >> p.find_common_tags
p += p.find_common_tags >> p.save[1]
client.add_pipeline(name="aggregate_die_analyses", schema=p)["updated_at"]
'2026-01-14T10:30:18.442961Z'
Trigger Pipeline for Each Wafer¶
Query die analysis JSON files grouped by wafer and trigger the wafer analysis pipeline.
# Query all die analysis JSON outputs (from previous notebook)
entries = client.query_files(
name="die_sheet_resistance.json", tags=[user, "project:resistance", "wafer"]
).groupby("wafer")
# Trigger wafer analysis pipeline for each wafer
job_ids = []
for _wafer_tag, group in tqdm(entries.items()):
# Get file IDs for this wafer
input_ids = [props["id"] for props in group]
# Trigger pipeline with list of file IDs
triggered_jobs = client.trigger_pipeline("aggregate_die_analyses", input_ids)
job_ids.extend(triggered_jobs["job_ids"])
print(f"\nTriggered {len(job_ids)} wafer analysis jobs")
0%| | 0/3 [00:00<?, ?it/s]
Triggered 3 wafer analysis jobs
Wait for Completion¶
Wait for all wafer analysis jobs to complete.
0%| | 0/3 [00:00<?, ?it/s]
View Wafer Maps¶
Query and display the generated wafer maps.
# Query the wafer map images
wafer_maps = client.query_files(
name="wafer_sheet_resistance.png",
tags=["project:resistance", user],
)
print(f"Found {len(wafer_maps)} wafer maps")
Found 3 wafer maps
# Display the first wafer map
if len(wafer_maps) > 0:
wafer_map_im = Image.open(client.download_file(wafer_maps[0]["id"]))
display(wafer_map_im)
else:
print("No wafer maps found")
