lance package

Subpackages

Submodules

lance.arrow module

Extensions to PyArrows.

class lance.arrow.BFloat16Array

Bases: ExtensionArray

Bfloat16 PyArrow Array.

classmethod from_numpy(array: ndarray)

Create a BFloat16Array from a NumPy array.

Can only convert from a NumPy array of dtype bfloat16 from the ml_dtypes module.

Examples

>>> import numpy as np
>>> from ml_dtypes import bfloat16
>>> from lance.arrow import BFloat16Array
>>> arr = np.array([1.0, 2.0, 3.0], dtype=bfloat16)
>>> print(BFloat16Array.from_numpy(arr))
[
  1,
  2,
  3
]
to_numpy(zero_copy_only=False)

Convert to a NumPy array.

This will do a zero-copy conversion.

The conversion will fail if the array contains null values.

class lance.arrow.BFloat16Type

Bases: ExtensionType

to_pandas_dtype(self)

Return the equivalent NumPy / Pandas dtype.

Examples

>>> import pyarrow as pa
>>> pa.int64().to_pandas_dtype()
<class 'numpy.int64'>
class lance.arrow.EncodedImageType(storage_type: DataType = DataType(binary))

Bases: ExtensionType

class lance.arrow.FixedShapeImageTensorType(arrow_type: DataType, shape)

Bases: ExtensionType

class lance.arrow.ImageArray

Bases: ExtensionArray

classmethod from_array(images)

Create an one of subclasses of ImageArray from input data.

Parameters:

images (Union[pa.StringArray, pa.BinaryArray, pa.FixedShapeTensorArray,) – Iterable]

Return type:

Union[ImageURIArray, EncodedImageArray, FixedShapeImageTensorArray]

class lance.arrow.ImageScalar

Bases: ExtensionScalar

as_py(self)

Return this scalar as a Python object.

class lance.arrow.ImageURIArray

Bases: ImageArray

Array of image URIs. URIs may represent local files or remote files on the web, S3 or GCS if they are accessible by the machine executing this.

classmethod from_uris(uris: StringArray | LargeStringArray | Iterable[str | Path])

Create an ImageURIArray from an array or iterable of URIs (such as a list).

Parameters:

uris (Union[pa.StringArray, pa.LargeStringArray, Iterable[Union[str, Path]]]) –

Returns:

Array of image URIs

Return type:

ImageURIArray

Examples

>>> uris = ["file::///tmp/1.png"]
>>> ImageURIArray.from_uris(uris)
<lance.arrow.ImageURIArray object at 0x...>
['file::///tmp/1.png']
read_uris(storage_type=DataType(binary)) EncodedImageArray

Read the images from the URIs into memory and return an EncodedImageArray

Parameters:

storage_type (pa.DataType, optional) – The storage type to use for the encoded images. Default is pa.binary(). To support arrays with more than 2GiB of data, use pa.large_binary().

Returns:

Array of encoded images

Return type:

EncodedImageArray

Examples

>>> import os
>>> uris = [os.path.join(os.path.dirname(__file__), "../tests/images/1.png")]
>>> uri_array = ImageURIArray.from_uris(uris)
>>> uri_array.read_uris()
<lance.arrow.EncodedImageArray object at 0x...>
...
class lance.arrow.ImageURIType(storage_type: DataType = DataType(string))

Bases: ExtensionType

lance.arrow.bfloat16_array(values)
lance.arrow.cast(arr: Array, target_type: DataType | str, *args, **kwargs) Array

Cast an array to another data type.

Extends pyarrow.compute.cast() for lance defined extension types. In case where the casting can be handled by pyarrow natively, it falls back to pyarrow.

Supported operations:

  • Cast between floating (float16, float32, float64) arrays and bfloat16 arrays.

  • Cast between FixedSizeListArray of floats (float16, float32, float64, bfloat16) with the same list size.

Parameters:
  • arr (pyarrow.Array) – Array to cast.

  • target_type (pyarrow.DataType or str) – Target data type. Accepts anything pyarrow.compute.cast() accepts. Additionally, accepts strings "bfloat16", "bf16" or BFloat16Type.

lance.blob module

class lance.blob.BlobColumn(blob_column: Array | ChunkedArray)

Bases: object

A utility to wrap a Pyarrow binary column and iterate over the rows as file-like objects.

This can be useful for working with medium-to-small binary objects that need to interface with APIs that expect file-like objects. For very large binary objects (4-8MB or more per value) you might be better off creating a blob column and using lance.Dataset.take_blobs to access the blob data.

class lance.blob.BlobFile(inner: LanceBlobFile)

Bases: RawIOBase

Represents a blob in a Lance dataset as a file-like object.

close() None

Flush and close the IO object.

This method has no effect if the file is already closed.

property closed: bool
readable() bool

Return whether object was opened for reading.

If False, read() will raise OSError.

readall() bytes

Read until EOF, using multiple read() call.

readinto(b: bytearray) int
seek(offset: int, whence: int = 0) int

Change the stream position to the given byte offset.

offset

The stream position, relative to ‘whence’.

whence

The relative position to seek from.

The offset is interpreted relative to the position indicated by whence. Values for whence are:

  • os.SEEK_SET or 0 – start of stream (the default); offset should be zero or positive

  • os.SEEK_CUR or 1 – current stream position; offset may be negative

  • os.SEEK_END or 2 – end of stream; offset is usually negative

Return the new absolute position.

seekable() bool

Return whether object supports random access.

If False, seek(), tell() and truncate() will raise OSError. This method may need to do a test seek().

size() int

Returns the size of the blob in bytes.

tell() int

Return current stream position.

class lance.blob.BlobIterator(binary_iter: Iterator[BinaryScalar])

Bases: object

lance.commit module

exception lance.commit.CommitConflictError

Bases: Exception

lance.conftest module

lance.dataset module

class lance.dataset.BulkCommitResult

Bases: TypedDict

dataset: LanceDataset
merged: Transaction
class lance.dataset.DatasetOptimizer(dataset: LanceDataset)

Bases: object

compact_files(*, target_rows_per_fragment: int = 1048576, max_rows_per_group: int = 1024, max_bytes_per_file: int | None = None, materialize_deletions: bool = True, materialize_deletions_threshold: float = 0.1, num_threads: int | None = None, batch_size: int | None = None) CompactionMetrics

Compacts small files in the dataset, reducing total number of files.

This does a few things:
  • Removes deleted rows from fragments

  • Removes dropped columns from fragments

  • Merges small fragments into larger ones

This method preserves the insertion order of the dataset. This may mean it leaves small fragments in the dataset if they are not adjacent to other fragments that need compaction. For example, if you have fragments with row counts 5 million, 100, and 5 million, the middle fragment will not be compacted because the fragments it is adjacent to do not need compaction.

Parameters:
  • target_rows_per_fragment (int, default 1024*1024) – The target number of rows per fragment. This is the number of rows that will be in each fragment after compaction.

  • max_rows_per_group (int, default 1024) –

    Max number of rows per group. This does not affect which fragments need compaction, but does affect how they are re-written if selected.

    This setting only affects datasets using the legacy storage format. The newer format does not require row groups.

  • max_bytes_per_file (Optional[int], default None) –

    Max number of bytes in a single file. This does not affect which fragments need compaction, but does affect how they are re-written if selected. If this value is too small you may end up with fragments that are smaller than target_rows_per_fragment.

    The default will use the default from write_dataset.

  • materialize_deletions (bool, default True) – Whether to compact fragments with soft deleted rows so they are no longer present in the file.

  • materialize_deletions_threshold (float, default 0.1) – The fraction of original rows that are soft deleted in a fragment before the fragment is a candidate for compaction.

  • num_threads (int, optional) – The number of threads to use when performing compaction. If not specified, defaults to the number of cores on the machine.

  • batch_size (int, optional) –

    The batch size to use when scanning input fragments. You may want to reduce this if you are running out of memory during compaction.

    The default will use the same default from scanner.

Returns:

Metrics about the compaction process

Return type:

CompactionMetrics

optimize_indices(**kwargs)

Optimizes index performance.

As new data arrives it is not added to existing indexes automatically. When searching we need to perform an indexed search of the old data plus an expensive unindexed search on the new data. As the amount of new unindexed data grows this can have an impact on search latency. This function will add the new data to existing indexes, restoring the performance. This function does not retrain the index, it only assigns the new data to existing partitions. This means an update is much quicker than retraining the entire index but may have less accuracy (especially if the new data exhibits new patterns, concepts, or trends)

Parameters:
  • num_indices_to_merge (int, default 1) – The number of indices to merge. If set to 0, new delta index will be created.

  • index_names (List[str], default None) – The names of the indices to optimize. If None, all indices will be optimized.

class lance.dataset.DatasetStats

Bases: TypedDict

num_deleted_rows: int
num_fragments: int
num_small_files: int
class lance.dataset.LanceDataset(uri: str | Path, version: int | str | None = None, block_size: int | None = None, index_cache_size: int | None = None, metadata_cache_size: int | None = None, commit_lock: CommitLock | None = None, storage_options: Dict[str, str] | None = None, serialized_manifest: bytes | None = None, default_scan_options: Dict[str, Any] | None = None)

Bases: Dataset

A Lance Dataset in Lance format where the data is stored at the given uri.

add_columns(transforms: Dict[str, str] | BatchUDF | ReaderLike, read_columns: List[str] | None = None, reader_schema: pa.Schema | None = None, batch_size: int | None = None)

Add new columns with defined values.

There are several ways to specify the new columns. First, you can provide SQL expressions for each new column. Second you can provide a UDF that takes a batch of existing data and returns a new batch with the new columns. These new columns will be appended to the dataset.

You can also provide a RecordBatchReader which will read the new column values from some external source. This is often useful when the new column values have already been staged to files (often by some distributed process)

See the lance.add_columns_udf() decorator for more information on writing UDFs.

Parameters:
  • transforms (dict or AddColumnsUDF or ReaderLike) – If this is a dictionary, then the keys are the names of the new columns and the values are SQL expression strings. These strings can reference existing columns in the dataset. If this is a AddColumnsUDF, then it is a UDF that takes a batch of existing data and returns a new batch with the new columns.

  • read_columns (list of str, optional) – The names of the columns that the UDF will read. If None, then the UDF will read all columns. This is only used when transforms is a UDF. Otherwise, the read columns are inferred from the SQL expressions.

  • reader_schema (pa.Schema, optional) – Only valid if transforms is a ReaderLike object. This will be used to determine the schema of the reader.

  • batch_size (int, optional) – The number of rows to read at a time from the source dataset when applying the transform. This is ignored if the dataset is a v1 dataset.

Examples

>>> import lance
>>> import pyarrow as pa
>>> table = pa.table({"a": [1, 2, 3]})
>>> dataset = lance.write_dataset(table, "my_dataset")
>>> @lance.batch_udf()
... def double_a(batch):
...     df = batch.to_pandas()
...     return pd.DataFrame({'double_a': 2 * df['a']})
>>> dataset.add_columns(double_a)
>>> dataset.to_table().to_pandas()
   a  double_a
0  1         2
1  2         4
2  3         6
>>> dataset.add_columns({"triple_a": "a * 3"})
>>> dataset.to_table().to_pandas()
   a  double_a  triple_a
0  1         2         3
1  2         4         6
2  3         6         9

See also

LanceDataset.merge

Merge a pre-computed set of columns into the dataset.

alter_columns(*alterations: Iterable[Dict[str, Any]])

Alter column name, data type, and nullability.

Columns that are renamed can keep any indices that are on them. If a column has an IVF_PQ index, it can be kept if the column is casted to another type. However, other index types don’t support casting at this time.

Column types can be upcasted (such as int32 to int64) or downcasted (such as int64 to int32). However, downcasting will fail if there are any values that cannot be represented in the new type. In general, columns can be casted to same general type: integers to integers, floats to floats, and strings to strings. However, strings, binary, and list columns can be casted between their size variants. For example, string to large string, binary to large binary, and list to large list.

Columns that are renamed can keep any indices that are on them. However, if the column is casted to a different type, it’s indices will be dropped.

Parameters:

alterations (Iterable[Dict[str, Any]]) –

A sequence of dictionaries, each with the following keys:

  • ”path”: str

    The column path to alter. For a top-level column, this is the name. For a nested column, this is the dot-separated path, e.g. “a.b.c”.

  • ”name”: str, optional

    The new name of the column. If not specified, the column name is not changed.

  • ”nullable”: bool, optional

    Whether the column should be nullable. If not specified, the column nullability is not changed. Only non-nullable columns can be changed to nullable. Currently, you cannot change a nullable column to non-nullable.

  • ”data_type”: pyarrow.DataType, optional

    The new data type to cast the column to. If not specified, the column data type is not changed.

Examples

>>> import lance
>>> import pyarrow as pa
>>> schema = pa.schema([pa.field('a', pa.int64()),
...                     pa.field('b', pa.string(), nullable=False)])
>>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]})
>>> dataset = lance.write_dataset(table, "example")
>>> dataset.alter_columns({"path": "a", "name": "x"},
...                       {"path": "b", "nullable": True})
>>> dataset.to_table().to_pandas()
   x  b
0  1  a
1  2  b
2  3  c
>>> dataset.alter_columns({"path": "x", "data_type": pa.int32()})
>>> dataset.schema
x: int32
b: string
checkout_version(version: int | str) LanceDataset

Load the given version of the dataset.

Unlike the dataset() constructor, this will re-use the current cache. This is a no-op if the dataset is already at the given version.

Parameters:

version (int | str,) – The version to check out. A version number (int) or a tag (str) can be provided.

Return type:

LanceDataset

cleanup_old_versions(older_than: timedelta | None = None, *, delete_unverified: bool = False, error_if_tagged_old_versions: bool = True) CleanupStats

Cleans up old versions of the dataset.

Some dataset changes, such as overwriting, leave behind data that is not referenced by the latest dataset version. The old data is left in place to allow the dataset to be restored back to an older version.

This method will remove older versions and any data files they reference. Once this cleanup task has run you will not be able to checkout or restore these older versions.

Parameters:
  • older_than (timedelta, optional) – Only versions older than this will be removed. If not specified, this will default to two weeks.

  • delete_unverified (bool, default False) –

    Files leftover from a failed transaction may appear to be part of an in-progress operation (e.g. appending new data) and these files will not be deleted unless they are at least 7 days old. If delete_unverified is True then these files will be deleted regardless of their age.

    This should only be set to True if you can guarantee that no other process is currently working on this dataset. Otherwise the dataset could be put into a corrupted state.

  • error_if_tagged_old_versions (bool, default True) – Some versions may have tags associated with them. Tagged versions will not be cleaned up, regardless of how old they are. If this argument is set to True (the default), an exception will be raised if any tagged versions match the parameters. Otherwise, tagged versions will be ignored without any error and only untagged versions will be cleaned up.

static commit(base_uri: str | Path | LanceDataset, operation: LanceOperation.BaseOperation, read_version: int | None = None, commit_lock: CommitLock | None = None, storage_options: Dict[str, str] | None = None, enable_v2_manifest_paths: bool | None = None, detached: bool | None = False, max_retries: int = 20) LanceDataset

Create a new version of dataset

This method is an advanced method which allows users to describe a change that has been made to the data files. This method is not needed when using Lance to apply changes (e.g. when using LanceDataset or write_dataset().)

It’s current purpose is to allow for changes being made in a distributed environment where no single process is doing all of the work. For example, a distributed bulk update or a distributed bulk modify operation.

Once all of the changes have been made, this method can be called to make the changes visible by updating the dataset manifest.

Warning

This is an advanced API and doesn’t provide the same level of validation as the other APIs. For example, it’s the responsibility of the caller to ensure that the fragments are valid for the schema.

Parameters:
  • base_uri (str, Path, or LanceDataset) – The base uri of the dataset, or the dataset object itself. Using the dataset object can be more efficient because it can re-use the file metadata cache.

  • operation (BaseOperation) – The operation to apply to the dataset. This describes what changes have been made. See available operations under LanceOperation.

  • read_version (int, optional) – The version of the dataset that was used as the base for the changes. This is not needed for overwrite or restore operations.

  • commit_lock (CommitLock, optional) – A custom commit lock. Only needed if your object store does not support atomic commits. See the user guide for more details.

  • storage_options (optional, dict) – Extra options that make sense for a particular storage connection. This is used to store connection parameters like credentials, endpoint, etc.

  • enable_v2_manifest_paths (bool, optional) – If True, and this is a new dataset, uses the new V2 manifest paths. These paths provide more efficient opening of datasets with many versions on object stores. This parameter has no effect if the dataset already exists. To migrate an existing dataset, instead use the migrate_manifest_paths_v2() method. Default is False. WARNING: turning this on will make the dataset unreadable for older versions of Lance (prior to 0.17.0).

  • detached (bool, optional) – If True, then the commit will not be part of the dataset lineage. It will never show up as the latest dataset and the only way to check it out in the future will be to specifically check it out by version. The version will be a random version that is only unique amongst detached commits. The caller should store this somewhere as there will be no other way to obtain it in the future.

  • max_retries (int) – The maximum number of retries to perform when committing the dataset.

Returns:

A new version of Lance Dataset.

Return type:

LanceDataset

Examples

Creating a new dataset with the LanceOperation.Overwrite operation:

>>> import lance
>>> import pyarrow as pa
>>> tab1 = pa.table({"a": [1, 2], "b": ["a", "b"]})
>>> tab2 = pa.table({"a": [3, 4], "b": ["c", "d"]})
>>> fragment1 = lance.fragment.LanceFragment.create("example", tab1)
>>> fragment2 = lance.fragment.LanceFragment.create("example", tab2)
>>> fragments = [fragment1, fragment2]
>>> operation = lance.LanceOperation.Overwrite(tab1.schema, fragments)
>>> dataset = lance.LanceDataset.commit("example", operation)
>>> dataset.to_table().to_pandas()
   a  b
0  1  a
1  2  b
2  3  c
3  4  d
static commit_batch(dest: str | Path | LanceDataset, transactions: Sequence[Transaction], commit_lock: CommitLock | None = None, storage_options: Dict[str, str] | None = None, enable_v2_manifest_paths: bool | None = None, detached: bool | None = False, max_retries: int = 20) BulkCommitResult

Create a new version of dataset with multiple transactions.

This method is an advanced method which allows users to describe a change that has been made to the data files. This method is not needed when using Lance to apply changes (e.g. when using LanceDataset or write_dataset().)

Parameters:
  • dest (str, Path, or LanceDataset) – The base uri of the dataset, or the dataset object itself. Using the dataset object can be more efficient because it can re-use the file metadata cache.

  • transactions (Iterable[Transaction]) – The transactions to apply to the dataset. These will be merged into a single transaction and applied to the dataset. Note: Only append transactions are currently supported. Other transaction types will be supported in the future.

  • commit_lock (CommitLock, optional) – A custom commit lock. Only needed if your object store does not support atomic commits. See the user guide for more details.

  • storage_options (optional, dict) – Extra options that make sense for a particular storage connection. This is used to store connection parameters like credentials, endpoint, etc.

  • enable_v2_manifest_paths (bool, optional) – If True, and this is a new dataset, uses the new V2 manifest paths. These paths provide more efficient opening of datasets with many versions on object stores. This parameter has no effect if the dataset already exists. To migrate an existing dataset, instead use the migrate_manifest_paths_v2() method. Default is False. WARNING: turning this on will make the dataset unreadable for older versions of Lance (prior to 0.17.0).

  • detached (bool, optional) – If True, then the commit will not be part of the dataset lineage. It will never show up as the latest dataset and the only way to check it out in the future will be to specifically check it out by version. The version will be a random version that is only unique amongst detached commits. The caller should store this somewhere as there will be no other way to obtain it in the future.

  • max_retries (int) – The maximum number of retries to perform when committing the dataset.

Returns:

dataset: LanceDataset

A new version of Lance Dataset.

merged: Transaction

The merged transaction that was applied to the dataset.

Return type:

dict with keys

count_rows(filter: str | Expression | None = None, **kwargs) int

Count rows matching the scanner filter.

Parameters:

**kwargs (dict, optional) – See py:method:scanner method for full parameter description.

Returns:

count – The total number of rows in the dataset.

Return type:

int

create_index(column: str | List[str], index_type: str, name: str | None = None, metric: str = 'L2', replace: bool = False, num_partitions: int | None = None, ivf_centroids: np.ndarray | pa.FixedSizeListArray | pa.FixedShapeTensorArray | None = None, pq_codebook: np.ndarray | pa.FixedSizeListArray | pa.FixedShapeTensorArray | None = None, num_sub_vectors: int | None = None, accelerator: str | 'torch.Device' | None = None, index_cache_size: int | None = None, shuffle_partition_batches: int | None = None, shuffle_partition_concurrency: int | None = None, ivf_centroids_file: str | None = None, precomputed_partition_dataset: str | None = None, storage_options: Dict[str, str] | None = None, filter_nan: bool = True, one_pass_ivfpq: bool = False, **kwargs) LanceDataset

Create index on column.

Experimental API

