Table
geneva.table.Table
Bases: Table
Table in Geneva.
A Table is a Lance dataset.
The NativeTable and RemoteTable subclasses distinguish
object-storage-backed tables from Phalanx-backed tables. For
historical reasons, Table itself remains instantiable so
existing callers and helpers continue to work. New code should
expect NativeTable from native-mode connections and
RemoteTable from db:// connections.
add
add_columns
add_columns(
transforms: dict[str, str | UDF | tuple[UDF, list[str]]]
| UDF
| UnpackedUDF,
*args,
**kwargs,
) -> None
Add columns or UDF-based columns to the Geneva table.
For UDF columns, this method validates that: - All input columns exist in the table schema - Column types are compatible with UDF type annotations (if present) - RecordBatch UDFs do not have input_columns defined
This early validation helps catch configuration errors before job execution.
Parameters:
-
transforms(dict[str, str | UDF | tuple[UDF, list[str]]] | UDF | UnpackedUDF) –How to add the new column(s). Several forms are accepted:
- SQL expression —
{"name": "<datafusion sql>"}: a string value is interpreted as a DataFusion expression (e.g."cast(null as string)"). - Single-column UDF —
{"name": udf}: input columns are inferred from the UDF's parameter names. The UDF parameter names must match existing table column names. - Single-column UDF with input column override —
{"name": (udf, ["src_a", "src_b"])}: use the tuple form to feed the UDF's positional arguments from columns whose names differ from the parameter names. The list length must match the UDF's parameter count and is mapped positionally. See the Overriding input column names example below. - Multi-column UDF —
udf(not wrapped in a dict): a UDF annotated as returningColumns[T]produces one sibling column per top-level struct field. Pass the UDF directly, not inside a dict; the dict form is reserved for single-column UDFs. See the Multi-column output example below. - Multi-column UDF with prefixed output names —
UnpackedUDF(udf, prefix="img_"): wrap aColumns[T]UDF to add a prefix to each output column name.
- SQL expression —
Raises:
-
ValueError–If UDF validation fails (missing columns, type mismatches, multi-output UDF passed inside a dict, output column already exists, etc.)
Warns:
-
UserWarning–If type validation is skipped due to missing type annotations
Examples:
Basic single-column UDF (parameter names map directly to columns):
>>> @udf(data_type=pa.int32())
... def double(a: int) -> int:
... return a * 2
>>> table.add_columns({"doubled": double}) # reads from column 'a'
Overriding input column names — useful when the UDF's parameter names don't match the columns you want to feed it (e.g. a reusable UDF, or a column whose name isn't a valid identifier):
>>> @udf(data_type=pa.int32())
... def add(a: int, b: int) -> int:
... return a + b
>>> # 'a' is bound to column 'price', 'b' to column 'tax':
>>> table.add_columns({"total": (add, ["price", "tax"])})
Multi-column output via Columns — one UDF emits
several sibling columns in a single backfill:
>>> from typing import NamedTuple
>>> import geneva
>>> class Dimensions(NamedTuple):
... height: int
... width: int
>>> @udf
... def dimensions(image_id: int) -> geneva.Columns[Dimensions]:
... return Dimensions(image_id + 10, image_id + 20)
>>> table.add_columns(dimensions) # adds 'height' AND 'width'
Multi-column output with prefixed names (e.g. to avoid collisions when the same UDF is applied to multiple inputs):
refresh
refresh(
*,
where: str | None = None,
src_version: int | None = None,
max_rows_per_fragment: int | None = None,
concurrency: int = 8,
intra_applier_concurrency: int = 1,
_admission_check: bool | None = None,
_admission_strict: bool | None = None,
**kwargs,
) -> RefreshJobResult
Refresh the specified materialized view.
Parameters:
-
where(str | None, default:None) –TODO: sql expression filter used to only backfill selected rows
-
src_version(int | None, default:None) –Optional source table version to refresh from. If None (default), uses the latest version of the source table.
-
max_rows_per_fragment(int | None, default:None) –Optional maximum number of rows per destination fragment when adding placeholder rows for new source data. If None, uses LanceDB's default (1 million rows). Use smaller values to control fragment granularity.
-
concurrency(int, default:8) –(default = 8) This controls the number of processes that tasks run concurrently. For max throughput, ideally this is larger than the number of nodes in the k8s cluster. This is the number of Ray actor processes that are started.
-
intra_applier_concurrency(int, default:1) –(default = 1) This controls the number of threads used to execute tasks within a process. Multiplying this times
concurrencyroughly corresponds to the number of cpu's being used. -
_admission_check(bool | None, default:None) –Whether to run admission control to validate cluster resources before starting the job. If None, uses config (default: true). Set to False to skip the check. Experimental: Parameters starting with
_are subject to change. -
_admission_strict(bool | None, default:None) –If True, raises ResourcesUnavailableError when resources are insufficient. If False, logs a warning but allows the job to proceed. If None, uses config (default: true). Experimental: Parameters starting with
_are subject to change.
Raises:
-
RuntimeError–If attempting to refresh to a different version without stable row IDs enabled on the source table. This is because compaction may have invalidated the __source_row_id values, breaking incremental refresh.
refresh_async
refresh_async(
*,
where: str | None = None,
src_version: int | None = None,
max_rows_per_fragment: int | None = None,
concurrency: int = 8,
intra_applier_concurrency: int = 1,
_admission_check: bool | None = None,
_admission_strict: bool | None = None,
**kwargs,
) -> Job
Refresh the materialized view asynchronously.
refresh runs synchronously under the hood; refresh_async
wraps it in a background thread and returns a
Job whose .result() blocks and
yields a RefreshJobResult.
Threading semantics: the worker is a non-daemon thread, so
Python will wait for an in-flight refresh before exiting the
process — preventing a partially-applied refresh on abrupt
shutdown. Callers who do not want to block on completion should
either invoke result with a timeout,
or use the synchronous refresh directly.
plan_refresh
Plan a refresh without dispatching: count new source fragments.
Returns a RefreshPlan describing what work a refresh() call
would perform. Does not require a Ray cluster.
backfill_async
backfill_async(
columns: str | list[str],
*,
udf: UDF | None = None,
where: str | None = None,
concurrency: int = 8,
intra_applier_concurrency: int = 1,
_admission_check: bool | None = None,
_admission_strict: bool | None = None,
min_checkpoint_size: int | None = None,
max_checkpoint_size: int | None = None,
batch_checkpoint_flush_interval_seconds: float
| None = None,
blob_read_strategy: Literal[
"auto", "legacy", "range"
] = "auto",
blob_read_buffer_size: int | None = None,
_enable_job_tracker_saves: bool = True,
job_id: str | None = None,
_return_future: bool = False,
**kwargs,
) -> JobFuture | Job
Backfills the specified column asynchronously.
Returns a Job whose .result() blocks
and yields a BackfillJobResult.
Parameters:
-
columns(str | list[str]) –Target column name to backfill. A single-element list is equivalent to a string. Multi-column backfill is not yet supported and raises
NotImplementedError. -
udf(UDF | None, default:None) –Optionally override the UDF used to backfill the column.
-
where(str | None, default:None) –SQL expression filter to select rows to backfill. Defaults to '
IS NULL' to skip already-computed rows. Use where="1=1" to force reprocessing all rows. -
concurrency(int, default:8) –(default = 8) This controls the number of processes that tasks run concurrently. For max throughput, ideally this is larger than the number of nodes in the k8s cluster. This is the number of Ray actor processes are started.
-
intra_applier_concurrency(int, default:1) –(default = 1) This controls the number of threads used to execute tasks within a process. Multiplying this times
concurrencyroughly corresponds to the number of cpu's being used. -
_admission_check(bool | None, default:None) –Whether to run admission control to validate cluster resources before starting the job. If None, uses GENEVA_ADMISSION__CHECK env var (default: true). Set to False to skip the check. Experimental: Parameters starting with
_are subject to change. -
_admission_strict(bool | None, default:None) –If True, raises ResourcesUnavailableError when resources are insufficient. If False, logs a warning but allows the job to proceed. If None, uses GENEVA_ADMISSION__STRICT env var (default: true). Experimental: Parameters starting with
_are subject to change. -
min_checkpoint_size(int | None, default:None) –Minimum adaptive checkpoint size (lower bound).
-
max_checkpoint_size(int | None, default:None) –Maximum adaptive checkpoint size (upper bound). This also caps the largest read batch and thus the maximum memory footprint per batch.
-
batch_checkpoint_flush_interval_seconds(float | None, default:None) –Controls how frequently in-progress results are persisted as batch checkpoints. Larger values usually improve throughput, but if the job stops unexpectedly, more recently computed work may need to be redone. Smaller values checkpoint progress sooner, which improves durability and resume behavior, but can reduce throughput. Set to
0to persist every batch as soon as it is produced. -
_enable_job_tracker_saves(bool, default:True) –(default = False) Experimentally enable persistence of job metrics to the database. When disabled, metrics are tracked in-memory only.
-
blob_read_strategy(Literal['auto', 'legacy', 'range'], default:'auto') –Controls how Lance blob input columns are materialized for UDF map tasks.
autouses contiguous range reads for top-level blob UDF inputs when possible,legacyuses Lancetake_blobs(), andrangerequires the native range path. The native range path handles Azureaz://URIs; other Azure schemes such asabfs://oradls://fall back inautomode or fail inrangemode. -
blob_read_buffer_size(int | None, default:None) –Maximum blob file byte span to materialize in one range-read batch. When omitted, uses
GENEVA_RANGE_BLOB_READ_BUFFER_SIZEor 512 MiB.
Other Parameters:
-
commit_granularity(int | None) –(default = 64) Show a partial result everytime this number of fragments are completed. If None, the entire result is committed at once.
-
read_version(int | None) –(default = None) The version of the table to read from. If None, the latest version is used.
-
task_shuffle_diversity(int | None) –(default = 8) ??
-
batch_size(int | None(deprecated)) –(default = 10240) Legacy alias for checkpoint_size. Prefer checkpoint_size.
-
checkpoint_size(int | None) –The max number of rows per checkpoint. This influences how often progress and proof of life is presented. When adaptive sizing is enabled, an explicit checkpoint_size seeds the initial checkpoint size; otherwise the initial size defaults to min_checkpoint_size.
-
task_size(int | None) –Controls read-task sizing (rows per worker task). Defaults to
table.count_rows() // num_workers // 2when omitted. -
num_frags(int | None) –(default = None) The number of table fragments to process. If None, process all fragments.
-
skip_frags(int) –(default = 0) Number of fragments to skip before processing. Combined with
num_fragsthis allows batching through a large dataset in manageable chunks. For example,skip_frags=100, num_frags=50processes fragments 100–149.
backfill
backfill(
columns: str | list[str],
*,
udf: UDF | None = None,
where: str | None = None,
concurrency: int = 8,
intra_applier_concurrency: int = 1,
_admission_check: bool | None = None,
_admission_strict: bool | None = None,
refresh_status_secs: float = 2.0,
timeout: timedelta | None = None,
min_checkpoint_size: int | None = None,
max_checkpoint_size: int | None = None,
batch_checkpoint_flush_interval_seconds: float
| None = None,
blob_read_strategy: Literal[
"auto", "legacy", "range"
] = "auto",
blob_read_buffer_size: int | None = None,
_enable_job_tracker_saves: bool = True,
job_id: str | None = None,
**kwargs,
) -> BackfillJobResult
Backfills the specified column synchronously and returns a
BackfillJobResult once the job
reaches a terminal state. Use
backfill_async for a
non-blocking handle.
Parameters:
-
columns(str | list[str]) –Target column name to backfill. A single-element list is equivalent to a string. Multi-column backfill is not yet supported and raises
NotImplementedError. -
udf(UDF | None, default:None) –Optionally override the UDF used to backfill the column.
-
where(str | None, default:None) –SQL expression filter to select rows to backfill. Defaults to '
IS NULL' to skip already-computed rows. Use where="1=1" to force reprocessing all rows. -
concurrency(int, default:8) –(default = 8) This controls the number of processes that tasks run concurrently. For max throughput, ideally this is larger than the number of nodes in the k8s cluster. This is the number of Ray actor processes are started.
-
intra_applier_concurrency(int, default:1) –(default = 1) This controls the number of threads used to execute tasks within a process. Multiplying this times
concurrencyroughly corresponds to the number of cpu's being used. -
_admission_check(bool | None, default:None) –Whether to run admission control to validate cluster resources before starting the job. If None, uses GENEVA_ADMISSION__CHECK env var (default: true). Set to False to skip the check. Experimental: Parameters starting with
_are subject to change. -
_admission_strict(bool | None, default:None) –If True, raises ResourcesUnavailableError when resources are insufficient. If False, logs a warning but allows the job to proceed. If None, uses GENEVA_ADMISSION__STRICT env var (default: true). Experimental: Parameters starting with
_are subject to change. -
timeout(timedelta | None, default:None) –(default = None) Maximum time to wait for the job to reach a terminal state. When the deadline elapses, the call raises
TimeoutError; note the job itself is not cancelled and may continue running.Nonewaits indefinitely. -
min_checkpoint_size(int | None, default:None) –Minimum adaptive checkpoint size (lower bound).
-
max_checkpoint_size(int | None, default:None) –Maximum adaptive checkpoint size (upper bound). This also caps the largest read batch and thus the maximum memory footprint per batch.
-
batch_checkpoint_flush_interval_seconds(float | None, default:None) –Controls how frequently in-progress results are persisted as batch checkpoints. Larger values usually improve throughput, but if the job stops unexpectedly, more recently computed work may need to be redone. Smaller values checkpoint progress sooner, which improves durability and resume behavior, but can reduce throughput. Set to
0to persist every batch as soon as it is produced. -
_enable_job_tracker_saves(bool, default:True) –(default = False) Experimentally enable persistence of job metrics to the database. When disabled, metrics are tracked in-memory only.
-
blob_read_strategy(Literal['auto', 'legacy', 'range'], default:'auto') –Controls how Lance blob input columns are materialized for UDF map tasks.
autouses contiguous range reads for top-level blob UDF inputs when possible,legacyuses Lancetake_blobs(), andrangerequires the native range path. The native range path handles Azureaz://URIs; other Azure schemes such asabfs://oradls://fall back inautomode or fail inrangemode. -
blob_read_buffer_size(int | None, default:None) –Maximum blob file byte span to materialize in one range-read batch. When omitted, uses
GENEVA_RANGE_BLOB_READ_BUFFER_SIZEor 512 MiB.
Other Parameters:
-
commit_granularity(int | None) –(default = 64) Show a partial result everytime this number of fragments are completed. If None, the entire result is committed at once.
-
read_version(int | None) –(default = None) The version of the table to read from. If None, the latest version is used.
-
task_shuffle_diversity(int | None) –(default = 8) ??
-
batch_size(int | None(deprecated)) –(default = 100) Legacy alias for checkpoint_size. Prefer checkpoint_size. If 0, the batch will be the total number of rows from a fragment.
-
checkpoint_size(int | None) –The max number of rows per checkpoint. This influences how often progress and proof of life is presented. When adaptive sizing is enabled, an explicit checkpoint_size seeds the initial checkpoint size; otherwise the initial size defaults to min_checkpoint_size.
-
task_size(int | None) –Controls read-task sizing (rows per worker task). Defaults to
table.count_rows() // num_workers // 2when omitted. -
num_frags(int | None) –(default = None) The number of table fragments to process. If None, process all fragments.
-
skip_frags(int) –(default = 0) Number of fragments to skip before processing. Combined with
num_fragsthis allows batching through a large dataset in manageable chunks. For example,skip_frags=100, num_frags=50processes fragments 100–149.
load_columns_async
load_columns_async(
source: str | list[str],
pk: str,
columns: list[str],
*,
source_format: str | None = None,
on_missing: str = "carry",
concurrency: int = 8,
task_size: int | None = None,
checkpoint_size: int | None = None,
min_checkpoint_size: int | None = None,
max_checkpoint_size: int | None = None,
checkpoint_interval_seconds: float | None = None,
_loader_cpus: float | None = None,
_loader_memory: int | None = None,
commit_granularity: int | None = None,
enable_job_tracker_saves: bool = True,
job_id: str | None = None,
) -> JobFuture
Load pre-computed column data from an external source by primary key.
Joins value columns from an external dataset (Parquet, Lance, or IPC)
into this table using a primary-key lookup. Returns a JobFuture
immediately; call .result() to block until completion.
Examples:
Basic single-source load:
>>> table.load_columns(
... source="s3://bucket/embeddings/",
... pk="document_id",
... columns=["embedding"],
... )
Multi-pass load when the source is too large for a single in-memory index. Split the source files into N chunks and run N sequential calls — each call must finish before the next starts. Carry semantics make later passes preserve earlier passes' values, so the end state is correct after all passes complete:
>>> import pyarrow.dataset as pads
>>> source_files = pads.dataset(
... "s3://bucket/embeddings/", format="parquet"
... ).files
>>> N = 4
>>> chunk_size = len(source_files) // N
>>> for i in range(N):
... # blocks until this pass commits before starting the next
... table.load_columns(
... source=source_files[i * chunk_size : (i + 1) * chunk_size],
... pk="document_id",
... columns=["embedding"],
... )
Each pass reads only its assigned files, so the total source scan I/O
across all passes stays at 1× full scan. Per-pass memory cost is
source_size / N.
Warning
Multi-pass loads must run sequentially, not concurrently. Two
load_columns calls running at the same time against the same
column produce an interleaved end state (last-writer-wins per
fragment). Use a plain for loop, not concurrent.futures.
Parameters:
-
source(str | list[str]) –URI of the external dataset (local path or cloud storage), or a list of file paths. Passing a list of paths enables file-level partitioning for the multi-pass load pattern: each call reads only its assigned files. Lance sources must be a single URI.
-
pk(str) –Primary key column name. Must exist in both source and destination.
-
columns(list[str]) –Value column names to load from the source dataset.
-
source_format(str | None, default:None) –One of
"parquet","lance","ipc". Auto-detected from the URI suffix when omitted. -
on_missing(str, default:'carry') –How to handle destination rows with no source match:
"carry"(default): keep existing value (NULL for new columns)."null": set to NULL."error": raise on first unmatched row.
-
concurrency(int, default:8) –Number of Ray worker actors (default 8).
-
task_size(int | None, default:None) –Rows per worker task. Auto-sized when omitted.
-
checkpoint_size(int | None, default:None) –Rows per checkpoint batch. When omitted, uses the job config default. When
min_checkpoint_sizeandmax_checkpoint_sizeare also set, this becomes the initial size for adaptive sizing. -
min_checkpoint_size(int | None, default:None) –Minimum checkpoint batch size for adaptive sizing.
-
max_checkpoint_size(int | None, default:None) –Maximum checkpoint batch size for adaptive sizing.
-
checkpoint_interval_seconds(float | None, default:None) –Target seconds per adaptive checkpoint batch. The adaptive sizer grows or shrinks batch sizes to hit this target. Defaults to 60 s for bulk_load (longer than the 10 s UDF default because bulk_load is I/O-bound and benefits from larger batches that amortize GCS write overhead).
-
_loader_cpus(float | None, default:None) –CPU reservation per loader actor (Ray scheduling). Defaults to
None(Ray default of 1.0 CPU per actor). -
_loader_memory(int | None, default:None) –Memory reservation in bytes per loader actor. Defaults to
None(no explicit reservation). Set this when loading wide columns (large strings, embeddings) to prevent Ray from oversubscribing worker nodes. -
commit_granularity(int | None, default:None) –Number of fragments per intermediate commit.
-
enable_job_tracker_saves(bool, default:True) –Enable persistence of job metrics to the database.
-
job_id(str | None, default:None) –Reuse an existing job ID instead of generating a new one.
load_columns
load_columns(
source: str | list[str],
pk: str,
columns: list[str],
*,
source_format: str | None = None,
on_missing: str = "carry",
concurrency: int = 8,
task_size: int | None = None,
checkpoint_size: int | None = None,
min_checkpoint_size: int | None = None,
max_checkpoint_size: int | None = None,
checkpoint_interval_seconds: float | None = None,
_loader_cpus: float | None = None,
_loader_memory: int | None = None,
commit_granularity: int | None = None,
refresh_status_secs: float = 2.0,
enable_job_tracker_saves: bool = True,
job_id: str | None = None,
) -> str
Load pre-computed column data from an external source by primary key.
Synchronous wrapper around
load_columns_async.
Returns the job ID string on success.
See load_columns_async for
parameter documentation.
plan_backfill
plan_backfill(
col_name: str,
*,
udf: UDF | None = None,
where: str | None = None,
read_version: int | None = None,
num_frags: int | None = None,
skip_frags: int = 0,
task_size: int | None = None,
) -> BackfillPlan
Plan a backfill without dispatching: count tasks and rows.
Returns a BackfillPlan describing what work a backfill() call
would perform. Does not require a Ray cluster.
Warning
Evaluates where per fragment via count_rows(filter=...)
on the driver — serial, can take many minutes for selective
predicates over wide columns. backfill() itself avoids
this cost.
alter_columns
Alter columns in the table. This can change the computed columns' udf
Parameters:
-
alterations(dict[str, Any], default:()) –This is a list of alterations to apply to the table.
Examples:
table.alter_columns(
{ "path": "col1", "udf": col1_udf_v2, },
{ "path": "col2", "udf": col2_udf})
create_index
create_index(
metric: str = "L2",
num_partitions: int | None = None,
num_sub_vectors: int | None = None,
vector_column_name: str = VECTOR_COLUMN_NAME,
replace: bool = True,
accelerator=None,
index_cache_size=None,
*,
index_type: Literal[
"IVF_FLAT", "IVF_PQ", "IVF_HNSW_SQ", "IVF_HNSW_PQ"
] = "IVF_PQ",
num_bits: int = 8,
max_iterations: int = 50,
sample_rate: int = 256,
m: int = 20,
ef_construction: int = 300,
) -> None
Create Vector Index
create_fts_index
create_fts_index(
field_names: str | list[str],
*,
ordering_field_names: str | list[str] | None = None,
replace: bool = False,
writer_heap_size: int | None = 1024 * 1024 * 1024,
tokenizer_name: str | None = None,
with_position: bool = True,
base_tokenizer: Literal[
"simple", "raw", "whitespace"
] = "simple",
language: str = "English",
max_token_length: int | None = 40,
lower_case: bool = True,
stem: bool = False,
remove_stop_words: bool = False,
ascii_folding: bool = False,
**_kwargs,
) -> None
create_scalar_index
create_scalar_index(
column: str,
*,
replace: bool = True,
index_type: Literal[
"BTREE", "BITMAP", "LABEL_LIST"
] = "BTREE",
) -> None
cleanup_old_versions
search
search(
query: list
| Array
| ChunkedArray
| ndarray
| None = None,
vector_column_name: str | None = None,
query_type: Literal[
"vector", "fts", "hybrid", "auto"
] = "auto",
ordering_field_name: str | None = None,
fts_columns: str | list[str] | None = None,
) -> GenevaQueryBuilder | LanceQueryBuilder
update
update(
where: str | None = None,
values: dict | None = None,
*,
values_sql: dict[str, str] | None = None,
) -> None
optimize
optimize(
*,
cleanup_older_than: timedelta | None = None,
delete_unverified: bool = False,
) -> None
get_errors
get_errors(
job_id: str | None = None,
column_name: str | None = None,
error_type: str | None = None,
) -> list[Any]
Get error records for this table.
Parameters:
-
job_id(str, default:None) –Filter errors by job ID
-
column_name(str, default:None) –Filter errors by column name
-
error_type(str, default:None) –Filter errors by exception type
Returns:
-
list[ErrorRecord]–List of error records matching the filters
Examples:
get_failed_row_addresses
Get row addresses for all failed rows in a job.
Parameters:
-
job_id(str) –Job ID to query
-
column_name(str) –Column name to filter by
Returns:
-
list[int]–List of row addresses that failed
Examples:
# Get failed row addresses
failed_rows = table.get_failed_row_addresses(
job_id="abc123", column_name="my_col"
)
# Retry processing only failed rows
row_ids = ','.join(map(str, failed_rows))
table.backfill("my_col", where=f"_rowaddr IN ({row_ids})")
geneva.table.TableReference
Serializable reference to a Geneva Table.
Used to pass through ray.remote calls
namespace_client_properties
namespace_client_pushdown_operations
storage_options
as_system_table
as_system_table(table_name: str) -> TableReference
Create a sibling reference that targets a system table.
open_checkpoint_store
Open a Lance checkpoint store for this table.
open_db
open_db() -> Connection
Open a connection to the Lance database. Set read consistency interval to 0 for strongly consistent reads.
When called from a worker context with worker_uri configured, the worker endpoint is used instead of the external endpoint.
open_db_async
Open an async connection to the Lance database. This uses native lancedb AsyncConnection and doesn't support checkpoint store. Currently used by JobTracker only.
open_system_db_async
Open an async connection suitable for system-table operations.