gfhub

DataLab.

Modules:

Name Description
client

DataLab Python SDK.

entry

Utilities.

function

Function class for defining DataLab functions from Python callables.

nodes

Helper functions for creating pipeline nodes.

pipeline

Friendly pipeline module.

tags

Utilities for working with tags.

Classes:

Name Description
Client

DataLab client for managing files, functions, pipelines, and tags.

Function

A DataLab function defined from a Python callable.

Pipeline

A friendly pipeline representation.

Functions:

Name Description
get_settings

Get settings from config files and environment variables.

Client

Client(host: str | None = None, api_key: str | None = None)

DataLab client for managing files, functions, pipelines, and tags.

Parameters:

Name Type Description Default
host str | None

The host URL of the DataLab server. If not provided, reads from settings (pyproject.toml or ~/.gdsfactory/gdsfactoryplus.toml).

None
api_key str | None

Optional API key for authentication. If not provided, reads from settings (only ~/.gdsfactory/gdsfactoryplus.toml, not local config).

None

Parameters:

Name Type Description Default
host str | None

The host URL of the DataLab server. Falls back to settings.

None
api_key str | None

Optional API key. Falls back to settings.

None

Methods:

Name Description
add_file

Upload a file to DataLab.

add_function

Add or update a Python function.

add_pipeline

Add or update a pipeline.

add_tag

Add or update a tag.

delete_file

Delete a file by ID.

disable_pipeline

Disable a pipeline.

download_file

Download a file by upload ID.

enable_pipeline

Enable a pipeline.

get_job

Get job details by ID.

get_jobs

Get multiple jobs by IDs (batch).

list_functions

List all functions in the organization.

pipeline_url

Get the pipeline url for a certain pipeline id.

query_files

Query files by name pattern and/or tags.

trigger_pipeline

Trigger a pipeline manually with one or more files.

url

Get the full URL for a given path.

wait_for_job

Wait for a job to complete (SUCCESS or FAILED status).

wait_for_jobs

Wait for multiple jobs to complete.

Attributes:

Name Type Description
host str

Get the host URL of the DataLab server.

Source code in python/gfhub/client.py
def __init__(
    self,
    host: str | None = None,
    api_key: str | None = None,
) -> None:
    """Initialize the DataLab client.

    Args:
        host: The host URL of the DataLab server. Falls back to settings.
        api_key: Optional API key. Falls back to settings.
    """
    # Get settings if not provided
    if host is None or api_key is None:
        settings_api_key, settings_host = get_settings()
        if host is None:
            host = settings_host
        if api_key is None and settings_api_key:
            api_key = settings_api_key

    self._host = host or "http://localhost:8080"
    self._client = _RustClient(host, api_key)

host property

host: str

Get the host URL of the DataLab server.

add_file

add_file(data: str | Path | BinaryIO | DataFrame, tags: Iterable[str] = (), *, filename: str | None = None) -> dict

Upload a file to DataLab.

Parameters:

Name Type Description Default
data str | Path | BinaryIO | DataFrame

The data to upload. Can be: - str/Path: Path to a file to upload - BinaryIO: File-like object (e.g., io.BytesIO) - pandas.DataFrame: Will be converted to Parquet format

required
tags Iterable[str]

Optional list of tags to apply to the file. Tags can be simple names (e.g., "raw") or parameter tags with "key:value" format (e.g., "raw:3").

()
filename str | None

Optional filename to use on the server. Required when uploading from BinaryIO or DataFrame. Optional when uploading from a path (defaults to the actual filename).

None

Returns:

Type Description
dict

Dictionary containing the upload response with file metadata.

Raises:

Type Description
RuntimeError

If the file upload fails.

ValueError

If filename is not provided when uploading from BinaryIO or DataFrame.