Parameters:
  • column (str) – The column to be indexed.

  • index_type (str) – The type of the index. "IVF_PQ, IVF_HNSW_PQ and IVF_HNSW_SQ" are supported now.

  • name (str, optional) – The index name. If not provided, it will be generated from the column name.

  • metric (str) – The distance metric type, i.e., “L2” (alias to “euclidean”), “cosine” or “dot” (dot product). Default is “L2”.

  • replace (bool) – Replace the existing index if it exists.

  • num_partitions (int, optional) – The number of partitions of IVF (Inverted File Index).

  • ivf_centroids (optional) – It can be either np.ndarray, pyarrow.FixedSizeListArray or pyarrow.FixedShapeTensorArray. A num_partitions x dimension array of existing K-mean centroids for IVF clustering. If not provided, a new KMeans model will be trained.

  • pq_codebook (optional,) –

    It can be np.ndarray, pyarrow.FixedSizeListArray, or pyarrow.FixedShapeTensorArray. A num_sub_vectors x (2 ^ nbits * dimensions // num_sub_vectors) array of K-mean centroids for PQ codebook.

    Note: nbits is always 8 for now. If not provided, a new PQ model will be trained.

  • num_sub_vectors (int, optional) – The number of sub-vectors for PQ (Product Quantization).

  • accelerator (str or torch.Device, optional) – If set, use an accelerator to speed up the training process. Accepted accelerator: “cuda” (Nvidia GPU) and “mps” (Apple Silicon GPU). If not set, use the CPU.

  • index_cache_size (int, optional) – The size of the index cache in number of entries. Default value is 256.

  • shuffle_partition_batches (int, optional) –

    The number of batches, using the row group size of the dataset, to include in each shuffle partition. Default value is 10240.

    Assuming the row group size is 1024, each shuffle partition will hold 10240 * 1024 = 10,485,760 rows. By making this value smaller, this shuffle will consume less memory but will take longer to complete, and vice versa.

  • shuffle_partition_concurrency (int, optional) –

    The number of shuffle partitions to process concurrently. Default value is 2

    By making this value smaller, this shuffle will consume less memory but will take longer to complete, and vice versa.

  • storage_options (optional, dict) – Extra options that make sense for a particular storage connection. This is used to store connection parameters like credentials, endpoint, etc.

  • filter_nan (bool) – Defaults to True. False is UNSAFE, and will cause a crash if any null/nan values are present (and otherwise will not). Disables the null filter used for nullable columns. Obtains a small speed boost.

  • one_pass_ivfpq (bool) – Defaults to False. If enabled, index type must be “IVF_PQ”. Reduces disk IO.

  • kwargs – Parameters passed to the index building process.

The SQ (Scalar Quantization) is available for only IVF_HNSW_SQ index type, this quantization method is used to reduce the memory usage of the index, it maps the float vectors to integer vectors, each integer is of num_bits, now only 8 bits are supported.

If index_type is “IVF_*”, then the following parameters are required:

num_partitions

If index_type is with “PQ”, then the following parameters are required:

num_sub_vectors

Optional parameters for IVF_PQ:

  • ivf_centroids

    Existing K-mean centroids for IVF clustering.

  • num_bits

    The number of bits for PQ (Product Quantization). Default is 8. Only 4, 8 are supported.

Optional parameters for IVF_HNSW_*:
max_level

Int, the maximum number of levels in the graph.

m

Int, the number of edges per node in the graph.

ef_construction

Int, the number of nodes to examine during the construction.

Examples

import lance

dataset = lance.dataset("/tmp/sift.lance")
dataset.create_index(
    "vector",
    "IVF_PQ",
    num_partitions=256,
    num_sub_vectors=16
)
import lance

dataset = lance.dataset("/tmp/sift.lance")
dataset.create_index(
    "vector",
    "IVF_HNSW_SQ",
    num_partitions=256,
)

Experimental Accelerator (GPU) support:

  • accelerate: use GPU to train IVF partitions.

    Only supports CUDA (Nvidia) or MPS (Apple) currently. Requires PyTorch being installed.

import lance

dataset = lance.dataset("/tmp/sift.lance")
dataset.create_index(
    "vector",
    "IVF_PQ",
    num_partitions=256,
    num_sub_vectors=16,
    accelerator="cuda"
)

References

create_scalar_index(column: str, index_type: Literal['BTREE'] | Literal['BITMAP'] | Literal['LABEL_LIST'] | Literal['INVERTED'] | Literal['FTS'], name: str | None = None, *, replace: bool = True, **kwargs)

Create a scalar index on a column.

Scalar indices, like vector indices, can be used to speed up scans. A scalar index can speed up scans that contain filter expressions on the indexed column. For example, the following scan will be faster if the column my_col has a scalar index:

import lance

dataset = lance.dataset("/tmp/images.lance")
my_table = dataset.scanner(filter="my_col != 7").to_table()

Vector search with pre-filers can also benefit from scalar indices. For example,

import lance

dataset = lance.dataset("/tmp/images.lance")
my_table = dataset.scanner(
    nearest=dict(
       column="vector",
       q=[1, 2, 3, 4],
       k=10,
    )
    filter="my_col != 7",
    prefilter=True
)

There are 4 types of scalar indices available today.

  • BTREE. The most common type is BTREE. This index is inspired by the btree data structure although only the first few layers of the btree are cached in memory. It will perform well on columns with a large number of unique values and few rows per value.

  • BITMAP. This index stores a bitmap for each unique value in the column. This index is useful for columns with a small number of unique values and many rows per value.

  • LABEL_LIST. A special index that is used to index list columns whose values have small cardinality. For example, a column that contains lists of tags (e.g. ["tag1", "tag2", "tag3"]) can be indexed with a LABEL_LIST index. This index can only speedup queries with array_has_any or array_has_all filters.

  • FTS/INVERTED. It is used to index document columns. This index can conduct full-text searches. For example, a column that contains any word of query string “hello world”. The results will be ranked by BM25.

Note that the LANCE_BYPASS_SPILLING environment variable can be used to bypass spilling to disk. Setting this to true can avoid memory exhaustion issues (see https://github.com/apache/datafusion/issues/10073 for more info).

Experimental API

Parameters:
  • column (str) – The column to be indexed. Must be a boolean, integer, float, or string column.

  • index_type (str) – The type of the index. One of "BTREE", "BITMAP", "LABEL_LIST", “FTS” or "INVERTED".

  • name (str, optional) – The index name. If not provided, it will be generated from the column name.

  • replace (bool, default True) – Replace the existing index if it exists.

  • Parameters (Optional) –

  • -------------------

  • with_position (bool, default True) – This is for the INVERTED index. If True, the index will store the positions of the words in the document, so that you can conduct phrase query. This will significantly increase the index size. It won’t impact the performance of non-phrase queries even if it is set to True.

  • base_tokenizer (str, default "simple") – This is for the INVERTED index. The base tokenizer to use. The value can be: * “simple”: splits tokens on whitespace and punctuation. * “whitespace”: splits tokens on whitespace. * “raw”: no tokenization.

  • language (str, default "English") – This is for the INVERTED index. The language for stemming and stop words. This is only used when stem or remove_stop_words is true

  • max_token_length (Optional[int], default 40) – This is for the INVERTED index. The maximum token length. Any token longer than this will be removed.

  • lower_case (bool, default True) – This is for the INVERTED index. If True, the index will convert all text to lowercase.

  • stem (bool, default False) – This is for the INVERTED index. If True, the index will stem the tokens.

  • remove_stop_words (bool, default False) – This is for the INVERTED index. If True, the index will remove stop words.

  • ascii_folding (bool, default False) – This is for the INVERTED index. If True, the index will convert non-ascii characters to ascii characters if possible. This would remove accents like “é” -> “e”.

Examples

import lance

dataset = lance.dataset("/tmp/images.lance")
dataset.create_index(
    "category",
    "BTREE",
)

Scalar indices can only speed up scans for basic filters using equality, comparison, range (e.g. my_col BETWEEN 0 AND 100), and set membership (e.g. my_col IN (0, 1, 2))

Scalar indices can be used if the filter contains multiple indexed columns and the filter criteria are AND’d or OR’d together (e.g. my_col < 0 AND other_col> 100)

Scalar indices may be used if the filter contains non-indexed columns but, depending on the structure of the filter, they may not be usable. For example, if the column not_indexed does not have a scalar index then the filter my_col = 0 OR not_indexed = 1 will not be able to use any scalar index on my_col.

To determine if a scan is making use of a scalar index you can use explain_plan to look at the query plan that lance has created. Queries that use scalar indices will either have a ScalarIndexQuery relation or a MaterializeIndex operator.

property data_storage_version: str

The version of the data storage format this dataset is using

delete(predicate: str | Expression)

Delete rows from the dataset.

This marks rows as deleted, but does not physically remove them from the files. This keeps the existing indexes still valid.

Parameters:

predicate (str or pa.compute.Expression) – The predicate to use to select rows to delete. May either be a SQL string or a pyarrow Expression.

Examples

>>> import lance
>>> import pyarrow as pa
>>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]})
>>> dataset = lance.write_dataset(table, "example")
>>> dataset.delete("a = 1 or b in ('a', 'b')")
>>> dataset.to_table()
pyarrow.Table
a: int64
b: string
----
a: [[3]]
b: [["c"]]
static drop(base_uri: str | Path, storage_options: Dict[str, str] | None = None) None
drop_columns(columns: List[str])

Drop one or more columns from the dataset

This is a metadata-only operation and does not remove the data from the underlying storage. In order to remove the data, you must subsequently call compact_files to rewrite the data without the removed columns and then call cleanup_old_versions to remove the old files.

Parameters:

columns (list of str) – The names of the columns to drop. These can be nested column references (e.g. “a.b.c”) or top-level column names (e.g. “a”).

Examples

>>> import lance
>>> import pyarrow as pa
>>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]})
>>> dataset = lance.write_dataset(table, "example")
>>> dataset.drop_columns(["a"])
>>> dataset.to_table().to_pandas()
   b
0  a
1  b
2  c
get_fragment(fragment_id: int) LanceFragment | None

Get the fragment with fragment id.

get_fragments(filter: Expression | None = None) List[LanceFragment]

Get all fragments from the dataset.

Note: filter is not supported yet.

property has_index
head(num_rows, **kwargs)

Load the first N rows of the dataset.

Parameters:
  • num_rows (int) – The number of rows to load.

  • **kwargs (dict, optional) – See scanner() method for full parameter description.

Returns:

table

Return type:

Table

index_statistics(index_name: str) Dict[str, Any]
insert(data: ReaderLike, *, mode='append', **kwargs)

Insert data into the dataset.

Parameters:
  • data_obj (Reader-like) – The data to be written. Acceptable types are: - Pandas DataFrame, Pyarrow Table, Dataset, Scanner, or RecordBatchReader - Huggingface dataset

  • mode (str, default 'append') –

    The mode to use when writing the data. Options are:

    create - create a new dataset (raises if uri already exists). overwrite - create a new snapshot version append - create a new version that is the concat of the input the latest version (raises if uri does not exist)

  • **kwargs (dict, optional) – Additional keyword arguments to pass to write_dataset().

join(right_dataset, keys, right_keys=None, join_type='left outer', left_suffix=None, right_suffix=None, coalesce_keys=True, use_threads=True)

Not implemented (just override pyarrow dataset to prevent segfault)

property lance_schema: LanceSchema

The LanceSchema for this dataset

property latest_version: int

Returns the latest version of the dataset.

list_indices() List[Dict[str, Any]]
merge(data_obj: ReaderLike, left_on: str, right_on: str | None = None, schema=None)

Merge another dataset into this one.

Performs a left join, where the dataset is the left side and data_obj is the right side. Rows existing in the dataset but not on the left will be filled with null values, unless Lance doesn’t support null values for some types, in which case an error will be raised.

Parameters:
  • data_obj (Reader-like) – The data to be merged. Acceptable types are: - Pandas DataFrame, Pyarrow Table, Dataset, Scanner, Iterator[RecordBatch], or RecordBatchReader

  • left_on (str) – The name of the column in the dataset to join on.

  • right_on (str or None) – The name of the column in data_obj to join on. If None, defaults to left_on.

Examples

>>> import lance
>>> import pyarrow as pa
>>> df = pa.table({'x': [1, 2, 3], 'y': ['a', 'b', 'c']})
>>> dataset = lance.write_dataset(df, "dataset")
>>> dataset.to_table().to_pandas()
   x  y
0  1  a
1  2  b
2  3  c
>>> new_df = pa.table({'x': [1, 2, 3], 'z': ['d', 'e', 'f']})
>>> dataset.merge(new_df, 'x')
>>> dataset.to_table().to_pandas()
   x  y  z
0  1  a  d
1  2  b  e
2  3  c  f

See also

LanceDataset.add_columns

Add new columns by computing batch-by-batch.

merge_insert(on: str | Iterable[str])

Returns a builder that can be used to create a “merge insert” operation

This operation can add rows, update rows, and remove rows in a single transaction. It is a very generic tool that can be used to create behaviors like “insert if not exists”, “update or insert (i.e. upsert)”, or even replace a portion of existing data with new data (e.g. replace all data where month=”january”)

The merge insert operation works by combining new data from a source table with existing data in a target table by using a join. There are three categories of records.

“Matched” records are records that exist in both the source table and the target table. “Not matched” records exist only in the source table (e.g. these are new data). “Not matched by source” records exist only in the target table (this is old data).

The builder returned by this method can be used to customize what should happen for each category of data.

Please note that the data will be reordered as part of this operation. This is because updated rows will be deleted from the dataset and then reinserted at the end with the new values. The order of the newly inserted rows may fluctuate randomly because a hash-join operation is used internally.

Parameters:

on (Union[str, Iterable[str]]) – A column (or columns) to join on. This is how records from the source table and target table are matched. Typically this is some kind of key or id column.

Examples

Use when_matched_update_all() and when_not_matched_insert_all() to perform an “upsert” operation. This will update rows that already exist in the dataset and insert rows that do not exist.

>>> import lance
>>> import pyarrow as pa
>>> table = pa.table({"a": [2, 1, 3], "b": ["a", "b", "c"]})
>>> dataset = lance.write_dataset(table, "example")
>>> new_table = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]})
>>> # Perform a "upsert" operation
>>> dataset.merge_insert("a")     \
...             .when_matched_update_all()     \
...             .when_not_matched_insert_all() \
...             .execute(new_table)
{'num_inserted_rows': 1, 'num_updated_rows': 2, 'num_deleted_rows': 0}
>>> dataset.to_table().sort_by("a").to_pandas()
   a  b
0  1  b
1  2  x
2  3  y
3  4  z

Use when_not_matched_insert_all() to perform an “insert if not exists” operation. This will only insert rows that do not already exist in the dataset.

>>> import lance
>>> import pyarrow as pa
>>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]})
>>> dataset = lance.write_dataset(table, "example2")
>>> new_table = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]})
>>> # Perform an "insert if not exists" operation
>>> dataset.merge_insert("a")     \
...             .when_not_matched_insert_all() \
...             .execute(new_table)
{'num_inserted_rows': 1, 'num_updated_rows': 0, 'num_deleted_rows': 0}
>>> dataset.to_table().sort_by("a").to_pandas()
   a  b
0  1  a
1  2  b
2  3  c
3  4  z

You are not required to provide all the columns. If you only want to update a subset of columns, you can omit columns you don’t want to update. Omitted columns will keep their existing values if they are updated, or will be null if they are inserted.

>>> import lance
>>> import pyarrow as pa
>>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"], \
...                   "c": ["x", "y", "z"]})
>>> dataset = lance.write_dataset(table, "example3")
>>> new_table = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]})
>>> # Perform an "upsert" operation, only updating column "a"
>>> dataset.merge_insert("a")     \
...             .when_matched_update_all()     \
...             .when_not_matched_insert_all() \
...             .execute(new_table)
{'num_inserted_rows': 1, 'num_updated_rows': 2, 'num_deleted_rows': 0}
>>> dataset.to_table().sort_by("a").to_pandas()
   a  b     c
0  1  a     x
1  2  x     y
2  3  y     z
3  4  z  None
migrate_manifest_paths_v2()

Migrate the manifest paths to the new format.

This will update the manifest to use the new v2 format for paths.

This function is idempotent, and can be run multiple times without changing the state of the object store.

DANGER: this should not be run while other concurrent operations are happening. And it should also run until completion before resuming other operations.

property optimize: DatasetOptimizer
property partition_expression

Not implemented (just override pyarrow dataset to prevent segfault)

replace_field_metadata(field_name: str, new_metadata: Dict[str, str])

Replace the metadata of a field in the schema

Parameters:
  • field_name (str) – The name of the field to replace the metadata for

  • new_metadata (dict) – The new metadata to set

replace_schema(schema: Schema)

Not implemented (just override pyarrow dataset to prevent segfault)

See :py:method:`replace_schema_metadata` or :py:method:`replace_field_metadata`

replace_schema_metadata(new_metadata: Dict[str, str])

Replace the schema metadata of the dataset

Parameters:

new_metadata (dict) – The new metadata to set

restore()

Restore the currently checked out version as the latest version of the dataset.

This creates a new commit.

sample(num_rows: int, columns: List[str] | Dict[str, str] | None = None, randomize_order: bool = True, **kwargs) Table

Select a random sample of data

Parameters:
  • num_rows (int) – number of rows to retrieve

  • columns (list of str, or dict of str to str default None) – List of column names to be fetched. Or a dictionary of column names to SQL expressions. All columns are fetched if None or unspecified.

  • **kwargs (dict, optional) – see scanner() method for full parameter description.

Returns:

table

Return type:

Table

scanner(columns: List[str] | Dict[str, str] | None = None, filter: str | Expression | None = None, limit: int | None = None, offset: int | None = None, nearest: dict | None = None, batch_size: int | None = None, batch_readahead: int | None = None, fragment_readahead: int | None = None, scan_in_order: bool = None, fragments: Iterable[LanceFragment] | None = None, full_text_query: str | dict | None = None, *, prefilter: bool = None, with_row_id: bool = None, with_row_address: bool = None, use_stats: bool = None, fast_search: bool = None, io_buffer_size: int | None = None, late_materialization: bool | List[str] | None = None, use_scalar_index: bool | None = None) LanceScanner

Return a Scanner that can support various pushdowns.

Parameters:
  • columns (list of str, or dict of str to str default None) – List of column names to be fetched. Or a dictionary of column names to SQL expressions. All columns are fetched if None or unspecified.

  • filter (pa.compute.Expression or str) – Expression or str that is a valid SQL where clause. See Lance filter pushdown for valid SQL expressions.

  • limit (int, default None) – Fetch up to this many rows. All rows if None or unspecified.

  • offset (int, default None) – Fetch starting with this row. 0 if None or unspecified.

  • nearest (dict, default None) –

    Get the rows corresponding to the K most similar vectors. Example:

    {
        "column": <embedding col name>,
        "q": <query vector as pa.Float32Array>,
        "k": 10,
        "nprobes": 1,
        "refine_factor": 1
    }
    

  • batch_size (int, default None) – The target size of batches returned. In some cases batches can be up to twice this size (but never larger than this). In some cases batches can be smaller than this size.

  • io_buffer_size (int, default None) – The size of the IO buffer. See ScannerBuilder.io_buffer_size for more information.

  • batch_readahead (int, optional) – The number of batches to read ahead.

  • fragment_readahead (int, optional) – The number of fragments to read ahead.

  • scan_in_order (bool, default True) – Whether to read the fragments and batches in order. If false, throughput may be higher, but batches will be returned out of order and memory use might increase.

  • fragments (iterable of LanceFragment, default None) – If specified, only scan these fragments. If scan_in_order is True, then the fragments will be scanned in the order given.

  • prefilter (bool, default False) –

    If True then the filter will be applied before the vector query is run. This will generate more correct results but it may be a more costly query. It’s generally good when the filter is highly selective.

    If False then the filter will be applied after the vector query is run. This will perform well but the results may have fewer than the requested number of rows (or be empty) if the rows closest to the query do not match the filter. It’s generally good when the filter is not very selective.

  • use_scalar_index (bool, default True) – Lance will automatically use scalar indices to optimize a query. In some corner cases this can make query performance worse and this parameter can be used to disable scalar indices in these cases.

  • late_materialization (bool or List[str], default None) –

    Allows custom control over late materialization. Late materialization fetches non-query columns using a take operation after the filter. This is useful when there are few results or columns are very large.

    Early materialization can be better when there are many results or the columns are very narrow.

    If True, then all columns are late materialized. If False, then all columns are early materialized. If a list of strings, then only the columns in the list are late materialized.

    The default uses a heuristic that assumes filters will select about 0.1% of the rows. If your filter is more selective (e.g. find by id) you may want to set this to True. If your filter is not very selective (e.g. matches 20% of the rows) you may want to set this to False.

  • full_text_query (str or dict, optional) –

    query string to search for, the results will be ranked by BM25. e.g. “hello world”, would match documents containing “hello” or “world”. or a dictionary with the following keys:

    • columns: list[str]

      The columns to search, currently only supports a single column in the columns list.

    • query: str

      The query string to search for.

  • fast_search (bool, default False) – If True, then the search will only be performed on the indexed data, which yields faster search time.

Notes

For now, if BOTH filter and nearest is specified, then:

  1. nearest is executed first.

  2. The results are filtered afterwards.

For debugging ANN results, you can choose to not use the index even if present by specifying use_index=False. For example, the following will always return exact KNN results:

dataset.to_table(nearest={
    "column": "vector",
    "k": 10,
    "q": <query vector>,
    "use_index": False
}
property schema: Schema

The pyarrow Schema for this dataset

session() _Session

Return the dataset session, which holds the dataset’s state.

property stats: LanceStats

Experimental API

property tags: Tags
take(indices: List[int] | Array, columns: List[str] | Dict[str, str] | None = None, **kwargs) Table

Select rows of data by index.

Parameters:
  • indices (Array or array-like) – indices of rows to select in the dataset.

  • columns (list of str, or dict of str to str default None) – List of column names to be fetched. Or a dictionary of column names to SQL expressions. All columns are fetched if None or unspecified.

  • **kwargs (dict, optional) – See :py:method::scanner method for full parameter description.

Returns:

table

Return type:

pyarrow.Table

take_blobs(row_ids: List[int] | Array, blob_column: str) List[BlobFile]

Select blobs by row IDs.

Instead of loading large binary blob data into memory before processing it, this API allows you to open binary blob data as a regular Python file-like object. For more details, see lance.BlobFile.

Parameters:
  • row_ids (List Array or array-like) – row IDs to select in the dataset.

  • blob_column (str) – The name of the blob column to select.

Returns:

blob_files

Return type:

List[BlobFile]

to_batches(columns: List[str] | Dict[str, str] | None = None, filter: str | Expression | None = None, limit: int | None = None, offset: int | None = None, nearest: dict | None = None, batch_size: int | None = None, batch_readahead: int | None = None, fragment_readahead: int | None = None, scan_in_order: bool = True, *, prefilter: bool = False, with_row_id: bool = False, with_row_address: bool = False, use_stats: bool = True, full_text_query: str | dict | None = None, io_buffer_size: int | None = None, late_materialization: bool | List[str] | None = None, use_scalar_index: bool | None = None, **kwargs) Iterator[RecordBatch]

Read the dataset as materialized record batches.

Parameters:

**kwargs (dict, optional) – Arguments for Scanner.from_dataset.

Returns:

record_batches

Return type:

Iterator of RecordBatch

to_table(columns: List[str] | Dict[str, str] | None = None, filter: str | Expression | None = None, limit: int | None = None, offset: int | None = None, nearest: dict | None = None, batch_size: int | None = None, batch_readahead: int | None = None, fragment_readahead: int | None = None, scan_in_order: bool = True, *, prefilter: bool = False, with_row_id: bool = False, with_row_address: bool = False, use_stats: bool = True, fast_search: bool = False, full_text_query: str | dict | None = None, io_buffer_size: int | None = None, late_materialization: bool | List[str] | None = None, use_scalar_index: bool | None = None) Table

Read the data into memory as a pyarrow.Table

Parameters:
  • columns (list of str, or dict of str to str default None) – List of column names to be fetched. Or a dictionary of column names to SQL expressions. All columns are fetched if None or unspecified.

  • filter (pa.compute.Expression or str) –

    Expression or str that is a valid SQL where clause. See Lance filter pushdown for valid SQL expressions.

  • limit (int, default None) – Fetch up to this many rows. All rows if None or unspecified.

  • offset (int, default None) – Fetch starting with this row. 0 if None or unspecified.

  • nearest (dict, default None) –

    Get the rows corresponding to the K most similar vectors. Example:

    {
        "column": <embedding col name>,
        "q": <query vector as pa.Float32Array>,
        "k": 10,
        "metric": "cosine",
        "nprobes": 1,
        "refine_factor": 1
    }
    

  • batch_size (int, optional) – The number of rows to read at a time.

  • io_buffer_size (int, default None) – The size of the IO buffer. See ScannerBuilder.io_buffer_size for more information.

  • batch_readahead (int, optional) – The number of batches to read ahead.

  • fragment_readahead (int, optional) – The number of fragments to read ahead.

  • scan_in_order (bool, default True) – Whether to read the fragments and batches in order. If false, throughput may be higher, but batches will be returned out of order and memory use might increase.

  • prefilter (bool, default False) – Run filter before the vector search.

  • late_materialization (bool or List[str], default None) – Allows custom control over late materialization. See ScannerBuilder.late_materialization for more information.

  • use_scalar_index (bool, default True) – Allows custom control over scalar index usage. See ScannerBuilder.use_scalar_index for more information.

  • with_row_id (bool, default False) – Return row ID.

  • with_row_address (bool, default False) – Return row address

  • use_stats (bool, default True) – Use stats pushdown during filters.

  • full_text_query (str or dict, optional) –

    query string to search for, the results will be ranked by BM25. e.g. “hello world”, would match documents contains “hello” or “world”. or a dictionary with the following keys:

    • columns: list[str]

      The columns to search, currently only supports a single column in the columns list.

    • query: str

      The query string to search for.

Notes

If BOTH filter and nearest is specified, then:

  1. nearest is executed first.

  2. The results are filtered afterward, unless pre-filter sets to True.

update(updates: Dict[str, str], where: str | None = None) Dict[str, Any]

Update column values for rows matching where.

Parameters:
  • updates (dict of str to str) – A mapping of column names to a SQL expression.

  • where (str, optional) – A SQL predicate indicating which rows should be updated.

Returns:

updates – A dictionary containing the number of rows updated.

Return type:

dict

Examples

>>> import lance
>>> import pyarrow as pa
>>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]})
>>> dataset = lance.write_dataset(table, "example")
>>> update_stats = dataset.update(dict(a = 'a + 2'), where="b != 'a'")
>>> update_stats["num_updated_rows"] = 2
>>> dataset.to_table().to_pandas()
   a  b
