Skip to content

Table

geneva.table.Table

Bases: Table

Table in Geneva.

A Table is a Lance dataset

Source code in geneva/table.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
class Table(LanceTable):
    """Table in Geneva.

    A Table is a Lance dataset
    """

    def __init__(
        self,
        conn: Connection,
        name: str,
        *,
        version: int | None = None,
        storage_options: dict[str, str] | None = None,
        index_cache_size: int | None = None,
        **kwargs,
    ) -> None:
        self._conn_uri = conn.uri
        self._name = name

        self._conn = conn

        base_uri = URL(conn.uri)
        self._uri = str(base_uri / f"{name}.lance")
        self._version: int | None = version
        self._index_cache_size = index_cache_size
        self._storage_options = storage_options

        # Load table
        self._ltbl  # noqa

    def __repr__(self) -> str:
        return f"<Table {self._name}>"

    # TODO: This annotation sucks
    def __reduce__(self):  # noqa: ANN204
        return (self.__class__, (self._conn, self._name))

    def get_reference(self) -> TableReference:
        return TableReference(
            db_uri=self._conn.uri, table_name=self._name, version=self._version
        )

    def get_fragments(self) -> list[lance.LanceFragment]:
        return self.to_lance().get_fragments()

    @cached_property
    def _ltbl(self) -> lancedb.table.Table:
        # remote db, open table directly
        if self._conn_uri.startswith("db://"):
            tbl = self._conn._connect.open_table(self._name)
            if self._version:
                tbl.checkout(self._version)
            return tbl

        return self._conn._connect.open_table(self.name)

    @property
    def name(self) -> str:
        """Get the name of the table."""
        return self._name

    @property
    def version(self) -> int:
        """Get the current version of the table"""
        return self._ltbl.version

    @property
    def schema(self) -> pa.Schema:
        """The Arrow Schema of the Table."""
        return self._ltbl.schema

    @property
    def uri(self) -> str:
        return self._uri

    @property
    def embedding_functions(self) -> Never:
        raise NotImplementedError("Embedding functions are not supported.")

    def add(
        self,
        data,
        mode: str = "append",
        on_bad_vectors: str = "error",
        fill_value: float = 0.0,
    ) -> None:
        self._ltbl.add(
            data,
            mode=mode,
            on_bad_vectors=on_bad_vectors,
            fill_value=fill_value,
        )

    def checkout(self, version: int) -> None:
        self._version = version
        self._ltbl.checkout(version)

    def checkout_latest(self) -> None:
        self._ltbl.checkout_latest()

    def add_columns(
        self, transforms: dict[str, str | UDF | tuple[UDF, list[str]]], *args, **kwargs
    ) -> None:
        """
        Add columns or udf based columns to the Geneva table.

        Parameters
        ----------
        transforms : dict[str, str | UDF | tuple[UDF, list[str]]]
            The key is the column name to add and the value is a
            specification of the column type/value.
            * If the spec is a string, it is expected to be a datafusion
              sql expression. (e.g "cast(null as string)"))
            * If the spec is a UDF, a virtual column is added with input
              columns inferred from the UDF's argument names.
            * If the spec is a tuple, the first element is a UDF and the
              second element is a list of input column names.

        Raises
        ------
        ValueError

        """
        # handle basic columns
        basic_cols = {k: v for k, v in transforms.items() if isinstance(v, str)}
        if len(basic_cols) > 0:
            self._ltbl.add_columns(basic_cols, *args)

        # handle UDF virtual columns
        udf_cols = {k: v for k, v in transforms.items() if not isinstance(v, str)}
        for k, v in udf_cols.items():
            if isinstance(v, UDF):
                # infer column names from udf arguments
                udf = v
                self._add_virtual_columns(
                    {k: udf}, *args, input_columns=udf.input_columns, **kwargs
                )
            else:
                # explicitly specify input columns
                (udf, cols) = v
                self._add_virtual_columns({k: udf}, *args, input_columns=cols, **kwargs)

    def _add_virtual_columns(
        self,
        mapping: dict[str, UDF],  # this breaks the non udf mapping
        *args,
        input_columns: list[str] | None = None,
        **kwargs,
    ) -> None:
        """
        This is an internal method and not intended to be called directly.

        Add udf based virtual columns to the Geneva table.
        """

        if len(mapping) != 1:
            raise ValueError("Only one UDF is supported for now.")

        _LOG.info("Adding column: udf=%s", mapping)
        col_name = next(iter(mapping))
        udf = mapping[col_name]

        if not isinstance(udf, UDF):
            # Stateful udfs are implemenated as Callable classses, and look
            # like partial functions here.  Instantiate to get the return
            # data_type annotations.
            udf = udf()
        self._ltbl.add_columns(pa.field(col_name, udf.data_type))
        self._configure_virtual_column(col_name, udf, input_columns)

    def refresh(self, where: str | None = None) -> None:
        """
        Refresh the specified materialized view.

        Parameters
        ----------
        where: str | None
            TODO: sql expression filter used to only backfill selected rows
        """
        if where:
            raise NotImplementedError(
                "where clauses on materialized view refresh not implemented yet."
            )

        from geneva.runners.ray.pipeline import run_ray_copy_table

        run_ray_copy_table(
            self.get_reference(), self._conn._packager, self._conn._checkpoint_store
        )
        self.checkout_latest()

    def backfill_async(
        self,
        col_name: str,
        *,
        input_columns: list[str] | None = None,
        udf: UDF | None = None,
        where: str | None = None,
        **kwargs,
    ) -> JobFuture:
        """Run backfill asynchronously by dispatching a driver task."""
        from geneva.runners.ray.pipeline import (
            dispatch_run_ray_add_column,
            validate_backfill_args,
        )

        validate_backfill_args(self, col_name, udf, input_columns)

        fut = dispatch_run_ray_add_column(
            self.get_reference(),
            col_name,
            udf=udf,
            input_columns=input_columns,
            where=where,
            **kwargs,
        )
        return fut

    def backfill(
        self,
        col_name,
        *,
        input_columns: list[str] | None = None,
        udf: UDF | None = None,
        where: str | None = None,
        concurrency: int = 8,
        intra_applier_concurrency: int = 1,
        refresh_status_secs: float = 2.0,
        **kwargs,
    ) -> str:
        """
        Backfills the specified column.

        Returns job_id string

        Parameters
        ----------
        col_name: str
            Target column name to backfill
        input_columns: list[str] | None
            Optionally override columns used as sources for scalar UDF input arguments
            or pa.Array batch UDF arguments.  Not valid for pa.RecordBatch UDFs.
        udf: UDF | None
            Optionally override the UDF used to backfill the column.
        where: str | None
            SQL expression filter used select rows to apply backfills.
        concurrency: int
            (default = 8) This controls the number of processes that tasks run
            concurrently. For max throughput, ideally this is larger than the number
            of nodes in the k8s cluster.   This is the number of Ray actor processes
            are started.
        intra_applier_concurrency: int
            (default = 1) This controls the number of threads used to execute tasks
            within a process. Multiplying this times `concurrency` roughly corresponds
            to the number of cpu's being used.
        commit_granularity: int | None
            (default = 64) Show a partial result everytime this number of fragments
            are completed. If None, the entire result is committed at once.
        read_version: int | None
            (default = None) The version of the table to read from.  If None, the
            latest version is used.
        task_shuffle_diversity: int | None
            (default = 8) ??
        batch_size: int | None
            (default = 10240) The number of rows per batch when reading data from the
            table. If None, the default value is used.

        """
        # Input validation
        from geneva.runners.ray.pipeline import validate_backfill_args

        validate_backfill_args(self, col_name, udf, input_columns)

        # get cluster status
        from geneva.runners.ray.raycluster import ClusterStatus

        cs = ClusterStatus(concurrency=concurrency)
        cs.get_status()

        # Kick off the job
        fut = self.backfill_async(
            col_name,
            input_columns=input_columns,
            udf=udf,
            where=where,
            concurrency=concurrency,
            intra_applier_concurrency=intra_applier_concurrency,
            **kwargs,
        )

        while not fut.done(timeout=refresh_status_secs):
            # wait for the backfill to complete, updating statuses
            cs.get_status()
            fut.status()

        cs.get_status()
        fut.status()

        # updates came from an external writer, so get the latest version.
        self._ltbl.checkout_latest()
        return fut.job_id

    def alter_columns(self, *alterations: Iterable[dict[str, Any]], **kwargs) -> None:
        basic_column_alterations = []
        for alter in alterations:
            if "virtual_column" in alter:
                if "path" not in alter:
                    raise ValueError("path is required to alter virtual virtual_column")
                if not isinstance(alter["virtual_column"], UDF):
                    raise ValueError("virtual_column must be a UDF")

                col_name = alter["path"]
                udf = alter["virtual_column"]

                input_cols = alter.get("input_columns", None)

                if input_cols is None:
                    input_cols = udf.input_columns

                self._configure_virtual_column(col_name, udf, input_cols)

            else:
                basic_column_alterations.append(alter)

        if len(basic_column_alterations) > 0:
            self._ltbl.alter_columns(*basic_column_alterations)

    def _configure_virtual_column(
        self,
        col_name: str,
        udf: UDF,
        input_cols: list[str] | None,
    ) -> None:
        """
        Configure a column to be a virtual column for the given UDF.

        This procedure includes:
        - Packaging the UDF
        - Uploading the UDF to the dataset
        - Updating the field metadata to include the UDF information

        Note that the column should already exist on the table.
        """
        udf_spec = self._conn._packager.marshal(udf)

        # upload the UDF to the dataset URL
        if not isinstance(self._ltbl, LanceLocalTable):
            raise TypeError(
                "adding udf column is currently only supported for local tables"
            )

        # upload the packaged UDF to some location inside the dataset:
        ds = self.to_lance()
        fs, root_uri = FileSystem.from_uri(ds.uri)
        checksum = hashlib.sha256(udf_spec.udf_payload).hexdigest()
        udf_location = f"_udfs/{checksum}"

        # TODO -- only upload the UDF if it doesn't exist
        if isinstance(fs, LocalFileSystem):
            # Object storage filesystems like GCS and S3 will create the directory
            # automatically, but local filesystem will not, so we create explicitly
            fs.create_dir(f"{root_uri}/_udfs")

        with fs.open_output_stream(f"{root_uri}/{udf_location}") as f:
            f.write(udf_spec.udf_payload)

        field_metadata = udf.field_metadata | {
            "virtual_column": "true",
            "virtual_column.udf_backend": udf_spec.backend,
            "virtual_column.udf_name": udf_spec.name,
            "virtual_column.udf": "_udfs/" + checksum,
            "virtual_column.udf_inputs": json.dumps(input_cols),
        }

        # Add the column metadata:
        self._ltbl.replace_field_metadata(col_name, field_metadata)

    def create_index(
        self,
        metric: str = "L2",
        num_partitions: int | None = None,
        num_sub_vectors: int | None = None,
        vector_column_name: str = VECTOR_COLUMN_NAME,
        replace: bool = True,
        accelerator=None,
        index_cache_size=None,
        *,
        index_type: Literal[
            "IVF_FLAT",
            "IVF_PQ",
            "IVF_HNSW_SQ",
            "IVF_HNSW_PQ",
        ] = "IVF_PQ",
        num_bits: int = 8,
        max_iterations: int = 50,
        sample_rate: int = 256,
        m: int = 20,
        ef_construction: int = 300,
    ) -> None:
        """Create Vector Index"""
        self._ltbl.create_index(
            metric,
            num_partitions or 256,
            num_sub_vectors or 96,
            vector_column_name,
            replace,
            accelerator,
            index_cache_size,
            index_type=index_type,
            num_bits=num_bits,
            max_iterations=max_iterations,
            sample_rate=sample_rate,
            m=m,
            ef_construction=ef_construction,
        )

    @override
    def create_fts_index(
        self,
        field_names: str | list[str],
        *,
        ordering_field_names: str | list[str] | None = None,
        replace: bool = False,
        writer_heap_size: int | None = None,
        tokenizer_name: str | None = None,
        with_position: bool = True,
        base_tokenizer: Literal["simple", "raw", "whitespace"] = "simple",
        language: str = "English",
        max_token_length: int | None = 40,
        lower_case: bool = True,
        stem: bool = False,
        remove_stop_words: bool = False,
        ascii_folding: bool = False,
        **_kwargs,
    ) -> None:
        self._ltbl.create_fts_index(
            field_names,
            ordering_field_names=ordering_field_names,
            replace=replace,
            writer_heap_size=writer_heap_size,
            tokenizer_name=tokenizer_name,
            with_position=with_position,
            base_tokenizer=base_tokenizer,
            language=language,
            max_token_length=max_token_length,
            lower_case=lower_case,
            stem=stem,
            remove_stop_words=remove_stop_words,
            ascii_folding=ascii_folding,
            use_tantivy=False,
        )

    @override
    def create_scalar_index(
        self,
        column: str,
        *,
        replace: bool = True,
        index_type: Literal["BTREE", "BITMAP", "LABEL_LIST"] = "BTREE",
    ) -> None:
        self._ltbl.create_scalar_index(
            column,
            replace=replace,
            index_type=index_type,
        )

    @override
    def _do_merge(self, other: "Table", on: list[str], how: str) -> Never:
        raise NotImplementedError("Merging tables is not supported.")

    @override
    def _execute_query(
        self,
        query: LanceQuery,
        batch_size: int | None = None,
    ) -> pa.RecordBatchReader:
        return self._ltbl._execute_query(query, batch_size=batch_size)

    def list_versions(self) -> list[dict[str, Any]]:
        return self._ltbl.list_versions()

    @override
    def cleanup_old_versions(
        self,
        older_than: timedelta | None = None,
        *,
        delete_unverified=False,
    ) -> "lance.CleanupStats":
        return self._ltbl.cleanup_old_versions(
            older_than,
            delete_unverified=delete_unverified,
        )

    def to_batches(self, batch_size: int | None = None) -> Iterator[pa.RecordBatch]:
        from .query import Query

        if isinstance(self._ltbl, Query):
            return self._ltbl.to_batches(batch_size)
        return self.to_lance().to_batches(batch_size)

    @overload
    def search(
        self,
        query: Union[VEC, str, "PIL.Image.Image", tuple, FullTextQuery],  # noqa: F821
        vector_column_name: str | None = None,
        query_type: QueryType = "auto",
        ordering_field_name: str | None = None,
        fts_columns: str | list[str] | None = None,
    ) -> LanceQueryBuilder: ...

    """This is the signature for the standard LanceDB table.search call"""

    def search(
        self,
        query: list | pa.Array | pa.ChunkedArray | np.ndarray | None = None,
        vector_column_name: str | None = None,
        query_type: Literal["vector", "fts", "hybrid", "auto"] = "auto",
        ordering_field_name: str | None = None,
        fts_columns: str | list[str] | None = None,
    ) -> GenevaQueryBuilder:
        if query is None:
            return GenevaQueryBuilder(self)
        else:
            return self._ltbl.search(
                query, vector_column_name, query_type, ordering_field_name, fts_columns
            )

    @override
    def drop_columns(self, columns: Iterable[str]) -> None:
        self._ltbl.drop_columns(columns)

    @override
    def to_arrow(self) -> pa.Table:
        return self._ltbl.to_arrow()

    @override
    def count_rows(self, filter: str | None = None) -> int:
        return self._ltbl.count_rows(filter)

    @override
    def update(
        self,
        where: str | None = None,
        values: dict | None = None,
        *,
        values_sql: dict[str, str] | None = None,
    ) -> None:
        self._ltbl.update(where, values, values_sql=values_sql)

    @override
    def delete(self, where: str) -> None:
        self._ltbl.delete(where)

    @override
    def list_indices(self) -> Iterable[IndexConfig]:
        return self._ltbl.list_indices()

    @override
    def index_stats(self, index_name: str) -> IndexStatistics | None:
        return self._ltbl.index_stats(index_name)

    @override
    def optimize(
        self,
        *,
        cleanup_older_than: timedelta | None = None,
        delete_unverified: bool = False,
    ) -> None:
        return self._ltbl.optimize(
            cleanup_older_than=cleanup_older_than,
            delete_unverified=delete_unverified,
        )

    @override
    def compact_files(self) -> None:
        self._ltbl.compact_files()

    @override
    def restore(self, *args, **kwargs) -> None:
        self._ltbl.restore(*args, **kwargs)

    # TODO: This annotation sucks
    def take_blobs(self, indices: list[int] | pa.Array, column: str):  # noqa: ANN201
        return self.to_lance().take_blobs(indices, column)

    def to_lance(self) -> lance.LanceDataset:
        return self._ltbl.to_lance()

    def uses_v2_manifest_paths(self) -> bool:
        return self._ltbl.uses_v2_manifest_paths()

    def migrate_v2_manifest_paths(self) -> None:
        return self._ltbl.migrate_v2_manifest_paths()

    def _analyze_plan(self, query: lancedb.query.Query) -> str:
        return self._ltbl._analyze_plan(query)

    def _explain_plan(
        self, query: lancedb.query.Query, verbose: bool | None = False
    ) -> str:
        return self._ltbl._explain_plan(query, verbose=verbose)

    def stats(self) -> TableStatistics:
        return self._ltbl.stats()

    @property
    def tags(self) -> Tags:
        return self._ltbl.tags

