Skip to content

UDF

geneva.udf

udf(
    func: Callable | None = None,
    *,
    data_type: DataType | None = None,
    version: str | None = None,
    cuda: bool = False,
    field_metadata: dict[str, str] | None = None,
    input_columns: list[str] | None = None,
    num_cpus: int | float | None = None,
    num_gpus: int | float | None = None,
    batch_size: int | None = None,
    checkpoint_size: int | None = None,
    min_checkpoint_size: int | None = 1,
    max_checkpoint_size: int | None = None,
    task_size: int | None = None,
    on_error: list[ExceptionMatcher]
    | ErrorHandlingConfig
    | None = None,
    error_handling: Optional[ErrorHandlingConfig] = None,
    **kwargs,
) -> UDF | partial

Decorator of a User Defined Function (UDF).

Parameters:

  • func (Callable | None, default: None ) –

    The callable to be decorated. If None, returns a partial function.

  • data_type (DataType | None, default: None ) –

    The data type of the output PyArrow Array from the UDF. If None, it will be inferred from the function signature.

  • version (str | None, default: None ) –

    A version string to manage the changes of function. If not provided, it will use the hash of the serialized function.

  • cuda (bool, default: False ) –

    If true, load CUDA optimized kernels. Equvalent to num_gpus=1

  • field_metadata (dict[str, str] | None, default: None ) –

    A dictionary of metadata to be attached to the output pyarrow.Field.

  • input_columns (list[str] | None, default: None ) –

    A list of input column names for the UDF. If not provided, it will be inferred from the function signature. Or scan all columns.

  • num_cpus (int | float | None, default: None ) –

    The (fraction) number of CPUs to acquire to run the job.

  • num_gpus (int | float | None, default: None ) –

    The (fraction) number of GPUs to acquire to run the job. Default 0.

  • batch_size (int | None, default: None ) –

    Legacy parameter controlling map/read batch size. Prefer checkpoint_size.

  • checkpoint_size (int | None, default: None ) –

    Alias for batch_size; preferred for overriding map-task batch size. When adaptive sizing is enabled, an explicit checkpoint_size seeds the initial checkpoint size; otherwise the initial size defaults to min_checkpoint_size.

  • min_checkpoint_size (int | None, default: 1 ) –

    Minimum adaptive checkpoint size (lower bound). Defaults to 1.

  • max_checkpoint_size (int | None, default: None ) –

    Maximum adaptive checkpoint size (upper bound). This also caps the largest read batch and thus the maximum memory footprint per batch.

  • task_size (int | None, default: None ) –

    Preferred read-task size for jobs that don't specify an explicit task_size. This is advisory and may be overridden by job-level parameters.

  • on_error (list[ExceptionMatcher] | ErrorHandlingConfig | None, default: None ) –

    Simplified error handling configuration. Can be: - A factory function: retry_transient(), retry_all(), skip_on_error() - A list of matchers: [Retry(...), Skip(...), Fail(...)]

    Examples::

    @udf(data_type=pa.int32(), on_error=retry_transient())
    def my_udf(x: int) -> int: ...
    
    @udf(data_type=pa.int32(), on_error=retry_transient(max_attempts=5))
    def my_udf(x: int) -> int: ...
    
    @udf(
        data_type=pa.int32(),
        on_error=[
            Retry(ConnectionError, TimeoutError, max_attempts=3),
            Retry(ValueError, match="rate limit", max_attempts=5),
            Skip(ValueError),
        ]
    )
    def my_udf(x: int) -> int: ...
    
  • error_handling (Optional[ErrorHandlingConfig], default: None ) –

    Advanced error handling configuration using tenacity. Use this for full control over retry behavior with custom callbacks. Cannot be used together with on_error.

Notes
  • Column/parameter mapping: For scalar and array UDFs, parameter names map directly to input column names. If you want a column to be delivered as a numpy.ndarray without extra copies, annotate the parameter as numpy.ndarray and ensure the column's Arrow type is a list (pa.list_/pa.large_list/pa.fixed_size_list). Other column types continue to be passed as Python scalars/objects.
  • Python lists: When a parameter is annotated as list[...], the column must be an Arrow list/large_list/fixed_size_list. In that case each value is delivered to the UDF as a Python list instead of a numpy array.
  • Return type with numpy.ndarray: If your function returns a numpy.ndarray, you must provide an explicit data_type (for example, pa.list_(pa.float32())); the ndarray shape/dtype cannot be inferred automatically from the annotation alone.

geneva.transformer.UDF

Bases: Callable[[RecordBatch], Array]

User-defined function (UDF) to be applied to a Lance Table.

func

func: Callable = field()

name

name: str = field(default='')

data_type

data_type: DataType = field(default=None)

version

version: str = field(default='')

cuda

cuda: Optional[bool] = field(default=False)

num_cpus

num_cpus: Optional[float] = field(
    default=1.0,
    converter=lambda v: None if v is None else float(v),
    validator=optional(ge(0.0)),
    on_setattr=[convert, validate],
)

memory

memory: int | None = field(default=None)

geneva.transformer.UDFArgType

Bases: Enum

The type of arguments that the UDF expects.

SCALAR

SCALAR = 0

ARRAY

ARRAY = 1

RECORD_BATCH

RECORD_BATCH = 2