0  1  a
1  4  b
2  5  c
property uri: str

The location of the data

validate()

Validate the dataset.

This checks the integrity of the dataset and will raise an exception if the dataset is corrupted.

property version: int

Returns the currently checked out version of the dataset

versions()

Return all versions in this dataset.

class lance.dataset.LanceOperation

Bases: object

class Append(fragments: Iterable[FragmentMetadata])

Bases: BaseOperation

Append new rows to the dataset.

fragments

The fragments that contain the new rows.

Type:

list[FragmentMetadata]

Warning

This is an advanced API for distributed operations. To append to a dataset on a single machine, use lance.write_dataset().

Examples

To append new rows to a dataset, first use lance.fragment.LanceFragment.create() to create fragments. Then collect the fragment metadata into a list and pass it to this class. Finally, pass the operation to the LanceDataset.commit() method to create the new dataset.

>>> import lance
>>> import pyarrow as pa
>>> tab1 = pa.table({"a": [1, 2], "b": ["a", "b"]})
>>> dataset = lance.write_dataset(tab1, "example")
>>> tab2 = pa.table({"a": [3, 4], "b": ["c", "d"]})
>>> fragment = lance.fragment.LanceFragment.create("example", tab2)
>>> operation = lance.LanceOperation.Append([fragment])
>>> dataset = lance.LanceDataset.commit("example", operation,
...                                     read_version=dataset.version)
>>> dataset.to_table().to_pandas()
   a  b
0  1  a
1  2  b
2  3  c
3  4  d
fragments: Iterable[FragmentMetadata]
class BaseOperation

Bases: ABC

Base class for operations that can be applied to a dataset.

See available operations under LanceOperation.

class CreateIndex(uuid: str, name: str, fields: List[int], dataset_version: int, fragment_ids: Set[int])

Bases: BaseOperation

Operation that creates an index on the dataset.

dataset_version: int
fields: List[int]
fragment_ids: Set[int]
name: str
uuid: str
class Delete(updated_fragments: Iterable[FragmentMetadata], deleted_fragment_ids: Iterable[int], predicate: str)

Bases: BaseOperation

Remove fragments or rows from the dataset.

updated_fragments

The fragments that have been updated with new deletion vectors.

Type:

list[FragmentMetadata]

deleted_fragment_ids

The ids of the fragments that have been deleted entirely. These are the fragments where LanceFragment.delete() returned None.

Type:

list[int]

predicate

The original SQL predicate used to select the rows to delete.

Type:

str

Warning

This is an advanced API for distributed operations. To delete rows from dataset on a single machine, use lance.LanceDataset.delete().

Examples

To delete rows from a dataset, call lance.fragment.LanceFragment.delete() on each of the fragments. If that returns a new fragment, add that to the updated_fragments list. If it returns None, that means the whole fragment was deleted, so add the fragment id to the deleted_fragment_ids. Finally, pass the operation to the LanceDataset.commit() method to complete the deletion operation.

>>> import lance
>>> import pyarrow as pa
>>> table = pa.table({"a": [1, 2], "b": ["a", "b"]})
>>> dataset = lance.write_dataset(table, "example")
>>> table = pa.table({"a": [3, 4], "b": ["c", "d"]})
>>> dataset = lance.write_dataset(table, "example", mode="append")
>>> dataset.to_table().to_pandas()
   a  b
0  1  a
1  2  b
2  3  c
3  4  d
>>> predicate = "a >= 2"
>>> updated_fragments = []
>>> deleted_fragment_ids = []
>>> for fragment in dataset.get_fragments():
...     new_fragment = fragment.delete(predicate)
...     if new_fragment is not None:
...         updated_fragments.append(new_fragment)
...     else:
...         deleted_fragment_ids.append(fragment.fragment_id)
>>> operation = lance.LanceOperation.Delete(updated_fragments,
...                                         deleted_fragment_ids,
...                                         predicate)
>>> dataset = lance.LanceDataset.commit("example", operation,
...                                     read_version=dataset.version)
>>> dataset.to_table().to_pandas()
   a  b
0  1  a
deleted_fragment_ids: Iterable[int]
predicate: str
updated_fragments: Iterable[FragmentMetadata]
class Merge(fragments: Iterable[FragmentMetadata], schema: LanceSchema | Schema)

Bases: BaseOperation

Operation that adds columns. Unlike Overwrite, this should not change the structure of the fragments, allowing existing indices to be kept.

fragments

The fragments that make up the new dataset.

Type:

iterable of FragmentMetadata

schema

The schema of the new dataset. Passing a LanceSchema is preferred, and passing a pyarrow.Schema is deprecated.

Type:

LanceSchema or pyarrow.Schema

Warning

This is an advanced API for distributed operations. To overwrite or create new dataset on a single machine, use lance.write_dataset().

Examples

To add new columns to a dataset, first define a method that will create the new columns based on the existing columns. Then use lance.fragment.LanceFragment.add_columns()

>>> import lance
>>> import pyarrow as pa
>>> import pyarrow.compute as pc
>>> table = pa.table({"a": [1, 2, 3, 4], "b": ["a", "b", "c", "d"]})
>>> dataset = lance.write_dataset(table, "example")
>>> dataset.to_table().to_pandas()
   a  b
0  1  a
1  2  b
2  3  c
3  4  d
>>> def double_a(batch: pa.RecordBatch) -> pa.RecordBatch:
...     doubled = pc.multiply(batch["a"], 2)
...     return pa.record_batch([doubled], ["a_doubled"])
>>> fragments = []
>>> for fragment in dataset.get_fragments():
...     new_fragment, new_schema = fragment.merge_columns(double_a,
...                                                       columns=['a'])
...     fragments.append(new_fragment)
>>> operation = lance.LanceOperation.Merge(fragments, new_schema)
>>> dataset = lance.LanceDataset.commit("example", operation,
...                                     read_version=dataset.version)
>>> dataset.to_table().to_pandas()
   a  b  a_doubled
0  1  a          2
1  2  b          4
2  3  c          6
3  4  d          8
fragments: Iterable[FragmentMetadata]
schema: LanceSchema | Schema
class Overwrite(new_schema: LanceSchema | Schema, fragments: Iterable[FragmentMetadata])

Bases: BaseOperation

Overwrite or create a new dataset.

new_schema

The schema of the new dataset.

Type:

pyarrow.Schema

fragments

The fragments that make up the new dataset.

Type:

list[FragmentMetadata]

Warning

This is an advanced API for distributed operations. To overwrite or create new dataset on a single machine, use lance.write_dataset().

Examples

To create or overwrite a dataset, first use lance.fragment.LanceFragment.create() to create fragments. Then collect the fragment metadata into a list and pass it along with the schema to this class. Finally, pass the operation to the LanceDataset.commit() method to create the new dataset.

>>> import lance
>>> import pyarrow as pa
>>> tab1 = pa.table({"a": [1, 2], "b": ["a", "b"]})
>>> tab2 = pa.table({"a": [3, 4], "b": ["c", "d"]})
>>> fragment1 = lance.fragment.LanceFragment.create("example", tab1)
>>> fragment2 = lance.fragment.LanceFragment.create("example", tab2)
>>> fragments = [fragment1, fragment2]
>>> operation = lance.LanceOperation.Overwrite(tab1.schema, fragments)
>>> dataset = lance.LanceDataset.commit("example", operation)
>>> dataset.to_table().to_pandas()
   a  b
0  1  a
1  2  b
2  3  c
3  4  d
fragments: Iterable[FragmentMetadata]
new_schema: LanceSchema | Schema
class Restore(version: int)

Bases: BaseOperation

Operation that restores a previous version of the dataset.

version: int
class Rewrite(groups: Iterable[RewriteGroup], rewritten_indices: Iterable[RewrittenIndex])

Bases: BaseOperation

Operation that rewrites one or more files and indices into one or more files and indices.

groups

Groups of files that have been rewritten.

Type:

list[RewriteGroup]

rewritten_indices

Indices that have been rewritten.

Type:

list[RewrittenIndex]

Warning

This is an advanced API not intended for general use.

groups: Iterable[RewriteGroup]
rewritten_indices: Iterable[RewrittenIndex]
class RewriteGroup(old_fragments: Iterable[FragmentMetadata], new_fragments: Iterable[FragmentMetadata])

Bases: object

Collection of rewritten files

new_fragments: Iterable[FragmentMetadata]
old_fragments: Iterable[FragmentMetadata]
class RewrittenIndex(old_id: str, new_id: str)

Bases: object

An index that has been rewritten

new_id: str
old_id: str
class lance.dataset.LanceScanner(scanner: _Scanner, dataset: LanceDataset)

Bases: Scanner

count_rows()

Count rows matching the scanner filter.

Returns:

count

Return type:

int

property dataset_schema: Schema

The schema with which batches will be read from fragments.

explain_plan(verbose=False) str

Return the execution plan for this scanner.

Parameters:

verbose (bool, default False) – Use a verbose output format.

Returns:

plan

Return type:

str

static from_batches(*args, **kwargs)

Not implemented

static from_dataset(*args, **kwargs)

Not implemented

static from_fragment(*args, **kwargs)

Not implemented

head(num_rows)

Load the first N rows of the dataset.

Parameters:

num_rows (int) – The number of rows to load.

Return type:

Table

property projected_schema: Schema

The materialized schema of the data, accounting for projections.

This is the schema of any data returned from the scanner.

scan_batches()

Consume a Scanner in record batches with corresponding fragments.

Returns:

record_batches

Return type:

iterator of TaggedRecordBatch

take(indices)

Not implemented

to_batches(self)

Consume a Scanner in record batches.

Returns:

record_batches

Return type:

iterator of RecordBatch

to_reader(self)

Consume this scanner as a RecordBatchReader.

Return type:

RecordBatchReader

to_table() Table

Read the data into memory and return a pyarrow Table.

class lance.dataset.LanceStats(dataset: _Dataset)

Bases: object

Statistics about a LanceDataset.

dataset_stats(max_rows_per_group: int = 1024) DatasetStats

Statistics about the dataset.

index_stats(index_name: str) Dict[str, Any]

Statistics about an index.

Parameters:

index_name (str) – The name of the index to get statistics for.

class lance.dataset.MergeInsertBuilder(dataset, on)

Bases: _MergeInsertBuilder

execute(data_obj: ReaderLike, *, schema: pa.Schema | None = None)

Executes the merge insert operation

This function updates the original dataset and returns a dictionary with information about merge statistics - i.e. the number of inserted, updated, and deleted rows.

Parameters:
  • data_obj (ReaderLike) – The new data to use as the source table for the operation. This parameter can be any source of data (e.g. table / dataset) that write_dataset() accepts.

  • schema (Optional[pa.Schema]) – The schema of the data. This only needs to be supplied whenever the data source is some kind of generator.

when_matched_update_all(condition: str | None = None) MergeInsertBuilder

Configure the operation to update matched rows

After this method is called, when the merge insert operation executes, any rows that match both the source table and the target table will be updated. The rows from the target table will be removed and the rows from the source table will be added.

An optional condition may be specified. This should be an SQL filter and, if present, then only matched rows that also satisfy this filter will be updated. The SQL filter should use the prefix target. to refer to columns in the target table and the prefix source. to refer to columns in the source table. For example, source.last_update < target.last_update.

If a condition is specified and rows do not satisfy the condition then these rows will not be updated. Failure to satisfy the filter does not cause a “matched” row to become a “not matched” row.

when_not_matched_by_source_delete(expr: str | None = None) MergeInsertBuilder

Configure the operation to delete source rows that do not match

After this method is called, when the merge insert operation executes, any rows that exist only in the target table will be deleted. An optional filter can be specified to limit the scope of the delete operation. If given (as an SQL filter) then only rows which match the filter will be deleted.

when_not_matched_insert_all() MergeInsertBuilder

Configure the operation to insert not matched rows

After this method is called, when the merge insert operation executes, any rows that exist only in the source table will be inserted into the target table.

class lance.dataset.ScannerBuilder(ds: LanceDataset)

Bases: object

apply_defaults(default_opts: Dict[str, Any]) ScannerBuilder
batch_readahead(nbatches: int | None = None) ScannerBuilder

This parameter is ignored when reading v2 files

batch_size(batch_size: int) ScannerBuilder

Set batch size for Scanner

columns(cols: List[str] | Dict[str, str] | None = None) ScannerBuilder

Enable fast search, which only perform search on the indexed data.

Users can use Table::optimize() or create_index() to include the new data into index, thus make new data searchable.

filter(filter: str | Expression) ScannerBuilder
fragment_readahead(nfragments: int | None = None) ScannerBuilder

Filter rows by full text searching. Experimental API, may remove it after we support to do this within filter SQL-like expression

Must create inverted index on the given column before searching,

io_buffer_size(io_buffer_size: int) ScannerBuilder

Set the I/O buffer size for the Scanner

This is the amount of RAM that will be reserved for holding I/O received from storage before it is processed. This is used to control the amount of memory used by the scanner. If the buffer is full then the scanner will block until the buffer is processed.

Generally this should scale with the number of concurrent I/O threads. The default is 2GiB which comfortably provides enough space for somewhere between 32 and 256 concurrent I/O threads.

This value is not a hard cap on the amount of RAM the scanner will use. Some space is used for the compute (which can be controlled by the batch size) and Lance does not keep track of memory after it is returned to the user.

Currently, if there is a single batch of data which is larger than the io buffer size then the scanner will deadlock. This is a known issue and will be fixed in a future release.

This parameter is only used when reading v2 files

late_materialization(late_materialization: bool | List[str]) ScannerBuilder
limit(n: int | None = None) ScannerBuilder
nearest(column: str, q: QueryVectorLike, k: int | None = None, metric: str | None = None, nprobes: int | None = None, refine_factor: int | None = None, use_index: bool = True, ef: int | None = None) ScannerBuilder
offset(n: int | None = None) ScannerBuilder
prefilter(prefilter: bool) ScannerBuilder
scan_in_order(scan_in_order: bool = True) ScannerBuilder

Whether to scan the dataset in order of fragments and batches.

If set to False, the scanner may read fragments concurrently and yield batches out of order. This may improve performance since it allows more concurrency in the scan, but can also use more memory.

This parameter is ignored when using v2 files. In the v2 file format there is no penalty to scanning in order and so all scans will scan in order.

to_scanner() LanceScanner
use_scalar_index(use_scalar_index: bool = True) ScannerBuilder

Set whether scalar indices should be used in a query

Scans will use scalar indices, when available, to optimize queries with filters. However, in some corner cases, scalar indices may make performance worse. This parameter allows users to disable scalar indices in these cases.

use_stats(use_stats: bool = True) ScannerBuilder

Enable use of statistics for query planning.

Disabling statistics is used for debugging and benchmarking purposes. This should be left on for normal use.

with_fragments(fragments: Iterable[LanceFragment] | None) ScannerBuilder
with_row_address(with_row_address: bool = True) ScannerBuilder

Enables returns with row addresses.

Row addresses are a unique but unstable identifier for each row in the dataset that consists of the fragment id (upper 32 bits) and the row offset in the fragment (lower 32 bits). Row IDs are generally preferred since they do not change when a row is modified or compacted. However, row addresses may be useful in some advanced use cases.

with_row_id(with_row_id: bool = True) ScannerBuilder

Enable returns with row IDs.

class lance.dataset.Tags(dataset: _Dataset)

Bases: object

Dataset tag manager.

create(tag: str, version: int) None

Create a tag for a given dataset version.

Parameters:
  • tag (str,) – The name of the tag to create. This name must be unique among all tag names for the dataset.

  • version (int,) – The dataset version to tag.

delete(tag: str) None

Delete tag from the dataset.

Parameters:

tag (str,) – The name of the tag to delete.

list() dict[str, int]

List all dataset tags.

Returns:

A dictionary mapping tag names to version numbers.

Return type:

dict[str, int]

update(tag: str, version: int) None

Update tag to a new version.

Parameters:
  • tag (str,) – The name of the tag to update.

  • version (int,) – The new dataset version to tag.

class lance.dataset.Transaction(read_version: 'int', operation: 'LanceOperation.BaseOperation', uuid: 'str' = <factory>, blobs_op: 'Optional[LanceOperation.BaseOperation]' = None)

Bases: object

blobs_op: BaseOperation | None = None
operation: BaseOperation
read_version: int
uuid: str
lance.dataset.write_dataset(data_obj: ReaderLike, uri: str | Path | LanceDataset, schema: pa.Schema | None = None, mode: str = 'create', *, max_rows_per_file: int = 1048576, max_rows_per_group: int = 1024, max_bytes_per_file: int = 96636764160, commit_lock: CommitLock | None = None, progress: FragmentWriteProgress | None = None, storage_options: Dict[str, str] | None = None, data_storage_version: str | None = None, use_legacy_format: bool | None = None, enable_v2_manifest_paths: bool = False, enable_move_stable_row_ids: bool = False) LanceDataset

Write a given data_obj to the given uri

Parameters:
  • data_obj (Reader-like) – The data to be written. Acceptable types are: - Pandas DataFrame, Pyarrow Table, Dataset, Scanner, or RecordBatchReader - Huggingface dataset

  • uri (str, Path, or LanceDataset) – Where to write the dataset to (directory). If a LanceDataset is passed, the session will be reused.

  • schema (Schema, optional) – If specified and the input is a pandas DataFrame, use this schema instead of the default pandas to arrow table conversion.

  • mode (str) – create - create a new dataset (raises if uri already exists). overwrite - create a new snapshot version append - create a new version that is the concat of the input the latest version (raises if uri does not exist)

  • max_rows_per_file (int, default 1024 * 1024) – The max number of rows to write before starting a new file

  • max_rows_per_group (int, default 1024) – The max number of rows before starting a new group (in the same file)

  • max_bytes_per_file (int, default 90 * 1024 * 1024 * 1024) – The max number of bytes to write before starting a new file. This is a soft limit. This limit is checked after each group is written, which means larger groups may cause this to be overshot meaningfully. This defaults to 90 GB, since we have a hard limit of 100 GB per file on object stores.

  • commit_lock (CommitLock, optional) – A custom commit lock. Only needed if your object store does not support atomic commits. See the user guide for more details.

  • progress (FragmentWriteProgress, optional) – Experimental API. Progress tracking for writing the fragment. Pass a custom class that defines hooks to be called when each fragment is starting to write and finishing writing.

  • storage_options (optional, dict) – Extra options that make sense for a particular storage connection. This is used to store connection parameters like credentials, endpoint, etc.

  • data_storage_version (optional, str, default None) – The version of the data storage format to use. Newer versions are more efficient but require newer versions of lance to read. The default (None) will use the latest stable version. See the user guide for more details.

  • use_legacy_format (optional, bool, default None) – Deprecated method for setting the data storage version. Use the data_storage_version parameter instead.

  • enable_v2_manifest_paths (bool, optional) – If True, and this is a new dataset, uses the new V2 manifest paths. These paths provide more efficient opening of datasets with many versions on object stores. This parameter has no effect if the dataset already exists. To migrate an existing dataset, instead use the LanceDataset.migrate_manifest_paths_v2() method. Default is False.

  • enable_move_stable_row_ids (bool, optional) – Experimental parameter: if set to true, the writer will use move-stable row ids. These row ids are stable after compaction operations, but not after updates. This makes compaction more efficient, since with stable row ids no secondary indices need to be updated to point to new row ids.

lance.debug module

Debug utilities for Lance.

lance.debug.format_fragment(fragment, dataset)

Debug print a LanceFragment.

lance.debug.format_manifest(dataset)

Print the full Lance manifest of the dataset.

lance.debug.format_schema(dataset)

Format the Lance schema of a dataset as a string.

This can be used to view the field ids and types in the schema.

lance.debug.list_transactions(dataset, /, max_transactions=10)

Return a string representation of each transaction in the dataset, in reverse chronological order.

If max_transactions is provided, only the most recent max_transactions transactions will be returned. Defaults to 10.

lance.dependencies module

lance.file module

class lance.file.LanceBufferDescriptor

Bases: object

position

The byte offset of the buffer in the file

size

The size (in bytes) of the buffer

class lance.file.LanceColumnMetadata

Bases: object

column_buffers

The column-wide buffers

pages

The data pages in the column

class lance.file.LanceFileMetadata

Bases: object

columns

The column metadata, an entry might be None if the metadata for a column was not loaded into memory when the file was opened.

global_buffers

The global buffers

major_version

The major version of the file

minor_version

The minor version of the file

num_column_metadata_bytes

The number of bytes in the column metadata section of the file

num_data_bytes

The number of bytes in the data section of the file

num_global_buffer_bytes

The number of bytes in the global buffer section of the file

num_rows

The number of rows in the file

schema

The schema of the file

class lance.file.LanceFileReader(path: str, storage_options: Dict[str, str] | None = None)

Bases: object

A file reader for reading Lance files

This class is used to read Lance data files, a low level structure optimized for storing multi-modal tabular data. If you are working with Lance datasets then you should use the LanceDataset class instead.

file_statistics() LanceFileStatistics

Return file statistics of the file

metadata() LanceFileMetadata

Return metadata describing the file contents

read_all(*, batch_size: int = 1024, batch_readahead=16) ReaderResults

Reads the entire file

Parameters:

batch_size (int, default 1024) –

The file will be read in batches. This parameter controls how many rows will be in each batch (except the final batch)

Smaller batches will use less memory but might be slightly slower because there is more per-batch overhead

read_global_buffer(index: int) bytes

Read a global buffer from the file at a given index

Parameters:

index (int) – The index of the global buffer to read

Returns:

The contents of the global buffer

Return type:

bytes

read_range(start: int, num_rows: int, *, batch_size: int = 1024, batch_readahead=16) ReaderResults

Read a range of rows from the file

Parameters:
  • start (int) – The offset of the first row to start reading

  • num_rows (int) – The number of rows to read from the file

  • batch_size (int, default 1024) –

    The file will be read in batches. This parameter controls how many rows will be in each batch (except the final batch)

    Smaller batches will use less memory but might be slightly slower because there is more per-batch overhead

take_rows(indices, *, batch_size: int = 1024, batch_readahead=16) ReaderResults

Read a specific set of rows from the file

Parameters:
  • indices (List[int]) – The indices of the rows to read from the file in ascending order

  • batch_size (int, default 1024) –

    The file will be read in batches. This parameter controls how many rows will be in each batch (except the final batch)

    Smaller batches will use less memory but might be slightly slower because there is more per-batch overhead

class lance.file.LanceFileStatistics

Bases: object

Statistics summarize some of the file metadata for quick summary info

columns

Statistics about each of the columns in the file

class lance.file.LanceFileWriter(path: str, schema: Schema | None = None, *, data_cache_bytes: int | None = None, version: str | None = None, storage_options: Dict[str, str] | None = None, **kwargs)

Bases: object

A file writer for writing Lance data files

This class is used to write Lance data files, a low level structure optimized for storing multi-modal tabular data. If you are working with Lance datasets then you should use the LanceDataset class instead.

add_global_buffer(data: bytes) int

Add a global buffer to the file. The global buffer can contain any arbitrary bytes.