name

name: str

Get the name of the table.

version

version: int

Get the current version of the table

schema

schema: Schema

The Arrow Schema of the Table.

uri

uri: str

embedding_functions

embedding_functions: Never

tags

tags: Tags

get_reference

get_reference() -> TableReference
Source code in geneva/table.py
def get_reference(self) -> TableReference:
    return TableReference(
        db_uri=self._conn.uri, table_name=self._name, version=self._version
    )

get_fragments

get_fragments() -> list[LanceFragment]
Source code in geneva/table.py
def get_fragments(self) -> list[lance.LanceFragment]:
    return self.to_lance().get_fragments()

add

add(
    data,
    mode: str = "append",
    on_bad_vectors: str = "error",
    fill_value: float = 0.0,
) -> None
Source code in geneva/table.py
def add(
    self,
    data,
    mode: str = "append",
    on_bad_vectors: str = "error",
    fill_value: float = 0.0,
) -> None:
    self._ltbl.add(
        data,
        mode=mode,
        on_bad_vectors=on_bad_vectors,
        fill_value=fill_value,
    )

checkout

checkout(version: int) -> None
Source code in geneva/table.py
def checkout(self, version: int) -> None:
    self._version = version
    self._ltbl.checkout(version)

checkout_latest

checkout_latest() -> None
Source code in geneva/table.py
def checkout_latest(self) -> None:
    self._ltbl.checkout_latest()

