Skip to content

Table

geneva.table.Table

Bases: Table

Table in Geneva.

A Table is a Lance dataset

name

name: str

Get the name of the table.

version

version: int

Get the current version of the table

schema

schema: Schema

The Arrow Schema of the Table.

uri

uri: str

embedding_functions

embedding_functions: Never

tags

tags: Tags

get_reference

get_reference() -> TableReference

get_fragments

get_fragments() -> list[LanceFragment]

add

add(
    data,
    mode: str = "append",
    on_bad_vectors: str = "error",
    fill_value: float = 0.0,
) -> None

checkout

checkout(version: int) -> None

checkout_latest

checkout_latest() -> None

add_columns

add_columns(
    transforms: dict[
        str, str | UDF | tuple[UDF, list[str]]
    ],
    *args,
    **kwargs,
) -> None

Add columns or UDF-based columns to the Geneva table.

For UDF columns, this method validates that: - All input columns exist in the table schema - Column types are compatible with UDF type annotations (if present) - RecordBatch UDFs do not have input_columns defined

This early validation helps catch configuration errors before job execution.

Parameters:

  • transforms (dict[str, str | UDF | tuple[UDF, list[str]]]) –

    The key is the column name to add and the value is a specification of the column type/value.

    • If the spec is a string, it is expected to be a datafusion sql expression. (e.g "cast(null as string)")
    • If the spec is a UDF, a virtual column is added with input columns inferred from the UDF's argument names.
    • If the spec is a tuple, the first element is a UDF and the second element is a list of input column names.

Raises:

  • ValueError

    If UDF validation fails (missing columns, type mismatches, etc.)

Warns:

  • UserWarning

    If type validation is skipped due to missing type annotations

Examples:

>>> @udf(data_type=pa.int32())
... def double(a: int) -> int:
...     return a * 2
>>> table.add_columns({"doubled": double})  # Validates 'a' column exists

refresh

refresh(
    *,
    where: str | None = None,
    src_version: int | None = None,
    max_rows_per_fragment: int | None = None,
    concurrency: int = 8,
    intra_applier_concurrency: int = 1,
    _admission_check: bool | None = None,
    _admission_strict: bool | None = None,
    **kwargs,
) -> None

Refresh the specified materialized view.

Parameters:

  • where (str | None, default: None ) –

    TODO: sql expression filter used to only backfill selected rows

  • src_version (int | None, default: None ) –

    Optional source table version to refresh from. If None (default), uses the latest version of the source table.

  • max_rows_per_fragment (int | None, default: None ) –

    Optional maximum number of rows per destination fragment when adding placeholder rows for new source data. If None, uses LanceDB's default (1 million rows). Use smaller values to control fragment granularity.

  • concurrency (int, default: 8 ) –

    (default = 8) This controls the number of processes that tasks run concurrently. For max throughput, ideally this is larger than the number of nodes in the k8s cluster. This is the number of Ray actor processes that are started.

  • intra_applier_concurrency (int, default: 1 ) –

    (default = 1) This controls the number of threads used to execute tasks within a process. Multiplying this times concurrency roughly corresponds to the number of cpu's being used.

  • _admission_check (bool | None, default: None ) –

    Whether to run admission control to validate cluster resources before starting the job. If None, uses config (default: true). Set to False to skip the check. Experimental: Parameters starting with _ are subject to change.

  • _admission_strict (bool | None, default: None ) –

    If True, raises ResourcesUnavailableError when resources are insufficient. If False, logs a warning but allows the job to proceed. If None, uses config (default: true). Experimental: Parameters starting with _ are subject to change.

Raises:

  • RuntimeError

    If attempting to refresh to a different version without stable row IDs enabled on the source table. This is because compaction may have invalidated the __source_row_id values, breaking incremental refresh.

backfill_async

backfill_async(
    col_name: str,
    *,
    udf: UDF | None = None,
    where: str | None = None,
    concurrency: int = 8,
    intra_applier_concurrency: int = 1,
    _admission_check: bool | None = None,
    _admission_strict: bool | None = None,
    min_checkpoint_size: int | None = None,
    max_checkpoint_size: int | None = None,
    _enable_job_tracker_saves: bool = True,
    **kwargs,
) -> JobFuture