Source code in python/gfhub/client.py
def add_file(
    self,
    data: str | Path | BinaryIO | pd.DataFrame,
    tags: Iterable[str] = (),
    *,
    filename: str | None = None,
) -> dict:
    """Upload a file to DataLab.

    Args:
        data: The data to upload. Can be:
            - str/Path: Path to a file to upload
            - BinaryIO: File-like object (e.g., io.BytesIO)
            - pandas.DataFrame: Will be converted to Parquet format
        tags: Optional list of tags to apply to the file. Tags can be simple names
            (e.g., "raw") or parameter tags with "key:value" format (e.g., "raw:3").
        filename: Optional filename to use on the server. Required when uploading
            from BinaryIO or DataFrame. Optional when uploading from a path
            (defaults to the actual filename).

    Returns:
        Dictionary containing the upload response with file metadata.

    Raises:
        RuntimeError: If the file upload fails.
        ValueError: If filename is not provided when uploading
            from BinaryIO or DataFrame.

    """
    tags_lst = None if not tags else [str(t) for t in tags]

    # Handle different input types
    if isinstance(data, (str, Path)):
        # Upload from file path
        path_obj = Path(data).resolve()
        if not path_obj.exists():
            msg = f"File not found: {path_obj}"
            raise FileNotFoundError(msg)

        if filename is not None:
            # Custom filename provided - read file and upload as bytes
            file_bytes = path_obj.read_bytes()
            mime_type = None  # Let server guess from filename
            return json.loads(
                self._client.add_file_from_bytes(
                    file_bytes,
                    filename,
                    mime_type,
                    tags_lst,
                )
            )
        # Use original filename
        path_str = str(path_obj)
        return json.loads(self._client.add_file(path_str, tags_lst))

    # Handle pandas DataFrame
    if isinstance(data, pd.DataFrame):
        if filename is None:
            msg = "filename parameter is required when uploading a DataFrame"
            raise ValueError(msg)
        # Convert DataFrame to Parquet bytes
        buffer = io.BytesIO()
        data.to_parquet(buffer, index=False)
        buffer.seek(0)
        file_bytes = buffer.read()
        mime_type = "application/octet-stream"

        # Ensure filename has .parquet extension
        if not filename.endswith(".parquet"):
            filename = f"{filename}.parquet"

        return json.loads(
            self._client.add_file_from_bytes(
                file_bytes,
                filename,
                mime_type,
                tags_lst,
            )
        )

    # Handle BinaryIO (file-like object)
    if hasattr(data, "read"):
        if filename is None:
            msg = "filename parameter is required when uploading a buffer"
            raise ValueError(msg)
        file_bytes = data.read()
        if isinstance(file_bytes, str):
            file_bytes = file_bytes.encode("utf-8")
        mime_type = None  # Let server guess from filename
        return json.loads(
            self._client.add_file_from_bytes(
                file_bytes,
                filename,
                mime_type,
                tags_lst,
            )
        )

    msg = f"Unsupported data type: {type(data)}"
    raise TypeError(msg)

add_function

add_function(function: str | Path | Function | Callable, *, name: str = '', update: bool = True) -> dict

Add or update a Python function.

Parameters:

Name Type Description Default
function str | Path | Function | Callable

The function to upload. One of: - str: Python script content (if contains newlines) or path string - Path: Path to a Python script file - Function: A Function instance created from a Python callable - Callable: A callable to be uploaded as function (no import dependencies allowed!)

required
name str

override the name of the function (must be given when function is Callable | str (script content))

''
update bool

If True, updates the function if it already exists. If False, raises an error on conflict. Defaults to True.

True

Returns:

Type Description
dict

Dictionary containing the function response with metadata.

Raises:

Type Description
RuntimeError

If the function validation or upload fails.

Examples:

Upload from a file path:

client.add_function("path/to/script.py")

Upload from a Function instance:

from gfhub import Function

def analyze(input_path: Path, /, *, threshold: float = 0.5) -> dict:
    df = pd.read_parquet(input_path)
    result = df[df["value"] > threshold]
    output = input_path.with_suffix(".filtered.parquet")
    result.to_parquet(output)
    return {"output": output}

func = Function(
    analyze, dependencies={"pandas>=2.0": "import pandas as pd"}
)
client.add_function(func)
Source code in python/gfhub/client.py
def add_function(  # noqa: PLR0912,C901
    self,
    function: str | Path | Function | Callable,
    *,
    name: str = "",
    update: bool = True,
) -> dict:
    """Add or update a Python function.

    Args:
        function: The function to upload. One of:
            - str: Python script content (if contains newlines) or path string
            - Path: Path to a Python script file
            - Function: A Function instance created from a Python callable
            - Callable: A callable to be uploaded as function
                (no import dependencies allowed!)
        name: override the name of the function (must be given when function
            is Callable | str (script content))
        update: If True, updates the function if it already exists. If False,
            raises an error on conflict. Defaults to True.

    Returns:
        Dictionary containing the function response with metadata.

    Raises:
        RuntimeError: If the function validation or upload fails.

    Examples:
        Upload from a file path:

        ```python
        client.add_function("path/to/script.py")
        ```

        Upload from a Function instance:

        ```python
        from gfhub import Function

        def analyze(input_path: Path, /, *, threshold: float = 0.5) -> dict:
            df = pd.read_parquet(input_path)
            result = df[df["value"] > threshold]
            output = input_path.with_suffix(".filtered.parquet")
            result.to_parquet(output)
            return {"output": output}

        func = Function(
            analyze, dependencies={"pandas>=2.0": "import pandas as pd"}
        )
        client.add_function(func)
        ```
    """
    # Import here to avoid circular import
    from .function import Function

    if isinstance(function, str) and "\n" in function:
        script = function
        if not name:
            msg = (
                "Client.add_function expects a 'name' argument when the "
                "function is specified as script content."
            )
            raise ValueError(msg)
    elif isinstance(function, str):
        script = Path(function).read_text()
        if not name:
            name = Path(function).stem
    elif isinstance(function, Path):
        script = function.read_text()
        if not name:
            name = function.stem
    elif isinstance(function, Function):
        script = function.to_script()
        if not name:
            name = function.name
    elif callable(function):
        try:
            script = Function(func=function, dependencies={}).to_script()
        except ValueError as e:
            msg = (
                "The function you're trying to upload has import dependencies. "
                f"Please upload a gfhub.Function instead. {e}"
            )
            raise ValueError(msg) from e
        if not name:
            name = getattr(function, "__name__", "")
            if not name:
                msg = (
                    "Client.add_function expects a 'name' argument when "
                    "the function passed does not have a __name__."
                )
                raise ValueError(msg)
    else:
        msg = (
            "Invalid value for argument 'function' in Client.add_function. "
            "Expected str | Path | Function | Callable. Got: {function}."
        )
        raise ValueError(msg)

    return json.loads(self._client.add_function(str(name), script, bool(update)))