add_columns

add_columns(
    transforms: dict[
        str, str | UDF | tuple[UDF, list[str]]
    ],
    *args,
    **kwargs,
) -> None

Add columns or udf based columns to the Geneva table.

Parameters:

  • transforms (dict[str, str | UDF | tuple[UDF, list[str]]]) –

    The key is the column name to add and the value is a specification of the column type/value. * If the spec is a string, it is expected to be a datafusion sql expression. (e.g "cast(null as string)")) * If the spec is a UDF, a virtual column is added with input columns inferred from the UDF's argument names. * If the spec is a tuple, the first element is a UDF and the second element is a list of input column names.

Raises:

  • ValueError –
Source code in geneva/table.py
def add_columns(
    self, transforms: dict[str, str | UDF | tuple[UDF, list[str]]], *args, **kwargs
) -> None:
    """
    Add columns or udf based columns to the Geneva table.

    Parameters
    ----------
    transforms : dict[str, str | UDF | tuple[UDF, list[str]]]
        The key is the column name to add and the value is a
        specification of the column type/value.
        * If the spec is a string, it is expected to be a datafusion
          sql expression. (e.g "cast(null as string)"))
        * If the spec is a UDF, a virtual column is added with input
          columns inferred from the UDF's argument names.
        * If the spec is a tuple, the first element is a UDF and the
          second element is a list of input column names.

    Raises
    ------
    ValueError

    """
    # handle basic columns
    basic_cols = {k: v for k, v in transforms.items() if isinstance(v, str)}
    if len(basic_cols) > 0:
        self._ltbl.add_columns(basic_cols, *args)

    # handle UDF virtual columns
    udf_cols = {k: v for k, v in transforms.items() if not isinstance(v, str)}
    for k, v in udf_cols.items():
        if isinstance(v, UDF):
            # infer column names from udf arguments
            udf = v
            self._add_virtual_columns(
                {k: udf}, *args, input_columns=udf.input_columns, **kwargs
            )
        else:
            # explicitly specify input columns
            (udf, cols) = v
            self._add_virtual_columns({k: udf}, *args, input_columns=cols, **kwargs)