Backfills the specified column asynchronously.

Returns job future. Call .result() to wait for completion.

Parameters:

  • col_name (str) –

    Target column name to backfill

  • udf (UDF | None, default: None ) –

    Optionally override the UDF used to backfill the column.

  • where (str | None, default: None ) –

    SQL expression filter to select rows to backfill. Defaults to ' IS NULL' to skip already-computed rows. Use where="1=1" to force reprocessing all rows.

  • concurrency (int, default: 8 ) –

    (default = 8) This controls the number of processes that tasks run concurrently. For max throughput, ideally this is larger than the number of nodes in the k8s cluster. This is the number of Ray actor processes are started.

  • intra_applier_concurrency (int, default: 1 ) –

    (default = 1) This controls the number of threads used to execute tasks within a process. Multiplying this times concurrency roughly corresponds to the number of cpu's being used.

  • _admission_check (bool | None, default: None ) –

    Whether to run admission control to validate cluster resources before starting the job. If None, uses GENEVA_ADMISSION__CHECK env var (default: true). Set to False to skip the check. Experimental: Parameters starting with _ are subject to change.

  • _admission_strict (bool | None, default: None ) –

    If True, raises ResourcesUnavailableError when resources are insufficient. If False, logs a warning but allows the job to proceed. If None, uses GENEVA_ADMISSION__STRICT env var (default: true). Experimental: Parameters starting with _ are subject to change.

  • commit_granularity

    (default = 64) Show a partial result everytime this number of fragments are completed. If None, the entire result is committed at once.

  • read_version

    (default = None) The version of the table to read from. If None, the latest version is used.

  • task_shuffle_diversity

    (default = 8) ??

  • batch_size

    (default = 10240) Legacy alias for checkpoint_size. Prefer checkpoint_size.

  • checkpoint_size

    The max number of rows per checkpoint. This influences how often progress and proof of life is presented. When adaptive sizing is enabled, an explicit checkpoint_size seeds the initial checkpoint size; otherwise the initial size defaults to min_checkpoint_size.

  • min_checkpoint_size (int | None, default: None ) –

    Minimum adaptive checkpoint size (lower bound).

  • max_checkpoint_size (int | None, default: None ) –

    Maximum adaptive checkpoint size (upper bound). This also caps the largest read batch and thus the maximum memory footprint per batch.

  • task_size

    Controls read-task sizing (rows per worker task). Defaults to table.count_rows() // num_workers // 2 when omitted.

  • num_frags

    (default = None) The number of table fragments to process. If None, process all fragments.

  • _enable_job_tracker_saves (bool, default: True ) –

    (default = False) Experimentally enable persistence of job metrics to the database. When disabled, metrics are tracked in-memory only.

backfill

backfill(
    col_name,
    *,
    udf: UDF | None = None,
    where: str | None = None,
    concurrency: int = 8,
    intra_applier_concurrency: int = 1,
    _admission_check: bool | None = None,
    _admission_strict: bool | None = None,
    refresh_status_secs: float = 2.0,
    min_checkpoint_size: int | None = None,
    max_checkpoint_size: int | None = None,
    _enable_job_tracker_saves: bool = True,
    **kwargs,
) -> str

Backfills the specified column.

Returns job_id string

