Skip to content

Error Handling

Exception Matchers

geneva.debug.error_store.Retry

Bases: ExceptionMatcher

Retry on matching exceptions with backoff

Parameters:

  • *exceptions (type[Exception], default: () ) –

    Exception types to match

  • match (str, default: None ) –

    Regex pattern to match in exception message. Simple strings work as substring matches (e.g., "rate limit"). Use (?i) for case-insensitive matching.

  • max_attempts (int, default: 3 ) –

    Maximum number of attempts (default: 3)

  • backoff (str, default: 'exponential' ) –

    Backoff strategy: "exponential" (default), "fixed", or "linear"

Examples:

Retry(ConnectionError, TimeoutError, max_attempts=3)
Retry(ValueError, match="rate limit", max_attempts=5)
Retry(APIError, match=r"429|rate.?limit")
Retry(APIError, match=r"(?i)rate limit")  # case-insensitive

max_attempts

max_attempts: int = field(default=3)

backoff

backoff: str = field(
    default="exponential", validator=_validate_backoff
)

geneva.debug.error_store.Skip

Bases: ExceptionMatcher

Skip row (return None) on matching exceptions

Parameters:

  • *exceptions (type[Exception], default: () ) –

    Exception types to match

  • match (str, default: None ) –

    Regex pattern to match in exception message

Examples:

Skip(ValueError, KeyError)
Skip(ValueError, match="invalid input")

geneva.debug.error_store.Fail

Bases: ExceptionMatcher

Fail job immediately on matching exceptions

Parameters:

  • *exceptions (type[Exception], default: () ) –

    Exception types to match

  • match (str, default: None ) –

    Regex pattern to match in exception message

Examples:

Fail(AuthError)
Fail(ValueError, match="fatal")

Helper Functions

geneva.debug.error_store.retry_transient

retry_transient(
    max_attempts: int = 3, backoff: str = "exponential"
) -> list[ExceptionMatcher]

Retry transient network errors (ConnectionError, TimeoutError, OSError).

Parameters:

  • max_attempts (int, default: 3 ) –

    Maximum number of attempts (default: 3)

  • backoff (str, default: 'exponential' ) –

    Backoff strategy: "exponential" (default), "fixed", or "linear"

Returns:

  • list[ExceptionMatcher]

    Matcher list for use with on_error parameter

Examples:

@udf(data_type=pa.int32(), on_error=retry_transient())
@udf(data_type=pa.int32(), on_error=retry_transient(max_attempts=5))

geneva.debug.error_store.retry_all

retry_all(
    max_attempts: int = 3, backoff: str = "exponential"
) -> list[ExceptionMatcher]

Retry any exception.

Parameters:

  • max_attempts (int, default: 3 ) –

    Maximum number of attempts (default: 3)

  • backoff (str, default: 'exponential' ) –

    Backoff strategy: "exponential" (default), "fixed", or "linear"

Returns:

  • list[ExceptionMatcher]

    Matcher list for use with on_error parameter

Examples:

@udf(data_type=pa.int32(), on_error=retry_all())
@udf(data_type=pa.int32(), on_error=retry_all(max_attempts=5))

geneva.debug.error_store.skip_on_error

skip_on_error() -> list[ExceptionMatcher]

Skip (return None) for any exception.

Returns:

  • list[ExceptionMatcher]

    Matcher list for use with on_error parameter

Examples:

@udf(data_type=pa.int32(), on_error=skip_on_error())

geneva.debug.error_store.fail_fast

fail_fast() -> list[ExceptionMatcher]

Fail immediately on any exception (default behavior).

Returns:

  • list[ExceptionMatcher]

    Empty matcher list (no special handling)

Examples:

@udf(data_type=pa.int32(), on_error=fail_fast())

Configuration

geneva.debug.error_store.ErrorHandlingConfig

Configuration for UDF error handling behavior

retry_config

retry_config: UDFRetryConfig = field(factory=no_retry)

fault_isolation

fault_isolation: FaultIsolation = field(default=FAIL_BATCH)

log_errors

log_errors: bool = field(default=True)

log_retry_attempts

log_retry_attempts: bool = field(default=False)

validate_compatibility

validate_compatibility(map_task) -> None

Validate that this error config is compatible with the given task

Parameters:

  • map_task

    The MapTask to validate against

Raises:

  • ValueError

    If SKIP_ROWS is used with RecordBatch UDF

geneva.debug.error_store.UDFRetryConfig

Retry configuration for UDF execution using tenacity semantics

retry

retry: retry_base = field(
    factory=lambda: retry_if_exception_type(())
)

stop

stop: stop_base = field(
    factory=lambda: stop_after_attempt(1)
)

wait

wait: wait_base = field(
    factory=lambda: wait_exponential(
        multiplier=1, min=1, max=60
    )
)

before_sleep

before_sleep: Callable[[RetryCallState], None] | None = (
    field(default=None)
)

after_attempt

after_attempt: Callable[[RetryCallState], None] | None = (
    field(default=None)
)

reraise

reraise: bool = field(default=True)

no_retry

no_retry() -> UDFRetryConfig

No retries - fail immediately (default behavior)

retry_transient

retry_transient(max_attempts: int = 3) -> UDFRetryConfig

Retry common transient errors (network, timeouts)

Parameters:

  • max_attempts (int, default: 3 ) –

    Maximum number of attempts including the initial try

geneva.debug.error_store.ErrorRecord

UDF execution error record, stored in geneva_errors table

error_id

error_id: str = field(factory=lambda: str(uuid4()))

error_type

error_type: str = field()

error_message

error_message: str = field()

error_trace

error_trace: str = field()

job_id

job_id: str = field()

table_uri

table_uri: str = field()

table_name

table_name: str = field()

table_version

table_version: Optional[int] = field(default=None)

column_name

column_name: str = field()

udf_name

udf_name: str = field()

udf_version

udf_version: str = field()

actor_id

actor_id: Optional[str] = field(default=None)

fragment_id

fragment_id: Optional[int] = field(default=None)

batch_index

batch_index: int = field()

row_address

row_address: Optional[int] = field(default=None)

attempt

attempt: int = field(default=1)

max_attempts

max_attempts: int = field(default=1)

timestamp

timestamp: datetime = field(
    factory=dt_now_utc,
    metadata={"pa_type": timestamp("us", tz="UTC")},
)