refresh

refresh(where: str | None = None) -> None

Refresh the specified materialized view.

Parameters:

  • where (str | None, default: None ) –

    TODO: sql expression filter used to only backfill selected rows

Source code in geneva/table.py
def refresh(self, where: str | None = None) -> None:
    """
    Refresh the specified materialized view.

    Parameters
    ----------
    where: str | None
        TODO: sql expression filter used to only backfill selected rows
    """
    if where:
        raise NotImplementedError(
            "where clauses on materialized view refresh not implemented yet."
        )

    from geneva.runners.ray.pipeline import run_ray_copy_table

    run_ray_copy_table(
        self.get_reference(), self._conn._packager, self._conn._checkpoint_store
    )
    self.checkout_latest()

backfill_async

backfill_async(
    col_name: str,
    *,
    input_columns: list[str] | None = None,
    udf: UDF | None = None,
    where: str | None = None,
    **kwargs,
) -> JobFuture

Run backfill asynchronously by dispatching a driver task.

Source code in geneva/table.py
def backfill_async(
    self,
    col_name: str,
    *,
    input_columns: list[str] | None = None,
    udf: UDF | None = None,
    where: str | None = None,
    **kwargs,
) -> JobFuture:
    """Run backfill asynchronously by dispatching a driver task."""
    from geneva.runners.ray.pipeline import (
        dispatch_run_ray_add_column,
        validate_backfill_args,
    )

    validate_backfill_args(self, col_name, udf, input_columns)

    fut = dispatch_run_ray_add_column(
        self.get_reference(),
        col_name,
        udf=udf,
        input_columns=input_columns,
        where=where,
        **kwargs,
    )
    return fut

