Skip to content

UDF

geneva.udf

udf(
    func: Callable | None = None,
    *,
    data_type: DataType | None = None,
    version: str | None = None,
    cuda: bool = False,
    field_metadata: dict[str, str] | None = None,
    input_columns: list[str] | None = None,
    num_cpus: int | float | None = None,
    num_gpus: int | float | None = None,
    memory: int | None = None,
    batch_size: int | None = None,
    checkpoint_size: int | None = None,
    min_checkpoint_size: int | None = 1,
    max_checkpoint_size: int | None = None,
    task_size: int | None = None,
    timeout: float | None = None,
    on_error: list[ExceptionMatcher]
    | ErrorHandlingConfig
    | None = None,
    error_handling: Optional[ErrorHandlingConfig] = None,
    auto_backfill: bool = False,
    manifest: GenevaManifest | None = None,
    **kwargs,
) -> UDF | partial

Decorator of a User Defined Function (UDF).

Parameters:

  • func (Callable | None, default: None ) –

    The callable to be decorated. If None, returns a partial function.

  • data_type (DataType | None, default: None ) –

    The data type of the output PyArrow Array from the UDF. If None, it will be inferred from the function signature.

  • version (str | None, default: None ) –

    A version string to manage the changes of function. If not provided, it will use the hash of the serialized function.

  • cuda (bool, default: False ) –

    If true, load CUDA optimized kernels. Equvalent to num_gpus=1

  • field_metadata (dict[str, str] | None, default: None ) –

    A dictionary of metadata to be attached to the output pyarrow.Field.

  • input_columns (list[str] | None, default: None ) –

    A list of input column names for the UDF. If not provided, it will be inferred from the function signature. Or scan all columns. Names may also refer to columns produced by an optional preprocess() step (see Notes); the validator skips the source-schema existence check for those.

  • num_cpus (int | float | None, default: None ) –

    The (fraction) number of CPUs to acquire to run the job.

  • num_gpus (int | float | None, default: None ) –

    The (fraction) number of GPUs to acquire to run the job. Default 0.

  • memory (int | None, default: None ) –

    The amount of memory in bytes to acquire to run the job. Used by admission control to validate cluster resources before starting.

  • batch_size (int | None, default: None ) –

    Legacy parameter controlling map/read batch size. Prefer checkpoint_size.

  • checkpoint_size (int | None, default: None ) –

    Alias for batch_size; preferred for overriding map-task batch size. When adaptive sizing is enabled, an explicit checkpoint_size seeds the initial checkpoint size; otherwise the initial size defaults to min_checkpoint_size.

  • min_checkpoint_size (int | None, default: 1 ) –

    Minimum adaptive checkpoint size (lower bound). Defaults to 1.

  • max_checkpoint_size (int | None, default: None ) –

    Maximum adaptive checkpoint size (upper bound). This also caps the largest read batch and thus the maximum memory footprint per batch.

  • task_size (int | None, default: None ) –

    Preferred read-task size for jobs that don't specify an explicit task_size. This is advisory and may be overridden by job-level parameters.

  • timeout (float | None, default: None ) –

    Per-row scalar UDF timeout in seconds. Each retry attempt gets a fresh timeout budget. Only supported for scalar UDFs, and requires execution on a Unix-like worker process main thread. This uses process-global SIGALRM / ITIMER_REAL state, so UDFs or libraries that install their own signal handlers/timers, or that depend on main-thread signal behavior while coordinating work in background threads, may be incompatible.

  • on_error (list[ExceptionMatcher] | ErrorHandlingConfig | None, default: None ) –

    Simplified error handling configuration. Can be: - A factory function: retry_transient(), retry_all(), skip_on_error() - A list of matchers: [Retry(...), Skip(...), Fail(...)]

    Scalar UDF backfill also exposes fatal worker-loss exceptions such as FatalWorkerTransientError. By default, transient fatal worker loss is retried up to 3 task-level attempts. If you explicitly match FatalWorkerTransientError in on_error, your matcher overrides that default.

    Examples
    @udf(data_type=pa.int32(), on_error=retry_transient())
    def my_udf(x: int) -> int: ...
    
    @udf(data_type=pa.int32(), on_error=retry_transient(max_attempts=5))
    def my_udf(x: int) -> int: ...
    
    @udf(
        data_type=pa.int32(),
        on_error=[
            Retry(ConnectionError, TimeoutError, max_attempts=3),
            Retry(ValueError, match="rate limit", max_attempts=5),
            Skip(ValueError),
        ]
    )
    def my_udf(x: int) -> int: ...
    
    @udf(
        data_type=pa.int32(),
        on_error=[Retry(FatalWorkerTransientError, max_attempts=5)],
    )
    def my_udf(x: int) -> int: ...
    
  • error_handling (Optional[ErrorHandlingConfig], default: None ) –

    Advanced error handling configuration using tenacity. Use this for full control over retry behavior with custom callbacks. Cannot be used together with on_error.

  • auto_backfill (bool, default: False ) –

    Automatically backfill this column asynchronously in LanceDB Enterprise when data or UDF version changes. Default: False

  • manifest (GenevaManifest | None, default: None ) –

    Optional execution-environment spec (image, pip deps, py_modules, captured workspace zips). Built via GenevaManifest.create_pip(), .create_conda(), or Connection.capture_local_environment(). When set, the manifest is snapshotted into the column's field metadata at add_columns time so the backfill executor can reconstruct the same environment without consulting any external registry. When omitted, native columns fall back to the embedded image/tag in the UDFSpec envelope, and remote columns fall back to the deployment-default manifest resolved server-side. Default: None.