Parameters:

data (bytes) – The data to write to the file.

Returns:

The index of the global buffer. This will always start at 1 and increment by 1 each time this method is called.

Return type:

int

add_schema_metadata(key: str, value: str) None

Add a metadata (key/value pair) entry to the schema. This method allows you to alter the schema metadata. It must be called before close is called.

Parameters:
  • key (str) – The key to add.

  • value (str) – The value to add.

close() int

Write the file metadata and close the file

Returns the number of rows written to the file

write_batch(batch: RecordBatch | Table) None

Write a batch of data to the file

Parameters:

batch (Union[pa.RecordBatch, pa.Table]) – The data to write to the file

class lance.file.LancePageMetadata

Bases: object

buffers

The buffers in the page

encoding

A description of the encoding used to encode the page

lance.fragment module

Dataset Fragment

class lance.fragment.DataFile(path: str, fields: List[int], column_indices: List[int] = None, file_major_version: int = 0, file_minor_version: int = 0)

Bases: object

A data file in a fragment.

path

The path to the data file.

Type:

str

fields

The field ids of the columns in this file.

Type:

List[int]

column_indices

The column indices where the fields are stored in the file. Will have the same length as fields.

Type:

List[int]

file_major_version

The major version of the data storage format.

Type:

int

file_minor_version

The minor version of the data storage format.

Type:

int

column_indices: List[int]
field_ids() List[int]
fields: List[int]
file_major_version: int = 0
file_minor_version: int = 0
property path: str
class lance.fragment.DeletionFile(read_version, id, file_type, num_deleted_rows)

Bases: object

asdict()
file_type
id
num_deleted_rows
path(fragment_id, base_uri=None)
read_version
class lance.fragment.FragmentMetadata(id: int, files: List[DataFile], physical_rows: int, deletion_file: DeletionFile | None = None, row_id_meta: RowIdMeta | None = None)

Bases: object

Metadata for a fragment.

id

The ID of the fragment.

Type:

int

files

The data files of the fragment. Each data file must have the same number of rows. Each file stores a different subset of the columns.

Type:

List[DataFile]

physical_rows

The number of rows originally in this fragment. This is the number of rows in the data files before deletions.

Type:

int

deletion_file

The deletion file, if any.

Type:

Optional[DeletionFile]

row_id_meta

The row id metadata, if any.

Type:

Optional[RowIdMeta]

data_files() List[DataFile]
deletion_file: DeletionFile | None = None
files: List[DataFile]
static from_json(json_data: str) FragmentMetadata
id: int
property num_deletions: int

The number of rows that have been deleted from this fragment.

property num_rows: int

The number of rows in this fragment after deletions.

physical_rows: int
row_id_meta: RowIdMeta | None = None
to_json() dict

Get this as a simple JSON-serializable dictionary.

class lance.fragment.LanceFragment(dataset: LanceDataset, fragment_id: int | None, *, fragment: _Fragment | None = None)

Bases: Fragment

count_rows(self, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, MemoryPool memory_pool=None)

Count rows matching the scanner filter.

Parameters:
  • filter (Expression, default None) – Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.

  • batch_size (int, default 131_072) – The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.

  • batch_readahead (int, default 16) – The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.

  • fragment_readahead (int, default 4) – The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.

  • fragment_scan_options (FragmentScanOptions, default None) – Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.

  • use_threads (bool, default True) – If enabled, then maximum parallelism will be used determined by the number of available CPU cores.

  • memory_pool (MemoryPool, default None) – For memory allocations, if required. If not specified, uses the default pool.

Returns:

count

Return type:

int

static create(dataset_uri: str | Path, data: Table | RecordBatchReader, fragment_id: int | None = None, schema: Schema | None = None, max_rows_per_group: int = 1024, progress: FragmentWriteProgress | None = None, mode: str = 'append', *, data_storage_version: str | None = None, use_legacy_format: bool | None = None, storage_options: Dict[str, str] | None = None) FragmentMetadata

Create a FragmentMetadata from the given data.

This can be used if the dataset is not yet created.

Warning

Internal API. This method is not intended to be used by end users.

Parameters:
  • dataset_uri (str) – The URI of the dataset.

  • fragment_id (int) – The ID of the fragment.

  • data (pa.Table or pa.RecordBatchReader) – The data to be written to the fragment.

  • schema (pa.Schema, optional) – The schema of the data. If not specified, the schema will be inferred from the data.

  • max_rows_per_group (int, default 1024) – The maximum number of rows per group in the data file.

  • progress (FragmentWriteProgress, optional) – Experimental API. Progress tracking for writing the fragment. Pass a custom class that defines hooks to be called when each fragment is starting to write and finishing writing.

  • mode (str, default "append") – The write mode. If “append” is specified, the data will be checked against the existing dataset’s schema. Otherwise, pass “create” or “overwrite” to assign new field ids to the schema.

  • data_storage_version (optional, str, default None) – The version of the data storage format to use. Newer versions are more efficient but require newer versions of lance to read. The default (None) will use the latest stable version. See the user guide for more details.

  • use_legacy_format (bool, default None) – Deprecated parameter. Use data_storage_version instead.

  • storage_options (optional, dict) – Extra options that make sense for a particular storage connection. This is used to store connection parameters like credentials, endpoint, etc.

See also

lance.dataset.LanceOperation.Overwrite

The operation used to create a new dataset or overwrite one using fragments created with this API. See the doc page for an example of using this API.

lance.dataset.LanceOperation.Append

The operation used to append fragments created with this API to an existing dataset. See the doc page for an example of using this API.

Return type:

FragmentMetadata

static create_from_file(filename: str | Path, dataset: LanceDataset, fragment_id: int) FragmentMetadata

Create a fragment from the given datafile uri.

This can be used if the datafile is loss from dataset.

Warning

Internal API. This method is not intended to be used by end users.

Parameters:
  • filename (str) – The filename of the datafile.

  • dataset (LanceDataset) – The dataset that the fragment belongs to.

  • fragment_id (int) – The ID of the fragment.

data_files()

Return the data files of this fragment.

delete(predicate: str) FragmentMetadata | None

Delete rows from this Fragment.

This will add or update the deletion file of this fragment. It does not modify or delete the data files of this fragment. If no rows are left after the deletion, this method will return None.

Warning

Internal API. This method is not intended to be used by end users.

Parameters:

predicate (str) – A SQL predicate that specifies the rows to delete.

Returns:

A new fragment containing the new deletion file, or None if no rows left.

Return type:

FragmentMetadata or None

Examples