Parameters:

  • col_name

    Target column name to backfill

  • udf (UDF | None, default: None ) –

    Optionally override the UDF used to backfill the column.

  • where (str | None, default: None ) –

    SQL expression filter to select rows to backfill. Defaults to ' IS NULL' to skip already-computed rows. Use where="1=1" to force reprocessing all rows.

  • concurrency (int, default: 8 ) –

    (default = 8) This controls the number of processes that tasks run concurrently. For max throughput, ideally this is larger than the number of nodes in the k8s cluster. This is the number of Ray actor processes are started.

  • intra_applier_concurrency (int, default: 1 ) –

    (default = 1) This controls the number of threads used to execute tasks within a process. Multiplying this times concurrency roughly corresponds to the number of cpu's being used.

  • _admission_check (bool | None, default: None ) –

    Whether to run admission control to validate cluster resources before starting the job. If None, uses GENEVA_ADMISSION__CHECK env var (default: true). Set to False to skip the check. Experimental: Parameters starting with _ are subject to change.

  • _admission_strict (bool | None, default: None ) –

    If True, raises ResourcesUnavailableError when resources are insufficient. If False, logs a warning but allows the job to proceed. If None, uses GENEVA_ADMISSION__STRICT env var (default: true). Experimental: Parameters starting with _ are subject to change.

  • commit_granularity

    (default = 64) Show a partial result everytime this number of fragments are completed. If None, the entire result is committed at once.

  • read_version

    (default = None) The version of the table to read from. If None, the latest version is used.

  • task_shuffle_diversity

    (default = 8) ??

  • batch_size

    (default = 100) Legacy alias for checkpoint_size. Prefer checkpoint_size. If 0, the batch will be the total number of rows from a fragment.

  • checkpoint_size

    The max number of rows per checkpoint. This influences how often progress and proof of life is presented. When adaptive sizing is enabled, an explicit checkpoint_size seeds the initial checkpoint size; otherwise the initial size defaults to min_checkpoint_size.

  • min_checkpoint_size (int | None, default: None ) –

    Minimum adaptive checkpoint size (lower bound).

  • max_checkpoint_size (int | None, default: None ) –

    Maximum adaptive checkpoint size (upper bound). This also caps the largest read batch and thus the maximum memory footprint per batch.

  • task_size

    Controls read-task sizing (rows per worker task). Defaults to table.count_rows() // num_workers // 2 when omitted.

  • num_frags

    (default = None) The number of table fragments to process. If None, process all fragments.

  • _enable_job_tracker_saves (bool, default: True ) –

    (default = False) Experimentally enable persistence of job metrics to the database. When disabled, metrics are tracked in-memory only.

alter_columns

alter_columns(
    *alterations: dict[str, Any], **kwargs
) -> None

Alter columns in the table. This can change the computed columns' udf

Parameters:

  • alterations (dict[str, Any], default: () ) –

    This is a list of alterations to apply to the table.

Examples:

table.alter_columns(
    { "path": "col1", "udf": col1_udf_v2, },
    { "path": "col2", "udf": col2_udf})

create_index

create_index(
    metric: str = "L2",
    num_partitions: int | None = None,
    num_sub_vectors: int | None = None,
    vector_column_name: str = VECTOR_COLUMN_NAME,
    replace: bool = True,
    accelerator=None,
    index_cache_size=None,
    *,
    index_type: Literal[
        "IVF_FLAT", "IVF_PQ", "IVF_HNSW_SQ", "IVF_HNSW_PQ"
    ] = "IVF_PQ",
    num_bits: int = 8,
    max_iterations: int = 50,
    sample_rate: int = 256,
    m: int = 20,
    ef_construction: int = 300,
) -> None

Create Vector Index

create_fts_index

create_fts_index(
    field_names: str | list[str],
    *,
    ordering_field_names: str | list[str] | None = None,
    replace: bool = False,
    writer_heap_size: int | None = None,
    tokenizer_name: str | None = None,
    with_position: bool = True,
    base_tokenizer: Literal[
        "simple", "raw", "whitespace"
    ] = "simple",
    language: str = "English",
    max_token_length: int | None = 40,
    lower_case: bool = True,
    stem: bool = False,
    remove_stop_words: bool = False,
    ascii_folding: bool = False,
    **_kwargs,
) -> None

create_scalar_index

create_scalar_index(
    column: str,
    *,
    replace: bool = True,
    index_type: Literal[
        "BTREE", "BITMAP", "LABEL_LIST"
    ] = "BTREE",
) -> None

list_versions

list_versions() -> list[dict[str, Any]]

cleanup_old_versions

cleanup_old_versions(
    older_than: timedelta | None = None,
    *,
    delete_unverified=False,
) -> Any

to_batches

to_batches(
    batch_size: int | None = None,
) -> Iterator[RecordBatch]

search