backfill

backfill(
    col_name,
    *,
    input_columns: list[str] | None = None,
    udf: UDF | None = None,
    where: str | None = None,
    concurrency: int = 8,
    intra_applier_concurrency: int = 1,
    refresh_status_secs: float = 2.0,
    **kwargs,
) -> str

Backfills the specified column.

Returns job_id string

Parameters:

  • col_name –

    Target column name to backfill

  • input_columns (list[str] | None, default: None ) –

    Optionally override columns used as sources for scalar UDF input arguments or pa.Array batch UDF arguments. Not valid for pa.RecordBatch UDFs.

  • udf (UDF | None, default: None ) –

    Optionally override the UDF used to backfill the column.

  • where (str | None, default: None ) –

    SQL expression filter used select rows to apply backfills.

  • concurrency (int, default: 8 ) –

    (default = 8) This controls the number of processes that tasks run concurrently. For max throughput, ideally this is larger than the number of nodes in the k8s cluster. This is the number of Ray actor processes are started.

  • intra_applier_concurrency (int, default: 1 ) –

    (default = 1) This controls the number of threads used to execute tasks within a process. Multiplying this times concurrency roughly corresponds to the number of cpu's being used.

  • commit_granularity –

    (default = 64) Show a partial result everytime this number of fragments are completed. If None, the entire result is committed at once.

  • read_version –

    (default = None) The version of the table to read from. If None, the latest version is used.

  • task_shuffle_diversity –

    (default = 8) ??

  • batch_size –

    (default = 10240) The number of rows per batch when reading data from the table. If None, the default value is used.