>>> import lance
>>> import pyarrow as pa
>>> tab = pa.table({"a": [1, 2, 3], "b": [4, 5, 6]})
>>> dataset = lance.write_dataset(tab, "dataset")
>>> frag = dataset.get_fragment(0)
>>> frag.delete("a > 1")
FragmentMetadata(id=0, files=[DataFile(path='...', fields=[0, 1], ...), ...)
>>> frag.delete("a > 0") is None
True

See also

lance.dataset.LanceOperation.Delete

The operation used to commit these changes to a dataset. See the doc page for an example of using this API.

deletion_file()

Return the deletion file, if any

property fragment_id
head(self, int num_rows, columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, MemoryPool memory_pool=None)

Load the first N rows of the fragment.

Parameters:
  • num_rows (int) – The number of rows to load.

  • columns (list of str, default None) –

    The columns to project. This can be a list of column names to include (order and duplicates will be preserved), or a dictionary with {new_column_name: expression} values for more advanced projections.

    The list of columns or expressions may use the special fields __batch_index (the index of the batch within the fragment), __fragment_index (the index of the fragment within the dataset), __last_in_fragment (whether the batch is last in fragment), and __filename (the name of the source file or a description of the source fragment).

    The columns will be passed down to Datasets and corresponding data fragments to avoid loading, copying, and deserializing columns that will not be required further down the compute chain. By default all of the available columns are projected. Raises an exception if any of the referenced column names does not exist in the dataset’s Schema.

  • filter (Expression, default None) – Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.

  • batch_size (int, default 131_072) – The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.

  • batch_readahead (int, default 16) – The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.

  • fragment_readahead (int, default 4) – The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.

  • fragment_scan_options (FragmentScanOptions, default None) – Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.

  • use_threads (bool, default True) – If enabled, then maximum parallelism will be used determined by the number of available CPU cores.

  • memory_pool (MemoryPool, default None) – For memory allocations, if required. If not specified, uses the default pool.

Return type:

Table

merge_columns(value_func: Dict[str, str] | BatchUDF | ReaderLike | Callable[[pa.RecordBatch], pa.RecordBatch], columns: list[str] | None = None, batch_size: int | None = None, reader_schema: pa.Schema | None = None) Tuple[FragmentMetadata, LanceSchema]

Add columns to this Fragment.

Warning

Internal API. This method is not intended to be used by end users.

The parameters and their interpretation are the same as in the lance.dataset.LanceDataset.add_columns() operation.

The only difference is that, instead of modifying the dataset, a new fragment is created. The new schema of the fragment is returned as well. These can be used in a later operation to commit the changes to the dataset.

See also

lance.dataset.LanceOperation.Merge

The operation used to commit these changes to the dataset. See the doc page for an example of using this API.

Returns:

A new fragment with the added column(s) and the final schema.

Return type:

Tuple[FragmentMetadata, LanceSchema]

property metadata: FragmentMetadata

Return the metadata of this fragment.

Return type:

FragmentMetadata

property num_deletions: int

Return the number of deleted rows in this fragment.

property partition_expression: Schema

An Expression which evaluates to true for all data viewed by this Fragment.

property physical_rows: int

Return the number of rows originally in this fragment.

To get the number of rows after deletions, use count_rows() instead.

property physical_schema: Schema

Return the physical schema of this Fragment. This schema can be different from the dataset read schema.

scanner(*, columns: List[str] | Dict[str, str] | None = None, batch_size: int | None = None, filter: str | pa.compute.Expression | None = None, limit: int | None = None, offset: int | None = None, with_row_id: bool = False, batch_readahead: int = 16) LanceScanner

See Dataset::scanner for details

property schema: Schema

Return the schema of this fragment.

take(self, indices, columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, MemoryPool memory_pool=None)

Select rows of data by index.

Parameters:
  • indices (Array or array-like) – The indices of row to select in the dataset.

  • columns (list of str, default None) –

    The columns to project. This can be a list of column names to include (order and duplicates will be preserved), or a dictionary with {new_column_name: expression} values for more advanced projections.

    The list of columns or expressions may use the special fields __batch_index (the index of the batch within the fragment), __fragment_index (the index of the fragment within the dataset), __last_in_fragment (whether the batch is last in fragment), and __filename (the name of the source file or a description of the source fragment).

    The columns will be passed down to Datasets and corresponding data fragments to avoid loading, copying, and deserializing columns that will not be required further down the compute chain. By default all of the available columns are projected. Raises an exception if any of the referenced column names does not exist in the dataset’s Schema.

  • filter (Expression, default None) – Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.

  • batch_size (int, default 131_072) – The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.

  • batch_readahead (int, default 16) – The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.

  • fragment_readahead (int, default 4) – The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.

  • fragment_scan_options (FragmentScanOptions, default None) – Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.

  • use_threads (bool, default True) – If enabled, then maximum parallelism will be used determined by the number of available CPU cores.

  • memory_pool (MemoryPool, default None) – For memory allocations, if required. If not specified, uses the default pool.

Return type:

Table

to_batches(self, Schema schema=None, columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, MemoryPool memory_pool=None)

Read the fragment as materialized record batches.

Parameters:
  • schema (Schema, optional) – Concrete schema to use for scanning.

  • columns (list of str, default None) –

    The columns to project. This can be a list of column names to include (order and duplicates will be preserved), or a dictionary with {new_column_name: expression} values for more advanced projections.

    The list of columns or expressions may use the special fields __batch_index (the index of the batch within the fragment), __fragment_index (the index of the fragment within the dataset), __last_in_fragment (whether the batch is last in fragment), and __filename (the name of the source file or a description of the source fragment).

    The columns will be passed down to Datasets and corresponding data fragments to avoid loading, copying, and deserializing columns that will not be required further down the compute chain. By default all of the available columns are projected. Raises an exception if any of the referenced column names does not exist in the dataset’s Schema.

  • filter (Expression, default None) – Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.

  • batch_size (int, default 131_072) – The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.

  • batch_readahead (int, default 16) – The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.

  • fragment_readahead (int, default 4) – The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.

  • fragment_scan_options (FragmentScanOptions, default None) – Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.

  • use_threads (bool, default True) – If enabled, then maximum parallelism will be used determined by the number of available CPU cores.

  • memory_pool (MemoryPool, default None) – For memory allocations, if required. If not specified, uses the default pool.

Returns:

record_batches

Return type:

iterator of RecordBatch

to_table(self, Schema schema=None, columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, MemoryPool memory_pool=None)

Convert this Fragment into a Table.

Use this convenience utility with care. This will serially materialize the Scan result in memory before creating the Table.

Parameters:
  • schema (Schema, optional) – Concrete schema to use for scanning.

  • columns (list of str, default None) –

    The columns to project. This can be a list of column names to include (order and duplicates will be preserved), or a dictionary with {new_column_name: expression} values for more advanced projections.

    The list of columns or expressions may use the special fields __batch_index (the index of the batch within the fragment), __fragment_index (the index of the fragment within the dataset), __last_in_fragment (whether the batch is last in fragment), and __filename (the name of the source file or a description of the source fragment).

    The columns will be passed down to Datasets and corresponding data fragments to avoid loading, copying, and deserializing columns that will not be required further down the compute chain. By default all of the available columns are projected. Raises an exception if any of the referenced column names does not exist in the dataset’s Schema.

  • filter (Expression, default None) – Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.

  • batch_size (int, default 131_072) – The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.

  • batch_readahead (int, default 16) – The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.

  • fragment_readahead (int, default 4) – The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.

  • fragment_scan_options (FragmentScanOptions, default None) – Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.

  • use_threads (bool, default True) – If enabled, then maximum parallelism will be used determined by the number of available CPU cores.

  • memory_pool (MemoryPool, default None) – For memory allocations, if required. If not specified, uses the default pool.

Returns:

table

Return type:

Table

class lance.fragment.RowIdMeta

Bases: object

asdict()
lance.fragment.write_fragments(data: ReaderLike, dataset_uri: str | Path | LanceDataset, schema: pa.Schema | None = None, *, mode: str = 'append', max_rows_per_file: int = 1048576, max_rows_per_group: int = 1024, max_bytes_per_file: int = 96636764160, progress: FragmentWriteProgress | None = None, data_storage_version: str | None = None, use_legacy_format: bool | None = None, storage_options: Dict[str, str] | None = None) List[FragmentMetadata]

Write data into one or more fragments.

Warning

This is a low-level API intended for manually implementing distributed writes. For most users, lance.write_dataset() is the recommended API.

Parameters:
  • data (pa.Table or pa.RecordBatchReader) – The data to be written to the fragment.

  • dataset_uri (str, Path, or LanceDataset) – The URI of the dataset or the dataset object.

  • schema (pa.Schema, optional) – The schema of the data. If not specified, the schema will be inferred from the data.

  • mode (str, default "append") – The write mode. If “append” is specified, the data will be checked against the existing dataset’s schema. Otherwise, pass “create” or “overwrite” to assign new field ids to the schema.

  • max_rows_per_file (int, default 1024 * 1024) – The maximum number of rows per data file.

  • max_rows_per_group (int, default 1024) – The maximum number of rows per group in the data file.

  • max_bytes_per_file (int, default 90 * 1024 * 1024 * 1024) – The max number of bytes to write before starting a new file. This is a soft limit. This limit is checked after each group is written, which means larger groups may cause this to be overshot meaningfully. This defaults to 90 GB, since we have a hard limit of 100 GB per file on object stores.

  • progress (FragmentWriteProgress, optional) – Experimental API. Progress tracking for writing the fragment. Pass a custom class that defines hooks to be called when each fragment is starting to write and finishing writing.

  • data_storage_version (optional, str, default None) – The version of the data storage format to use. Newer versions are more efficient but require newer versions of lance to read. The default (None) will use the 2.0 version. See the user guide for more details.

  • use_legacy_format (optional, bool, default None) – Deprecated method for setting the data storage version. Use the data_storage_version parameter instead.

  • storage_options (Optional[Dict[str, str]]) – Extra options that make sense for a particular storage connection. This is used to store connection parameters like credentials, endpoint, etc.

Returns:

A list of FragmentMetadata for the fragments written. The fragment ids are left as zero meaning they are not yet specified. They will be assigned when the fragments are committed to a dataset.

Return type:

List[FragmentMetadata]

lance.hf module

class lance.hf.HuggingFaceConverter(ds_info: dict[str, Any])

Bases: object

Utility class for from PyArrow RecordBatch to Huggingface internal Type

to_pytorch(col: str, array: Array) torch.Tensor | list['PIL.Image.Image'] | None

lance.indices module

class lance.indices.IndicesBuilder(dataset, column: str)

Bases: object

A class with helper functions for building indices on a dataset.

This methods in this class can break down the process of building indices into smaller steps. This can be useful for debugging and checkpointing when building indices for extremely large datasets.

This class is intended for advanced users that need to create vector indices at large scales.

The methods in this class are experimental and may change in future versions.

For datasets with 10s of millions or fewer rows it will likely be simpler to just use the create_index method on the dataset object.

assign_ivf_partitions(ivf_model: IvfModel, accelerator: str | torch.Device, *, output_uri: str | None = None) str

Calculates which IVF partition each vector belongs to. This searches the IVF centroids and assigns the closest centroid to the vector. The result is stored in a Lance dataset located at output_uri. The schema of the partition assignment dataset is:

row_id: uint64 partition: uint32

Note: There is no advantage to separately computing the partition assignment without an accelerator. If you are not using an accelerator then you should skip this method and proceed without precomputed partition assignments.

Parameters:
  • ivf_model (IvfModel) – An IvfModel, previously created by train_ivf which the data will be assigned to.

  • accelerator (Union[str, torch.Device]) – An optional accelerator to use to offload computation to specialized hardware. Currently supported values are the same as those in train_ivf

  • output_uri (Optional[str], default None) – Destination Lance dataset where the partition assignments will be written Can be None in which case a random directory will be used.

Returns:

The path of the partition assignment dataset (will be equal to output_uri unless the value is None)

Return type:

str

load_shuffled_vectors(filenames: list[str], dir_path: str, ivf: IvfModel, pq: PqModel, index_name: str | None = None)

Takes filenames of the sorted, transformed vector files as input. Loads these sorted files and commits the index into the dataset.

Parameters:
  • filenames (list[str]) – The filenames of the sorted storage files.

  • dir_path (str) – Path of the directory where all the files are located.

  • index_name (Optional[str]) – The name of the index to be created. If not provided, the default name will be “{column_name}_idx”.

  • ivf (IvfModel) – The IVF model used to create the inputs.

  • pq (PqModel) – The PQ model used to create the inputs.

shuffle_transformed_vectors(unsorted_filenames: list[str], dir_path: str, ivf: IvfModel, shuffle_output_root_filename: str | None = 'sorted') list[str]

Take the transformed, unsorted vector files as input, and create sorted storage files. Sorting is done based on the partition id. This function only makes sense if the transformed vector file contains a partition_id column.

Parameters:
  • unsorted_filenames (list[str]) – The filenames of the unsorted files.

  • dir_path (str) – Directory where all the files are located, and where output files will be placed.

  • ivf (IvfModel) – The IVF model used for the transformations (e.g. partition assignment)

  • shuffle_output_root_filename (Optional[str]) – The root filename for the sorted output files. If not provided, the root filename used will be “sorted”.

Returns:

The file paths of the sorted transformed vector files. These will be of the form shuffle_output_root_filename_i.lance.

Return type:

list[str]

train_ivf(num_partitions=None, *, distance_type='l2', accelerator: str | torch.Device | None = None, sample_rate: int = 256, max_iters: int = 50) IvfModel

Train IVF centroids for the given vector column.

This will run k-means clustering on the given vector column to train the IVF centroids. This is the first step in several vector indices. The centroids will be used to partition the vectors into different clusters.

IVF centroids are trained from a sample of the data (determined by the sample_rate). While this sample is not huge it might still be quite large.

K-means is an iterative algorithm that can be computationally expensive. The accelerator argument can be used to offload the computation to a hardware accelerator such as a GPU or TPU.

Parameters:
  • num_partitions (int) – The number of partitions to train. Large values are more expensive to train and can lead to longer search times. Smaller values could lead to overtraining, reduced recall, and require large nprobes values. If not specified the default will be the integer nearest the square root of the number of rows.

  • distance_type ("l2" | "dot" | "cosine") – The distance type to used. This is defined in more detail in the LanceDB documentation on creating indices.

  • accelerator (str | torch.Device) – An optional accelerator to use to offload computation to specialized hardware. Currently supported values are “cuda” and “mps”.

  • sample_rate (int) – IVF is trained on a random sample of the dataset. The sample_rate determines the size of this sample. There will be sample_rate rows loaded for each partition for a total of sample_rate * num_partitions rows. If the dataset does not contain enough rows an error will be raised.

  • max_iters (int) – K-means is an iterative algorithm that is run until it converges. In some cases, k-means will not converge but will cycle between various possible minima. In these cases we must terminate or run forever. The max_iters parameter defines a cutoff at which we terminate training.

train_pq(ivf_model: IvfModel, num_subvectors=None, *, sample_rate: int = 256, max_iters: int = 50) PqModel

Train a PQ model for a given column.

This will run k-means clustering on each subvector to determine the centroids that will be used to quantize the subvectors. This step runs against a randomly chosen sample of the data. The sample size is typically quite small and PQ training is relatively fast regardless of dataset scale. As a result, accelerators are not needed here.

Parameters:
  • ivf_model (IvfModel) – The IVF model to use to partition the vectors into clusters. This is needed because PQ is trained on residuals from the IVF model.

  • num_subvectors (int) –

    The number of subvectors to divide the source vectors into. This must be a divisor of the vector dimension. If not specified the default will be the vector dimension divided by 16 if the dimension is divisible by 16, otherwise the vector dimension divided by 8 if the dimension is divisible by 8.

    Automatic calculation of num_subvectors will fail if the vector dimension is not divisible by 16 or 8. In this case you must specify num_subvectors manually (though any value you choose is likely to lead to poor performance)

  • sample_rate (int) – This parameter is used in the same way as in the IVF model.

  • max_iters (int) – This parameter is used in the same way as in the IVF model.

transform_vectors(ivf: IvfModel, pq: PqModel, dest_uri: str, fragments: list[lance.fragment.LanceFragment] | None = None, partition_ds_uri: str | None = None)

Apply transformations to the vectors in the dataset and create an unsorted storage file. The unsorted storage file is a lance file that will at least have a row id column. Normally it will have other columns containing the transform outputs (such as partition id and PQ code)

Parameters:
  • ivf (IvfModel) – The IVF model to use for the transformations (e.g. partition assignment)

  • pq (PqModel) – The PQ model to use for the transformations (e.g. quantization)

  • dest_uri (str) – The URI to save the transformed vectors to. The URI can be a local file path or a cloud storage path.

  • fragments (list[LanceFragment]) – The list of data fragments to use when computing the transformed vectors. This is an optional parameter (the default uses all fragments).

  • partition_ds_uri (str) – The URI of a precomputed partitions dataset. This allows the partition transform to be skipped, using the precomputed value instead. This is optional.

class lance.indices.IvfModel(centroids: Array, distance_type: str)

Bases: object

A class that represents a trained IVF model.

centroids

The centroids of the IVF clusters

distance_type

The distance type used to train the IVF model

classmethod load(uri: str)

Load an IVF model from a lance file.

Parameters:

uri (str) – The URI to load the model from. The URI can be a local file path or a cloud storage path.

property num_partitions: int

The number of partitions / centroids in the IVF model

save(uri: str)

Save the IVF model to a lance file.

Parameters:

uri (str) – The URI to save the model to. The URI can be a local file path or a cloud storage path.

class lance.indices.PqModel(num_subvectors: int, codebook: FixedSizeListArray)

Bases: object

A class that represents a trained PQ model

Can be saved / loaded to checkpoint progress.

codebook

The centroids of the PQ clusters

property dimension

The dimension of the vectors this model was trained on

classmethod load(uri: str)

Load a PQ model from a lance file.

Parameters:

uri (str) – The URI to load the model from. The URI can be a local file path or a cloud storage path.

num_subvectors

The number of subvectors to divide source vectors into

save(uri: str)

Save the PQ model to a lance file.

Parameters:

uri (str) – The URI to save the model to. The URI can be a local file path or a cloud storage path.

lance.log module

lance.log.get_log_level()
lance.log.get_python_log_level(rust_log_level: str) str
lance.log.set_logger(file_path: str | None = 'pylance.log', name='pylance', level=20, format_string=None, log_handler=None)

lance.optimize module

class lance.optimize.Compaction

Bases: object

File compaction operation.

To run with multiple threads in a single process, just use execute().

To run with multiple processes, first use plan() to construct a plan, then execute the tasks in parallel, and finally use commit(). The CompactionPlan contains many CompactionTask objects, which can be pickled and sent to other processes. The tasks produce RewriteResult objects, which can be pickled and sent back to the main process to be passed to commit().

static commit(dataset, rewrites)

Commit a compaction operation.

Once tasks from plan() have been executed, the results can be passed to this method to commit the compaction. It is not required that all of the original tasks are passed. For example, if only a subset were successful or completed before a deadline, you can pass just those.

Parameters:
  • dataset (lance.Dataset) – The dataset to compact. The dataset instance will be updated to the new version once committed.

  • rewrites (List[RewriteResult]) – The results of the compaction tasks to include in the commit.

Return type:

CompactionMetrics

static execute(dataset, options)

Execute a full compaction operation.

Parameters:
  • dataset (lance.Dataset) – The dataset to compact. The dataset instance will be updated to the new version once complete.

  • options (CompactionOptions) – The compaction options.

Returns:

The metrics from the compaction operation.

Return type:

CompactionMetrics

static plan(dataset, options)

Plan a compaction operation.

This is intended for users who want to run compaction in a distributed fashion. For running on a single process, use execute() instead.

Parameters:
  • dataset (lance.Dataset) – The dataset to compact.

  • options (CompactionOptions) – The compaction options.

Return type:

CompactionPlan

class lance.optimize.CompactionMetrics

Bases: object

files_added

The number of files that have been added, which is always equal to the number of fragments.

Type:

int

files_removed

The number of files that have been removed, including deletion files.

Type:

int

fragments_added

The number of new fragments that have been added.

Type:

int

fragments_removed

The number of fragments that have been overwritten.

Type:

int

class lance.optimize.CompactionOptions

Bases: TypedDict

Options for compaction.

batch_size: int | None

The batch size to use when scanning input fragments. You may want to reduce this if you are running out of memory during compaction.

The default will use the same default from scanner.

materialize_deletions: bool | None

Whether to compact fragments with soft deleted rows so they are no longer present in the file. (default: True)

materialize_deletions_threadhold: float | None

The fraction of original rows that are soft deleted in a fragment before the fragment is a candidate for compaction. (default: 0.1 = 10%)

max_bytes_per_file: int | None

Max number of bytes in a single file. This does not affect which fragments need compaction, but does affect how they are re-written if selected. If this value is too small you may end up with fragments that are smaller than target_rows_per_fragment.

The default will use the default from write_dataset.

max_rows_per_group: int | None

Max number of rows per group. This does not affect which fragments need compaction, but does affect how they are re-written if selected. (default: 1024)

num_threads: int | None

The number of threads to use when performing compaction. If not specified, defaults to the number of cores on the machine.

target_rows_per_fragment: int | None

The target number of rows per fragment. This is the number of rows that will be in each fragment after compaction. (default: 1024*1024)

class lance.optimize.CompactionPlan

Bases: object

A plan to compact small dataset fragments into larger ones.

Created by lance.optimize.Compaction.plan().

static from_json(json)

Load a plan from a JSON representation.

Parameters:

json (str) – The JSON representation of the plan.

Return type:

CompactionPlan

json()

Get a JSON representation of the plan.

Return type:

str

Warning

The JSON representation is not guaranteed to be stable across versions.

num_tasks()

int : The number of compaction tasks in the plan.

read_version

The read version of the dataset that this plan was created from.

Type:

int

tasks

The individual tasks in the plan.

Type:

List[CompactionTask]

class lance.optimize.CompactionTask

Bases: object

execute(dataset)

Execute the compaction task and return the RewriteResult.

The rewrite result should be passed onto lance.optimize.Compaction.commit().

fragments

The fragments that will be compacted.

Type:

List[lance.fragment.FragmentMetadata]

static from_json(json)

Load a task from a JSON representation.

Parameters:

json (str) – The JSON representation of the task.

Return type:

CompactionTask

json()

Get a JSON representation of the task.

Return type:

str

Warning

The JSON representation is not guaranteed to be stable across versions.

read_version

The read version of the dataset that this task was created from.

Type:

int

class lance.optimize.RewriteResult

Bases: object

The result of a single compaction task.

Created by lance.optimize.CompactionTask.execute().

This result is pickle-able, so it can be serialized and sent back to the main process to be passed to lance.optimize.Compaction.commit().

static from_json(json)

Load a result from a JSON representation.

json()

Get a JSON representation of the result.

Return type:

str

Warning

The JSON representation is not guaranteed to be stable across versions.

metrics

The metrics from this compaction task.

Type:

CompactionMetrics

new_fragments

The metadata for fragments that are being added.

Type:

List[lance.fragment.FragmentMetadata]

original_fragments

The metadata for fragments that are being replaced.

Type:

List[lance.fragment.FragmentMetadata]

read_version

The version of the dataset the optimize operation is based on.

Type:

int

lance.progress module

class lance.progress.FileSystemFragmentWriteProgress(base_uri: str, metadata: Dict[str, str] | None = None)

Bases: FragmentWriteProgress

Progress tracking for Writing a Dataset or Fragment.

Warns:
  • This tracking class is experimental and will change in the future.

  • This implementation writes a JSON file to track in-progress state

  • to the filesystem for each fragment.

PROGRESS_EXT: str = '.in_progress'
begin(fragment: FragmentMetadata, **kwargs)

Called when a new fragment is created.

Parameters:

fragment (FragmentMetadata) – The fragment that is open to write to.

complete(fragment: FragmentMetadata, **kwargs)

Called when a fragment is completed

class lance.progress.FragmentWriteProgress

Bases: ABC

Progress tracking for Writing a Dataset or Fragment.

Warns:

This tracking class is experimental and may change in the future.

abstract begin(fragment: FragmentMetadata, **kwargs) None

Called when a new fragment is about to be written.

Parameters:
  • fragment (FragmentMetadata) – The fragment that is open to write to. The fragment id might not yet be assigned at this point.

  • kwargs (dict, optional) – Extra keyword arguments to pass to the implementation.

Return type:

None

abstract complete(fragment: FragmentMetadata, **kwargs) None

Callback when a fragment is completely written.

Parameters:
  • fragment (FragmentMetadata) – The fragment that is open to write to.

  • kwargs (dict, optional) – Extra keyword arguments to pass to the implementation.

class lance.progress.NoopFragmentWriteProgress

Bases: FragmentWriteProgress

No-op implementation of WriteProgressTracker.

This is the default implementation.

begin(fragment: FragmentMetadata, **kargs)

Called when a new fragment is about to be written.

Parameters:
  • fragment (FragmentMetadata) – The fragment that is open to write to. The fragment id might not yet be assigned at this point.

  • kwargs (dict, optional) – Extra keyword arguments to pass to the implementation.

Return type:

None

complete(fragment: FragmentMetadata, **kwargs)

Callback when a fragment is completely written.

Parameters:
  • fragment (FragmentMetadata) – The fragment that is open to write to.

  • kwargs (dict, optional) – Extra keyword arguments to pass to the implementation.

lance.sampler module

class lance.sampler.FragmentSampler

Bases: Sampler

Sampling over Fragments.

To implement a new FragmentSampler, you can implement the iter_fragments method to yield fragments in desired order.

abstract iter_fragments(ds: lance.LanceDataset, *args, **kwargs) Generator[lance.LanceFragment, None, None]

Iterate over data fragments.

class lance.sampler.FullScanSampler

Bases: FragmentSampler

Default Sampler, which scan the entire dataset sequentially.

iter_fragments(dataset: lance.LanceDataset, **kwargs) Generator[lance.LanceFragment, None, None]

Iterate over data fragments.

class lance.sampler.Sampler

Bases: ABC

Sampler over LanceDataset.

To implement a new Sampler, you can implement the __call__ method to yield a pyarrow.RecordBatch.

class lance.sampler.ShardedBatchSampler(rank: int, world_size: int, randomize: bool = False, seed: int = 0)

Bases: Sampler

Sharded batch sampler.

Each rank / process will process a subset of the batches.

The input is subdivided into batches (of size batch_size). Each rank / process takes every Nth batch (where N is the world size). The order in which batches are loaded is randomized.

When there is no filter then each process only needs to load the rows assigned to it but this process is still slightly less efficient than ShardedFragmentSampler since it requires loading rows by range instead of loading all rows for a given fragment.

If there is a filter then we cannot divide the row ids ahead of time. Instead, each process will load the entire filtered dataset and discard the rows that are not assigned to it. The resulting stream is then randomized via a reservoir sampler. This does not perfectly randomize the stream but it should generate a stream that is random enough for many use cases.

static from_torch(randomize: bool = False, seed: int = 0) ShardedBatchSampler

Use it from a PyTorch distributed environment.

Automatically infer rank and world_size from torch.distributed.

class lance.sampler.ShardedFragmentSampler(rank: int, world_size: int, randomize: bool = False, seed: int = 0)

Bases: FragmentSampler

Sharded fragments by rank and world_size.

Each rank / process will process a subset of the fragments. It yields batches from ds.fragments[rank::world_size].

This sampler is more efficient than ShardedBatchSampler when the dataset is large.

Parameters:
  • rank (int) – The rank of the process in the distributed cluster.

  • world_size (int) – The total number of processes in the distributed cluster.

  • randomize (bool) – If set true, randomize

  • seed (int) – The random seed to use when randomize is set true.

static from_torch(randomize: bool = False, seed: int = 0) ShardedFragmentSampler

Use from a PyTorch distributed environment.

Automatically infer rank and world_size from torch.distributed.

iter_fragments(dataset: lance.LanceDataset, **kwargs) Generator[lance.LanceFragment, None, None]

Iterate over data fragments.

lance.sampler.maybe_sample(dataset: str | Path | lance.LanceDataset, n: int, columns: list[str] | dict[str, str] | str, batch_size: int = 10240, max_takes: int = 2048, filt: str | None = None) Generator[pa.RecordBatch, None, None]

Sample n records from the dataset.

Parameters:
  • dataset (Union[str, Path, lance.LanceDataset]) – The dataset to sample from.

  • n (int) – The number of records to sample.

  • columns (Union[list[str], dict[str, str], str]) – The columns to load.

  • batch_size (int, optional) – The batch size to use when loading the data, by default 10240.

  • max_takes (int, optional) – The maximum number of takes to perform, by default 2048. This is employed to minimize the number of random reads necessary for sampling. A sufficiently large value can provide an effective random sample without the need for excessive random reads.

  • filter (str, optional) – The filter to apply to the dataset, by default None. If a filter is provided, then we will first load all row ids in memory and then batch through the ids in random order until enough matches have been found.

Returns:

A generator that yields [RecordBatch] of data.

Return type:

Generator[pa.RecordBatch]

lance.schema module

class lance.schema.LanceSchema

Bases: object

A Lance Schema.

Unlike a PyArrow schema, a Lance schema assigns every field an integer id. This is used to track fields across versions. This assignment of fields to ids is initially done in depth-first order, but as a schema evolves the assignment may change.

The assignment of field ids is particular to each dataset, so these schemas cannot be used interchangeably between datasets.

static from_pyarrow(schema)

Create a Lance schema from a PyArrow schema.

This will assign field ids in depth-first order. Be aware this may not match the correct schema for a particular table.

to_pyarrow()

Convert the schema to a PyArrow schema.

lance.schema.json_to_schema(schema_json: Dict[str, Any]) Schema

Converts a JSON string to a PyArrow schema.

Parameters:

schema_json (Dict[str, Any]) – The JSON payload to convert to a PyArrow Schema.

lance.schema.schema_to_json(schema: Schema) Dict[str, Any]

Converts a pyarrow schema to a JSON string.

lance.tracing module

lance.tracing.lance_trace_to_chrome(path=None, level=None)
lance.tracing.trace_to_chrome(*, file: str = None, level: str = None)

Begins tracing lance events to a chrome trace file.

The trace file can be opened with chrome://tracing or with the Perfetto UI.

The file will be finished (and closed) when the python process exits.

Parameters:
  • file (str, optional) – The file to write the trace to. If None, then a file in the current directory will be created named ./trace-{unix epoch in micros}.json

  • level (str, optional) – The level of detail to trace. One of “trace”, “debug”, “info”, “warn” or “error”. If None, then “info” is used.

lance.types module

lance.udf module

class lance.udf.BatchUDF(func, output_schema=None, checkpoint_file=None)

Bases: object

A user-defined function that can be passed to LanceDataset.add_columns().

Use lance.add_columns_udf() decorator to wrap a function with this class.

class lance.udf.BatchUDFCheckpoint(path)

Bases: object

A cache for BatchUDF results to avoid recomputation.

This is backed by a SQLite database.

class BatchInfo(fragment_id, batch_index)

Bases: NamedTuple

batch_index: int

Alias for field number 1

fragment_id: int

Alias for field number 0

cleanup()
get_batch(info: BatchInfo) pa.RecordBatch | None
get_fragment(fragment_id: int) str | None

Retrieves a fragment as a JSON string.

insert_batch(info: BatchInfo, batch: pa.RecordBatch)
insert_fragment(fragment_id: int, fragment: str)

Save a JSON string of a fragment to the cache.

lance.udf.batch_udf(output_schema=None, checkpoint_file=None)

Create a user defined function (UDF) that adds columns to a dataset.

This function is used to add columns to a dataset. It takes a function that takes a single argument, a RecordBatch, and returns a RecordBatch. The function is called once for each batch in the dataset. The function should not modify the input batch, but instead create a new batch with the new columns added.

Parameters:
  • output_schema (Schema, optional) – The schema of the output RecordBatch. This is used to validate the output of the function. If not provided, the schema of the first output RecordBatch will be used.

  • checkpoint_file (str or Path, optional) – If specified, this file will be used as a cache for unsaved results of this UDF. If the process fails, and you call add_columns again with this same file, it will resume from the last saved state. This is useful for long running processes that may fail and need to be resumed. This file may get very large. It will hold up to an entire data files’ worth of results on disk, which can be multiple gigabytes of data.

Return type:

AddColumnsUDF

lance.udf.normalize_transform(udf_like: Dict[str, str] | BatchUDF | ReaderLike, data_source: LanceDataset | LanceFragment, read_columns: List[str] | None = None, reader_schema: pa.Schema | None = None)

lance.util module

class lance.util.HNSW(hnsw)

Bases: object

static build(vectors_array: Iterator[Array], max_level=7, m=20, ef_construction=100) HNSW
to_lance_file(file_path)
vectors() Array
class lance.util.KMeans(k: int, metric_type: Literal['l2', 'dot', 'cosine'] = 'l2', max_iters: int = 50, centroids: FixedSizeListArray | None = None)

Bases: object

KMean model for clustering.

It works with 2-D arrays of float32 type, and support distance metrics: “l2”, “cosine”, “dot”.

Note, you must train the kmeans model by calling fit() before calling predict(). Calling fit() again will reset the model.

Currently, the initial centroids are initialized randomly. kmean++ is implemented but not exposed yet.

Examples

>>> import numpy as np
>>> import lance
>>> data = np.random.randn(1000, 128).astype(np.float32)
>>> kmeans = lance.util.KMeans(8, metric_type="l2")
>>> kmeans.fit(data)
>>> centroids = np.stack(kmeans.centroids.to_numpy(zero_copy_only=False))
>>> clusters = kmeans.predict(data)
property centroids: FixedShapeTensorType | None

Returns the centroids of the model,

Returns None if the model is not trained.

fit(data: FixedSizeListArray | FixedShapeTensorArray | ndarray)

Fit the model to the data.

Parameters:

data (pa.FixedSizeListArray, pa.FixedShapeTensorArray, np.ndarray) – The data to fit the model to. Must be a 2-D array of float32 type.

predict(data: FixedSizeListArray | FixedShapeTensorArray | ndarray) UInt32Array

Predict the cluster for each vector in the data.

lance.util.sanitize_ts(ts: ts_types) datetime

Returns a python datetime object from various timestamp input types.

lance.util.td_to_micros(td: timedelta) int

Returns the number of microseconds in a timedelta object.

lance.util.validate_vector_index(dataset, column: str, refine_factor: int = 5, sample_size: int | None = None, pass_threshold: float = 1.0)

Run in-sample queries and make sure that the recall for k=1 is very high (should be near 100%)

Parameters:
  • dataset (LanceDataset) – The dataset to sanity check.

  • column (str) – The column name of the vector column.

  • refine_factor (int, default=5) – The refine factor to use for the nearest neighbor query.

  • sample_size (int, optional) – The number of vectors to sample from the dataset. If None, the entire dataset will be used.

  • pass_threshold (float, default=1.0) – The minimum fraction of vectors that must pass the sanity check. If less than this fraction of vectors pass, a ValueError will be raised.

lance.vector module

Embedding vector utilities

lance.vector.compute_partitions(dataset: LanceDataset, column: str, kmeans: Any, batch_size: int = 40960, dst_dataset_uri: str | Path | None = None, allow_cuda_tf32: bool = True, num_sub_vectors: int | None = None, filter_nan: bool = True, sample_size: int | None = None) str

Compute partitions for each row using GPU kmeans and spill to disk.

Parameters:
  • dataset (LanceDataset) – Dataset to compute partitions for.

  • column (str) – Column name of the vector column.

  • kmeans (lance.torch.kmeans.KMeans) – KMeans model to use to compute partitions.

  • batch_size (int, default 10240) – The batch size used to read the dataset.

  • dst_dataset_uri (Union[str, Path], optional) – The path to store the partitions. If not specified a random directory is used instead

  • allow_tf32 (bool, default True) – Whether to allow tf32 for matmul on CUDA.

Returns:

The absolute path of the partition dataset.

Return type:

str

lance.vector.compute_pq_codes(dataset: LanceDataset, kmeans_list: List[Any], batch_size: int = 40960, dst_dataset_uri: str | Path | None = None, allow_cuda_tf32: bool = True) str

Compute pq codes for each row using GPU kmeans and spill to disk.

Parameters:
  • dataset (LanceDataset) – Dataset to compute pq codes for.

  • kmeans_list (List[lance.torch.kmeans.KMeans]) – KMeans models to use to compute pq (one per subspace)

  • batch_size (int, default 10240) – The batch size used to read the dataset.

  • dst_dataset_uri (Union[str, Path], optional) – The path to store the partitions. If not specified a random directory is used instead

  • allow_tf32 (bool, default True) – Whether to allow tf32 for matmul on CUDA.

Returns:

The absolute path of the pq codes dataset.

Return type:

str

lance.vector.one_pass_assign_ivf_pq_on_accelerator(dataset: LanceDataset, column: str, metric_type: Literal['l2', 'cosine', 'dot'], accelerator: str | 'torch.Device', ivf_kmeans: Any, pq_kmeans_list: List[Any], dst_dataset_uri: str | Path | None = None, batch_size: int = 40960, *, filter_nan: bool = True, allow_cuda_tf32: bool = True)

Compute partitions for each row using GPU kmeans and spill to disk.

Returns:

The absolute path of the ivfpq codes dataset, as precomputed partition buffers.

Return type:

str

lance.vector.one_pass_train_ivf_pq_on_accelerator(dataset: LanceDataset, column: str, k: int, metric_type: Literal['l2', 'cosine', 'dot'], accelerator: str | 'torch.Device', num_sub_vectors: int, batch_size: int = 40960, *, sample_rate: int = 256, max_iters: int = 50, filter_nan: bool = True)
lance.vector.train_ivf_centroids_on_accelerator(dataset: LanceDataset, column: str, k: int, metric_type: Literal['l2', 'cosine', 'dot'], accelerator: str | 'torch.Device', batch_size: int = 40960, *, sample_rate: int = 256, max_iters: int = 50, filter_nan: bool = True) Tuple[np.ndarray, Any]

Use accelerator (GPU or MPS) to train kmeans.

lance.vector.train_pq_codebook_on_accelerator(dataset: LanceDataset, metric_type: Literal['l2', 'cosine', 'dot'], accelerator: str | 'torch.Device', num_sub_vectors: int, batch_size: int = 40960) Tuple[np.ndarray, List[Any]]

Use accelerator (GPU or MPS) to train pq codebook.

lance.vector.vec_to_table(data: dict | list | ndarray, names: str | list | None = None, ndim: int | None = None, check_ndim: bool = True) Table

Create a pyarrow Table containing vectors. Vectors are created as FixedSizeListArray’s in pyarrow with Float32 values.

Examples

>>> import numpy as np
>>> np.random.seed(0)
>>> from lance.vector import vec_to_table
>>> dd = {"vector0": np.random.randn(10), "vector1": np.random.randn(10)}
>>> vec_to_table(dd)
pyarrow.Table
id: string
vector: fixed_size_list<item: float>[10]
  child 0, item: float
----
id: [["vector0","vector1"]]
vector: [[[1.7640524,0.4001572,0.978738,2.2408931,1.867558,-0.9772779,0.95008844,-0.1513572,-0.10321885,0.41059852],[0.14404356,1.4542735,0.7610377,0.121675014,0.44386324,0.33367434,1.4940791,-0.20515826,0.3130677,-0.85409576]]]
>>> vec_to_table(dd).to_pandas()
        id                                             vector
0  vector0  [1.7640524, 0.4001572, 0.978738, 2.2408931, 1....
1  vector1  [0.14404356, 1.4542735, 0.7610377, 0.121675014...
Parameters:
  • data (dict, list, or np.ndarray) – If dict, the keys are added as “id” column If list, then each element is assumed to be a vector If ndarray, then each row is assumed to be a vector

  • names (str or list, optional) – If data is dict, then names should be a list of 2 str; default [“id”, “vector”] If data is list or ndarray, then names should be str; default “vector”

  • ndim (int, optional) – Number of dimensions of the vectors. Inferred if omitted.

  • check_ndim (bool, default True) – Whether to verify that all vectors have the same length

Returns:

tbl – A pyarrow Table with vectors converted to appropriate types

Return type:

pa.Table

Module contents

class lance.BlobColumn(blob_column: Array | ChunkedArray)

Bases: object

A utility to wrap a Pyarrow binary column and iterate over the rows as file-like objects.

This can be useful for working with medium-to-small binary objects that need to interface with APIs that expect file-like objects. For very large binary objects (4-8MB or more per value) you might be better off creating a blob column and using lance.Dataset.take_blobs to access the blob data.

class lance.BlobFile(inner: LanceBlobFile)

Bases: RawIOBase

Represents a blob in a Lance dataset as a file-like object.

close() None

Flush and close the IO object.

This method has no effect if the file is already closed.

property closed: bool
readable() bool

Return whether object was opened for reading.

If False, read() will raise OSError.

readall() bytes

Read until EOF, using multiple read() call.

readinto(b: bytearray) int
seek(offset: int, whence: int = 0) int

Change the stream position to the given byte offset.

offset

The stream position, relative to ‘whence’.

whence

The relative position to seek from.

The offset is interpreted relative to the position indicated by whence. Values for whence are:

  • os.SEEK_SET or 0 – start of stream (the default); offset should be zero or positive

  • os.SEEK_CUR or 1 – current stream position; offset may be negative

  • os.SEEK_END or 2 – end of stream; offset is usually negative

Return the new absolute position.

seekable() bool

Return whether object supports random access.

If False, seek(), tell() and truncate() will raise OSError. This method may need to do a test seek().

size() int

Returns the size of the blob in bytes.

tell() int

Return current stream position.

class lance.FragmentMetadata(id: int, files: List[DataFile], physical_rows: int, deletion_file: DeletionFile | None = None, row_id_meta: RowIdMeta | None = None)

Bases: object

Metadata for a fragment.

id

The ID of the fragment.

Type:

int

files

The data files of the fragment. Each data file must have the same number of rows. Each file stores a different subset of the columns.

Type:

List[DataFile]

physical_rows

The number of rows originally in this fragment. This is the number of rows in the data files before deletions.

Type:

int

deletion_file

The deletion file, if any.

Type:

Optional[DeletionFile]

row_id_meta

The row id metadata, if any.

Type:

Optional[RowIdMeta]

data_files() List[DataFile]
deletion_file: DeletionFile | None = None
files: List[DataFile]
static from_json(json_data: str) FragmentMetadata
id: int
property num_deletions: int

The number of rows that have been deleted from this fragment.

property num_rows: int

The number of rows in this fragment after deletions.

physical_rows: int
row_id_meta: RowIdMeta | None = None
to_json() dict

Get this as a simple JSON-serializable dictionary.

class lance.LanceDataset(uri: str | Path, version: int | str | None = None, block_size: int | None = None, index_cache_size: int | None = None, metadata_cache_size: int | None = None, commit_lock: CommitLock | None = None, storage_options: Dict[str, str] | None = None, serialized_manifest: bytes | None = None, default_scan_options: Dict[str, Any] | None = None)

Bases: Dataset

A Lance Dataset in Lance format where the data is stored at the given uri.

add_columns(transforms: Dict[str, str] | BatchUDF | ReaderLike, read_columns: List[str] | None = None, reader_schema: pa.Schema | None = None, batch_size: int | None = None)

Add new columns with defined values.

There are several ways to specify the new columns. First, you can provide SQL expressions for each new column. Second you can provide a UDF that takes a batch of existing data and returns a new batch with the new columns. These new columns will be appended to the dataset.

You can also provide a RecordBatchReader which will read the new column values from some external source. This is often useful when the new column values have already been staged to files (often by some distributed process)

See the lance.add_columns_udf() decorator for more information on writing UDFs.

Parameters:
  • transforms (dict or AddColumnsUDF or ReaderLike) – If this is a dictionary, then the keys are the names of the new columns and the values are SQL expression strings. These strings can reference existing columns in the dataset. If this is a AddColumnsUDF, then it is a UDF that takes a batch of existing data and returns a new batch with the new columns.

  • read_columns (list of str, optional) – The names of the columns that the UDF will read. If None, then the UDF will read all columns. This is only used when transforms is a UDF. Otherwise, the read columns are inferred from the SQL expressions.

  • reader_schema (pa.Schema, optional) – Only valid if transforms is a ReaderLike object. This will be used to determine the schema of the reader.

  • batch_size (int, optional) – The number of rows to read at a time from the source dataset when applying the transform. This is ignored if the dataset is a v1 dataset.

Examples

>>> import lance
>>> import pyarrow as pa
>>> table = pa.table({"a": [1, 2, 3]})
>>> dataset = lance.write_dataset(table, "my_dataset")
>>> @lance.batch_udf()
... def double_a(batch):
...     df = batch.to_pandas()
...     return pd.DataFrame({'double_a': 2 * df['a']})
>>> dataset.add_columns(double_a)
>>> dataset.to_table().to_pandas()
   a  double_a
0  1         2
1  2         4
2  3         6
>>> dataset.add_columns({"triple_a": "a * 3"})
>>> dataset.to_table().to_pandas()
   a  double_a  triple_a
0  1         2         3
1  2         4         6
2  3         6         9

See also

LanceDataset.merge

Merge a pre-computed set of columns into the dataset.

alter_columns(*alterations: Iterable[Dict[str, Any]])

Alter column name, data type, and nullability.

Columns that are renamed can keep any indices that are on them. If a column has an IVF_PQ index, it can be kept if the column is casted to another type. However, other index types don’t support casting at this time.

Column types can be upcasted (such as int32 to int64) or downcasted (such as int64 to int32). However, downcasting will fail if there are any values that cannot be represented in the new type. In general, columns can be casted to same general type: integers to integers, floats to floats, and strings to strings. However, strings, binary, and list columns can be casted between their size variants. For example, string to large string, binary to large binary, and list to large list.

Columns that are renamed can keep any indices that are on them. However, if the column is casted to a different type, it’s indices will be dropped.

Parameters:

alterations (Iterable[Dict[str, Any]]) –

A sequence of dictionaries, each with the following keys:

  • ”path”: str

    The column path to alter. For a top-level column, this is the name. For a nested column, this is the dot-separated path, e.g. “a.b.c”.

  • ”name”: str, optional

    The new name of the column. If not specified, the column name is not changed.

  • ”nullable”: bool, optional

    Whether the column should be nullable. If not specified, the column nullability is not changed. Only non-nullable columns can be changed to nullable. Currently, you cannot change a nullable column to non-nullable.

  • ”data_type”: pyarrow.DataType, optional

    The new data type to cast the column to. If not specified, the column data type is not changed.

Examples

>>> import lance
>>> import pyarrow as pa
>>> schema = pa.schema([pa.field('a', pa.int64()),
...                     pa.field('b', pa.string(), nullable=False)])
>>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]})
>>> dataset = lance.write_dataset(table, "example")
>>> dataset.alter_columns({"path": "a", "name": "x"},
...                       {"path": "b", "nullable": True})
>>> dataset.to_table().to_pandas()
   x  b