add_pipeline

add_pipeline(name: str, schema: dict | str | Pipeline, *, update: bool = True) -> dict

Add or update a pipeline.

Parameters:

Name Type Description Default
name str

Name of the pipeline.

required
schema dict | str | Pipeline

Either a dict or JSON string containing the pipeline schema. The schema uses JsonNode/JsonEdge format: - nodes: List of nodes with name, type, and settings - edges: List of edges connecting nodes

required
update bool

If True, updates the pipeline if it already exists. If False, raises an error on conflict. Defaults to True.

True

Returns:

Type Description
dict

Dictionary containing the pipeline response with metadata.

Raises:

Type Description
RuntimeError

If the pipeline creation or update fails.

Examples:

schema = {
    "nodes": [
        {
            "name": "to_parquet",
            "type": "function",
            "settings": {
                "function": "csv2parquet",
                "settings": {}
            }
        }
    ],
    "edges": []
}
client.add_pipeline("csv_converter", schema)
Source code in python/gfhub/client.py
def add_pipeline(
    self,
    name: str,
    schema: dict | str | Pipeline,
    *,
    update: bool = True,
) -> dict:
    """Add or update a pipeline.

    Args:
        name: Name of the pipeline.
        schema: Either a dict or JSON string containing the pipeline schema.
            The schema uses JsonNode/JsonEdge format:
            - nodes: List of nodes with name, type, and settings
            - edges: List of edges connecting nodes
        update: If True, updates the pipeline if it already exists. If False,
            raises an error on conflict. Defaults to True.

    Returns:
        Dictionary containing the pipeline response with metadata.

    Raises:
        RuntimeError: If the pipeline creation or update fails.

    Examples:
        ```python
        schema = {
            "nodes": [
                {
                    "name": "to_parquet",
                    "type": "function",
                    "settings": {
                        "function": "csv2parquet",
                        "settings": {}
                    }
                }
            ],
            "edges": []
        }
        client.add_pipeline("csv_converter", schema)
        ```
    """
    # Parse schema if it's a string
    if isinstance(schema, str):
        schema = json.loads(schema)
    elif isinstance(schema, Pipeline):
        schema = schema.to_dict()
    schema = cast(dict, schema)

    # Resolve function names to IDs for function nodes
    if "nodes" in schema:
        functions = self.list_functions()
        function_name_to_id = {f["name"]: f["id"] for f in functions}

        for node in schema["nodes"]:
            if node.get("type") == "function" and node.get("config"):
                config = node["config"]
                if "function" in config:
                    function_name = config["function"]
                    if function_name in function_name_to_id:
                        # Replace function name with ID
                        config["id"] = function_name_to_id[function_name]
                        del config["function"]
                    else:
                        msg = f"Function '{function_name}' not found"
                        raise ValueError(msg)

    # Convert to JSON string
    schema_str = json.dumps(schema)

    return json.loads(
        self._client.add_pipeline(str(name), schema_str, bool(update))
    )

add_tag

add_tag(name: str, color: str, *, update: bool = True) -> dict

Add or update a tag.

Parameters:

Name Type Description Default
name str

Name of the tag.

required
color str

Hex color code for the tag (e.g., "#ef4444").

required
update bool

If True, updates the tag if it already exists. If False, raises an error on conflict. Defaults to True.

True

Returns:

Type Description
dict

Dictionary containing the tag response with metadata.

Raises:

Type Description
RuntimeError

If the tag creation or update fails.

Source code in python/gfhub/client.py
def add_tag(
    self,
    name: str,
    color: str,
    *,
    update: bool = True,
) -> dict:
    """Add or update a tag.

    Args:
        name: Name of the tag.
        color: Hex color code for the tag (e.g., "#ef4444").
        update: If True, updates the tag if it already exists. If False,
            raises an error on conflict. Defaults to True.

    Returns:
        Dictionary containing the tag response with metadata.

    Raises:
        RuntimeError: If the tag creation or update fails.
    """
    return json.loads(self._client.add_tag(str(name), str(color), bool(update)))

delete_file

delete_file(file_id: str) -> None

Delete a file by ID.

Parameters:

Name Type Description Default
file_id str

ID of the file to delete.

required

Raises:

Type Description
RuntimeError

If the deletion fails.

Examples:

client.delete_file("upload_123")
Source code in python/gfhub/client.py
def delete_file(self, file_id: str) -> None:
    """Delete a file by ID.

    Args:
        file_id: ID of the file to delete.

    Raises:
        RuntimeError: If the deletion fails.

    Examples:
        ```python
        client.delete_file("upload_123")
        ```
    """
    self._client.delete_file(str(file_id))

disable_pipeline

disable_pipeline(pipeline_id: str) -> None

Disable a pipeline.

Parameters:

Name Type Description Default
pipeline_id str

ID of the pipeline.

required

Raises:

Type Description
RuntimeError

If the operation fails.

Examples:

client.disable_pipeline("pipeline-uuid")
Source code in python/gfhub/client.py
def disable_pipeline(self, pipeline_id: str) -> None:
    """Disable a pipeline.

    Args:
        pipeline_id: ID of the pipeline.

    Raises:
        RuntimeError: If the operation fails.

    Examples:
        ```python
        client.disable_pipeline("pipeline-uuid")
        ```
    """
    self._client.disable_pipeline(str(pipeline_id))

download_file

download_file(upload_id: str, output: str | Path | BinaryIO | None = None) -> BinaryIO | Path

Download a file by upload ID.

Parameters:

Name Type Description Default
upload_id str

ID of the file to download.

required
output str | Path | BinaryIO | None

Where to write the file. Can be: - str/Path: File path to write to - File handle opened in binary mode (e.g., open('file', 'wb')) - io.BytesIO: BytesIO buffer to write to - None: Return new BytesIO buffer with file contents

None

Returns:

Type Description
BinaryIO | Path

None if output is a path or file handle, io.BytesIO if output is None.

Raises:

Type Description
RuntimeError

If the download fails.

Examples:

Download to file path:

client.download_file("upload_123", "downloaded_file.csv")

Download to file handle:

with open("output.csv", "wb") as f:
    client.download_file("upload_123", f)

Download to BytesIO:

import io
buffer = io.BytesIO()
client.download_file("upload_123", buffer)
buffer.seek(0)

Get BytesIO directly:

buffer = client.download_file("upload_123")
data = buffer.read()
Source code in python/gfhub/client.py
def download_file(
    self,
    upload_id: str,
    output: str | Path | BinaryIO | None = None,
) -> BinaryIO | Path:
    """Download a file by upload ID.

    Args:
        upload_id: ID of the file to download.
        output: Where to write the file. Can be:
            - str/Path: File path to write to
            - File handle opened in binary mode (e.g., open('file', 'wb'))
            - io.BytesIO: BytesIO buffer to write to
            - None: Return new BytesIO buffer with file contents

    Returns:
        None if output is a path or file handle, io.BytesIO if output is None.

    Raises:
        RuntimeError: If the download fails.

    Examples:
        Download to file path:

        ```python
        client.download_file("upload_123", "downloaded_file.csv")
        ```

        Download to file handle:

        ```python
        with open("output.csv", "wb") as f:
            client.download_file("upload_123", f)
        ```

        Download to BytesIO:

        ```python
        import io
        buffer = io.BytesIO()
        client.download_file("upload_123", buffer)
        buffer.seek(0)
        ```

        Get BytesIO directly:

        ```python
        buffer = client.download_file("upload_123")
        data = buffer.read()
        ```
    """
    file_bytes = bytes(self._client.download_file_bytes(str(upload_id)))

    if output is None:
        return io.BytesIO(file_bytes)
    if isinstance(output, (str, Path)):
        output_path = Path(output).resolve()
        with output_path.open("wb") as f:
            f.write(file_bytes)
        return output_path

    # File handle or BytesIO
    output.write(file_bytes)
    return output

enable_pipeline

enable_pipeline(pipeline_id: str) -> None

Enable a pipeline.

Parameters:

Name Type Description Default
pipeline_id str

ID of the pipeline.

required

Raises:

Type Description
RuntimeError

If the operation fails.

Examples:

client.enable_pipeline("pipeline-uuid")
Source code in python/gfhub/client.py
def enable_pipeline(self, pipeline_id: str) -> None:
    """Enable a pipeline.

    Args:
        pipeline_id: ID of the pipeline.

    Raises:
        RuntimeError: If the operation fails.

    Examples:
        ```python
        client.enable_pipeline("pipeline-uuid")
        ```
    """
    self._client.enable_pipeline(str(pipeline_id))

get_job

get_job(job_id: str) -> dict

Get job details by ID.

Parameters:

Name Type Description Default
job_id str

The job ID to retrieve

required

Returns:

Type Description
dict

Job details including status, inputs, outputs, timestamps, etc.

Examples:

job = client.get_job("job_123")
print(job["status"])  # QUEUED, RUNNING, SUCCESS, or FAILED
print(job["pipeline_name"])  # Name of the pipeline
Source code in python/gfhub/client.py
def get_job(self, job_id: str) -> dict:
    """Get job details by ID.

    Args:
        job_id: The job ID to retrieve

    Returns:
        Job details including status, inputs, outputs, timestamps, etc.

    Examples:
        ```python
        job = client.get_job("job_123")
        print(job["status"])  # QUEUED, RUNNING, SUCCESS, or FAILED
        print(job["pipeline_name"])  # Name of the pipeline
        ```
    """
    return json.loads(self._client.get_job(str(job_id)))

get_jobs

get_jobs(job_ids: list[str]) -> list[dict]

Get multiple jobs by IDs (batch).

Parameters:

Name Type Description Default
job_ids list[str]

List of job IDs to retrieve

required

Returns:

Type Description
list[dict]

List of job details including status, inputs, outputs, timestamps, etc.

Examples:

jobs = client.get_jobs(["job_123", "job_456"])
for job in jobs:
    print(job["status"])  # QUEUED, RUNNING, SUCCESS, or FAILED
