Skip to content

Error Handling

Exception Matchers

geneva.debug.error_store.Retry

Bases: ExceptionMatcher

Retry on matching exceptions with backoff

Parameters:

  • *exceptions (type[Exception], default: () ) –

    Exception types to match

  • match (str, default: None ) –

    Regex pattern to match in exception message. Simple strings work as substring matches (e.g., "rate limit"). Use (?i) for case-insensitive matching.

  • max_attempts (int, default: 3 ) –

    Maximum number of attempts (default: 3)

  • backoff (str, default: 'exponential' ) –

    Backoff strategy: "exponential" (default), "fixed", or "linear"

Examples:

>>> Retry(ConnectionError, TimeoutError, max_attempts=3)
>>> Retry(ValueError, match="rate limit", max_attempts=5)
>>> Retry(APIError, match=r"429|rate.?limit")
>>> Retry(APIError, match=r"(?i)rate limit")  # case-insensitive
Source code in geneva/debug/error_store.py
@attrs.define
class Retry(ExceptionMatcher):
    """Retry on matching exceptions with backoff

    Parameters
    ----------
    *exceptions : type[Exception]
        Exception types to match
    match : str, optional
        Regex pattern to match in exception message.
        Simple strings work as substring matches (e.g., "rate limit").
        Use (?i) for case-insensitive matching.
    max_attempts : int
        Maximum number of attempts (default: 3)
    backoff : str
        Backoff strategy: "exponential" (default), "fixed", or "linear"

    Examples
    --------
    >>> Retry(ConnectionError, TimeoutError, max_attempts=3)
    >>> Retry(ValueError, match="rate limit", max_attempts=5)
    >>> Retry(APIError, match=r"429|rate.?limit")
    >>> Retry(APIError, match=r"(?i)rate limit")  # case-insensitive
    """

    max_attempts: int = attrs.field(default=3)
    backoff: str = attrs.field(default="exponential", validator=_validate_backoff)

    def __init__(
        self,
        *exceptions: type[Exception],
        match: Optional[str] = None,
        max_attempts: int = 3,
        backoff: str = "exponential",
    ) -> None:
        # Handle both Retry(E1, E2) and Retry((E1, E2)) syntax
        if len(exceptions) == 1 and isinstance(exceptions[0], (tuple, list)):
            exceptions = tuple(exceptions[0])
        self.__attrs_init__(  # pyright: ignore[reportAttributeAccessIssue]
            exceptions=exceptions,
            match=match,
            max_attempts=max_attempts,
            backoff=backoff,
        )

max_attempts

max_attempts: int = field(default=3)

backoff

backoff: str = field(
    default="exponential", validator=_validate_backoff
)

geneva.debug.error_store.Skip

Bases: ExceptionMatcher

Skip row (return None) on matching exceptions

Parameters:

  • *exceptions (type[Exception], default: () ) –

    Exception types to match

  • match (str, default: None ) –

    Regex pattern to match in exception message

Examples:

>>> Skip(ValueError, KeyError)
>>> Skip(ValueError, match="invalid input")
Source code in geneva/debug/error_store.py
@attrs.define
class Skip(ExceptionMatcher):
    """Skip row (return None) on matching exceptions

    Parameters
    ----------
    *exceptions : type[Exception]
        Exception types to match
    match : str, optional
        Regex pattern to match in exception message

    Examples
    --------
    >>> Skip(ValueError, KeyError)
    >>> Skip(ValueError, match="invalid input")
    """

    def __init__(
        self,
        *exceptions: type[Exception],
        match: Optional[str] = None,
    ) -> None:
        if len(exceptions) == 1 and isinstance(exceptions[0], (tuple, list)):
            exceptions = tuple(exceptions[0])
        self.__attrs_init__(  # pyright: ignore[reportAttributeAccessIssue]
            exceptions=exceptions, match=match
        )

geneva.debug.error_store.Fail

Bases: ExceptionMatcher

Fail job immediately on matching exceptions

Parameters:

  • *exceptions (type[Exception], default: () ) –

    Exception types to match

  • match (str, default: None ) –

    Regex pattern to match in exception message

Examples:

>>> Fail(AuthError)
>>> Fail(ValueError, match="fatal")
Source code in geneva/debug/error_store.py
@attrs.define
class Fail(ExceptionMatcher):
    """Fail job immediately on matching exceptions

    Parameters
    ----------
    *exceptions : type[Exception]
        Exception types to match
    match : str, optional
        Regex pattern to match in exception message

    Examples
    --------
    >>> Fail(AuthError)
    >>> Fail(ValueError, match="fatal")
    """

    def __init__(
        self,
        *exceptions: type[Exception],
        match: Optional[str] = None,
    ) -> None:
        if len(exceptions) == 1 and isinstance(exceptions[0], (tuple, list)):
            exceptions = tuple(exceptions[0])
        self.__attrs_init__(  # pyright: ignore[reportAttributeAccessIssue]
            exceptions=exceptions, match=match
        )