search(
    query: list
    | Array
    | ChunkedArray
    | ndarray
    | None = None,
    vector_column_name: str | None = None,
    query_type: Literal[
        "vector", "fts", "hybrid", "auto"
    ] = "auto",
    ordering_field_name: str | None = None,
    fts_columns: str | list[str] | None = None,
) -> GenevaQueryBuilder | LanceQueryBuilder

drop_columns

drop_columns(columns: Iterable[str]) -> None

to_arrow

to_arrow() -> Table

count_rows

count_rows(filter: str | None = None) -> int

update

update(
    where: str | None = None,
    values: dict | None = None,
    *,
    values_sql: dict[str, str] | None = None,
) -> None

delete

delete(where: str) -> None

list_indices

list_indices() -> Iterable[IndexConfig]

index_stats

index_stats(index_name: str) -> IndexStatistics | None

optimize

optimize(
    *,
    cleanup_older_than: timedelta | None = None,
    delete_unverified: bool = False,
) -> None

compact_files

compact_files() -> None

restore

restore(*args, **kwargs) -> None

take_blobs

take_blobs(indices: list[int] | Array, column: str)

to_lance

to_lance() -> LanceDataset

uses_v2_manifest_paths

uses_v2_manifest_paths() -> bool

migrate_v2_manifest_paths

migrate_v2_manifest_paths() -> None

stats

stats() -> TableStatistics

take_offsets

take_offsets(offsets: list[int]) -> LanceTakeQueryBuilder

take_row_ids

take_row_ids(row_ids: list[int]) -> LanceTakeQueryBuilder

get_errors

get_errors(
    job_id: str | None = None,
    column_name: str | None = None,
    error_type: str | None = None,
) -> list[Any]

Get error records for this table.

Parameters:

  • job_id (str, default: None ) –

    Filter errors by job ID

  • column_name (str, default: None ) –

    Filter errors by column name

  • error_type (str, default: None ) –

    Filter errors by exception type

Returns:

  • list[ErrorRecord]

    List of error records matching the filters

Examples:

>>> # Get all errors for this table
>>> errors = table.get_errors()
>>>
>>> # Get errors for a specific job
>>> errors = table.get_errors(job_id="abc123")
>>>
>>> # Get errors for a specific column
>>> errors = table.get_errors(column_name="my_column")

get_failed_row_addresses

get_failed_row_addresses(
    job_id: str, column_name: str
) -> list[int]

Get row addresses for all failed rows in a job.

Parameters:

  • job_id (str) –

    Job ID to query

  • column_name (str) –

    Column name to filter by

Returns:

  • list[int]

    List of row addresses that failed

Examples:

# Get failed row addresses
failed_rows = table.get_failed_row_addresses(
    job_id="abc123", column_name="my_col"
)

# Retry processing only failed rows
row_ids = ','.join(map(str, failed_rows))
table.backfill("my_col", where=f"_rowaddr IN ({row_ids})")

geneva.table.TableReference

Serializable reference to a Geneva Table.

Used to pass through ray.remote calls

table_id

table_id: list[str]

version

version: int | None

db_uri

db_uri: str | None

namespace_impl

namespace_impl: str | None = field(default=None)

namespace_properties

namespace_properties: dict[str, str] | None = field(
    default=None
)

system_namespace

system_namespace: list[str] | None = field(default=None)

table_name

table_name: str

Return the table name (last element of table_id).

open_checkpoint_store

open_checkpoint_store() -> CheckpointStore

Open a Lance checkpoint store for this table.

open_db

open_db() -> Connection

Open a connection to the Lance database. Set read consistency interval to 0 for strongly consistent reads.

open_db_async

open_db_async() -> (
    AsyncConnection | AsyncLanceNamespaceDBConnection
)

Open an async connection to the Lance database. This uses native lancedb AsyncConnection and doesn't support checkpoint store. Currently used by JobTracker only.

open

open() -> Table

connect_namespace

connect_namespace() -> Optional[LanceNamespace]

Connect using the Lance namespace if configured

geneva.table.JobFuture

job_id

job_id: str

done

done(timeout: float | None = None) -> bool

result

result(timeout: float | None = None) -> Any

status

status(timeout: float | None = None) -> None