Skip to content

Error Handling

Scalar UDFs can also set @geneva.udf(timeout=...) to enforce a per-row timeout in seconds. Timeout failures surface as ordinary TimeoutError, so existing on_error, retry, and skip_on_error() behavior applies unchanged.

This timeout support is scalar-only and uses Unix SIGALRM / setitimer semantics at runtime. It also requires execution on the worker process main thread. It works inside MultiProcessBatchApplier worker subprocesses because each child executes batches on its own main thread, but it still uses process-global signal state. UDFs or libraries that install their own SIGALRM handlers/timers, or that depend on main-thread signal behavior while coordinating work in background threads, may be incompatible.

Fatal Worker Errors

Scalar UDF backfill also classifies fatal worker loss into public Geneva exception types:

  • FatalWorkerTransientError
  • FatalWorkerOOMError
  • FatalWorkerCrashError
  • FatalWorkerExitError

By default, FatalWorkerTransientError is retried up to 3 task-level attempts, even if the user does not specify an on_error policy.

If the user provides on_error and does not explicitly match FatalWorkerTransientError, Geneva still applies that default retry behavior. If the user does explicitly match it, the user configuration wins.

Examples:

from geneva import Retry, Fail, udf
from geneva.errors import FatalWorkerTransientError

@udf(
    data_type=pa.int64(),
    on_error=[Retry(FatalWorkerTransientError, max_attempts=5)],
)
def retry_infra_failures(x: int) -> int:
    return x * 2


@udf(
    data_type=pa.int64(),
    on_error=[Fail(FatalWorkerTransientError)],
)
def fail_on_infra_failures(x: int) -> int:
    return x * 2

Exception Matchers

geneva.debug.error_store.Retry

Bases: ExceptionMatcher

Retry on matching exceptions with backoff

Parameters:

  • *exceptions (type[Exception], default: () ) –

    Exception types to match

  • match (str, default: None ) –

    Regex pattern to match in exception message. Simple strings work as substring matches (e.g., "rate limit"). Use (?i) for case-insensitive matching.

  • max_attempts (int, default: 3 ) –

    Maximum number of attempts (default: 3)

  • backoff (str, default: 'exponential' ) –

    Backoff strategy: "exponential" (default), "fixed", or "linear"

Examples:

Retry(ConnectionError, TimeoutError, max_attempts=3)
Retry(ValueError, match="rate limit", max_attempts=5)
Retry(APIError, match=r"429|rate.?limit")
Retry(APIError, match=r"(?i)rate limit")  # case-insensitive

max_attempts

max_attempts: int = field(default=3)

backoff

backoff: str = field(
    default="exponential", validator=_validate_backoff
)

geneva.debug.error_store.Skip

Bases: ExceptionMatcher

Skip row (return None) on matching exceptions

Parameters:

  • *exceptions (type[Exception], default: () ) –

    Exception types to match

  • match (str, default: None ) –

    Regex pattern to match in exception message

  • max_skip_count (int, default: None ) –

    Maximum number of rows that can be skipped before failing the job. Only used when passed via skip_on_error().

  • max_skip_fraction (float, default: None ) –

    Maximum fraction of rows (0.0-1.0) that can be skipped before failing the job. Only used when passed via skip_on_error().

Examples:

Skip(ValueError, KeyError)
Skip(ValueError, match="invalid input")

max_skip_count

max_skip_count: int | None = field(default=None)

max_skip_fraction

max_skip_fraction: float | None = field(default=None)

geneva.debug.error_store.Fail

Bases: ExceptionMatcher

Fail job immediately on matching exceptions

Parameters:

  • *exceptions (type[Exception], default: () ) –

    Exception types to match

  • match (str, default: None ) –

    Regex pattern to match in exception message

Examples:

Fail(AuthError)
Fail(ValueError, match="fatal")

Helper Functions

geneva.debug.error_store.retry_transient

retry_transient(
    max_attempts: int = 3, backoff: str = "exponential"
) -> list[ExceptionMatcher]

Retry transient network errors (ConnectionError, TimeoutError, OSError).

Parameters:

  • max_attempts (int, default: 3 ) –

    Maximum number of attempts (default: 3)

  • backoff (str, default: 'exponential' ) –

    Backoff strategy: "exponential" (default), "fixed", or "linear"

Returns:

  • list[ExceptionMatcher]

    Matcher list for use with on_error parameter

Examples:

@udf(data_type=pa.int32(), on_error=retry_transient())
@udf(data_type=pa.int32(), on_error=retry_transient(max_attempts=5))

geneva.debug.error_store.retry_all

retry_all(
    max_attempts: int = 3, backoff: str = "exponential"
) -> list[ExceptionMatcher]