Source code in python/gfhub/client.py
def get_jobs(self, job_ids: list[str]) -> list[dict]:
    """Get multiple jobs by IDs (batch).

    Args:
        job_ids: List of job IDs to retrieve

    Returns:
        List of job details including status, inputs, outputs, timestamps, etc.

    Examples:
        ```python
        jobs = client.get_jobs(["job_123", "job_456"])
        for job in jobs:
            print(job["status"])  # QUEUED, RUNNING, SUCCESS, or FAILED
        ```
    """
    if not job_ids:
        return []
    return json.loads(self._client.get_jobs([str(jid) for jid in job_ids]))

list_functions

list_functions() -> list[dict]

List all functions in the organization.

Returns:

Type Description
list[dict]

List of function dictionaries with id, name, parameters, etc.

Raises:

Type Description
RuntimeError

If listing functions fails.

Source code in python/gfhub/client.py
def list_functions(self) -> list[dict]:
    """List all functions in the organization.

    Returns:
        List of function dictionaries with id, name, parameters, etc.

    Raises:
        RuntimeError: If listing functions fails.
    """
    return json.loads(self._client.list_functions())

pipeline_url

pipeline_url(pipeline_id: str) -> str

Get the pipeline url for a certain pipeline id.

Source code in python/gfhub/client.py
def pipeline_url(self, pipeline_id: str) -> str:
    """Get the pipeline url for a certain pipeline id."""
    return os.path.join(self.host, "pipelines", str(pipeline_id))  # noqa: PTH118

query_files

query_files(*, name: str | None = None, tags: Iterable[str] = ()) -> Entries

Query files by name pattern and/or tags.

Parameters:

Name Type Description Default
name str | None

Optional filename pattern to filter by. Supports glob patterns: - Exact match: "lattice.gds" (case-insensitive) - Glob pattern: ".csv", "data.parquet", "lattice*"

None
tags Iterable[str]

Optional list of tags to filter by. Files must have ALL given tags. Supports wildcards (e.g., "wafer_id:*") to match any parameter value.

()

Returns:

Type Description
Entries

Dictionary containing a list of matching files with their metadata.

Raises:

Type Description
RuntimeError

If the query fails.

Examples:

# Find all CSV files by extension tag
client.query_files(tags=[".csv"])

# Find files by exact name (case-insensitive)
client.query_files(name="lattice.gds")

# Find files by glob pattern
client.query_files(name="*.csv")
client.query_files(name="data*.parquet")

# Find files with specific parameter values
client.query_files(tags=["wafer_id:wafer1", ".parquet"])

# Combine name pattern and tags
client.query_files(name="*.parquet", tags=["wafer_id:*"])

# Get all files
client.query_files()
Source code in python/gfhub/client.py
def query_files(
    self,
    *,
    name: str | None = None,
    tags: Iterable[str] = (),
) -> Entries:
    """Query files by name pattern and/or tags.

    Args:
        name: Optional filename pattern to filter by. Supports glob patterns:
            - Exact match: "lattice.gds" (case-insensitive)
            - Glob pattern: "*.csv", "data*.parquet", "lattice*"
        tags: Optional list of tags to filter by. Files must have ALL given tags.
            Supports wildcards (e.g., "wafer_id:*") to match any parameter value.

    Returns:
        Dictionary containing a list of matching files with their metadata.

    Raises:
        RuntimeError: If the query fails.

    Examples:
        ```python
        # Find all CSV files by extension tag
        client.query_files(tags=[".csv"])

        # Find files by exact name (case-insensitive)
        client.query_files(name="lattice.gds")

        # Find files by glob pattern
        client.query_files(name="*.csv")
        client.query_files(name="data*.parquet")

        # Find files with specific parameter values
        client.query_files(tags=["wafer_id:wafer1", ".parquet"])

        # Combine name pattern and tags
        client.query_files(name="*.parquet", tags=["wafer_id:*"])

        # Get all files
        client.query_files()
        ```
    """
    tags_list = None if not tags else [str(t) for t in tags]
    entries = Entries(json.loads(self._client.query_files(name, tags_list)))
    for entry in entries:
        if "tags" not in entry:
            entry["tags"] = {}
        else:
            # entry names are unique, so this is more convenient:
            entry["tags"] = {t["name"]: t for t in entry["tags"]}
    return entries

trigger_pipeline

trigger_pipeline(pipeline_name: str, upload_ids: str | Iterable[str]) -> dict

Trigger a pipeline manually with one or more files.

Parameters:

Name Type Description Default
pipeline_name str

Name of the pipeline to trigger.

required
upload_ids str | Iterable[str]

Single upload id or list of upload IDs to process.

required

Returns:

Type Description
dict

Dictionary containing the job metadata with job ID.

Raises:

Type Description
RuntimeError

If the pipeline trigger fails or pipeline not found.

Examples:

Trigger with single file:

job = client.trigger_pipeline("csv2json", "upload_123")
print(job["id"])  # Job ID

Trigger with multiple files:

job = client.trigger_pipeline("csv2json", ["upload_1", "upload_2"])
print(job["id"])  # Job ID
Source code in python/gfhub/client.py
def trigger_pipeline(
    self,
    pipeline_name: str,
    upload_ids: str | Iterable[str],
) -> dict:
    """Trigger a pipeline manually with one or more files.

    Args:
        pipeline_name: Name of the pipeline to trigger.
        upload_ids: Single upload id or list of upload IDs to process.

    Returns:
        Dictionary containing the job metadata with job ID.

    Raises:
        RuntimeError: If the pipeline trigger fails or pipeline not found.

    Examples:
        Trigger with single file:

        ```python
        job = client.trigger_pipeline("csv2json", "upload_123")
        print(job["id"])  # Job ID
        ```

        Trigger with multiple files:

        ```python
        job = client.trigger_pipeline("csv2json", ["upload_1", "upload_2"])
        print(job["id"])  # Job ID
        ```
    """
    return json.loads(self._client.trigger_pipeline(str(pipeline_name), upload_ids))

url

url(*parts: str) -> str

Get the full URL for a given path.

Source code in python/gfhub/client.py
def url(self, *parts: str) -> str:
    """Get the full URL for a given path."""
    return urljoin(self._host, "/".join(parts))

wait_for_job

wait_for_job(job_id: str, poll_interval: float = 1.0) -> dict

Wait for a job to complete (SUCCESS or FAILED status).

Parameters:

Name Type Description Default
job_id str

The job ID to wait for

required
timeout

Maximum seconds to wait (default: 300)

required
poll_interval float

Seconds between polls (default: 1.0)

1.0

Returns:

Type Description
dict

Final job details with status SUCCESS or FAILED

Raises:

Type Description
RuntimeError

If job is not found

Examples:

job = client.trigger_pipeline("csv2json", "upload_123")
final_job = client.wait_for_job(job["id"])
print(final_job["status"])  # SUCCESS or FAILED
if final_job["status"] == "SUCCESS":
    print(final_job["output_filenames"])
Source code in python/gfhub/client.py
def wait_for_job(
    self,
    job_id: str,
    poll_interval: float = 1.0,
) -> dict:
    """Wait for a job to complete (SUCCESS or FAILED status).

    Args:
        job_id: The job ID to wait for
        timeout: Maximum seconds to wait (default: 300)
        poll_interval: Seconds between polls (default: 1.0)

    Returns:
        Final job details with status SUCCESS or FAILED

    Raises:
        RuntimeError: If job is not found

    Examples:
        ```python
        job = client.trigger_pipeline("csv2json", "upload_123")
        final_job = client.wait_for_job(job["id"])
        print(final_job["status"])  # SUCCESS or FAILED
        if final_job["status"] == "SUCCESS":
            print(final_job["output_filenames"])
        ```
    """
    while True:
        job = self.get_job(job_id)
        status = str(job["status"]).upper()
        if status in ("SUCCESS", "FAILED"):
            return job
        time.sleep(poll_interval)

wait_for_jobs

wait_for_jobs(job_ids: list[str], poll_interval: float = 1.0) -> list[dict]

Wait for multiple jobs to complete.

Parameters:

Name Type Description Default
job_ids list[str]

List of job IDs to wait for

required
poll_interval float

Seconds between polling cycles (default: 1.0)

1.0

Returns:

Type Description
list[dict]

List of final job details with status SUCCESS or FAILED

Raises:

Type Description
RuntimeError

If any job is not found

Examples:

jobs = client.wait_for_jobs(["job_123", "job_456"])
for job in jobs:
    print(job["status"])  # SUCCESS or FAILED
Source code in python/gfhub/client.py
def wait_for_jobs(
    self, job_ids: list[str], poll_interval: float = 1.0
) -> list[dict]:
    """Wait for multiple jobs to complete.

    Args:
        job_ids: List of job IDs to wait for
        poll_interval: Seconds between polling cycles (default: 1.0)

    Returns:
        List of final job details with status SUCCESS or FAILED

    Raises:
        RuntimeError: If any job is not found

    Examples:
        ```python
        jobs = client.wait_for_jobs(["job_123", "job_456"])
        for job in jobs:
            print(job["status"])  # SUCCESS or FAILED
        ```
    """
    completed: dict[str, dict] = {}
    remaining = set(job_ids)

    with tqdm(total=len(job_ids)) as pbar:
        while remaining:
            # Batch fetch all remaining jobs in a single request
            jobs = self.get_jobs(list(remaining))

            # Process results
            for job in jobs:
                job_id = job["id"]
                status = str(job["status"]).upper()
                if status in ("SUCCESS", "FAILED"):
                    completed[job_id] = job
                    remaining.discard(job_id)
                    pbar.update(1)

            # Sleep once per cycle
            if remaining:
                time.sleep(poll_interval)

    # Return in original order
    return [completed[job_id] for job_id in job_ids]

Function

Function(func: Callable, dependencies: dict[str, str | list[str]] | None = None)

A DataLab function defined from a Python callable.

This class wraps a Python function and its dependencies, validates that all undefined globals are covered by the provided imports, and generates a uv-style script for upload to DataLab.

Parameters:

Name Type Description Default
func Callable

A Python function to upload. Must have a valid signature with positional-only input parameters and keyword-only config parameters: def func(input1: Path, input2: Path, /, *, param: float = 1.0) -> dict

required
dependencies dict[str, str | list[str]] | None

A dict mapping package specs to import statement(s). The package spec can include version constraints (e.g., "pandas>=2.0"). The import statement(s) define what names become available.

None

Raises:

Type Description
ValueError

If the dependencies don't cover all undefined globals used in the function body.

Examples:

def analyze(input_path: Path, /, *, threshold: float = 0.5) -> dict:
    df = pd.read_parquet(input_path)
    result = df[df["value"] > threshold]
    output = input_path.with_suffix(".filtered.parquet")
    result.to_parquet(output)
    return {"output": output}

func = Function(
    analyze,
    dependencies={"pandas>=2.0": "import pandas as pd"},
)

client.add_function("filter_data", func)

Parameters:

Name Type Description Default
func Callable

The Python function to wrap.

required
dependencies dict[str, str | list[str]] | None

Dict mapping package specs to import statement(s).

None

Methods:

Name Description
eval

Evaluate the function locally using uv run.

to_script

Generate a uv-style Python script.

Attributes:

Name Type Description
dependencies dict[str, list[str]]

The normalized dependencies dict.

func Callable

The wrapped Python function.

name str

The function name.

undefined_globals set[str]

The set of undefined globals found in the function.

Source code in python/gfhub/function.py
def __init__(
    self,
    func: Callable,
    dependencies: dict[str, str | list[str]] | None = None,
) -> None:
    """Initialize a Function from a callable.

    Args:
        func: The Python function to wrap.
        dependencies: Dict mapping package specs to import statement(s).
    """
    self._func = func
    self._func_name = func.__name__

    # Get and dedent source code
    self._source = textwrap.dedent(inspect.getsource(func))

    # Normalize dependencies to dict[str, list[str]]
    if dependencies is None:
        dependencies = {}
    self._dependencies: dict[str, list[str]] = {}
    for pkg, imports in dependencies.items():
        if isinstance(imports, str):
            self._dependencies[pkg] = [imports]
        else:
            self._dependencies[pkg] = list(imports)

    # Analyze and validate
    self._validate()

dependencies property

dependencies: dict[str, list[str]]

The normalized dependencies dict.

func property

func: Callable

The wrapped Python function.

name property

name: str

The function name.

undefined_globals property

undefined_globals: set[str]

The set of undefined globals found in the function.

eval

eval(*inputs: Any, **kwargs: Any) -> dict[str, Any]

Evaluate the function locally using uv run.

This runs the function in a subprocess with uv run --script, which will automatically install the required dependencies. The execution mirrors how the backend runs functions.

Parameters:

Name Type Description Default
*inputs Any

Positional inputs to pass to the function. Path objects will be resolved to absolute paths. Other types (int, float, str, dict, list) are passed as-is.

()
**kwargs Any

Keyword parameters to pass to the function.

{}

Returns:

Type Description
dict[str, Any]

A dictionary mapping output names to output values. Path strings

dict[str, Any]

in the output are converted back to Path objects.

Raises:

Type Description
RuntimeError

If the function execution fails.

Examples:

func = Function(analyze, dependencies={"pandas": "import pandas as pd"})
result = func.eval(Path("input.parquet"), threshold=0.5)
print(result)
# {"output": Path("/tmp/.../output.parquet")}
Source code in python/gfhub/function.py
def eval(self, *inputs: Any, **kwargs: Any) -> dict[str, Any]:
    """Evaluate the function locally using uv run.

    This runs the function in a subprocess with `uv run --script`, which
    will automatically install the required dependencies. The execution
    mirrors how the backend runs functions.

    Args:
        *inputs: Positional inputs to pass to the function. Path objects
            will be resolved to absolute paths. Other types (int, float,
            str, dict, list) are passed as-is.
        **kwargs: Keyword parameters to pass to the function.

    Returns:
        A dictionary mapping output names to output values. Path strings
        in the output are converted back to Path objects.

    Raises:
        RuntimeError: If the function execution fails.

    Examples:
        ```python
        func = Function(analyze, dependencies={"pandas": "import pandas as pd"})
        result = func.eval(Path("input.parquet"), threshold=0.5)
        print(result)
        # {"output": Path("/tmp/.../output.parquet")}
        ```
    """
    input_json = json.dumps(_serialize_inputs(inputs))
    kwargs_json = json.dumps(kwargs)

    # Write script and run
    with tempfile.TemporaryDirectory() as tmpdir:
        tmpdir_path = Path(tmpdir)
        script_path = tmpdir_path / "script.py"
        output_path = tmpdir_path / "output.json"

        # Generate script with injected inputs/kwargs
        script = self._create_eval_script(
            output_file=str(output_path),
            input_json=input_json,
            kwargs_json=kwargs_json,
        )
        script_path.write_text(script)

        # Run with uv run --script
        uv = shutil.which("uv")
        if uv is None:
            msg = "Failed to execute function: uv executable not found."
            raise FileNotFoundError(msg)
        uv = str(Path(uv).resolve())
        result = subprocess.run(  # noqa: S603
            [uv, "run", "--script", str(script_path)],
            capture_output=True,
            text=True,
            check=True,
        )

        # Read result from output file
        if not output_path.exists():
            msg = (
                f"Function did not produce output.\n"
                f"stdout:\n{result.stdout}\n"
                f"stderr:\n{result.stderr}"
            )
            raise RuntimeError(msg)

        output = json.loads(output_path.read_text())

        return_annot = inspect.signature(self._func).return_annotation
        if return_annot is Path:
            output["output"] = Path(output['output']).resolve()
        elif get_origin(return_annot) is tuple:
            outs = list(output["output"])
            args = get_args(return_annot)
            for i, (out, arg) in enumerate(zip(outs, args, strict=True)):
                if arg is Path:
                    outs[i] = Path(out).resolve()
            output["output"] = tuple(outs)
        return output