Source code in geneva/table.py
def backfill(
    self,
    col_name,
    *,
    input_columns: list[str] | None = None,
    udf: UDF | None = None,
    where: str | None = None,
    concurrency: int = 8,
    intra_applier_concurrency: int = 1,
    refresh_status_secs: float = 2.0,
    **kwargs,
) -> str:
    """
    Backfills the specified column.

    Returns job_id string

    Parameters
    ----------
    col_name: str
        Target column name to backfill
    input_columns: list[str] | None
        Optionally override columns used as sources for scalar UDF input arguments
        or pa.Array batch UDF arguments.  Not valid for pa.RecordBatch UDFs.
    udf: UDF | None
        Optionally override the UDF used to backfill the column.
    where: str | None
        SQL expression filter used select rows to apply backfills.
    concurrency: int
        (default = 8) This controls the number of processes that tasks run
        concurrently. For max throughput, ideally this is larger than the number
        of nodes in the k8s cluster.   This is the number of Ray actor processes
        are started.
    intra_applier_concurrency: int
        (default = 1) This controls the number of threads used to execute tasks
        within a process. Multiplying this times `concurrency` roughly corresponds
        to the number of cpu's being used.
    commit_granularity: int | None
        (default = 64) Show a partial result everytime this number of fragments
        are completed. If None, the entire result is committed at once.
    read_version: int | None
        (default = None) The version of the table to read from.  If None, the
        latest version is used.
    task_shuffle_diversity: int | None
        (default = 8) ??
    batch_size: int | None
        (default = 10240) The number of rows per batch when reading data from the
        table. If None, the default value is used.

    """
    # Input validation
    from geneva.runners.ray.pipeline import validate_backfill_args

    validate_backfill_args(self, col_name, udf, input_columns)

    # get cluster status
    from geneva.runners.ray.raycluster import ClusterStatus

    cs = ClusterStatus(concurrency=concurrency)
    cs.get_status()

    # Kick off the job
    fut = self.backfill_async(
        col_name,
        input_columns=input_columns,
        udf=udf,
        where=where,
        concurrency=concurrency,
        intra_applier_concurrency=intra_applier_concurrency,
        **kwargs,
    )

    while not fut.done(timeout=refresh_status_secs):
        # wait for the backfill to complete, updating statuses
        cs.get_status()
        fut.status()

    cs.get_status()
    fut.status()

    # updates came from an external writer, so get the latest version.
    self._ltbl.checkout_latest()
    return fut.job_id

alter_columns

alter_columns(
    *alterations: Iterable[dict[str, Any]], **kwargs
) -> None
Source code in geneva/table.py
def alter_columns(self, *alterations: Iterable[dict[str, Any]], **kwargs) -> None:
    basic_column_alterations = []
    for alter in alterations:
        if "virtual_column" in alter:
            if "path" not in alter:
                raise ValueError("path is required to alter virtual virtual_column")
            if not isinstance(alter["virtual_column"], UDF):
                raise ValueError("virtual_column must be a UDF")

            col_name = alter["path"]
            udf = alter["virtual_column"]

            input_cols = alter.get("input_columns", None)

            if input_cols is None:
                input_cols = udf.input_columns

            self._configure_virtual_column(col_name, udf, input_cols)

        else:
            basic_column_alterations.append(alter)

    if len(basic_column_alterations) > 0:
        self._ltbl.alter_columns(*basic_column_alterations)

create_index