Retry any exception.

Parameters:

  • max_attempts (int, default: 3 ) –

    Maximum number of attempts (default: 3)

  • backoff (str, default: 'exponential' ) –

    Backoff strategy: "exponential" (default), "fixed", or "linear"

Returns:

  • list[ExceptionMatcher]

    Matcher list for use with on_error parameter

Examples:

@udf(data_type=pa.int32(), on_error=retry_all())
@udf(data_type=pa.int32(), on_error=retry_all(max_attempts=5))

geneva.debug.error_store.skip_on_error

skip_on_error(
    max_skip_count: int | None = None,
    max_skip_fraction: float | None = None,
) -> list[ExceptionMatcher]

Skip (return None) for any exception.

Parameters:

  • max_skip_count (int, default: None ) –

    Maximum number of rows that can be skipped before the job fails. If both max_skip_count and max_skip_fraction are set, whichever threshold is hit first triggers failure.

  • max_skip_fraction (float, default: None ) –

    Maximum fraction of rows (0.0-1.0) that can be skipped before the job fails. The fraction is computed as skipped / total_processed_so_far.

Returns:

  • list[ExceptionMatcher]

    Matcher list for use with on_error parameter

Examples:

@udf(data_type=pa.int32(), on_error=skip_on_error())
@udf(data_type=pa.int32(), on_error=skip_on_error(max_skip_count=100))
@udf(data_type=pa.int32(), on_error=skip_on_error(max_skip_fraction=0.05))
@udf(
    data_type=pa.int32(),
    on_error=skip_on_error(max_skip_count=100, max_skip_fraction=0.05),
)

geneva.debug.error_store.fail_fast

fail_fast() -> list[ExceptionMatcher]

Fail immediately on any exception (default behavior).

Returns:

  • list[ExceptionMatcher]

    Empty matcher list (no special handling)

Examples:

@udf(data_type=pa.int32(), on_error=fail_fast())

Configuration

geneva.debug.error_store.ErrorHandlingConfig

Configuration for UDF error handling behavior

retry_config

retry_config: UDFRetryConfig = field(factory=no_retry)

fault_isolation

fault_isolation: FaultIsolation = field(default=FAIL_BATCH)

log_errors

log_errors: bool = field(default=True)

log_retry_attempts

log_retry_attempts: bool = field(default=False)

max_skip_count

max_skip_count: int | None = field(default=None)

max_skip_fraction

max_skip_fraction: float | None = field(default=None)

validate_compatibility

validate_compatibility(map_task) -> None

Validate that this error config is compatible with the given task

Parameters:

  • map_task

    The MapTask to validate against

Raises:

  • ValueError

    If SKIP_ROWS is used with RecordBatch UDF

geneva.debug.error_store.UDFRetryConfig

Retry configuration for UDF execution using tenacity semantics

retry

retry: retry_base = field(
    factory=lambda: retry_if_exception_type(())
)

stop

stop: stop_base = field(
    factory=lambda: stop_after_attempt(1)
)

wait

wait: wait_base = field(
    factory=lambda: wait_exponential(
        multiplier=1, min=1, max=60
    )
)

before_sleep

before_sleep: Callable[[RetryCallState], None] | None = (
    field(default=None)
)

after_attempt

after_attempt: Callable[[RetryCallState], None] | None = (
    field(default=None)
)

reraise

reraise: bool = field(default=True)

no_retry

no_retry() -> UDFRetryConfig

No retries - fail immediately (default behavior)

retry_transient

retry_transient(max_attempts: int = 3) -> UDFRetryConfig

Retry common transient errors (network, timeouts)

Parameters:

  • max_attempts (int, default: 3 ) –

    Maximum number of attempts including the initial try

geneva.debug.error_store.ErrorRecord

UDF execution error record, stored in geneva_errors table

error_id

error_id: str = field(factory=lambda: str(uuid4()))

error_type

error_type: str = field()

error_message

error_message: str = field()

error_trace

error_trace: str = field()

job_id

job_id: str = field()

table_uri

table_uri: str = field()

table_name

table_name: str = field()

table_version

table_version: Optional[int] = field(default=None)

column_name

column_name: str = field()

udf_name

udf_name: str = field()

udf_version

udf_version: str = field()

actor_id

actor_id: Optional[str] = field(default=None)

fragment_id

fragment_id: Optional[int] = field(default=None)

batch_index

batch_index: int = field()

row_address

row_address: Optional[int] = field(default=None)

attempt

attempt: int = field(default=1)

max_attempts

max_attempts: int = field(default=1)

timestamp

timestamp: datetime = field(
    factory=dt_now_utc,
    metadata={"pa_type": timestamp("us", tz="UTC")},
)