0  1  a
1  2  b
2  3  c
>>> dataset.alter_columns({"path": "x", "data_type": pa.int32()})
>>> dataset.schema
x: int32
b: string
checkout_version(version: int | str) LanceDataset

Load the given version of the dataset.

Unlike the dataset() constructor, this will re-use the current cache. This is a no-op if the dataset is already at the given version.

Parameters:

version (int | str,) – The version to check out. A version number (int) or a tag (str) can be provided.

Return type:

LanceDataset

cleanup_old_versions(older_than: timedelta | None = None, *, delete_unverified: bool = False, error_if_tagged_old_versions: bool = True) CleanupStats

Cleans up old versions of the dataset.

Some dataset changes, such as overwriting, leave behind data that is not referenced by the latest dataset version. The old data is left in place to allow the dataset to be restored back to an older version.

This method will remove older versions and any data files they reference. Once this cleanup task has run you will not be able to checkout or restore these older versions.

Parameters:
  • older_than (timedelta, optional) – Only versions older than this will be removed. If not specified, this will default to two weeks.

  • delete_unverified (bool, default False) –

    Files leftover from a failed transaction may appear to be part of an in-progress operation (e.g. appending new data) and these files will not be deleted unless they are at least 7 days old. If delete_unverified is True then these files will be deleted regardless of their age.

    This should only be set to True if you can guarantee that no other process is currently working on this dataset. Otherwise the dataset could be put into a corrupted state.

  • error_if_tagged_old_versions (bool, default True) – Some versions may have tags associated with them. Tagged versions will not be cleaned up, regardless of how old they are. If this argument is set to True (the default), an exception will be raised if any tagged versions match the parameters. Otherwise, tagged versions will be ignored without any error and only untagged versions will be cleaned up.

static commit(base_uri: str | Path | LanceDataset, operation: LanceOperation.BaseOperation, read_version: int | None = None, commit_lock: CommitLock | None = None, storage_options: Dict[str, str] | None = None, enable_v2_manifest_paths: bool | None = None, detached: bool | None = False, max_retries: int = 20) LanceDataset

Create a new version of dataset

This method is an advanced method which allows users to describe a change that has been made to the data files. This method is not needed when using Lance to apply changes (e.g. when using LanceDataset or write_dataset().)

It’s current purpose is to allow for changes being made in a distributed environment where no single process is doing all of the work. For example, a distributed bulk update or a distributed bulk modify operation.

Once all of the changes have been made, this method can be called to make the changes visible by updating the dataset manifest.

Warning

This is an advanced API and doesn’t provide the same level of validation as the other APIs. For example, it’s the responsibility of the caller to ensure that the fragments are valid for the schema.

Parameters:
  • base_uri (str, Path, or LanceDataset) – The base uri of the dataset, or the dataset object itself. Using the dataset object can be more efficient because it can re-use the file metadata cache.

  • operation (BaseOperation) – The operation to apply to the dataset. This describes what changes have been made. See available operations under LanceOperation.

  • read_version (int, optional) – The version of the dataset that was used as the base for the changes. This is not needed for overwrite or restore operations.

  • commit_lock (CommitLock, optional) – A custom commit lock. Only needed if your object store does not support atomic commits. See the user guide for more details.

  • storage_options (optional, dict) – Extra options that make sense for a particular storage connection. This is used to store connection parameters like credentials, endpoint, etc.

  • enable_v2_manifest_paths (bool, optional) – If True, and this is a new dataset, uses the new V2 manifest paths. These paths provide more efficient opening of datasets with many versions on object stores. This parameter has no effect if the dataset already exists. To migrate an existing dataset, instead use the migrate_manifest_paths_v2() method. Default is False. WARNING: turning this on will make the dataset unreadable for older versions of Lance (prior to 0.17.0).

  • detached (bool, optional) – If True, then the commit will not be part of the dataset lineage. It will never show up as the latest dataset and the only way to check it out in the future will be to specifically check it out by version. The version will be a random version that is only unique amongst detached commits. The caller should store this somewhere as there will be no other way to obtain it in the future.

  • max_retries (int) – The maximum number of retries to perform when committing the dataset.

Returns:

A new version of Lance Dataset.

Return type:

LanceDataset

Examples

Creating a new dataset with the LanceOperation.Overwrite operation:

>>> import lance
>>> import pyarrow as pa
>>> tab1 = pa.table({"a": [1, 2], "b": ["a", "b"]})
>>> tab2 = pa.table({"a": [3, 4], "b": ["c", "d"]})
>>> fragment1 = lance.fragment.LanceFragment.create("example", tab1)
>>> fragment2 = lance.fragment.LanceFragment.create("example", tab2)
>>> fragments = [fragment1, fragment2]
>>> operation = lance.LanceOperation.Overwrite(tab1.schema, fragments)
>>> dataset = lance.LanceDataset.commit("example", operation)
>>> dataset.to_table().to_pandas()
   a  b
0  1  a
1  2  b
2  3  c
3  4  d
static commit_batch(dest: str | Path | LanceDataset, transactions: Sequence[Transaction], commit_lock: CommitLock | None = None, storage_options: Dict[str, str] | None = None, enable_v2_manifest_paths: bool | None = None, detached: bool | None = False, max_retries: int = 20) BulkCommitResult

Create a new version of dataset with multiple transactions.

This method is an advanced method which allows users to describe a change that has been made to the data files. This method is not needed when using Lance to apply changes (e.g. when using LanceDataset or write_dataset().)

Parameters:
  • dest (str, Path, or LanceDataset) – The base uri of the dataset, or the dataset object itself. Using the dataset object can be more efficient because it can re-use the file metadata cache.

  • transactions (Iterable[Transaction]) – The transactions to apply to the dataset. These will be merged into a single transaction and applied to the dataset. Note: Only append transactions are currently supported. Other transaction types will be supported in the future.

  • commit_lock (CommitLock, optional) – A custom commit lock. Only needed if your object store does not support atomic commits. See the user guide for more details.

  • storage_options (optional, dict) – Extra options that make sense for a particular storage connection. This is used to store connection parameters like credentials, endpoint, etc.

  • enable_v2_manifest_paths (bool, optional) – If True, and this is a new dataset, uses the new V2 manifest paths. These paths provide more efficient opening of datasets with many versions on object stores. This parameter has no effect if the dataset already exists. To migrate an existing dataset, instead use the migrate_manifest_paths_v2() method. Default is False. WARNING: turning this on will make the dataset unreadable for older versions of Lance (prior to 0.17.0).

  • detached (bool, optional) – If True, then the commit will not be part of the dataset lineage. It will never show up as the latest dataset and the only way to check it out in the future will be to specifically check it out by version. The version will be a random version that is only unique amongst detached commits. The caller should store this somewhere as there will be no other way to obtain it in the future.

  • max_retries (int) – The maximum number of retries to perform when committing the dataset.

Returns:

dataset: LanceDataset

A new version of Lance Dataset.

merged: Transaction

The merged transaction that was applied to the dataset.

Return type:

dict with keys

count_rows(filter: Expression | str | None = None, **kwargs) int

Count rows matching the scanner filter.

Parameters:

**kwargs (dict, optional) – See py:method:scanner method for full parameter description.

Returns:

count – The total number of rows in the dataset.

Return type:

int

create_index(column: str | List[str], index_type: str, name: str | None = None, metric: str = 'L2', replace: bool = False, num_partitions: int | None = None, ivf_centroids: np.ndarray | pa.FixedSizeListArray | pa.FixedShapeTensorArray | None = None, pq_codebook: np.ndarray | pa.FixedSizeListArray | pa.FixedShapeTensorArray | None = None, num_sub_vectors: int | None = None, accelerator: str | 'torch.Device' | None = None, index_cache_size: int | None = None, shuffle_partition_batches: int | None = None, shuffle_partition_concurrency: int | None = None, ivf_centroids_file: str | None = None, precomputed_partition_dataset: str | None = None, storage_options: Dict[str, str] | None = None, filter_nan: bool = True, one_pass_ivfpq: bool = False, **kwargs) LanceDataset

Create index on column.

Experimental API

Parameters:
  • column (str) – The column to be indexed.

  • index_type (str) – The type of the index. "IVF_PQ, IVF_HNSW_PQ and IVF_HNSW_SQ" are supported now.

  • name (str, optional) – The index name. If not provided, it will be generated from the column name.

  • metric (str) – The distance metric type, i.e., “L2” (alias to “euclidean”), “cosine” or “dot” (dot product). Default is “L2”.

  • replace (bool) – Replace the existing index if it exists.

  • num_partitions (int, optional) – The number of partitions of IVF (Inverted File Index).

  • ivf_centroids (optional) – It can be either np.ndarray, pyarrow.FixedSizeListArray or pyarrow.FixedShapeTensorArray. A num_partitions x dimension array of existing K-mean centroids for IVF clustering. If not provided, a new KMeans model will be trained.

  • pq_codebook (optional,) –

    It can be np.ndarray, pyarrow.FixedSizeListArray, or pyarrow.FixedShapeTensorArray. A num_sub_vectors x (2 ^ nbits * dimensions // num_sub_vectors) array of K-mean centroids for PQ codebook.

    Note: nbits is always 8 for now. If not provided, a new PQ model will be trained.

  • num_sub_vectors (int, optional) – The number of sub-vectors for PQ (Product Quantization).

  • accelerator (str or torch.Device, optional) – If set, use an accelerator to speed up the training process. Accepted accelerator: “cuda” (Nvidia GPU) and “mps” (Apple Silicon GPU). If not set, use the CPU.

  • index_cache_size (int, optional) – The size of the index cache in number of entries. Default value is 256.

  • shuffle_partition_batches (int, optional) –

    The number of batches, using the row group size of the dataset, to include in each shuffle partition. Default value is 10240.

    Assuming the row group size is 1024, each shuffle partition will hold 10240 * 1024 = 10,485,760 rows. By making this value smaller, this shuffle will consume less memory but will take longer to complete, and vice versa.

  • shuffle_partition_concurrency (int, optional) –

    The number of shuffle partitions to process concurrently. Default value is 2

    By making this value smaller, this shuffle will consume less memory but will take longer to complete, and vice versa.

  • storage_options (optional, dict) – Extra options that make sense for a particular storage connection. This is used to store connection parameters like credentials, endpoint, etc.

  • filter_nan (bool) – Defaults to True. False is UNSAFE, and will cause a crash if any null/nan values are present (and otherwise will not). Disables the null filter used for nullable columns. Obtains a small speed boost.

  • one_pass_ivfpq (bool) – Defaults to False. If enabled, index type must be “IVF_PQ”. Reduces disk IO.

  • kwargs – Parameters passed to the index building process.

The SQ (Scalar Quantization) is available for only IVF_HNSW_SQ index type, this quantization method is used to reduce the memory usage of the index, it maps the float vectors to integer vectors, each integer is of num_bits, now only 8 bits are supported.

If index_type is “IVF_*”, then the following parameters are required:

num_partitions

If index_type is with “PQ”, then the following parameters are required:

num_sub_vectors

Optional parameters for IVF_PQ:

  • ivf_centroids

    Existing K-mean centroids for IVF clustering.

  • num_bits

    The number of bits for PQ (Product Quantization). Default is 8. Only 4, 8 are supported.

Optional parameters for IVF_HNSW_*:
max_level

Int, the maximum number of levels in the graph.

m

Int, the number of edges per node in the graph.

ef_construction

Int, the number of nodes to examine during the construction.

Examples

import lance

dataset = lance.dataset("/tmp/sift.lance")
dataset.create_index(
    "vector",
    "IVF_PQ",
    num_partitions=256,
    num_sub_vectors=16
)
import lance

dataset = lance.dataset("/tmp/sift.lance")
dataset.create_index(
    "vector",
    "IVF_HNSW_SQ",
    num_partitions=256,
)

Experimental Accelerator (GPU) support:

  • accelerate: use GPU to train IVF partitions.

    Only supports CUDA (Nvidia) or MPS (Apple) currently. Requires PyTorch being installed.

import lance

dataset = lance.dataset("/tmp/sift.lance")
dataset.create_index(
    "vector",
    "IVF_PQ",
    num_partitions=256,
    num_sub_vectors=16,
    accelerator="cuda"
)

References

create_scalar_index(column: str, index_type: Literal['BTREE'] | Literal['BITMAP'] | Literal['LABEL_LIST'] | Literal['INVERTED'] | Literal['FTS'], name: str | None = None, *, replace: bool = True, **kwargs)

Create a scalar index on a column.

Scalar indices, like vector indices, can be used to speed up scans. A scalar index can speed up scans that contain filter expressions on the indexed column. For example, the following scan will be faster if the column my_col has a scalar index:

import lance

dataset = lance.dataset("/tmp/images.lance")
my_table = dataset.scanner(filter="my_col != 7").to_table()

Vector search with pre-filers can also benefit from scalar indices. For example,

import lance

dataset = lance.dataset("/tmp/images.lance")
my_table = dataset.scanner(
    nearest=dict(
       column="vector",
       q=[1, 2, 3, 4],
       k=10,
    )
    filter="my_col != 7",
    prefilter=True
)

There are 4 types of scalar indices available today.

  • BTREE. The most common type is BTREE. This index is inspired by the btree data structure although only the first few layers of the btree are cached in memory. It will perform well on columns with a large number of unique values and few rows per value.

  • BITMAP. This index stores a bitmap for each unique value in the column. This index is useful for columns with a small number of unique values and many rows per value.

  • LABEL_LIST. A special index that is used to index list columns whose values have small cardinality. For example, a column that contains lists of tags (e.g. ["tag1", "tag2", "tag3"]) can be indexed with a LABEL_LIST index. This index can only speedup queries with array_has_any or array_has_all filters.

  • FTS/INVERTED. It is used to index document columns. This index can conduct full-text searches. For example, a column that contains any word of query string “hello world”. The results will be ranked by BM25.

Note that the LANCE_BYPASS_SPILLING environment variable can be used to bypass spilling to disk. Setting this to true can avoid memory exhaustion issues (see https://github.com/apache/datafusion/issues/10073 for more info).

Experimental API

Parameters:
  • column (str) – The column to be indexed. Must be a boolean, integer, float, or string column.

  • index_type (str) – The type of the index. One of "BTREE", "BITMAP", "LABEL_LIST", “FTS” or "INVERTED".

  • name (str, optional) – The index name. If not provided, it will be generated from the column name.

  • replace (bool, default True) – Replace the existing index if it exists.

  • Parameters (Optional) –

  • -------------------

  • with_position (bool, default True) – This is for the INVERTED index. If True, the index will store the positions of the words in the document, so that you can conduct phrase query. This will significantly increase the index size. It won’t impact the performance of non-phrase queries even if it is set to True.

  • base_tokenizer (str, default "simple") – This is for the INVERTED index. The base tokenizer to use. The value can be: * “simple”: splits tokens on whitespace and punctuation. * “whitespace”: splits tokens on whitespace. * “raw”: no tokenization.

  • language (str, default "English") – This is for the INVERTED index. The language for stemming and stop words. This is only used when stem or remove_stop_words is true

  • max_token_length (Optional[int], default 40) – This is for the INVERTED index. The maximum token length. Any token longer than this will be removed.

  • lower_case (bool, default True) – This is for the INVERTED index. If True, the index will convert all text to lowercase.

  • stem (bool, default False) – This is for the INVERTED index. If True, the index will stem the tokens.

  • remove_stop_words (bool, default False) – This is for the INVERTED index. If True, the index will remove stop words.

  • ascii_folding (bool, default False) – This is for the INVERTED index. If True, the index will convert non-ascii characters to ascii characters if possible. This would remove accents like “é” -> “e”.

Examples

import lance

dataset = lance.dataset("/tmp/images.lance")
dataset.create_index(
    "category",
    "BTREE",
)

Scalar indices can only speed up scans for basic filters using equality, comparison, range (e.g. my_col BETWEEN 0 AND 100), and set membership (e.g. my_col IN (0, 1, 2))

Scalar indices can be used if the filter contains multiple indexed columns and the filter criteria are AND’d or OR’d together (e.g. my_col < 0 AND other_col> 100)

Scalar indices may be used if the filter contains non-indexed columns but, depending on the structure of the filter, they may not be usable. For example, if the column not_indexed does not have a scalar index then the filter my_col = 0 OR not_indexed = 1 will not be able to use any scalar index on my_col.

To determine if a scan is making use of a scalar index you can use explain_plan to look at the query plan that lance has created. Queries that use scalar indices will either have a ScalarIndexQuery relation or a MaterializeIndex operator.

property data_storage_version: str

The version of the data storage format this dataset is using

delete(predicate: str | Expression)

Delete rows from the dataset.

This marks rows as deleted, but does not physically remove them from the files. This keeps the existing indexes still valid.

Parameters:

predicate (str or pa.compute.Expression) – The predicate to use to select rows to delete. May either be a SQL string or a pyarrow Expression.

Examples

>>> import lance
>>> import pyarrow as pa
>>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]})
>>> dataset = lance.write_dataset(table, "example")
>>> dataset.delete("a = 1 or b in ('a', 'b')")
>>> dataset.to_table()
pyarrow.Table
a: int64
b: string
----
a: [[3]]
b: [["c"]]
static drop(base_uri: str | Path, storage_options: Dict[str, str] | None = None) None
drop_columns(columns: List[str])

