UDF
geneva.udf
udf(
func: Callable | None = None,
*,
data_type: DataType | None = None,
version: str | None = None,
cuda: bool = False,
field_metadata: dict[str, str] | None = None,
input_columns: list[str] | None = None,
num_cpus: int | float | None = None,
num_gpus: int | float | None = None,
batch_size: int | None = None,
checkpoint_size: int | None = None,
min_checkpoint_size: int | None = 1,
max_checkpoint_size: int | None = None,
task_size: int | None = None,
on_error: list[ExceptionMatcher]
| ErrorHandlingConfig
| None = None,
error_handling: Optional[ErrorHandlingConfig] = None,
**kwargs,
) -> UDF | partial
Decorator of a User Defined Function (UDF).
Parameters:
-
func(Callable | None, default:None) –The callable to be decorated. If None, returns a partial function.
-
data_type(DataType | None, default:None) –The data type of the output PyArrow Array from the UDF. If None, it will be inferred from the function signature.
-
version(str | None, default:None) –A version string to manage the changes of function. If not provided, it will use the hash of the serialized function.
-
cuda(bool, default:False) –If true, load CUDA optimized kernels. Equvalent to num_gpus=1
-
field_metadata(dict[str, str] | None, default:None) –A dictionary of metadata to be attached to the output
pyarrow.Field. -
input_columns(list[str] | None, default:None) –A list of input column names for the UDF. If not provided, it will be inferred from the function signature. Or scan all columns.
-
num_cpus(int | float | None, default:None) –The (fraction) number of CPUs to acquire to run the job.
-
num_gpus(int | float | None, default:None) –The (fraction) number of GPUs to acquire to run the job. Default 0.
-
batch_size(int | None, default:None) –Legacy parameter controlling map/read batch size. Prefer checkpoint_size.
-
checkpoint_size(int | None, default:None) –Alias for batch_size; preferred for overriding map-task batch size. When adaptive sizing is enabled, an explicit checkpoint_size seeds the initial checkpoint size; otherwise the initial size defaults to min_checkpoint_size.
-
min_checkpoint_size(int | None, default:1) –Minimum adaptive checkpoint size (lower bound). Defaults to 1.
-
max_checkpoint_size(int | None, default:None) –Maximum adaptive checkpoint size (upper bound). This also caps the largest read batch and thus the maximum memory footprint per batch.
-
task_size(int | None, default:None) –Preferred read-task size for jobs that don't specify an explicit
task_size. This is advisory and may be overridden by job-level parameters. -
on_error(list[ExceptionMatcher] | ErrorHandlingConfig | None, default:None) –Simplified error handling configuration. Can be: - A factory function: retry_transient(), retry_all(), skip_on_error() - A list of matchers: [Retry(...), Skip(...), Fail(...)]
Examples::
@udf(data_type=pa.int32(), on_error=retry_transient()) def my_udf(x: int) -> int: ... @udf(data_type=pa.int32(), on_error=retry_transient(max_attempts=5)) def my_udf(x: int) -> int: ... @udf( data_type=pa.int32(), on_error=[ Retry(ConnectionError, TimeoutError, max_attempts=3), Retry(ValueError, match="rate limit", max_attempts=5), Skip(ValueError), ] ) def my_udf(x: int) -> int: ... -
error_handling(Optional[ErrorHandlingConfig], default:None) –Advanced error handling configuration using tenacity. Use this for full control over retry behavior with custom callbacks. Cannot be used together with
on_error.
Notes
- Column/parameter mapping: For scalar and array UDFs, parameter names map
directly to input column names. If you want a column to be delivered as a
numpy.ndarraywithout extra copies, annotate the parameter asnumpy.ndarrayand ensure the column's Arrow type is a list (pa.list_/pa.large_list/pa.fixed_size_list). Other column types continue to be passed as Python scalars/objects. - Python lists: When a parameter is annotated as
list[...], the column must be an Arrow list/large_list/fixed_size_list. In that case each value is delivered to the UDF as a Python list instead of a numpy array. - Return type with numpy.ndarray: If your function returns a
numpy.ndarray, you must provide an explicitdata_type(for example,pa.list_(pa.float32())); the ndarray shape/dtype cannot be inferred automatically from the annotation alone.
geneva.transformer.UDF
Bases: Callable[[RecordBatch], Array]
User-defined function (UDF) to be applied to a Lance Table.