Helper Functions

geneva.debug.error_store.retry_transient

retry_transient(
    max_attempts: int = 3, backoff: str = "exponential"
) -> list[ExceptionMatcher]

Retry transient network errors (ConnectionError, TimeoutError, OSError).

Parameters:

  • max_attempts (int, default: 3 ) –

    Maximum number of attempts (default: 3)

  • backoff (str, default: 'exponential' ) –

    Backoff strategy: "exponential" (default), "fixed", or "linear"

Returns:

  • list[ExceptionMatcher]

    Matcher list for use with on_error parameter

Examples:

>>> @udf(data_type=pa.int32(), on_error=retry_transient())
>>> @udf(data_type=pa.int32(), on_error=retry_transient(max_attempts=5))
Source code in geneva/debug/error_store.py
def retry_transient(
    max_attempts: int = 3,
    backoff: str = "exponential",
) -> list[ExceptionMatcher]:
    """Retry transient network errors (ConnectionError, TimeoutError, OSError).

    Parameters
    ----------
    max_attempts : int
        Maximum number of attempts (default: 3)
    backoff : str
        Backoff strategy: "exponential" (default), "fixed", or "linear"

    Returns
    -------
    list[ExceptionMatcher]
        Matcher list for use with on_error parameter

    Examples
    --------
    >>> @udf(data_type=pa.int32(), on_error=retry_transient())
    >>> @udf(data_type=pa.int32(), on_error=retry_transient(max_attempts=5))
    """
    return [
        Retry(
            ConnectionError,
            TimeoutError,
            OSError,
            max_attempts=max_attempts,
            backoff=backoff,
        )
    ]

geneva.debug.error_store.retry_all

retry_all(
    max_attempts: int = 3, backoff: str = "exponential"
) -> list[ExceptionMatcher]

Retry any exception.

Parameters:

  • max_attempts (int, default: 3 ) –

    Maximum number of attempts (default: 3)

  • backoff (str, default: 'exponential' ) –

    Backoff strategy: "exponential" (default), "fixed", or "linear"

Returns:

  • list[ExceptionMatcher]

    Matcher list for use with on_error parameter

Examples:

>>> @udf(data_type=pa.int32(), on_error=retry_all())
>>> @udf(data_type=pa.int32(), on_error=retry_all(max_attempts=5))
Source code in geneva/debug/error_store.py
def retry_all(
    max_attempts: int = 3,
    backoff: str = "exponential",
) -> list[ExceptionMatcher]:
    """Retry any exception.

    Parameters
    ----------
    max_attempts : int
        Maximum number of attempts (default: 3)
    backoff : str
        Backoff strategy: "exponential" (default), "fixed", or "linear"

    Returns
    -------
    list[ExceptionMatcher]
        Matcher list for use with on_error parameter

    Examples
    --------
    >>> @udf(data_type=pa.int32(), on_error=retry_all())
    >>> @udf(data_type=pa.int32(), on_error=retry_all(max_attempts=5))
    """
    return [Retry(Exception, max_attempts=max_attempts, backoff=backoff)]

geneva.debug.error_store.skip_on_error

skip_on_error() -> list[ExceptionMatcher]

Skip (return None) for any exception.

Returns:

  • list[ExceptionMatcher]

    Matcher list for use with on_error parameter

Examples:

>>> @udf(data_type=pa.int32(), on_error=skip_on_error())
Source code in geneva/debug/error_store.py
def skip_on_error() -> list[ExceptionMatcher]:
    """Skip (return None) for any exception.

    Returns
    -------
    list[ExceptionMatcher]
        Matcher list for use with on_error parameter

    Examples
    --------
    >>> @udf(data_type=pa.int32(), on_error=skip_on_error())
    """
    return [Skip(Exception)]

geneva.debug.error_store.fail_fast

fail_fast() -> list[ExceptionMatcher]

Fail immediately on any exception (default behavior).

Returns:

  • list[ExceptionMatcher]

    Empty matcher list (no special handling)

Examples:

>>> @udf(data_type=pa.int32(), on_error=fail_fast())
Source code in geneva/debug/error_store.py
def fail_fast() -> list[ExceptionMatcher]:
    """Fail immediately on any exception (default behavior).

    Returns
    -------
    list[ExceptionMatcher]
        Empty matcher list (no special handling)

    Examples
    --------
    >>> @udf(data_type=pa.int32(), on_error=fail_fast())
    """
    return []