create_index(
    metric: str = "L2",
    num_partitions: int | None = None,
    num_sub_vectors: int | None = None,
    vector_column_name: str = VECTOR_COLUMN_NAME,
    replace: bool = True,
    accelerator=None,
    index_cache_size=None,
    *,
    index_type: Literal[
        "IVF_FLAT", "IVF_PQ", "IVF_HNSW_SQ", "IVF_HNSW_PQ"
    ] = "IVF_PQ",
    num_bits: int = 8,
    max_iterations: int = 50,
    sample_rate: int = 256,
    m: int = 20,
    ef_construction: int = 300,
) -> None

Create Vector Index

Source code in geneva/table.py
def create_index(
    self,
    metric: str = "L2",
    num_partitions: int | None = None,
    num_sub_vectors: int | None = None,
    vector_column_name: str = VECTOR_COLUMN_NAME,
    replace: bool = True,
    accelerator=None,
    index_cache_size=None,
    *,
    index_type: Literal[
        "IVF_FLAT",
        "IVF_PQ",
        "IVF_HNSW_SQ",
        "IVF_HNSW_PQ",
    ] = "IVF_PQ",
    num_bits: int = 8,
    max_iterations: int = 50,
    sample_rate: int = 256,
    m: int = 20,
    ef_construction: int = 300,
) -> None:
    """Create Vector Index"""
    self._ltbl.create_index(
        metric,
        num_partitions or 256,
        num_sub_vectors or 96,
        vector_column_name,
        replace,
        accelerator,
        index_cache_size,
        index_type=index_type,
        num_bits=num_bits,
        max_iterations=max_iterations,
        sample_rate=sample_rate,
        m=m,
        ef_construction=ef_construction,
    )

create_fts_index

create_fts_index(
    field_names: str | list[str],
    *,
    ordering_field_names: str | list[str] | None = None,
    replace: bool = False,
    writer_heap_size: int | None = None,
    tokenizer_name: str | None = None,
    with_position: bool = True,
    base_tokenizer: Literal[
        "simple", "raw", "whitespace"
    ] = "simple",
    language: str = "English",
    max_token_length: int | None = 40,
    lower_case: bool = True,
    stem: bool = False,
    remove_stop_words: bool = False,
    ascii_folding: bool = False,
    **_kwargs,
) -> None
Source code in geneva/table.py
@override
def create_fts_index(
    self,
    field_names: str | list[str],
    *,
    ordering_field_names: str | list[str] | None = None,
    replace: bool = False,
    writer_heap_size: int | None = None,
    tokenizer_name: str | None = None,
    with_position: bool = True,
    base_tokenizer: Literal["simple", "raw", "whitespace"] = "simple",
    language: str = "English",
    max_token_length: int | None = 40,
    lower_case: bool = True,
    stem: bool = False,
    remove_stop_words: bool = False,
    ascii_folding: bool = False,
    **_kwargs,
) -> None:
    self._ltbl.create_fts_index(
        field_names,
        ordering_field_names=ordering_field_names,
        replace=replace,
        writer_heap_size=writer_heap_size,
        tokenizer_name=tokenizer_name,
        with_position=with_position,
        base_tokenizer=base_tokenizer,
        language=language,
        max_token_length=max_token_length,
        lower_case=lower_case,
        stem=stem,
        remove_stop_words=remove_stop_words,
        ascii_folding=ascii_folding,
        use_tantivy=False,
    )

create_scalar_index

create_scalar_index(
    column: str,
    *,
    replace: bool = True,
    index_type: Literal[
        "BTREE", "BITMAP", "LABEL_LIST"
    ] = "BTREE",
) -> None
Source code in geneva/table.py
@override
def create_scalar_index(
    self,
    column: str,
    *,
    replace: bool = True,
    index_type: Literal["BTREE", "BITMAP", "LABEL_LIST"] = "BTREE",
) -> None:
    self._ltbl.create_scalar_index(
        column,
        replace=replace,
        index_type=index_type,
    )

list_versions

list_versions() -> list[dict[str, Any]]
Source code in geneva/table.py
def list_versions(self) -> list[dict[str, Any]]:
    return self._ltbl.list_versions()

cleanup_old_versions

cleanup_old_versions(
    older_than: timedelta | None = None,
    *,
    delete_unverified=False,
) -> CleanupStats
Source code in geneva/table.py
@override
def cleanup_old_versions(
    self,
    older_than: timedelta | None = None,
    *,
    delete_unverified=False,
) -> "lance.CleanupStats":
    return self._ltbl.cleanup_old_versions(
        older_than,
        delete_unverified=delete_unverified,
    )

to_batches

to_batches(
    batch_size: int | None = None,
) -> Iterator[RecordBatch]
Source code in geneva/table.py
def to_batches(self, batch_size: int | None = None) -> Iterator[pa.RecordBatch]:
    from .query import Query

    if isinstance(self._ltbl, Query):
        return self._ltbl.to_batches(batch_size)
    return self.to_lance().to_batches(batch_size)

