Now we will create wafer maps using the die analysis JSON outputs from the previous notebook.
Make sure: 1. You've run the die analysis pipeline from notebook 3 2. All die analysis jobs have completed successfully 3. The JSON output files are tagged appropriately
Imports¶
import getpass
from pathlib import Path
from gfhub import Client, Pipeline, nodes
from PIL import Image
from tqdm.notebook import tqdm
user = getpass.getuser()
Client¶
Wafer Analysis Function¶
When a function gets large it might be useful to write the uv-script yourself. This is what we're doing here. The content of the uv script can just be passed directly instead of supplying a gfhub.Function object.
# Read the wafer analysis function
script = Path("aggregate_die_analyses.py").read_text()
# Create the function in DataLab
client.add_function(name="aggregate_die_analyses", function=script)
{'id': '019bb958-85ca-76d0-b97f-72e663048c04',
'name': 'aggregate_die_analyses',
'parameters': {'max_output': {'default': '0.115', 'type': 'float'},
'min_output': {'default': '0', 'type': 'float'},
'output_key': {'default': '"component_loss"', 'type': 'str'},
'output_name': {'default': '"wafer_map"', 'type': 'str'}},
'inputs': {'files': {'type': 'list[Path]'}},
'outputs': {'0': {'type': 'Path'}},
'created_at': '2026-01-13T21:52:25.034842Z',
'updated_at': '2026-01-14T10:29:47.113166Z'}
Create Wafer Analysis Pipeline¶
Now create a pipeline that will aggregate die analysis JSON files into a wafer map.
p = Pipeline()
p.trigger = nodes.on_manual_trigger()
p.load_file = nodes.load()
p.load_tags = nodes.load_tags()
p.find_common_tags = nodes.function(
function="find_common_tags"
) # this function was defined in the previous notebook
p.aggregate = nodes.function(
function="aggregate_die_analyses",
kwargs={
"output_key": "component_loss",
"min_output": 0.0,
"max_output": 0.115,
"output_name": "wafer_map",
},
)
p.save = nodes.save()
p += p.trigger >> p.load_file
p += p.trigger >> p.load_tags
p += p.load_file >> p.aggregate
p += p.aggregate >> p.save[0]
p += p.load_tags >> p.find_common_tags
p += p.find_common_tags >> p.save[1]
client.add_pipeline(name="aggregate_die_analyses", schema=p)["updated_at"]
'2026-01-14T10:29:48.262660Z'
Trigger Pipeline for Each Wafer¶
Query die analysis JSON files grouped by wafer and trigger the wafer analysis pipeline.
# Query all die analysis JSON outputs (from previous notebook)
# These should be tagged with: project:cutback, wafer, .json
entries = client.query_files(tags=[user, "project:cutback", "wafer", ".json"]).groupby(
"wafer"
)
# Trigger wafer analysis pipeline for each wafer
job_ids = []
for wafer_tag, group in tqdm(entries.items()):
print(f"Processing {wafer_tag}: {len(group)} die analyses")
# Get file IDs for this wafer
input_ids = [props["id"] for props in group]
# Trigger pipeline with list of file IDs
triggered_jobs = client.trigger_pipeline("aggregate_die_analyses", input_ids)
job_ids.extend(triggered_jobs["job_ids"])
print(f"\nTriggered {len(job_ids)} wafer analysis jobs")
0%| | 0/1 [00:00<?, ?it/s]
Processing wafer:wafer1: 45 die analyses
Triggered 1 wafer analysis jobs
Wait for Completion¶
Wait for all wafer analysis jobs to complete.
0%| | 0/1 [00:00<?, ?it/s]
View Wafer Maps¶
Query and display the generated wafer maps.
# Query the wafer map images
wafer_maps = client.query_files(
name="wafer_map.png",
tags=["project:cutback"],
)
print(f"Found {len(wafer_maps)} wafer maps")
Found 2 wafer maps
# Display the first wafer map
if len(wafer_maps) > 0:
wafer_map_im = Image.open(client.download_file(wafer_maps[0]["id"]))
display(wafer_map_im)
else:
print("No wafer maps found")
