Now we will create wafer maps using the die analysis JSON outputs from the previous notebook.

Make sure: 1. You've run the die analysis pipeline from notebook 3 2. All die analysis jobs have completed successfully 3. The JSON output files are tagged appropriately

Imports

import getpass
from pathlib import Path

from gfhub import Client, Pipeline, nodes
from PIL import Image
from tqdm.notebook import tqdm

user = getpass.getuser()

Client

client = Client()

Wafer Analysis Function

When a function gets large it might be useful to write the uv-script yourself. This is what we're doing here. The content of the uv script can just be passed directly instead of supplying a gfhub.Function object.

# Read the wafer analysis function
script = Path("aggregate_die_analyses.py").read_text()

# Create the function in DataLab
client.add_function(name="aggregate_die_analyses", function=script)
{'id': '019bb958-85ca-76d0-b97f-72e663048c04',
 'name': 'aggregate_die_analyses',
 'parameters': {'max_output': {'default': '0.115', 'type': 'float'},
  'min_output': {'default': '0', 'type': 'float'},
  'output_key': {'default': '"component_loss"', 'type': 'str'},
  'output_name': {'default': '"wafer_map"', 'type': 'str'}},
 'inputs': {'files': {'type': 'list[Path]'}},
 'outputs': {'0': {'type': 'Path'}},
 'created_at': '2026-01-13T21:52:25.034842Z',
 'updated_at': '2026-01-14T10:29:47.113166Z'}

Create Wafer Analysis Pipeline

Now create a pipeline that will aggregate die analysis JSON files into a wafer map.

p = Pipeline()
p.trigger = nodes.on_manual_trigger()
p.load_file = nodes.load()
p.load_tags = nodes.load_tags()
p.find_common_tags = nodes.function(
    function="find_common_tags"
)  # this function was defined in the previous notebook
p.aggregate = nodes.function(
    function="aggregate_die_analyses",
    kwargs={
        "output_key": "component_loss",
        "min_output": 0.0,
        "max_output": 0.115,
        "output_name": "wafer_map",
    },
)
p.save = nodes.save()
p += p.trigger >> p.load_file
p += p.trigger >> p.load_tags
p += p.load_file >> p.aggregate
p += p.aggregate >> p.save[0]
p += p.load_tags >> p.find_common_tags
p += p.find_common_tags >> p.save[1]
client.add_pipeline(name="aggregate_die_analyses", schema=p)["updated_at"]
'2026-01-14T10:29:48.262660Z'

Trigger Pipeline for Each Wafer

Query die analysis JSON files grouped by wafer and trigger the wafer analysis pipeline.

# Query all die analysis JSON outputs (from previous notebook)
# These should be tagged with: project:cutback, wafer, .json
entries = client.query_files(tags=[user, "project:cutback", "wafer", ".json"]).groupby(
    "wafer"
)
# Trigger wafer analysis pipeline for each wafer
job_ids = []
for wafer_tag, group in tqdm(entries.items()):
    print(f"Processing {wafer_tag}: {len(group)} die analyses")

    # Get file IDs for this wafer
    input_ids = [props["id"] for props in group]

    # Trigger pipeline with list of file IDs
    triggered_jobs = client.trigger_pipeline("aggregate_die_analyses", input_ids)
    job_ids.extend(triggered_jobs["job_ids"])

print(f"\nTriggered {len(job_ids)} wafer analysis jobs")
  0%|          | 0/1 [00:00<?, ?it/s]


Processing wafer:wafer1: 45 die analyses

Triggered 1 wafer analysis jobs

Wait for Completion

Wait for all wafer analysis jobs to complete.

# Wait for all jobs to complete
jobs = client.wait_for_jobs(job_ids)
  0%|          | 0/1 [00:00<?, ?it/s]

View Wafer Maps

Query and display the generated wafer maps.

# Query the wafer map images
wafer_maps = client.query_files(
    name="wafer_map.png",
    tags=["project:cutback"],
)

print(f"Found {len(wafer_maps)} wafer maps")
Found 2 wafer maps
# Display the first wafer map
if len(wafer_maps) > 0:
    wafer_map_im = Image.open(client.download_file(wafer_maps[0]["id"]))
    display(wafer_map_im)
else:
    print("No wafer maps found")

png