search

search(
    query: Union[VEC, str, Image, tuple, FullTextQuery],
    vector_column_name: str | None = None,
    query_type: QueryType = "auto",
    ordering_field_name: str | None = None,
    fts_columns: str | list[str] | None = None,
) -> LanceQueryBuilder
search(
    query: list
    | Array
    | ChunkedArray
    | ndarray
    | None = None,
    vector_column_name: str | None = None,
    query_type: Literal[
        "vector", "fts", "hybrid", "auto"
    ] = "auto",
    ordering_field_name: str | None = None,
    fts_columns: str | list[str] | None = None,
) -> GenevaQueryBuilder
Source code in geneva/table.py
def search(
    self,
    query: list | pa.Array | pa.ChunkedArray | np.ndarray | None = None,
    vector_column_name: str | None = None,
    query_type: Literal["vector", "fts", "hybrid", "auto"] = "auto",
    ordering_field_name: str | None = None,
    fts_columns: str | list[str] | None = None,
) -> GenevaQueryBuilder:
    if query is None:
        return GenevaQueryBuilder(self)
    else:
        return self._ltbl.search(
            query, vector_column_name, query_type, ordering_field_name, fts_columns
        )

drop_columns

drop_columns(columns: Iterable[str]) -> None
Source code in geneva/table.py
@override
def drop_columns(self, columns: Iterable[str]) -> None:
    self._ltbl.drop_columns(columns)

to_arrow

to_arrow() -> Table
Source code in geneva/table.py
@override
def to_arrow(self) -> pa.Table:
    return self._ltbl.to_arrow()

count_rows

count_rows(filter: str | None = None) -> int
Source code in geneva/table.py
@override
def count_rows(self, filter: str | None = None) -> int:
    return self._ltbl.count_rows(filter)

update

update(
    where: str | None = None,
    values: dict | None = None,
    *,
    values_sql: dict[str, str] | None = None,
) -> None
Source code in geneva/table.py
@override
def update(
    self,
    where: str | None = None,
    values: dict | None = None,
    *,
    values_sql: dict[str, str] | None = None,
) -> None:
    self._ltbl.update(where, values, values_sql=values_sql)

delete

delete(where: str) -> None
Source code in geneva/table.py
@override
def delete(self, where: str) -> None:
    self._ltbl.delete(where)

list_indices

list_indices() -> Iterable[IndexConfig]
Source code in geneva/table.py
@override
def list_indices(self) -> Iterable[IndexConfig]:
    return self._ltbl.list_indices()

index_stats

index_stats(index_name: str) -> IndexStatistics | None
Source code in geneva/table.py
@override
def index_stats(self, index_name: str) -> IndexStatistics | None:
    return self._ltbl.index_stats(index_name)

optimize

optimize(
    *,
    cleanup_older_than: timedelta | None = None,
    delete_unverified: bool = False,
) -> None
Source code in geneva/table.py
@override
def optimize(
    self,
    *,
    cleanup_older_than: timedelta | None = None,
    delete_unverified: bool = False,
) -> None:
    return self._ltbl.optimize(
        cleanup_older_than=cleanup_older_than,
        delete_unverified=delete_unverified,
    )

compact_files

compact_files() -> None
Source code in geneva/table.py
@override
def compact_files(self) -> None:
    self._ltbl.compact_files()

restore

restore(*args, **kwargs) -> None
Source code in geneva/table.py
@override
def restore(self, *args, **kwargs) -> None:
    self._ltbl.restore(*args, **kwargs)

take_blobs

take_blobs(indices: list[int] | Array, column: str)
Source code in geneva/table.py
def take_blobs(self, indices: list[int] | pa.Array, column: str):  # noqa: ANN201
    return self.to_lance().take_blobs(indices, column)

to_lance

to_lance() -> LanceDataset
Source code in geneva/table.py
def to_lance(self) -> lance.LanceDataset:
    return self._ltbl.to_lance()

uses_v2_manifest_paths

uses_v2_manifest_paths() -> bool
Source code in geneva/table.py
def uses_v2_manifest_paths(self) -> bool:
    return self._ltbl.uses_v2_manifest_paths()

migrate_v2_manifest_paths

migrate_v2_manifest_paths() -> None
Source code in geneva/table.py
def migrate_v2_manifest_paths(self) -> None:
    return self._ltbl.migrate_v2_manifest_paths()

stats

stats() -> TableStatistics
Source code in geneva/table.py
def stats(self) -> TableStatistics:
    return self._ltbl.stats()