UDF
geneva.udf
udf(
func: Callable | None = None,
*,
data_type: DataType | None = None,
version: str | None = None,
cuda: bool = False,
field_metadata: dict[str, str] | None = None,
input_columns: list[str] | None = None,
num_cpus: int | float | None = None,
num_gpus: int | float | None = None,
memory: int | None = None,
batch_size: int | None = None,
checkpoint_size: int | None = None,
min_checkpoint_size: int | None = 1,
max_checkpoint_size: int | None = None,
task_size: int | None = None,
timeout: float | None = None,
on_error: list[ExceptionMatcher]
| ErrorHandlingConfig
| None = None,
error_handling: Optional[ErrorHandlingConfig] = None,
auto_backfill: bool = False,
manifest: GenevaManifest | None = None,
**kwargs,
) -> UDF | partial
Decorator of a User Defined Function (UDF).
Parameters:
-
func(Callable | None, default:None) –The callable to be decorated. If None, returns a partial function.
-
data_type(DataType | None, default:None) –The data type of the output PyArrow Array from the UDF. If None, it will be inferred from the function signature.
-
version(str | None, default:None) –A version string to manage the changes of function. If not provided, it will use the hash of the serialized function.
-
cuda(bool, default:False) –If true, load CUDA optimized kernels. Equvalent to num_gpus=1
-
field_metadata(dict[str, str] | None, default:None) –A dictionary of metadata to be attached to the output
pyarrow.Field. -
input_columns(list[str] | None, default:None) –A list of input column names for the UDF. If not provided, it will be inferred from the function signature. Or scan all columns. Names may also refer to columns produced by an optional
preprocess()step (see Notes); the validator skips the source-schema existence check for those. -
num_cpus(int | float | None, default:None) –The (fraction) number of CPUs to acquire to run the job.
-
num_gpus(int | float | None, default:None) –The (fraction) number of GPUs to acquire to run the job. Default 0.
-
memory(int | None, default:None) –The amount of memory in bytes to acquire to run the job. Used by admission control to validate cluster resources before starting.
-
batch_size(int | None, default:None) –Legacy parameter controlling map/read batch size. Prefer checkpoint_size.
-
checkpoint_size(int | None, default:None) –Alias for batch_size; preferred for overriding map-task batch size. When adaptive sizing is enabled, an explicit checkpoint_size seeds the initial checkpoint size; otherwise the initial size defaults to min_checkpoint_size.
-
min_checkpoint_size(int | None, default:1) –Minimum adaptive checkpoint size (lower bound). Defaults to 1.
-
max_checkpoint_size(int | None, default:None) –Maximum adaptive checkpoint size (upper bound). This also caps the largest read batch and thus the maximum memory footprint per batch.
-
task_size(int | None, default:None) –Preferred read-task size for jobs that don't specify an explicit
task_size. This is advisory and may be overridden by job-level parameters. -
timeout(float | None, default:None) –Per-row scalar UDF timeout in seconds. Each retry attempt gets a fresh timeout budget. Only supported for scalar UDFs, and requires execution on a Unix-like worker process main thread. This uses process-global
SIGALRM/ITIMER_REALstate, so UDFs or libraries that install their own signal handlers/timers, or that depend on main-thread signal behavior while coordinating work in background threads, may be incompatible. -
on_error(list[ExceptionMatcher] | ErrorHandlingConfig | None, default:None) –Simplified error handling configuration. Can be: - A factory function: retry_transient(), retry_all(), skip_on_error() - A list of matchers: [Retry(...), Skip(...), Fail(...)]
Scalar UDF backfill also exposes fatal worker-loss exceptions such as
FatalWorkerTransientError. By default, transient fatal worker loss is retried up to 3 task-level attempts. If you explicitly matchFatalWorkerTransientErrorinon_error, your matcher overrides that default.Examples
@udf(data_type=pa.int32(), on_error=retry_transient()) def my_udf(x: int) -> int: ... @udf(data_type=pa.int32(), on_error=retry_transient(max_attempts=5)) def my_udf(x: int) -> int: ... @udf( data_type=pa.int32(), on_error=[ Retry(ConnectionError, TimeoutError, max_attempts=3), Retry(ValueError, match="rate limit", max_attempts=5), Skip(ValueError), ] ) def my_udf(x: int) -> int: ... @udf( data_type=pa.int32(), on_error=[Retry(FatalWorkerTransientError, max_attempts=5)], ) def my_udf(x: int) -> int: ... -
error_handling(Optional[ErrorHandlingConfig], default:None) –Advanced error handling configuration using tenacity. Use this for full control over retry behavior with custom callbacks. Cannot be used together with
on_error. -
auto_backfill(bool, default:False) –Automatically backfill this column asynchronously in LanceDB Enterprise when data or UDF version changes. Default: False
-
manifest(GenevaManifest | None, default:None) –Optional execution-environment spec (image, pip deps, py_modules, captured workspace zips). Built via
GenevaManifest.create_pip(),.create_conda(), orConnection.capture_local_environment(). When set, the manifest is snapshotted into the column's field metadata atadd_columnstime so the backfill executor can reconstruct the same environment without consulting any external registry. When omitted, native columns fall back to the embedded image/tag in the UDFSpec envelope, and remote columns fall back to the deployment-default manifest resolved server-side. Default:None.
Notes
- Column/parameter mapping: For scalar and array UDFs, parameter names map
directly to input column names. If you want a column to be delivered as a
numpy.ndarraywithout extra copies, annotate the parameter asnumpy.ndarrayand ensure the column's Arrow type is a list (pa.list_/pa.large_list/pa.fixed_size_list). Other column types continue to be passed as Python scalars/objects. - Python lists: When a parameter is annotated as
list[...], the column must be an Arrow list/large_list/fixed_size_list. In that case each value is delivered to the UDF as a Python list instead of a numpy array. - Return type with numpy.ndarray: If your function returns a
numpy.ndarray, you must provide an explicitdata_type(for example,pa.list_(pa.float32())); the ndarray shape/dtype cannot be inferred automatically from the annotation alone. -
Optional
preprocess()hook: Stateful (class-based) UDFs may declare apreprocess(self, batch: pa.RecordBatch) -> pa.RecordBatchmethod. When GPU pipelining is enabled, the framework runspreprocess()in a pool of reader threads before dispatching__call__, letting CPU-side decode / transform / tokenize overlap with GPU compute on previous batches. The contract: -
preprocess()returns aRecordBatchwhose columns include every name listed ininput_columnsfor__call__. - Coupling between
preprocess()and__call__is by column name only — the framework dispatches__call__by pullinginput_columnsout of the post-preprocess batch. There is no type/shape metadata flowing between the two; if names mismatch, the failure is a runtimeKeyErroron the first batch. - Names introduced by
preprocess()are user-chosen. Conventional practice is a non-user-facing prefix (_pp_*) to avoid colliding with persisted columns.
preprocess() runs only when GPU pipelining is enabled (config
enable_gpu_pipelining / env JOB__ENABLE_GPU_PIPELINING=true).
For best throughput the hot path inside preprocess() should
release the GIL — use native libraries (cv2, HF tokenizers,
torchaudio, numpy vector ops) rather than per-element Python loops.
geneva.transformer.UDF
Bases: Callable[[RecordBatch], Array]
User-defined function (UDF) to be applied to a Lance Table.
geneva.transformer.Columns
Bases: Generic[_ColumnsT]
Return annotation marker for UDFs that produce multiple columns.
Annotating a UDF's return type as Columns[T] declares that the UDF
emits a struct whose top-level fields are unpacked into sibling table
columns at add_columns time. T must currently be a
NamedTuple; the framework infers the output struct schema from its
field type annotations. Other types raise ValueError at decoration
time.
Each top-level struct field becomes its own table column with the same
name. To attach a Columns[T] UDF, pass it directly to
Table.add_columns (not wrapped in a
dict). The sibling columns share a backfill group and must be
backfilled and dropped together.
Supply an explicit data_type=pa.struct([...]) on @udf when a
field needs Arrow metadata (e.g. lance-encoding:blob); T is
still a NamedTuple in that case — data_type only overrides the
inferred schema, it does not unlock non-NamedTuple T.
Examples:
>>> from typing import NamedTuple
>>> import geneva
>>> from geneva import udf
>>>
>>> class Dimensions(NamedTuple):
... height: int
... width: int
>>>
>>> @udf
... def dimensions(image_id: int) -> geneva.Columns[Dimensions]:
... return Dimensions(image_id + 10, image_id + 20)
>>>
>>> table.add_columns(dimensions) # adds "height" and "width" columns