to_script

to_script() -> str

Generate a uv-style Python script.

Returns:

Type Description
str

A string containing the complete uv script with dependency

str

metadata, imports, and the function definition.

Source code in python/gfhub/function.py
def to_script(self) -> str:
    """Generate a uv-style Python script.

    Returns:
        A string containing the complete uv script with dependency
        metadata, imports, and the function definition.
    """
    # Build import statements
    all_imports = []
    for imports in self._dependencies.values():
        all_imports.extend(imports)

    # Extract package specs for uv dependency list (exclude stdlib)
    pkg_specs = [pkg for pkg in self._dependencies if not _is_stdlib(pkg)]

    # Generate uv script
    script_lines = [
        "# /// script",
        "# dependencies = [",
    ]
    script_lines.extend([f'#   "{pkg}",' for pkg in pkg_specs])
    script_lines.append("# ]")
    script_lines.append("# ///")
    script_lines.append("")

    # Make annotations lazy so type hints don't need runtime imports
    script_lines.append("from __future__ import annotations")
    script_lines.append("")

    # Add imports
    script_lines.append("from pathlib import Path")
    if "matplotlib" in self._dependencies:
        script_lines.append("import os")
        script_lines.append("os.environ['MPLBACKEND'] = 'Agg'")
    script_lines.extend(all_imports)
    script_lines.append("")

    # Add the function source, renaming to main if needed
    source = self._source
    if self._func_name != "main":
        source = source.replace(f"def {self._func_name}(", "def main(", 1)
    script_lines.append(source)

    return "\n".join(script_lines)

Pipeline

Pipeline()

A friendly pipeline representation.

Methods:

Name Description
on_file_upload

Create a pipeline that triggers on file upload.

to_dict

Convert pipeline to dict representation.

Source code in python/gfhub/pipeline.py
def __init__(self) -> None:
    """Create an empty pipeline."""
    super().__setattr__("_nodes", {})
    super().__setattr__("_edges", [])

on_file_upload classmethod

on_file_upload(function_name: str, *, tags: Iterable[str], kwargs: dict | None = None) -> Self

Create a pipeline that triggers on file upload.

Parameters:

Name Type Description Default
function_name str

The name of the function node to use for plotting.

required
tags Iterable[str]

The tags to filter file uploads on.

required
kwargs dict | None

Additional keyword arguments to pass to the function node.

None

Returns:

Type Description
Self

A pipeline that triggers on file upload.

Source code in python/gfhub/pipeline.py
@classmethod
def on_file_upload(
    cls, function_name: str, *, tags: Iterable[str], kwargs: dict | None = None
) -> Self:
    """Create a pipeline that triggers on file upload.

    Args:
        function_name: The name of the function node to use for plotting.
        tags: The tags to filter file uploads on.
        kwargs: Additional keyword arguments to pass to the function node.

    Returns:
        A pipeline that triggers on file upload.
    """
    kwargs = kwargs or {}
    p = cls()
    p.manual_trigger = nodes.on_manual_trigger()
    p.auto_trigger = nodes.on_file_upload(tags=list(tags))
    p.load = nodes.load()
    p += p.auto_trigger >> p.load
    p += p.manual_trigger >> p.load
    p.plot = nodes.function(function=str(function_name), kwargs=dict(kwargs))
    p += p.load >> p.plot
    p.save = nodes.save()
    p += p.plot >> p.save[0]
    p.load_tags = nodes.load_tags()
    p += p.auto_trigger >> p.load_tags
    p += p.manual_trigger >> p.load_tags
    p += p.load_tags >> p.save[1]
    return p

to_dict

to_dict() -> dict

Convert pipeline to dict representation.

Source code in python/gfhub/pipeline.py
def to_dict(self) -> dict:
    """Convert pipeline to dict representation."""
    return {
        "nodes": [n.to_dict() for n in cast(dict, self._nodes).values()],
        "edges": [e.to_dict() for e in cast(list, self._edges)],
    }

get_settings

get_settings() -> tuple[str, str]

Get settings from config files and environment variables.

Returns:

Type Description
str

Tuple of (api_key, host) read from:

str
  • ~/.gdsfactory/gdsfactoryplus.toml (global)
tuple[str, str]
  • pyproject.toml (local, host only)
tuple[str, str]
  • Environment variables (GFP_API_KEY, GFH_HOST)

Priority: env vars > local > global (api_key only from global/env)

Source code in python/gfhub/client.py
def get_settings() -> tuple[str, str]:
    """Get settings from config files and environment variables.

    Returns:
        Tuple of (api_key, host) read from:
        - ~/.gdsfactory/gdsfactoryplus.toml (global)
        - pyproject.toml (local, host only)
        - Environment variables (GFP_API_KEY, GFH_HOST)

    Priority: env vars > local > global (api_key only from global/env)
    """
    return get_settings_py()