Configuration

geneva.debug.error_store.ErrorHandlingConfig

Configuration for UDF error handling behavior

Source code in geneva/debug/error_store.py
@attrs.define
class ErrorHandlingConfig:
    """Configuration for UDF error handling behavior"""

    # Retry policy using tenacity
    retry_config: UDFRetryConfig = attrs.field(factory=UDFRetryConfig.no_retry)

    # How to isolate failures
    fault_isolation: FaultIsolation = attrs.field(default=FaultIsolation.FAIL_BATCH)

    # Whether to log errors to the error table
    log_errors: bool = attrs.field(default=True)

    # Whether to log all retry attempts (not just final failures)
    log_retry_attempts: bool = attrs.field(default=False)

    # Internal: Store matchers for runtime exception matching (set by resolve_on_error)
    _matchers: Optional[list["ExceptionMatcher"]] = attrs.field(
        default=None, repr=False, alias="_matchers"
    )

    def validate_compatibility(self, map_task) -> None:
        """Validate that this error config is compatible with the given task

        Args:
            map_task: The MapTask to validate against

        Raises:
            ValueError: If SKIP_ROWS is used with RecordBatch UDF
        """
        from geneva.apply.task import BackfillUDFTask
        from geneva.transformer import UDFArgType

        if self.fault_isolation != FaultIsolation.SKIP_ROWS:
            return

        # SKIP_ROWS only works with scalar/array UDFs, not RecordBatch UDFs
        if isinstance(map_task, BackfillUDFTask):
            _, udf = next(iter(map_task.udfs.items()))
            if hasattr(udf, "arg_type") and udf.arg_type == UDFArgType.RECORD_BATCH:
                raise ValueError(
                    "SKIP_ROWS fault isolation cannot be used with "
                    "RecordBatch UDFs. RecordBatch UDFs process entire "
                    "batches and cannot skip individual rows. "
                    "Use FAIL_BATCH instead."
                )

retry_config

retry_config: UDFRetryConfig = field(factory=no_retry)

fault_isolation

fault_isolation: FaultIsolation = field(default=FAIL_BATCH)

log_errors

log_errors: bool = field(default=True)

log_retry_attempts

log_retry_attempts: bool = field(default=False)

validate_compatibility

validate_compatibility(map_task) -> None

Validate that this error config is compatible with the given task

Args: map_task: The MapTask to validate against

Raises: ValueError: If SKIP_ROWS is used with RecordBatch UDF

Source code in geneva/debug/error_store.py
def validate_compatibility(self, map_task) -> None:
    """Validate that this error config is compatible with the given task

    Args:
        map_task: The MapTask to validate against

    Raises:
        ValueError: If SKIP_ROWS is used with RecordBatch UDF
    """
    from geneva.apply.task import BackfillUDFTask
    from geneva.transformer import UDFArgType

    if self.fault_isolation != FaultIsolation.SKIP_ROWS:
        return

    # SKIP_ROWS only works with scalar/array UDFs, not RecordBatch UDFs
    if isinstance(map_task, BackfillUDFTask):
        _, udf = next(iter(map_task.udfs.items()))
        if hasattr(udf, "arg_type") and udf.arg_type == UDFArgType.RECORD_BATCH:
            raise ValueError(
                "SKIP_ROWS fault isolation cannot be used with "
                "RecordBatch UDFs. RecordBatch UDFs process entire "
                "batches and cannot skip individual rows. "
                "Use FAIL_BATCH instead."
            )

geneva.debug.error_store.UDFRetryConfig

Retry configuration for UDF execution using tenacity semantics

Source code in geneva/debug/error_store.py
@attrs.define
class UDFRetryConfig:
    """Retry configuration for UDF execution using tenacity semantics"""

    # Tenacity retry condition - which exceptions to retry
    retry: retry_base = attrs.field(
        factory=lambda: retry_if_exception_type(())  # No retries by default
    )

    # Stop condition - when to give up
    stop: stop_base = attrs.field(factory=lambda: stop_after_attempt(1))

    # Wait strategy - how long to wait between retries
    wait: wait_base = attrs.field(
        factory=lambda: wait_exponential(multiplier=1, min=1, max=60)
    )

    # Optional callbacks
    before_sleep: Callable[[RetryCallState], None] | None = attrs.field(default=None)
    after_attempt: Callable[[RetryCallState], None] | None = attrs.field(default=None)

    # Whether to reraise exception after retries exhausted
    reraise: bool = attrs.field(default=True)

    @classmethod
    def no_retry(cls) -> "UDFRetryConfig":
        """No retries - fail immediately (default behavior)"""
        return cls()

    @classmethod
    def retry_transient(cls, max_attempts: int = 3) -> "UDFRetryConfig":
        """Retry common transient errors (network, timeouts)

        Parameters
        ----------
        max_attempts : int
            Maximum number of attempts including the initial try
        """
        return cls(
            retry=retry_if_exception_type((OSError, TimeoutError, ConnectionError)),
            stop=stop_after_attempt(max_attempts),
            wait=wait_exponential(multiplier=1, min=1, max=60),
        )