Drop one or more columns from the dataset

This is a metadata-only operation and does not remove the data from the underlying storage. In order to remove the data, you must subsequently call compact_files to rewrite the data without the removed columns and then call cleanup_old_versions to remove the old files.

Parameters:

columns (list of str) – The names of the columns to drop. These can be nested column references (e.g. “a.b.c”) or top-level column names (e.g. “a”).

Examples

>>> import lance
>>> import pyarrow as pa
>>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]})
>>> dataset = lance.write_dataset(table, "example")
>>> dataset.drop_columns(["a"])
>>> dataset.to_table().to_pandas()
   b
0  a
1  b
2  c
get_fragment(fragment_id: int) LanceFragment | None

Get the fragment with fragment id.

get_fragments(filter: Expression | None = None) List[LanceFragment]

Get all fragments from the dataset.

Note: filter is not supported yet.

property has_index
head(num_rows, **kwargs)

Load the first N rows of the dataset.

Parameters:
  • num_rows (int) – The number of rows to load.

  • **kwargs (dict, optional) – See scanner() method for full parameter description.

Returns:

table

Return type:

Table

index_statistics(index_name: str) Dict[str, Any]
insert(data: ReaderLike, *, mode='append', **kwargs)

Insert data into the dataset.

Parameters:
  • data_obj (Reader-like) – The data to be written. Acceptable types are: - Pandas DataFrame, Pyarrow Table, Dataset, Scanner, or RecordBatchReader - Huggingface dataset

  • mode (str, default 'append') –

    The mode to use when writing the data. Options are:

    create - create a new dataset (raises if uri already exists). overwrite - create a new snapshot version append - create a new version that is the concat of the input the latest version (raises if uri does not exist)

  • **kwargs (dict, optional) – Additional keyword arguments to pass to write_dataset().

join(right_dataset, keys, right_keys=None, join_type='left outer', left_suffix=None, right_suffix=None, coalesce_keys=True, use_threads=True)

Not implemented (just override pyarrow dataset to prevent segfault)

property lance_schema: LanceSchema

The LanceSchema for this dataset

property latest_version: int

Returns the latest version of the dataset.

list_indices() List[Dict[str, Any]]
merge(data_obj: ReaderLike, left_on: str, right_on: str | None = None, schema=None)

Merge another dataset into this one.

Performs a left join, where the dataset is the left side and data_obj is the right side. Rows existing in the dataset but not on the left will be filled with null values, unless Lance doesn’t support null values for some types, in which case an error will be raised.

Parameters:
  • data_obj (Reader-like) – The data to be merged. Acceptable types are: - Pandas DataFrame, Pyarrow Table, Dataset, Scanner, Iterator[RecordBatch], or RecordBatchReader

  • left_on (str) – The name of the column in the dataset to join on.

  • right_on (str or None) – The name of the column in data_obj to join on. If None, defaults to left_on.

Examples

>>> import lance
>>> import pyarrow as pa
>>> df = pa.table({'x': [1, 2, 3], 'y': ['a', 'b', 'c']})
>>> dataset = lance.write_dataset(df, "dataset")
>>> dataset.to_table().to_pandas()
   x  y
0  1  a
1  2  b
2  3  c
>>> new_df = pa.table({'x': [1, 2, 3], 'z': ['d', 'e', 'f']})
>>> dataset.merge(new_df, 'x')
>>> dataset.to_table().to_pandas()
   x  y  z
0  1  a  d
1  2  b  e
2  3  c  f

See also

LanceDataset.add_columns

Add new columns by computing batch-by-batch.

merge_insert(on: str | Iterable[str])

Returns a builder that can be used to create a “merge insert” operation

This operation can add rows, update rows, and remove rows in a single transaction. It is a very generic tool that can be used to create behaviors like “insert if not exists”, “update or insert (i.e. upsert)”, or even replace a portion of existing data with new data (e.g. replace all data where month=”january”)

The merge insert operation works by combining new data from a source table with existing data in a target table by using a join. There are three categories of records.

“Matched” records are records that exist in both the source table and the target table. “Not matched” records exist only in the source table (e.g. these are new data). “Not matched by source” records exist only in the target table (this is old data).

The builder returned by this method can be used to customize what should happen for each category of data.

Please note that the data will be reordered as part of this operation. This is because updated rows will be deleted from the dataset and then reinserted at the end with the new values. The order of the newly inserted rows may fluctuate randomly because a hash-join operation is used internally.

Parameters:

on (Union[str, Iterable[str]]) – A column (or columns) to join on. This is how records from the source table and target table are matched. Typically this is some kind of key or id column.

Examples

Use when_matched_update_all() and when_not_matched_insert_all() to perform an “upsert” operation. This will update rows that already exist in the dataset and insert rows that do not exist.

>>> import lance
>>> import pyarrow as pa
>>> table = pa.table({"a": [2, 1, 3], "b": ["a", "b", "c"]})
>>> dataset = lance.write_dataset(table, "example")
>>> new_table = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]})
>>> # Perform a "upsert" operation
>>> dataset.merge_insert("a")     \
...             .when_matched_update_all()     \
...             .when_not_matched_insert_all() \
...             .execute(new_table)
{'num_inserted_rows': 1, 'num_updated_rows': 2, 'num_deleted_rows': 0}
>>> dataset.to_table().sort_by("a").to_pandas()
   a  b
0  1  b
1  2  x
2  3  y
3  4  z

Use when_not_matched_insert_all() to perform an “insert if not exists” operation. This will only insert rows that do not already exist in the dataset.

>>> import lance
>>> import pyarrow as pa
>>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]})
>>> dataset = lance.write_dataset(table, "example2")
>>> new_table = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]})
>>> # Perform an "insert if not exists" operation
>>> dataset.merge_insert("a")     \
...             .when_not_matched_insert_all() \
...             .execute(new_table)
{'num_inserted_rows': 1, 'num_updated_rows': 0, 'num_deleted_rows': 0}
>>> dataset.to_table().sort_by("a").to_pandas()
   a  b
0  1  a
1  2  b
2  3  c
3  4  z

You are not required to provide all the columns. If you only want to update a subset of columns, you can omit columns you don’t want to update. Omitted columns will keep their existing values if they are updated, or will be null if they are inserted.

>>> import lance
>>> import pyarrow as pa
>>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"], \
...                   "c": ["x", "y", "z"]})
>>> dataset = lance.write_dataset(table, "example3")
>>> new_table = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]})
>>> # Perform an "upsert" operation, only updating column "a"
>>> dataset.merge_insert("a")     \
...             .when_matched_update_all()     \
...             .when_not_matched_insert_all() \
...             .execute(new_table)
{'num_inserted_rows': 1, 'num_updated_rows': 2, 'num_deleted_rows': 0}
>>> dataset.to_table().sort_by("a").to_pandas()
   a  b     c
0  1  a     x
1  2  x     y
2  3  y     z
3  4  z  None
migrate_manifest_paths_v2()

Migrate the manifest paths to the new format.

This will update the manifest to use the new v2 format for paths.

This function is idempotent, and can be run multiple times without changing the state of the object store.

DANGER: this should not be run while other concurrent operations are happening. And it should also run until completion before resuming other operations.

property optimize: DatasetOptimizer
property partition_expression

Not implemented (just override pyarrow dataset to prevent segfault)

replace_field_metadata(field_name: str, new_metadata: Dict[str, str])

Replace the metadata of a field in the schema

Parameters:
  • field_name (str) – The name of the field to replace the metadata for

  • new_metadata (dict) – The new metadata to set

replace_schema(schema: Schema)

Not implemented (just override pyarrow dataset to prevent segfault)

See :py:method:`replace_schema_metadata` or :py:method:`replace_field_metadata`

replace_schema_metadata(new_metadata: Dict[str, str])

Replace the schema metadata of the dataset

Parameters:

new_metadata (dict) – The new metadata to set

restore()

Restore the currently checked out version as the latest version of the dataset.

This creates a new commit.

sample(num_rows: int, columns: List[str] | Dict[str, str] | None = None, randomize_order: bool = True, **kwargs) Table

Select a random sample of data

Parameters:
  • num_rows (int) – number of rows to retrieve

  • columns (list of str, or dict of str to str default None) – List of column names to be fetched. Or a dictionary of column names to SQL expressions. All columns are fetched if None or unspecified.

  • **kwargs (dict, optional) – see scanner() method for full parameter description.

Returns:

table

Return type:

Table

scanner(columns: List[str] | Dict[str, str] | None = None, filter: Expression | str | None = None, limit: int | None = None, offset: int | None = None, nearest: dict | None = None, batch_size: int | None = None, batch_readahead: int | None = None, fragment_readahead: int | None = None, scan_in_order: bool = None, fragments: Iterable[LanceFragment] | None = None, full_text_query: str | dict | None = None, *, prefilter: bool = None, with_row_id: bool = None, with_row_address: bool = None, use_stats: bool = None, fast_search: bool = None, io_buffer_size: int | None = None, late_materialization: bool | List[str] | None = None, use_scalar_index: bool | None = None) LanceScanner

Return a Scanner that can support various pushdowns.

Parameters:
  • columns (list of str, or dict of str to str default None) – List of column names to be fetched. Or a dictionary of column names to SQL expressions. All columns are fetched if None or unspecified.

  • filter (pa.compute.Expression or str) –

    Expression or str that is a valid SQL where clause. See Lance filter pushdown for valid SQL expressions.

  • limit (int, default None) – Fetch up to this many rows. All rows if None or unspecified.

  • offset (int, default None) – Fetch starting with this row. 0 if None or unspecified.

  • nearest (dict, default None) –

    Get the rows corresponding to the K most similar vectors. Example:

    {
        "column": <embedding col name>,
        "q": <query vector as pa.Float32Array>,
        "k": 10,
        "nprobes": 1,
        "refine_factor": 1
    }
    

  • batch_size (int, default None) – The target size of batches returned. In some cases batches can be up to twice this size (but never larger than this). In some cases batches can be smaller than this size.

  • io_buffer_size (int, default None) – The size of the IO buffer. See ScannerBuilder.io_buffer_size for more information.

  • batch_readahead (int, optional) – The number of batches to read ahead.

  • fragment_readahead (int, optional) – The number of fragments to read ahead.

  • scan_in_order (bool, default True) – Whether to read the fragments and batches in order. If false, throughput may be higher, but batches will be returned out of order and memory use might increase.

  • fragments (iterable of LanceFragment, default None) – If specified, only scan these fragments. If scan_in_order is True, then the fragments will be scanned in the order given.

  • prefilter (bool, default False) –

    If True then the filter will be applied before the vector query is run. This will generate more correct results but it may be a more costly query. It’s generally good when the filter is highly selective.

    If False then the filter will be applied after the vector query is run. This will perform well but the results may have fewer than the requested number of rows (or be empty) if the rows closest to the query do not match the filter. It’s generally good when the filter is not very selective.

  • use_scalar_index (bool, default True) – Lance will automatically use scalar indices to optimize a query. In some corner cases this can make query performance worse and this parameter can be used to disable scalar indices in these cases.

  • late_materialization (bool or List[str], default None) –

    Allows custom control over late materialization. Late materialization fetches non-query columns using a take operation after the filter. This is useful when there are few results or columns are very large.

    Early materialization can be better when there are many results or the columns are very narrow.

    If True, then all columns are late materialized. If False, then all columns are early materialized. If a list of strings, then only the columns in the list are late materialized.

    The default uses a heuristic that assumes filters will select about 0.1% of the rows. If your filter is more selective (e.g. find by id) you may want to set this to True. If your filter is not very selective (e.g. matches 20% of the rows) you may want to set this to False.

  • full_text_query (str or dict, optional) –

    query string to search for, the results will be ranked by BM25. e.g. “hello world”, would match documents containing “hello” or “world”. or a dictionary with the following keys:

    • columns: list[str]

      The columns to search, currently only supports a single column in the columns list.

    • query: str

      The query string to search for.

  • fast_search (bool, default False) – If True, then the search will only be performed on the indexed data, which yields faster search time.

Notes

For now, if BOTH filter and nearest is specified, then:

  1. nearest is executed first.

  2. The results are filtered afterwards.

For debugging ANN results, you can choose to not use the index even if present by specifying use_index=False. For example, the following will always return exact KNN results:

dataset.to_table(nearest={
    "column": "vector",
    "k": 10,
    "q": <query vector>,
    "use_index": False
}
property schema: Schema

The pyarrow Schema for this dataset

session() _Session

Return the dataset session, which holds the dataset’s state.

property stats: LanceStats

Experimental API

property tags: Tags
take(indices: List[int] | Array, columns: List[str] | Dict[str, str] | None = None, **kwargs) Table

Select rows of data by index.

Parameters:
  • indices (Array or array-like) – indices of rows to select in the dataset.

  • columns (list of str, or dict of str to str default None) – List of column names to be fetched. Or a dictionary of column names to SQL expressions. All columns are fetched if None or unspecified.

  • **kwargs (dict, optional) – See :py:method::scanner method for full parameter description.

Returns:

table

Return type:

pyarrow.Table

take_blobs(row_ids: List[int] | Array, blob_column: str) List[BlobFile]

Select blobs by row IDs.

Instead of loading large binary blob data into memory before processing it, this API allows you to open binary blob data as a regular Python file-like object. For more details, see lance.BlobFile.

Parameters:
  • row_ids (List Array or array-like) – row IDs to select in the dataset.

  • blob_column (str) – The name of the blob column to select.

Returns:

blob_files

Return type:

List[BlobFile]

to_batches(columns: List[str] | Dict[str, str] | None = None, filter: Expression | str | None = None, limit: int | None = None, offset: int | None = None, nearest: dict | None = None, batch_size: int | None = None, batch_readahead: int | None = None, fragment_readahead: int | None = None, scan_in_order: bool = True, *, prefilter: bool = False, with_row_id: bool = False, with_row_address: bool = False, use_stats: bool = True, full_text_query: str | dict | None = None, io_buffer_size: int | None = None, late_materialization: bool | List[str] | None = None, use_scalar_index: bool | None = None, **kwargs) Iterator[RecordBatch]

Read the dataset as materialized record batches.

Parameters:

**kwargs (dict, optional) – Arguments for Scanner.from_dataset.

Returns:

record_batches

Return type:

Iterator of RecordBatch

to_table(columns: List[str] | Dict[str, str] | None = None, filter: Expression | str | None = None, limit: int | None = None, offset: int | None = None, nearest: dict | None = None, batch_size: int | None = None, batch_readahead: int | None = None, fragment_readahead: int | None = None, scan_in_order: bool = True, *, prefilter: bool = False, with_row_id: bool = False, with_row_address: bool = False, use_stats: bool = True, fast_search: bool = False, full_text_query: str | dict | None = None, io_buffer_size: int | None = None, late_materialization: bool | List[str] | None = None, use_scalar_index: bool | None = None) Table

Read the data into memory as a pyarrow.Table

Parameters:
  • columns (list of str, or dict of str to str default None) – List of column names to be fetched. Or a dictionary of column names to SQL expressions. All columns are fetched if None or unspecified.

  • filter (pa.compute.Expression or str) –

    Expression or str that is a valid SQL where clause. See Lance filter pushdown for valid SQL expressions.

  • limit (int, default None) – Fetch up to this many rows. All rows if None or unspecified.

  • offset (int, default None) – Fetch starting with this row. 0 if None or unspecified.

  • nearest (dict, default None) –

    Get the rows corresponding to the K most similar vectors. Example:

    {
        "column": <embedding col name>,
        "q": <query vector as pa.Float32Array>,
        "k": 10,
        "metric": "cosine",
        "nprobes": 1,
        "refine_factor": 1
    }
    

  • batch_size (int, optional) – The number of rows to read at a time.

  • io_buffer_size (int, default None) – The size of the IO buffer. See ScannerBuilder.io_buffer_size for more information.

  • batch_readahead (int, optional) – The number of batches to read ahead.

  • fragment_readahead (int, optional) – The number of fragments to read ahead.

  • scan_in_order (bool, default True) – Whether to read the fragments and batches in order. If false, throughput may be higher, but batches will be returned out of order and memory use might increase.

  • prefilter (bool, default False) – Run filter before the vector search.

  • late_materialization (bool or List[str], default None) – Allows custom control over late materialization. See ScannerBuilder.late_materialization for more information.

  • use_scalar_index (bool, default True) – Allows custom control over scalar index usage. See ScannerBuilder.use_scalar_index for more information.

  • with_row_id (bool, default False) – Return row ID.

  • with_row_address (bool, default False) – Return row address

  • use_stats (bool, default True) – Use stats pushdown during filters.

  • full_text_query (str or dict, optional) –

    query string to search for, the results will be ranked by BM25. e.g. “hello world”, would match documents contains “hello” or “world”. or a dictionary with the following keys:

    • columns: list[str]

      The columns to search, currently only supports a single column in the columns list.

    • query: str

      The query string to search for.

Notes

If BOTH filter and nearest is specified, then:

  1. nearest is executed first.

  2. The results are filtered afterward, unless pre-filter sets to True.

update(updates: Dict[str, str], where: str | None = None) Dict[str, Any]

Update column values for rows matching where.

Parameters:
  • updates (dict of str to str) – A mapping of column names to a SQL expression.

  • where (str, optional) – A SQL predicate indicating which rows should be updated.

Returns:

updates – A dictionary containing the number of rows updated.

Return type:

dict

Examples

>>> import lance
>>> import pyarrow as pa
>>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]})
>>> dataset = lance.write_dataset(table, "example")
>>> update_stats = dataset.update(dict(a = 'a + 2'), where="b != 'a'")
>>> update_stats["num_updated_rows"] = 2
>>> dataset.to_table().to_pandas()
   a  b
0  1  a
1  4  b
2  5  c
property uri: str

The location of the data

validate()

Validate the dataset.

This checks the integrity of the dataset and will raise an exception if the dataset is corrupted.

property version: int

Returns the currently checked out version of the dataset

versions()

Return all versions in this dataset.

class lance.LanceFragment(dataset: LanceDataset, fragment_id: int | None, *, fragment: _Fragment | None = None)

Bases: Fragment

count_rows(self, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, MemoryPool memory_pool=None)

Count rows matching the scanner filter.

Parameters:
  • filter (Expression, default None) – Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.

  • batch_size (int, default 131_072) – The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.

  • batch_readahead (int, default 16) – The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.

  • fragment_readahead (int, default 4) – The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.

  • fragment_scan_options (FragmentScanOptions, default None) – Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.

  • use_threads (bool, default True) – If enabled, then maximum parallelism will be used determined by the number of available CPU cores.

  • memory_pool (MemoryPool, default None) – For memory allocations, if required. If not specified, uses the default pool.

Returns:

count

Return type:

int

static create(dataset_uri: str | Path, data: Table | RecordBatchReader, fragment_id: int | None = None, schema: Schema | None = None, max_rows_per_group: int = 1024, progress: FragmentWriteProgress | None = None, mode: str = 'append', *, data_storage_version: str | None = None, use_legacy_format: bool | None = None, storage_options: Dict[str, str] | None = None) FragmentMetadata

Create a FragmentMetadata from the given data.

This can be used if the dataset is not yet created.

Warning

Internal API. This method is not intended to be used by end users.

Parameters:
  • dataset_uri (str) – The URI of the dataset.

  • fragment_id (int) – The ID of the fragment.

  • data (pa.Table or pa.RecordBatchReader) – The data to be written to the fragment.

  • schema (pa.Schema, optional) – The schema of the data. If not specified, the schema will be inferred from the data.

  • max_rows_per_group (int, default 1024) – The maximum number of rows per group in the data file.

  • progress (FragmentWriteProgress, optional) – Experimental API. Progress tracking for writing the fragment. Pass a custom class that defines hooks to be called when each fragment is starting to write and finishing writing.

  • mode (str, default "append") – The write mode. If “append” is specified, the data will be checked against the existing dataset’s schema. Otherwise, pass “create” or “overwrite” to assign new field ids to the schema.

  • data_storage_version (optional, str, default None) – The version of the data storage format to use. Newer versions are more efficient but require newer versions of lance to read. The default (None) will use the latest stable version. See the user guide for more details.

  • use_legacy_format (bool, default None) – Deprecated parameter. Use data_storage_version instead.

  • storage_options (optional, dict) – Extra options that make sense for a particular storage connection. This is used to store connection parameters like credentials, endpoint, etc.

See also

lance.dataset.LanceOperation.Overwrite

The operation used to create a new dataset or overwrite one using fragments created with this API. See the doc page for an example of using this API.

lance.dataset.LanceOperation.Append

The operation used to append fragments created with this API to an existing dataset. See the doc page for an example of using this API.

Return type:

FragmentMetadata

static create_from_file(filename: str | Path, dataset: LanceDataset, fragment_id: int) FragmentMetadata

Create a fragment from the given datafile uri.

This can be used if the datafile is loss from dataset.

Warning

Internal API. This method is not intended to be used by end users.

Parameters:
  • filename (str) – The filename of the datafile.

  • dataset (LanceDataset) – The dataset that the fragment belongs to.

  • fragment_id (int) – The ID of the fragment.

data_files()

Return the data files of this fragment.

delete(predicate: str) FragmentMetadata | None

Delete rows from this Fragment.

This will add or update the deletion file of this fragment. It does not modify or delete the data files of this fragment. If no rows are left after the deletion, this method will return None.

Warning

Internal API. This method is not intended to be used by end users.

Parameters:

predicate (str) – A SQL predicate that specifies the rows to delete.

Returns:

A new fragment containing the new deletion file, or None if no rows left.

Return type:

FragmentMetadata or None

Examples

