Skip to content

UDTF (Beta)

geneva.udtf

udtf(
    func: Callable | type | None = None,
    *,
    output_schema: Schema,
    input_columns: list[str] | None = None,
    partition_by: str | None = None,
    partition_by_indexed_column: str | None = None,
    num_cpus: float = 1.0,
    num_gpus: float = 0.0,
    memory: int | None = None,
    version: str = "",
    on_error: list[ExceptionMatcher]
    | ErrorHandlingConfig
    | None = None,
) -> UDTF | partial

Decorator to create a User-Defined Table Function (UDTF).

.. warning:: This API is in beta and may change in future releases.

A UDTF transforms a Geneva table into another table. Unlike UDFs which operate row-by-row, UDTFs can perform cross-row operations like deduplication, clustering, or aggregation.

Parameters:

  • func (Callable | type, default: None ) –

    The function or class to be decorated. Must implement __call__(self, source) -> Iterator[pa.RecordBatch].

  • output_schema (Schema) –

    The PyArrow schema for the output table (required).

  • input_columns (list[str], default: None ) –

    Columns required from source table. If None, the UDTF can access all columns.

  • partition_by (str, default: None ) –

    Column name for parallel partition execution via Ray actors. Each distinct value of this column is processed independently.

  • partition_by_indexed_column (str, default: None ) –

    Column name that has an existing IVF vector index to dispatch partitions from. Works with any IVF-family index (IVF_FLAT, IVF_PQ, IVF_HNSW_SQ, etc.). Mutually exclusive with partition_by.

  • num_cpus (float, default: 1.0 ) –

    Number of CPUs to request for the Ray task. Default 1.0.

  • num_gpus (float, default: 0.0 ) –

    Number of GPUs to request for the Ray task. Default 0.0.

  • memory (int, default: None ) –

    Memory in bytes to request for the Ray task.

  • version (str, default: '' ) –

    Version string for cache invalidation. If not provided, uses hash of the serialized function.

  • on_error (list[ExceptionMatcher] | ErrorHandlingConfig, default: None ) –

    Error handling configuration for the UDTF.

Examples:

Class-based UDTF::

@geneva.udtf(
    output_schema=pa.schema([
        pa.field("row_id", pa.int64()),
        pa.field("cluster_id", pa.int64()),
        pa.field("duplicate_row_ids", pa.list_(pa.int64())),
    ]),
    input_columns=["row_id", "phash"],
)
class PHashIvfFlatHammingDedupe:
    def __init__(self, threshold: int = 4):
        self.threshold = threshold

    def __call__(self, source) -> Iterator[pa.RecordBatch]:
        tbl = source.to_arrow()
        # ... compute clusters ...
        yield pa.RecordBatch.from_pydict({...})

Function-based UDTF::

@geneva.udtf(
    output_schema=pa.schema([...]),
    input_columns=["row_id", "text"],
)
def compute_scores(source, model_name: str = "default"):
    for batch in source.to_batches():
        result = process(batch)
        yield result

geneva.transformer.UDTF

User-Defined Table Function (UDTF) to transform a Geneva Table.

.. warning:: This API is in beta and may change in future releases.

Unlike UDFs which operate row-by-row producing a single column, UDTFs can: - Access the entire table (or filtered subset) - Produce multiple output columns - Change the number of rows (filter, expand, aggregate) - Perform cross-row operations (e.g., deduplication, clustering)

The UDTF yields pa.RecordBatch objects via a generator interface.

func

func: Callable = field()

output_schema

output_schema: Schema = field()

input_columns

input_columns: list[str] | None = field(default=None)

partition_by

partition_by: str | None = field(default=None)

partition_by_indexed_column

partition_by_indexed_column: str | None = field(
    default=None
)

num_cpus

num_cpus: float | None = field(
    default=1.0,
    converter=lambda v: None if v is None else float(v),
    validator=optional(ge(0.0)),
)