retry

retry: retry_base = field(
    factory=lambda: retry_if_exception_type(())
)

stop

stop: stop_base = field(
    factory=lambda: stop_after_attempt(1)
)

wait

wait: wait_base = field(
    factory=lambda: wait_exponential(
        multiplier=1, min=1, max=60
    )
)

before_sleep

before_sleep: Callable[[RetryCallState], None] | None = (
    field(default=None)
)

after_attempt

after_attempt: Callable[[RetryCallState], None] | None = (
    field(default=None)
)

reraise

reraise: bool = field(default=True)

no_retry

no_retry() -> UDFRetryConfig

No retries - fail immediately (default behavior)

Source code in geneva/debug/error_store.py
@classmethod
def no_retry(cls) -> "UDFRetryConfig":
    """No retries - fail immediately (default behavior)"""
    return cls()

retry_transient

retry_transient(max_attempts: int = 3) -> UDFRetryConfig

Retry common transient errors (network, timeouts)

Parameters:

  • max_attempts (int, default: 3 ) –

    Maximum number of attempts including the initial try

Source code in geneva/debug/error_store.py
@classmethod
def retry_transient(cls, max_attempts: int = 3) -> "UDFRetryConfig":
    """Retry common transient errors (network, timeouts)

    Parameters
    ----------
    max_attempts : int
        Maximum number of attempts including the initial try
    """
    return cls(
        retry=retry_if_exception_type((OSError, TimeoutError, ConnectionError)),
        stop=stop_after_attempt(max_attempts),
        wait=wait_exponential(multiplier=1, min=1, max=60),
    )

geneva.debug.error_store.ErrorRecord

UDF execution error record, stored in geneva_errors table

Source code in geneva/debug/error_store.py
@attrs.define(kw_only=True)
class ErrorRecord:
    """UDF execution error record, stored in geneva_errors table"""

    # Unique error ID
    error_id: str = attrs.field(factory=lambda: str(uuid.uuid4()))

    # Error details
    error_type: str = attrs.field()  # Exception.__class__.__name__
    error_message: str = attrs.field()
    error_trace: str = attrs.field()  # Full traceback

    # Job/Table context
    job_id: str = attrs.field()
    table_uri: str = attrs.field()  # Full URI to the table
    table_name: str = attrs.field()
    table_version: Optional[int] = attrs.field(default=None)  # Read version
    column_name: str = attrs.field()

    # UDF context
    udf_name: str = attrs.field()
    udf_version: str = attrs.field()

    # Execution context (Ray/distributed)
    actor_id: Optional[str] = attrs.field(default=None)
    fragment_id: Optional[int] = attrs.field(default=None)
    batch_index: int = attrs.field()  # Sequence number within fragment

    # Row-level granularity (for scalar UDFs)
    row_address: Optional[int] = attrs.field(default=None)

    # Retry context
    attempt: int = attrs.field(default=1)
    max_attempts: int = attrs.field(default=1)

    # Timestamp
    timestamp: datetime = attrs.field(
        factory=dt_now_utc, metadata={"pa_type": pa.timestamp("us", tz="UTC")}
    )

error_id

error_id: str = field(factory=lambda: str(uuid4()))

error_type

error_type: str = field()

error_message

error_message: str = field()

error_trace

error_trace: str = field()

job_id

job_id: str = field()

table_uri

table_uri: str = field()

table_name

table_name: str = field()

table_version

table_version: Optional[int] = field(default=None)

column_name

column_name: str = field()

udf_name

udf_name: str = field()

udf_version

udf_version: str = field()

actor_id

actor_id: Optional[str] = field(default=None)

fragment_id

fragment_id: Optional[int] = field(default=None)

batch_index

batch_index: int = field()

row_address

row_address: Optional[int] = field(default=None)

attempt

attempt: int = field(default=1)

max_attempts

max_attempts: int = field(default=1)

timestamp

timestamp: datetime = field(
    factory=dt_now_utc,
    metadata={"pa_type": timestamp("us", tz="UTC")},
)