>>> import lance
>>> import pyarrow as pa
>>> tab = pa.table({"a": [1, 2, 3], "b": [4, 5, 6]})
>>> dataset = lance.write_dataset(tab, "dataset")
>>> frag = dataset.get_fragment(0)
>>> frag.delete("a > 1")
FragmentMetadata(id=0, files=[DataFile(path='...', fields=[0, 1], ...), ...)
>>> frag.delete("a > 0") is None
True

See also

lance.dataset.LanceOperation.Delete

The operation used to commit these changes to a dataset. See the doc page for an example of using this API.

deletion_file()

Return the deletion file, if any

property fragment_id
head(self, int num_rows, columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, MemoryPool memory_pool=None)

Load the first N rows of the fragment.

Parameters:
  • num_rows (int) – The number of rows to load.

  • columns (list of str, default None) –

    The columns to project. This can be a list of column names to include (order and duplicates will be preserved), or a dictionary with {new_column_name: expression} values for more advanced projections.

    The list of columns or expressions may use the special fields __batch_index (the index of the batch within the fragment), __fragment_index (the index of the fragment within the dataset), __last_in_fragment (whether the batch is last in fragment), and __filename (the name of the source file or a description of the source fragment).

    The columns will be passed down to Datasets and corresponding data fragments to avoid loading, copying, and deserializing columns that will not be required further down the compute chain. By default all of the available columns are projected. Raises an exception if any of the referenced column names does not exist in the dataset’s Schema.

  • filter (Expression, default None) – Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.

  • batch_size (int, default 131_072) – The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.

  • batch_readahead (int, default 16) – The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.

  • fragment_readahead (int, default 4) – The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.

  • fragment_scan_options (FragmentScanOptions, default None) – Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.

  • use_threads (bool, default True) – If enabled, then maximum parallelism will be used determined by the number of available CPU cores.

  • memory_pool (MemoryPool, default None) – For memory allocations, if required. If not specified, uses the default pool.

Return type:

Table

merge_columns(value_func: Dict[str, str] | BatchUDF | ReaderLike | Callable[[pa.RecordBatch], pa.RecordBatch], columns: list[str] | None = None, batch_size: int | None = None, reader_schema: pa.Schema | None = None) Tuple[FragmentMetadata, LanceSchema]

Add columns to this Fragment.

Warning

Internal API. This method is not intended to be used by end users.

The parameters and their interpretation are the same as in the lance.dataset.LanceDataset.add_columns() operation.

The only difference is that, instead of modifying the dataset, a new fragment is created. The new schema of the fragment is returned as well. These can be used in a later operation to commit the changes to the dataset.

See also

lance.dataset.LanceOperation.Merge

The operation used to commit these changes to the dataset. See the doc page for an example of using this API.

Returns:

A new fragment with the added column(s) and the final schema.

Return type:

Tuple[FragmentMetadata, LanceSchema]

property metadata: FragmentMetadata

Return the metadata of this fragment.

Return type:

FragmentMetadata

property num_deletions: int

Return the number of deleted rows in this fragment.

property partition_expression: Schema

An Expression which evaluates to true for all data viewed by this Fragment.

property physical_rows: int

Return the number of rows originally in this fragment.

To get the number of rows after deletions, use count_rows() instead.

property physical_schema: Schema

Return the physical schema of this Fragment. This schema can be different from the dataset read schema.

scanner(*, columns: List[str] | Dict[str, str] | None = None, batch_size: int | None = None, filter: str | pa.compute.Expression | None = None, limit: int | None = None, offset: int | None = None, with_row_id: bool = False, batch_readahead: int = 16) LanceScanner

See Dataset::scanner for details

property schema: Schema

Return the schema of this fragment.

take(self, indices, columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, MemoryPool memory_pool=None)

Select rows of data by index.

Parameters:
  • indices (Array or array-like) – The indices of row to select in the dataset.

  • columns (list of str, default None) –

    The columns to project. This can be a list of column names to include (order and duplicates will be preserved), or a dictionary with {new_column_name: expression} values for more advanced projections.

    The list of columns or expressions may use the special fields __batch_index (the index of the batch within the fragment), __fragment_index (the index of the fragment within the dataset), __last_in_fragment (whether the batch is last in fragment), and __filename (the name of the source file or a description of the source fragment).

    The columns will be passed down to Datasets and corresponding data fragments to avoid loading, copying, and deserializing columns that will not be required further down the compute chain. By default all of the available columns are projected. Raises an exception if any of the referenced column names does not exist in the dataset’s Schema.

  • filter (Expression, default None) – Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.

  • batch_size (int, default 131_072) – The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.

  • batch_readahead (int, default 16) – The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.

  • fragment_readahead (int, default 4) – The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.

  • fragment_scan_options (FragmentScanOptions, default None) – Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.

  • use_threads (bool, default True) – If enabled, then maximum parallelism will be used determined by the number of available CPU cores.

  • memory_pool (MemoryPool, default None) – For memory allocations, if required. If not specified, uses the default pool.

Return type:

Table

to_batches(self, Schema schema=None, columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, MemoryPool memory_pool=None)

Read the fragment as materialized record batches.

Parameters:
  • schema (Schema, optional) – Concrete schema to use for scanning.

  • columns (list of str, default None) –

    The columns to project. This can be a list of column names to include (order and duplicates will be preserved), or a dictionary with {new_column_name: expression} values for more advanced projections.

    The list of columns or expressions may use the special fields __batch_index (the index of the batch within the fragment), __fragment_index (the index of the fragment within the dataset), __last_in_fragment (whether the batch is last in fragment), and __filename (the name of the source file or a description of the source fragment).

    The columns will be passed down to Datasets and corresponding data fragments to avoid loading, copying, and deserializing columns that will not be required further down the compute chain. By default all of the available columns are projected. Raises an exception if any of the referenced column names does not exist in the dataset’s Schema.

  • filter (Expression, default None) – Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.

  • batch_size (int, default 131_072) – The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.

  • batch_readahead (int, default 16) – The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.

  • fragment_readahead (int, default 4) – The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.

  • fragment_scan_options (FragmentScanOptions, default None) – Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.

  • use_threads (bool, default True) – If enabled, then maximum parallelism will be used determined by the number of available CPU cores.

  • memory_pool (MemoryPool, default None) – For memory allocations, if required. If not specified, uses the default pool.

Returns:

record_batches

Return type:

iterator of RecordBatch

to_table(self, Schema schema=None, columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, MemoryPool memory_pool=None)

Convert this Fragment into a Table.

Use this convenience utility with care. This will serially materialize the Scan result in memory before creating the Table.

Parameters:
  • schema (Schema, optional) – Concrete schema to use for scanning.

  • columns (list of str, default None) –

    The columns to project. This can be a list of column names to include (order and duplicates will be preserved), or a dictionary with {new_column_name: expression} values for more advanced projections.

    The list of columns or expressions may use the special fields __batch_index (the index of the batch within the fragment), __fragment_index (the index of the fragment within the dataset), __last_in_fragment (whether the batch is last in fragment), and __filename (the name of the source file or a description of the source fragment).

    The columns will be passed down to Datasets and corresponding data fragments to avoid loading, copying, and deserializing columns that will not be required further down the compute chain. By default all of the available columns are projected. Raises an exception if any of the referenced column names does not exist in the dataset’s Schema.

  • filter (Expression, default None) – Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.

  • batch_size (int, default 131_072) – The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.

  • batch_readahead (int, default 16) – The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.

  • fragment_readahead (int, default 4) – The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.

  • fragment_scan_options (FragmentScanOptions, default None) – Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.

  • use_threads (bool, default True) – If enabled, then maximum parallelism will be used determined by the number of available CPU cores.

  • memory_pool (MemoryPool, default None) – For memory allocations, if required. If not specified, uses the default pool.

Returns:

table

Return type:

Table

class lance.LanceOperation

Bases: object

class Append(fragments: Iterable[FragmentMetadata])

Bases: BaseOperation

Append new rows to the dataset.

fragments

The fragments that contain the new rows.

Type:

list[FragmentMetadata]

Warning

This is an advanced API for distributed operations. To append to a dataset on a single machine, use lance.write_dataset().

Examples

To append new rows to a dataset, first use lance.fragment.LanceFragment.create() to create fragments. Then collect the fragment metadata into a list and pass it to this class. Finally, pass the operation to the LanceDataset.commit() method to create the new dataset.

>>> import lance
>>> import pyarrow as pa
>>> tab1 = pa.table({"a": [1, 2], "b": ["a", "b"]})
>>> dataset = lance.write_dataset(tab1, "example")
>>> tab2 = pa.table({"a": [3, 4], "b": ["c", "d"]})
>>> fragment = lance.fragment.LanceFragment.create("example", tab2)
>>> operation = lance.LanceOperation.Append([fragment])
>>> dataset = lance.LanceDataset.commit("example", operation,
...                                     read_version=dataset.version)
>>> dataset.to_table().to_pandas()
   a  b
0  1  a
1  2  b
2  3  c
3  4  d
fragments: Iterable[FragmentMetadata]
class BaseOperation

Bases: ABC

Base class for operations that can be applied to a dataset.

See available operations under LanceOperation.

class CreateIndex(uuid: str, name: str, fields: List[int], dataset_version: int, fragment_ids: Set[int])

Bases: BaseOperation

Operation that creates an index on the dataset.

dataset_version: int
fields: List[int]
fragment_ids: Set[int]
name: str
uuid: str
class Delete(updated_fragments: Iterable[FragmentMetadata], deleted_fragment_ids: Iterable[int], predicate: str)

Bases: BaseOperation

Remove fragments or rows from the dataset.

updated_fragments

The fragments that have been updated with new deletion vectors.

Type:

list[FragmentMetadata]

deleted_fragment_ids

The ids of the fragments that have been deleted entirely. These are the fragments where LanceFragment.delete() returned None.

Type:

list[int]

predicate

The original SQL predicate used to select the rows to delete.

Type:

str

Warning

This is an advanced API for distributed operations. To delete rows from dataset on a single machine, use lance.LanceDataset.delete().

Examples

To delete rows from a dataset, call lance.fragment.LanceFragment.delete() on each of the fragments. If that returns a new fragment, add that to the updated_fragments list. If it returns None, that means the whole fragment was deleted, so add the fragment id to the deleted_fragment_ids. Finally, pass the operation to the LanceDataset.commit() method to complete the deletion operation.

>>> import lance
>>> import pyarrow as pa
>>> table = pa.table({"a": [1, 2], "b": ["a", "b"]})
>>> dataset = lance.write_dataset(table, "example")
>>> table = pa.table({"a": [3, 4], "b": ["c", "d"]})
>>> dataset = lance.write_dataset(table, "example", mode="append")
>>> dataset.to_table().to_pandas()
   a  b
0  1  a
1  2  b
2  3  c
3  4  d
>>> predicate = "a >= 2"
>>> updated_fragments = []
>>> deleted_fragment_ids = []
>>> for fragment in dataset.get_fragments():
...     new_fragment = fragment.delete(predicate)
...     if new_fragment is not None:
...         updated_fragments.append(new_fragment)
...     else:
...         deleted_fragment_ids.append(fragment.fragment_id)
>>> operation = lance.LanceOperation.Delete(updated_fragments,
...                                         deleted_fragment_ids,
...                                         predicate)
>>> dataset = lance.LanceDataset.commit("example", operation,
...                                     read_version=dataset.version)
>>> dataset.to_table().to_pandas()
   a  b
0  1  a
deleted_fragment_ids: Iterable[int]
predicate: str
updated_fragments: Iterable[FragmentMetadata]
class Merge(fragments: Iterable[FragmentMetadata], schema: LanceSchema | Schema)

Bases: BaseOperation

Operation that adds columns. Unlike Overwrite, this should not change the structure of the fragments, allowing existing indices to be kept.

fragments

The fragments that make up the new dataset.

Type:

iterable of FragmentMetadata

schema

The schema of the new dataset. Passing a LanceSchema is preferred, and passing a pyarrow.Schema is deprecated.

Type:

LanceSchema or pyarrow.Schema

Warning

This is an advanced API for distributed operations. To overwrite or create new dataset on a single machine, use lance.write_dataset().

Examples

To add new columns to a dataset, first define a method that will create the new columns based on the existing columns. Then use lance.fragment.LanceFragment.add_columns()

>>> import lance
>>> import pyarrow as pa
>>> import pyarrow.compute as pc
>>> table = pa.table({"a": [1, 2, 3, 4], "b": ["a", "b", "c", "d"]})
>>> dataset = lance.write_dataset(table, "example")
>>> dataset.to_table().to_pandas()
   a  b
0  1  a
1  2  b
2  3  c
3  4  d
>>> def double_a(batch: pa.RecordBatch) -> pa.RecordBatch:
...     doubled = pc.multiply(batch["a"], 2)
...     return pa.record_batch([doubled], ["a_doubled"])
>>> fragments = []
>>> for fragment in dataset.get_fragments():
...     new_fragment, new_schema = fragment.merge_columns(double_a,
...                                                       columns=['a'])
...     fragments.append(new_fragment)
>>> operation = lance.LanceOperation.Merge(fragments, new_schema)
>>> dataset = lance.LanceDataset.commit("example", operation,
...                                     read_version=dataset.version)
>>> dataset.to_table().to_pandas()
   a  b  a_doubled
0  1  a          2
1  2  b          4
2  3  c          6
3  4  d          8
fragments: Iterable[FragmentMetadata]
schema: LanceSchema | Schema
class Overwrite(new_schema: LanceSchema | Schema, fragments: Iterable[FragmentMetadata])

Bases: BaseOperation

Overwrite or create a new dataset.

new_schema

The schema of the new dataset.

Type:

pyarrow.Schema

fragments

The fragments that make up the new dataset.

Type:

list[FragmentMetadata]

Warning

This is an advanced API for distributed operations. To overwrite or create new dataset on a single machine, use lance.write_dataset().

Examples

To create or overwrite a dataset, first use lance.fragment.LanceFragment.create() to create fragments. Then collect the fragment metadata into a list and pass it along with the schema to this class. Finally, pass the operation to the LanceDataset.commit() method to create the new dataset.

>>> import lance
>>> import pyarrow as pa
>>> tab1 = pa.table({"a": [1, 2], "b": ["a", "b"]})
>>> tab2 = pa.table({"a": [3, 4], "b": ["c", "d"]})
>>> fragment1 = lance.fragment.LanceFragment.create("example", tab1)
>>> fragment2 = lance.fragment.LanceFragment.create("example", tab2)
>>> fragments = [fragment1, fragment2]
>>> operation = lance.LanceOperation.Overwrite(tab1.schema, fragments)
>>> dataset = lance.LanceDataset.commit("example", operation)
>>> dataset.to_table().to_pandas()
   a  b
0  1  a
1  2  b
2  3  c
3  4  d
fragments: Iterable[FragmentMetadata]
new_schema: LanceSchema | Schema
class Restore(version: int)

Bases: BaseOperation

Operation that restores a previous version of the dataset.

version: int
class Rewrite(groups: Iterable[RewriteGroup], rewritten_indices: Iterable[RewrittenIndex])

Bases: BaseOperation

Operation that rewrites one or more files and indices into one or more files and indices.

groups

Groups of files that have been rewritten.

Type:

list[RewriteGroup]

rewritten_indices

Indices that have been rewritten.

Type:

list[RewrittenIndex]

Warning

This is an advanced API not intended for general use.

groups: Iterable[RewriteGroup]
rewritten_indices: Iterable[RewrittenIndex]
class RewriteGroup(old_fragments: Iterable[FragmentMetadata], new_fragments: Iterable[FragmentMetadata])

Bases: object

Collection of rewritten files

new_fragments: Iterable[FragmentMetadata]
old_fragments: Iterable[FragmentMetadata]
class RewrittenIndex(old_id: str, new_id: str)

Bases: object

An index that has been rewritten

new_id: str
old_id: str
class lance.LanceScanner(scanner: _Scanner, dataset: LanceDataset)

Bases: Scanner

count_rows()

Count rows matching the scanner filter.

Returns:

count

Return type:

int

property dataset_schema: Schema

The schema with which batches will be read from fragments.

explain_plan(verbose=False) str

Return the execution plan for this scanner.

Parameters:

verbose (bool, default False) – Use a verbose output format.

Returns:

plan

Return type:

str

static from_batches(*args, **kwargs)

Not implemented

static from_dataset(*args, **kwargs)

Not implemented

static from_fragment(*args, **kwargs)

Not implemented

head(num_rows)

Load the first N rows of the dataset.

Parameters:

num_rows (int) – The number of rows to load.

Return type:

Table

property projected_schema: Schema

The materialized schema of the data, accounting for projections.

This is the schema of any data returned from the scanner.

scan_batches()

Consume a Scanner in record batches with corresponding fragments.

Returns:

record_batches

Return type:

iterator of TaggedRecordBatch

take(indices)

Not implemented

to_batches(self)

Consume a Scanner in record batches.

Returns:

record_batches

Return type:

iterator of RecordBatch

to_reader(self)

Consume this scanner as a RecordBatchReader.

Return type:

RecordBatchReader

to_table() Table

Read the data into memory and return a pyarrow Table.

class lance.MergeInsertBuilder(dataset, on)

Bases: _MergeInsertBuilder

execute(data_obj: ReaderLike, *, schema: pa.Schema | None = None)

Executes the merge insert operation

This function updates the original dataset and returns a dictionary with information about merge statistics - i.e. the number of inserted, updated, and deleted rows.

Parameters:
  • data_obj (ReaderLike) – The new data to use as the source table for the operation. This parameter can be any source of data (e.g. table / dataset) that write_dataset() accepts.

  • schema (Optional[pa.Schema]) – The schema of the data. This only needs to be supplied whenever the data source is some kind of generator.

when_matched_update_all(condition: str | None = None) MergeInsertBuilder

Configure the operation to update matched rows

After this method is called, when the merge insert operation executes, any rows that match both the source table and the target table will be updated. The rows from the target table will be removed and the rows from the source table will be added.

An optional condition may be specified. This should be an SQL filter and, if present, then only matched rows that also satisfy this filter will be updated. The SQL filter should use the prefix target. to refer to columns in the target table and the prefix source. to refer to columns in the source table. For example, source.last_update < target.last_update.

If a condition is specified and rows do not satisfy the condition then these rows will not be updated. Failure to satisfy the filter does not cause a “matched” row to become a “not matched” row.

when_not_matched_by_source_delete(expr: str | None = None) MergeInsertBuilder

Configure the operation to delete source rows that do not match

After this method is called, when the merge insert operation executes, any rows that exist only in the target table will be deleted. An optional filter can be specified to limit the scope of the delete operation. If given (as an SQL filter) then only rows which match the filter will be deleted.

when_not_matched_insert_all() MergeInsertBuilder

Configure the operation to insert not matched rows

After this method is called, when the merge insert operation executes, any rows that exist only in the source table will be inserted into the target table.

class lance.Transaction(read_version: 'int', operation: 'LanceOperation.BaseOperation', uuid: 'str' = <factory>, blobs_op: 'Optional[LanceOperation.BaseOperation]' = None)

Bases: object

blobs_op: BaseOperation | None = None
operation: BaseOperation
read_version: int
uuid: str
lance.batch_udf(output_schema=None, checkpoint_file=None)

Create a user defined function (UDF) that adds columns to a dataset.

This function is used to add columns to a dataset. It takes a function that takes a single argument, a RecordBatch, and returns a RecordBatch. The function is called once for each batch in the dataset. The function should not modify the input batch, but instead create a new batch with the new columns added.

Parameters:
  • output_schema (Schema, optional) – The schema of the output RecordBatch. This is used to validate the output of the function. If not provided, the schema of the first output RecordBatch will be used.

  • checkpoint_file (str or Path, optional) – If specified, this file will be used as a cache for unsaved results of this UDF. If the process fails, and you call add_columns again with this same file, it will resume from the last saved state. This is useful for long running processes that may fail and need to be resumed. This file may get very large. It will hold up to an entire data files’ worth of results on disk, which can be multiple gigabytes of data.

Return type:

AddColumnsUDF

lance.dataset(uri: str | Path, version: int | str | None = None, asof: ts_types | None = None, block_size: int | None = None, commit_lock: CommitLock | None = None, index_cache_size: int | None = None, storage_options: Dict[str, str] | None = None, default_scan_options: Dict[str, str] | None = None) LanceDataset

Opens the Lance dataset from the address specified.

Parameters:
  • uri (str) – Address to the Lance dataset. It can be a local file path /tmp/data.lance, or a cloud object store URI, i.e., s3://bucket/data.lance.

  • version (optional, int | str) – If specified, load a specific version of the Lance dataset. Else, loads the latest version. A version number (int) or a tag (str) can be provided.

  • asof (optional, datetime or str) – If specified, find the latest version created on or earlier than the given argument value. If a version is already specified, this arg is ignored.

  • block_size (optional, int) – Block size in bytes. Provide a hint for the size of the minimal I/O request.

  • commit_handler (optional, lance.commit.CommitLock) – If specified, use the provided commit handler to lock the table while committing a new version. Not necessary on object stores other than S3 or when there are no concurrent writers.

  • index_cache_size (optional, int) –

    Index cache size. Index cache is a LRU cache with TTL. This number specifies the number of index pages, for example, IVF partitions, to be cached in the host memory. Default value is 256.

    Roughly, for an IVF_PQ partition with n rows, the size of each index page equals the combination of the pq code (nd.array([n,pq], dtype=uint8)) and the row ids (nd.array([n], dtype=uint64)). Approximately, n = Total Rows / number of IVF partitions. pq = number of PQ sub-vectors.

  • storage_options (optional, dict) – Extra options that make sense for a particular storage connection. This is used to store connection parameters like credentials, endpoint, etc.

  • default_scan_options (optional, dict) –

    Default scan options that are used when scanning the dataset. This accepts the same arguments described in lance.LanceDataset.scanner(). The arguments will be applied to any scan operation.

    This can be useful to supply defaults for common parameters such as batch_size.

    It can also be used to create a view of the dataset that includes meta fields such as _rowid or _rowaddr. If default_scan_options is provided then the schema returned by lance.LanceDataset.schema() will include these fields if the appropriate scan options are set.

lance.json_to_schema(schema_json: Dict[str, Any]) Schema

Converts a JSON string to a PyArrow schema.

Parameters:

schema_json (Dict[str, Any]) – The JSON payload to convert to a PyArrow Schema.

lance.schema_to_json(schema: Schema) Dict[str, Any]

Converts a pyarrow schema to a JSON string.

lance.set_logger(file_path='pylance.log', name='pylance', level=20, format_string=None, log_handler=None)
lance.write_dataset(data_obj: ReaderLike, uri: str | Path | LanceDataset, schema: pa.Schema | None = None, mode: str = 'create', *, max_rows_per_file: int = 1048576, max_rows_per_group: int = 1024, max_bytes_per_file: int = 96636764160, commit_lock: CommitLock | None = None, progress: FragmentWriteProgress | None = None, storage_options: Dict[str, str] | None = None, data_storage_version: str | None = None, use_legacy_format: bool | None = None, enable_v2_manifest_paths: bool = False, enable_move_stable_row_ids: bool = False) LanceDataset

Write a given data_obj to the given uri

Parameters:
  • data_obj (Reader-like) – The data to be written. Acceptable types are: - Pandas DataFrame, Pyarrow Table, Dataset, Scanner, or RecordBatchReader - Huggingface dataset

  • uri (str, Path, or LanceDataset) – Where to write the dataset to (directory). If a LanceDataset is passed, the session will be reused.

  • schema (Schema, optional) – If specified and the input is a pandas DataFrame, use this schema instead of the default pandas to arrow table conversion.

  • mode (str) – create - create a new dataset (raises if uri already exists). overwrite - create a new snapshot version append - create a new version that is the concat of the input the latest version (raises if uri does not exist)

  • max_rows_per_file (int, default 1024 * 1024) – The max number of rows to write before starting a new file

  • max_rows_per_group (int, default 1024) – The max number of rows before starting a new group (in the same file)

  • max_bytes_per_file (int, default 90 * 1024 * 1024 * 1024) – The max number of bytes to write before starting a new file. This is a soft limit. This limit is checked after each group is written, which means larger groups may cause this to be overshot meaningfully. This defaults to 90 GB, since we have a hard limit of 100 GB per file on object stores.

  • commit_lock (CommitLock, optional) – A custom commit lock. Only needed if your object store does not support atomic commits. See the user guide for more details.

  • progress (FragmentWriteProgress, optional) – Experimental API. Progress tracking for writing the fragment. Pass a custom class that defines hooks to be called when each fragment is starting to write and finishing writing.

  • storage_options (optional, dict) – Extra options that make sense for a particular storage connection. This is used to store connection parameters like credentials, endpoint, etc.

  • data_storage_version (optional, str, default None) – The version of the data storage format to use. Newer versions are more efficient but require newer versions of lance to read. The default (None) will use the latest stable version. See the user guide for more details.

  • use_legacy_format (optional, bool, default None) – Deprecated method for setting the data storage version. Use the data_storage_version parameter instead.

  • enable_v2_manifest_paths (bool, optional) – If True, and this is a new dataset, uses the new V2 manifest paths. These paths provide more efficient opening of datasets with many versions on object stores. This parameter has no effect if the dataset already exists. To migrate an existing dataset, instead use the LanceDataset.migrate_manifest_paths_v2() method. Default is False.

  • enable_move_stable_row_ids (bool, optional) – Experimental parameter: if set to true, the writer will use move-stable row ids. These row ids are stable after compaction operations, but not after updates. This makes compaction more efficient, since with stable row ids no secondary indices need to be updated to point to new row ids.