Notes
  • Column/parameter mapping: For scalar and array UDFs, parameter names map directly to input column names. If you want a column to be delivered as a numpy.ndarray without extra copies, annotate the parameter as numpy.ndarray and ensure the column's Arrow type is a list (pa.list_/pa.large_list/pa.fixed_size_list). Other column types continue to be passed as Python scalars/objects.
  • Python lists: When a parameter is annotated as list[...], the column must be an Arrow list/large_list/fixed_size_list. In that case each value is delivered to the UDF as a Python list instead of a numpy array.
  • Return type with numpy.ndarray: If your function returns a numpy.ndarray, you must provide an explicit data_type (for example, pa.list_(pa.float32())); the ndarray shape/dtype cannot be inferred automatically from the annotation alone.
  • Optional preprocess() hook: Stateful (class-based) UDFs may declare a preprocess(self, batch: pa.RecordBatch) -> pa.RecordBatch method. When GPU pipelining is enabled, the framework runs preprocess() in a pool of reader threads before dispatching __call__, letting CPU-side decode / transform / tokenize overlap with GPU compute on previous batches. The contract:

  • preprocess() returns a RecordBatch whose columns include every name listed in input_columns for __call__.

  • Coupling between preprocess() and __call__ is by column name only — the framework dispatches __call__ by pulling input_columns out of the post-preprocess batch. There is no type/shape metadata flowing between the two; if names mismatch, the failure is a runtime KeyError on the first batch.
  • Names introduced by preprocess() are user-chosen. Conventional practice is a non-user-facing prefix (_pp_*) to avoid colliding with persisted columns.

preprocess() runs only when GPU pipelining is enabled (config enable_gpu_pipelining / env JOB__ENABLE_GPU_PIPELINING=true). For best throughput the hot path inside preprocess() should release the GIL — use native libraries (cv2, HF tokenizers, torchaudio, numpy vector ops) rather than per-element Python loops.

geneva.transformer.UDF

Bases: Callable[[RecordBatch], Array]

User-defined function (UDF) to be applied to a Lance Table.

func

func: Callable = field()

name

name: str = field(default='')

data_type

data_type: DataType = field(default=None)

version

version: str = field(default='')

cuda

cuda: Optional[bool] = field(default=False)

num_cpus

num_cpus: Optional[float] = field(
    default=1.0,
    converter=lambda v: None if v is None else float(v),
    validator=optional(ge(0.0)),
    on_setattr=[convert, validate],
)

memory

memory: int | None = field(default=None)

geneva.transformer.Columns

Bases: Generic[_ColumnsT]

Return annotation marker for UDFs that produce multiple columns.

Annotating a UDF's return type as Columns[T] declares that the UDF emits a struct whose top-level fields are unpacked into sibling table columns at add_columns time. T must currently be a NamedTuple; the framework infers the output struct schema from its field type annotations. Other types raise ValueError at decoration time.

Each top-level struct field becomes its own table column with the same name. To attach a Columns[T] UDF, pass it directly to Table.add_columns (not wrapped in a dict). The sibling columns share a backfill group and must be backfilled and dropped together.

Supply an explicit data_type=pa.struct([...]) on @udf when a field needs Arrow metadata (e.g. lance-encoding:blob); T is still a NamedTuple in that case — data_type only overrides the inferred schema, it does not unlock non-NamedTuple T.

Examples:

>>> from typing import NamedTuple
>>> import geneva
>>> from geneva import udf
>>>
>>> class Dimensions(NamedTuple):
...     height: int
...     width: int
>>>
>>> @udf
... def dimensions(image_id: int) -> geneva.Columns[Dimensions]:
...     return Dimensions(image_id + 10, image_id + 20)
>>>
>>> table.add_columns(dimensions)  # adds "height" and "width" columns

geneva.transformer.UDFArgType

Bases: Enum

The type of arguments that the UDF expects.

SCALAR

SCALAR = 0

ARRAY

ARRAY = 1

RECORD_BATCH

RECORD_BATCH = 2