num_gpus

num_gpus: float | None = field(
    default=0.0,
    converter=lambda v: None if v is None else float(v),
    validator=optional(ge(0.0)),
)

memory

memory: int | None = field(default=None)

version

version: str = field(default='')

geneva.batch_udtf

batch_udtf(
    func: Callable | type | None = None, **kwargs: Any
) -> UDTF | partial

Alias for :func:udtf — the N:M batch UDTF variant.

.. warning:: This API is in beta and may change in future releases.

Takes an entire table/partition as input and yields RecordBatches. Use @scalar_udtf instead for per-row 1:N expansion.

geneva.scalar_udtf

scalar_udtf(
    func: Callable | None = None,
    *,
    output_schema: Schema | None = None,
    batch: bool = False,
    input_columns: list[str] | None = None,
    num_cpus: float = 1.0,
    num_gpus: float = 0.0,
    memory: int | None = None,
    version: str = "",
    on_error: list[ExceptionMatcher]
    | ErrorHandlingConfig
    | None = None,
) -> ScalarUDTF | partial

Decorator to create a Scalar UDTF for 1:N row expansion.

.. warning:: This API is in beta and may change in future releases.

A scalar UDTF operates per-row: for each input row, it yields zero or more output rows. Input columns are bound by parameter name (same as UDFs).

Parameters:

  • func (Callable, default: None ) –

    The generator function. Parameters map to input columns by name. Return type should be Iterator[NamedTuple].

  • output_schema (Schema, default: None ) –

    Output schema. If not provided, inferred from the return type annotation (must be a NamedTuple).

  • batch (bool, default: False ) –

    If True, the function receives Arrow Arrays and returns a RecordBatch (vectorized variant). Default False.

  • input_columns (list[str], default: None ) –

    Input column names. If not provided, inferred from function parameter names.

  • num_cpus (float, default: 1.0 ) –

    CPUs per Ray task. Default 1.0.

  • num_gpus (float, default: 0.0 ) –

    GPUs per Ray task. Default 0.0.

  • memory (int, default: None ) –

    Memory in bytes per Ray task.

  • version (str, default: '' ) –

    Version string for cache invalidation.

  • on_error (list[ExceptionMatcher] | ErrorHandlingConfig, default: None ) –

    Error handling configuration.

Examples:

Generator-based scalar UDTF::

from typing import Iterator, NamedTuple

class Clip(NamedTuple):
    clip_start: float
    clip_end: float

@scalar_udtf
def extract_clips(duration: float) -> Iterator[Clip]:
    for start in range(0, int(duration), 10):
        yield Clip(clip_start=start, clip_end=min(start + 10, duration))

Batched scalar UDTF::

@scalar_udtf(batch=True, output_schema=clip_schema)
def extract_clips(
    duration: pa.Array,
    __source_row_id: pa.Array,
) -> pa.RecordBatch:
    # Return expanded RecordBatch with __source_row_id
    ...

geneva.transformer.ScalarUDTF

Scalar User-Defined Table Function for 1:N row expansion.

.. warning:: This API is in beta and may change in future releases.

Unlike batch UDTFs which operate on entire tables/partitions, scalar UDTFs operate per-row: for each input row, the function yields zero or more output rows. Input columns are bound by parameter name (same as UDFs).

The function should be a generator that yields NamedTuples, dicts, or tuples.

func

func: Callable = field()

output_schema

output_schema: Schema = field()

batch

batch: bool = field(default=False)

input_columns

input_columns: list[str] | None = field(default=None)

num_cpus

num_cpus: float | None = field(
    default=1.0,
    converter=lambda v: None if v is None else float(v),
    validator=optional(ge(0.0)),
)

num_gpus

num_gpus: float | None = field(
    default=0.0,
    converter=lambda v: None if v is None else float(v),
    validator=optional(ge(0.0)),
)

memory

memory: int | None = field(default=None)

version

version: str = field(default='')