UDTF (Beta)
geneva.udtf
udtf(
func: Callable | type | None = None,
*,
output_schema: Schema,
input_columns: list[str] | None = None,
partition_by: str | None = None,
partition_by_indexed_column: str | None = None,
num_cpus: float = 1.0,
num_gpus: float = 0.0,
memory: int | None = None,
version: str = "",
on_error: list[ExceptionMatcher]
| ErrorHandlingConfig
| None = None,
) -> UDTF | partial
Decorator to create a User-Defined Table Function (UDTF).
.. warning:: This API is in beta and may change in future releases.
A UDTF transforms a Geneva table into another table. Unlike UDFs which operate row-by-row, UDTFs can perform cross-row operations like deduplication, clustering, or aggregation.
Parameters:
-
func(Callable | type, default:None) –The function or class to be decorated. Must implement
__call__(self, source) -> Iterator[pa.RecordBatch]. -
output_schema(Schema) –The PyArrow schema for the output table (required).
-
input_columns(list[str], default:None) –Columns required from source table. If None, the UDTF can access all columns.
-
partition_by(str, default:None) –Column name for parallel partition execution via Ray actors. Each distinct value of this column is processed independently.
-
partition_by_indexed_column(str, default:None) –Column name that has an existing IVF vector index to dispatch partitions from. Works with any IVF-family index (IVF_FLAT, IVF_PQ, IVF_HNSW_SQ, etc.). Mutually exclusive with
partition_by. -
num_cpus(float, default:1.0) –Number of CPUs to request for the Ray task. Default 1.0.
-
num_gpus(float, default:0.0) –Number of GPUs to request for the Ray task. Default 0.0.
-
memory(int, default:None) –Memory in bytes to request for the Ray task.
-
version(str, default:'') –Version string for cache invalidation. If not provided, uses hash of the serialized function.
-
on_error(list[ExceptionMatcher] | ErrorHandlingConfig, default:None) –Error handling configuration for the UDTF.
Examples:
Class-based UDTF::
@geneva.udtf(
output_schema=pa.schema([
pa.field("row_id", pa.int64()),
pa.field("cluster_id", pa.int64()),
pa.field("duplicate_row_ids", pa.list_(pa.int64())),
]),
input_columns=["row_id", "phash"],
)
class PHashIvfFlatHammingDedupe:
def __init__(self, threshold: int = 4):
self.threshold = threshold
def __call__(self, source) -> Iterator[pa.RecordBatch]:
tbl = source.to_arrow()
# ... compute clusters ...
yield pa.RecordBatch.from_pydict({...})
Function-based UDTF::
@geneva.udtf(
output_schema=pa.schema([...]),
input_columns=["row_id", "text"],
)
def compute_scores(source, model_name: str = "default"):
for batch in source.to_batches():
result = process(batch)
yield result
geneva.transformer.UDTF
User-Defined Table Function (UDTF) to transform a Geneva Table.
.. warning:: This API is in beta and may change in future releases.
Unlike UDFs which operate row-by-row producing a single column, UDTFs can: - Access the entire table (or filtered subset) - Produce multiple output columns - Change the number of rows (filter, expand, aggregate) - Perform cross-row operations (e.g., deduplication, clustering)
The UDTF yields pa.RecordBatch objects via a generator interface.
geneva.batch_udtf
batch_udtf(
func: Callable | type | None = None, **kwargs: Any
) -> UDTF | partial
Alias for :func:udtf — the N:M batch UDTF variant.
.. warning:: This API is in beta and may change in future releases.
Takes an entire table/partition as input and yields RecordBatches.
Use @scalar_udtf instead for per-row 1:N expansion.
geneva.scalar_udtf
scalar_udtf(
func: Callable | None = None,
*,
output_schema: Schema | None = None,
batch: bool = False,
input_columns: list[str] | None = None,
num_cpus: float = 1.0,
num_gpus: float = 0.0,
memory: int | None = None,
version: str = "",
on_error: list[ExceptionMatcher]
| ErrorHandlingConfig
| None = None,
) -> ScalarUDTF | partial
Decorator to create a Scalar UDTF for 1:N row expansion.
.. warning:: This API is in beta and may change in future releases.
A scalar UDTF operates per-row: for each input row, it yields zero or more output rows. Input columns are bound by parameter name (same as UDFs).
Parameters:
-
func(Callable, default:None) –The generator function. Parameters map to input columns by name. Return type should be
Iterator[NamedTuple]. -
output_schema(Schema, default:None) –Output schema. If not provided, inferred from the return type annotation (must be a NamedTuple).
-
batch(bool, default:False) –If True, the function receives Arrow Arrays and returns a RecordBatch (vectorized variant). Default False.
-
input_columns(list[str], default:None) –Input column names. If not provided, inferred from function parameter names.
-
num_cpus(float, default:1.0) –CPUs per Ray task. Default 1.0.
-
num_gpus(float, default:0.0) –GPUs per Ray task. Default 0.0.
-
memory(int, default:None) –Memory in bytes per Ray task.
-
version(str, default:'') –Version string for cache invalidation.
-
on_error(list[ExceptionMatcher] | ErrorHandlingConfig, default:None) –Error handling configuration.
Examples:
Generator-based scalar UDTF::
from typing import Iterator, NamedTuple
class Clip(NamedTuple):
clip_start: float
clip_end: float
@scalar_udtf
def extract_clips(duration: float) -> Iterator[Clip]:
for start in range(0, int(duration), 10):
yield Clip(clip_start=start, clip_end=min(start + 10, duration))
Batched scalar UDTF::
@scalar_udtf(batch=True, output_schema=clip_schema)
def extract_clips(
duration: pa.Array,
__source_row_id: pa.Array,
) -> pa.RecordBatch:
# Return expanded RecordBatch with __source_row_id
...
geneva.transformer.ScalarUDTF
Scalar User-Defined Table Function for 1:N row expansion.
.. warning:: This API is in beta and may change in future releases.
Unlike batch UDTFs which operate on entire tables/partitions, scalar UDTFs operate per-row: for each input row, the function yields zero or more output rows. Input columns are bound by parameter name (same as UDFs).
The function should be a generator that yields NamedTuples, dicts, or tuples.