lance package

Subpackages

Submodules

lance.arrow module

Extensions to PyArrows.

class lance.arrow.BFloat16Array

Bases: ExtensionArray

Bfloat16 PyArrow Array.

classmethod from_numpy(array: ndarray)

Create a BFloat16Array from a NumPy array.

Can only convert from a NumPy array of dtype bfloat16 from the ml_dtypes module.

Examples

>>> import numpy as np
>>> from ml_dtypes import bfloat16
>>> from lance.arrow import BFloat16Array
>>> arr = np.array([1.0, 2.0, 3.0], dtype=bfloat16)
>>> print(BFloat16Array.from_numpy(arr))
[
  1,
  2,
  3
]
to_numpy(zero_copy_only=False)

Convert to a NumPy array.

This will do a zero-copy conversion.

The conversion will fail if the array contains null values.

class lance.arrow.BFloat16Type

Bases: ExtensionType

to_pandas_dtype(self)

Return the equivalent NumPy / Pandas dtype.

Examples

>>> import pyarrow as pa
>>> pa.int64().to_pandas_dtype()
<class 'numpy.int64'>
class lance.arrow.EncodedImageType(storage_type: DataType = DataType(binary))

Bases: ExtensionType

class lance.arrow.FixedShapeImageTensorType(arrow_type: DataType, shape)

Bases: ExtensionType

class lance.arrow.ImageArray

Bases: ExtensionArray

classmethod from_array(images)

Create an one of subclasses of ImageArray from input data.

Parameters:

images (Union[pa.StringArray, pa.BinaryArray, pa.FixedShapeTensorArray,) – Iterable]

Return type:

Union[ImageURIArray, EncodedImageArray, FixedShapeImageTensorArray]

class lance.arrow.ImageScalar

Bases: ExtensionScalar

as_py(self)

Return this scalar as a Python object.

class lance.arrow.ImageURIArray

Bases: ImageArray

Array of image URIs. URIs may represent local files or remote files on the web, S3 or GCS if they are accessible by the machine executing this.

classmethod from_uris(uris: StringArray | LargeStringArray | Iterable[str | Path])

Create an ImageURIArray from an array or iterable of URIs (such as a list).

Parameters:

uris (Union[pa.StringArray, pa.LargeStringArray, Iterable[Union[str, Path]]]) –

Returns:

Array of image URIs

Return type:

ImageURIArray

Examples

>>> uris = ["file::///tmp/1.png"]
>>> ImageURIArray.from_uris(uris)
<lance.arrow.ImageURIArray object at 0x...>
['file::///tmp/1.png']
read_uris(storage_type=DataType(binary)) EncodedImageArray

Read the images from the URIs into memory and return an EncodedImageArray

Parameters:

storage_type (pa.DataType, optional) – The storage type to use for the encoded images. Default is pa.binary(). To support arrays with more than 2GiB of data, use pa.large_binary().

Returns:

Array of encoded images

Return type:

EncodedImageArray

Examples

>>> import os
>>> uris = [os.path.join(os.path.dirname(__file__), "../tests/images/1.png")]
>>> uri_array = ImageURIArray.from_uris(uris)
>>> uri_array.read_uris()
<lance.arrow.EncodedImageArray object at 0x...>
...
class lance.arrow.ImageURIType(storage_type: DataType = DataType(string))

Bases: ExtensionType

lance.arrow.bfloat16_array(values)
lance.arrow.cast(arr: Array, target_type: DataType | str, *args, **kwargs) Array

Cast an array to another data type.

Extends pyarrow.compute.cast() for lance defined extension types. In case where the casting can be handled by pyarrow natively, it falls back to pyarrow.

Supported operations:

  • Cast between floating (float16, float32, float64) arrays and bfloat16 arrays.

  • Cast between FixedSizeListArray of floats (float16, float32, float64, bfloat16) with the same list size.

Parameters:
  • arr (pyarrow.Array) – Array to cast.

  • target_type (pyarrow.DataType or str) – Target data type. Accepts anything pyarrow.compute.cast() accepts. Additionally, accepts strings "bfloat16", "bf16" or BFloat16Type.

lance.cleanup module

lance.cleanup.cleanup_partial_writes(objects: List[Tuple[str, str]])

Cleans up partial writes from a list of objects.

These writes can be discovered using the lance.progress.FragmentWriteProgress class.

Parameters:

objects (List[Tuple[str, str]]) – A list of tuples of (fragment_id, multipart_id) to clean up.

lance.commit module

exception lance.commit.CommitConflictError

Bases: Exception

lance.conftest module

lance.dataset module

class lance.dataset.BatchUDF(func, output_schema=None, checkpoint_file=None)

Bases: object

A user-defined function that can be passed to LanceDataset.add_columns().

Use lance.add_columns_udf() decorator to wrap a function with this class.

class lance.dataset.BatchUDFCheckpoint(path)

Bases: object

A cache for BatchUDF results to avoid recomputation.

This is backed by a SQLite database.

class BatchInfo(fragment_id, batch_index)

Bases: NamedTuple

batch_index: int

Alias for field number 1

fragment_id: int

Alias for field number 0

cleanup()
get_batch(info: BatchInfo) pa.RecordBatch | None
get_fragment(fragment_id: int) str | None

Retrieves a fragment as a JSON string.

insert_batch(info: BatchInfo, batch: pa.RecordBatch)
insert_fragment(fragment_id: int, fragment: str)

Save a JSON string of a fragment to the cache.

class lance.dataset.DatasetOptimizer(dataset: LanceDataset)

Bases: object

compact_files(*, target_rows_per_fragment: int = 1048576, max_rows_per_group: int = 1024, materialize_deletions: bool = True, materialize_deletions_threshold: float = 0.1, num_threads: int | None = None) CompactionMetrics

Compacts small files in the dataset, reducing total number of files.

This does a few things:
  • Removes deleted rows from fragments

  • Removes dropped columns from fragments

  • Merges small fragments into larger ones

This method preserves the insertion order of the dataset. This may mean it leaves small fragments in the dataset if they are not adjacent to other fragments that need compaction. For example, if you have fragments with row counts 5 million, 100, and 5 million, the middle fragment will not be compacted because the fragments it is adjacent to do not need compaction.

Parameters:
  • target_rows_per_fragment (int, default 1024*1024) – The target number of rows per fragment. This is the number of rows that will be in each fragment after compaction.

  • max_rows_per_group (int, default 1024) – Max number of rows per group. This does not affect which fragments need compaction, but does affect how they are re-written if selected.

  • materialize_deletions (bool, default True) – Whether to compact fragments with soft deleted rows so they are no longer present in the file.

  • materialize_deletions_threshold (float, default 0.1) – The fraction of original rows that are soft deleted in a fragment before the fragment is a candidate for compaction.

  • num_threads (int, optional) – The number of threads to use when performing compaction. If not specified, defaults to the number of cores on the machine.

Returns:

Metrics about the compaction process

Return type:

CompactionMetrics

optimize_indices(**kwargs)

Optimizes index performance.

As new data arrives it is not added to existing indexes automatically. When searching we need to perform an indexed search of the old data plus an expensive unindexed search on the new data. As the amount of new unindexed data grows this can have an impact on search latency. This function will add the new data to existing indexes, restoring the performance. This function does not retrain the index, it only assigns the new data to existing partitions. This means an update is much quicker than retraining the entire index but may have less accuracy (especially if the new data exhibits new patterns, concepts, or trends)

class lance.dataset.DatasetStats

Bases: TypedDict

num_deleted_rows: int
num_fragments: int
num_small_files: int
class lance.dataset.LanceDataset(uri: str | Path, version: int | None = None, block_size: int | None = None, index_cache_size: int | None = None, metadata_cache_size: int | None = None, commit_lock: CommitLock | None = None, storage_options: Dict[str, str] | None = None)

Bases: Dataset

A dataset in Lance format where the data is stored at the given uri.

add_columns(transforms: Dict[str, str] | BatchUDF, read_columns: List[str] | None = None)

Add new columns with defined values.

There are two ways to specify the new columns. First, you can provide SQL expressions for each new column. Second you can provide a UDF that takes a batch of existing data and returns a new batch with the new columns. These new columns will be appended to the dataset.

See the lance.add_columns_udf() decorator for more information on writing UDFs.

Parameters:
  • transforms (dict or AddColumnsUDF) – If this is a dictionary, then the keys are the names of the new columns and the values are SQL expression strings. These strings can reference existing columns in the dataset. If this is a AddColumnsUDF, then it is a UDF that takes a batch of existing data and returns a new batch with the new columns.

  • read_columns (list of str, optional) – The names of the columns that the UDF will read. If None, then the UDF will read all columns. This is only used when transforms is a UDF. Otherwise, the read columns are inferred from the SQL expressions.

Examples

>>> import lance
>>> import pyarrow as pa
>>> table = pa.table({"a": [1, 2, 3]})
>>> dataset = lance.write_dataset(table, "my_dataset")
>>> @lance.batch_udf()
... def double_a(batch):
...     df = batch.to_pandas()
...     return pd.DataFrame({'double_a': 2 * df['a']})
>>> dataset.add_columns(double_a)
>>> dataset.to_table().to_pandas()
   a  double_a
0  1         2
1  2         4
2  3         6
>>> dataset.add_columns({"triple_a": "a * 3"})
>>> dataset.to_table().to_pandas()
   a  double_a  triple_a
0  1         2         3
1  2         4         6
2  3         6         9

See also

LanceDataset.merge

Merge a pre-computed set of columns into the dataset.

alter_columns(*alterations: Iterable[Dict[str, Any]])

Alter column name, data type, and nullability.

Columns that are renamed can keep any indices that are on them. If a column has an IVF_PQ index, it can be kept if the column is casted to another type. However, other index types don’t support casting at this time.

Column types can be upcasted (such as int32 to int64) or downcasted (such as int64 to int32). However, downcasting will fail if there are any values that cannot be represented in the new type. In general, columns can be casted to same general type: integers to integers, floats to floats, and strings to strings. However, strings, binary, and list columns can be casted between their size variants. For example, string to large string, binary to large binary, and list to large list.

Parameters:

alterations (Iterable[Dict[str, Any]]) –

A sequence of dictionaries, each with the following keys: - “path”: str

The column path to alter. For a top-level column, this is the name. For a nested column, this is the dot-separated path, e.g. “a.b.c”.

  • ”name”: str, optional

    The new name of the column. If not specified, the column name is not changed.

  • ”nullable”: bool, optional

    Whether the column should be nullable. If not specified, the column nullability is not changed. Only non-nullable columns can be changed to nullable. Currently, you cannot change a nullable column to non-nullable.

  • ”data_type”: pyarrow.DataType, optional

    The new data type to cast the column to. If not specified, the column data type is not changed.

Examples

>>> import lance
>>> import pyarrow as pa
>>> schema = pa.schema([pa.field('a', pa.int64()),
...                     pa.field('b', pa.string(), nullable=False)])
>>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]})
>>> dataset = lance.write_dataset(table, "example")
>>> dataset.alter_columns({"path": "a", "name": "x"},
...                       {"path": "b", "nullable": True})
>>> dataset.to_table().to_pandas()
   x  b
0  1  a
1  2  b
2  3  c
>>> dataset.alter_columns({"path": "x", "data_type": pa.int32()})
>>> dataset.schema
x: int32
b: string
checkout_version(version) LanceDataset

Load the given version of the dataset.

Unlike the dataset() constructor, this will re-use the current cache. This is a no-op if the dataset is already at the given version.

cleanup_old_versions(older_than: timedelta | None = None, *, delete_unverified: bool = False) CleanupStats

Cleans up old versions of the dataset.

Some dataset changes, such as overwriting, leave behind data that is not referenced by the latest dataset version. The old data is left in place to allow the dataset to be restored back to an older version.

This method will remove older versions and any data files they reference. Once this cleanup task has run you will not be able to checkout or restore these older versions.

Parameters:
  • older_than (timedelta, optional) – Only versions older than this will be removed. If not specified, this will default to two weeks.

  • delete_unverified (bool, default False) –

    Files leftover from a failed transaction may appear to be part of an in-progress operation (e.g. appending new data) and these files will not be deleted unless they are at least 7 days old. If delete_unverified is True then these files will be deleted regardless of their age.

    This should only be set to True if you can guarantee that no other process is currently working on this dataset. Otherwise the dataset could be put into a corrupted state.

static commit(base_uri: str | Path, operation: LanceOperation.BaseOperation, read_version: int | None = None, commit_lock: CommitLock | None = None) LanceDataset

Create a new version of dataset

This method is an advanced method which allows users to describe a change that has been made to the data files. This method is not needed when using Lance to apply changes (e.g. when using LanceDataset or write_dataset().)

It’s current purpose is to allow for changes being made in a distributed environment where no single process is doing all of the work. For example, a distributed bulk update or a distributed bulk modify operation.

Once all of the changes have been made, this method can be called to make the changes visible by updating the dataset manifest.

Warning

This is an advanced API and doesn’t provide the same level of validation as the other APIs. For example, it’s the responsibility of the caller to ensure that the fragments are valid for the schema.

Parameters:
  • base_uri (str or Path) – The base uri of the dataset

  • operation (BaseOperation) – The operation to apply to the dataset. This describes what changes have been made. See available operations under LanceOperation.

  • read_version (int, optional) – The version of the dataset that was used as the base for the changes. This is not needed for overwrite or restore operations.

  • commit_lock (CommitLock, optional) – A custom commit lock. Only needed if your object store does not support atomic commits. See the user guide for more details.

Returns:

A new version of Lance Dataset.

Return type:

LanceDataset

Examples

Creating a new dataset with the LanceOperation.Overwrite operation:

>>> import lance
>>> import pyarrow as pa
>>> tab1 = pa.table({"a": [1, 2], "b": ["a", "b"]})
>>> tab2 = pa.table({"a": [3, 4], "b": ["c", "d"]})
>>> fragment1 = lance.fragment.LanceFragment.create("example", tab1)
>>> fragment2 = lance.fragment.LanceFragment.create("example", tab2)
>>> fragments = [fragment1, fragment2]
>>> operation = lance.LanceOperation.Overwrite(tab1.schema, fragments)
>>> dataset = lance.LanceDataset.commit("example", operation)
>>> dataset.to_table().to_pandas()
   a  b
0  1  a
1  2  b
2  3  c
3  4  d
count_rows(filter: str | Expression | None = None, **kwargs) int

Count rows matching the scanner filter.

Parameters:

**kwargs (dict, optional) – See py:method:scanner method for full parameter description.

Returns:

count – The total number of rows in the dataset.

Return type:

int

create_index(column: str | List[str], index_type: str, name: str | None = None, metric: str = 'L2', replace: bool = False, num_partitions: int | None = None, ivf_centroids: np.ndarray | pa.FixedSizeListArray | pa.FixedShapeTensorArray | None = None, pq_codebook: np.ndarray | pa.FixedSizeListArray | pa.FixedShapeTensorArray | None = None, num_sub_vectors: int | None = None, accelerator: str | 'torch.Device' | None = None, index_cache_size: int | None = None, shuffle_partition_batches: int | None = None, shuffle_partition_concurrency: int | None = None, **kwargs) LanceDataset

Create index on column.

Experimental API

Parameters:
  • column (str) – The column to be indexed.

  • index_type (str) – The type of the index. "IVF_PQ, IVF_HNSW_PQ and IVF_HNSW_SQ" are supported now.

  • name (str, optional) – The index name. If not provided, it will be generated from the column name.

  • metric (str) – The distance metric type, i.e., “L2” (alias to “euclidean”), “cosine” or “dot” (dot product). Default is “L2”.

  • replace (bool) – Replace the existing index if it exists.

  • num_partitions (int, optional) – The number of partitions of IVF (Inverted File Index).

  • ivf_centroids (np.ndarray, pyarrow.FixedSizeListArray) –

  • Optional. (or pyarrow.FixedShapeTensorArray.) – A num_partitions x dimension array of K-mean centroids for IVF clustering. If not provided, a new Kmean model will be trained.

  • pq_codebook (np.ndarray, pyarrow.FixedSizeListArray) –

  • Optional. – A num_sub_vectors x (2 ^ nbits * dimensions // num_sub_vectors) array of K-mean centroids for PQ codebook. Note: nbits is always 8 for now. If not provided, a new PQ model will be trained.

  • num_sub_vectors (int, optional) – The number of sub-vectors for PQ (Product Quantization).

  • accelerator (str or torch.Device, optional) – If set, use an accelerator to speed up the training process. Accepted accelerator: “cuda” (Nvidia GPU) and “mps” (Apple Silicon GPU). If not set, use the CPU.

  • index_cache_size (int, optional) – The size of the index cache in number of entries. Default value is 256.

  • shuffle_partition_batches (int, optional) –

    The number of batches, using the row group size of the dataset, to include in each shuffle partition. Default value is 10240.

    Assuming the row group size is 1024, each shuffle partition will hold 10240 * 1024 = 10,485,760 rows. By making this value smaller, this shuffle will consume less memory but will take longer to complete, and vice versa.

  • shuffle_partition_concurrency (int, optional) –

    The number of shuffle partitions to process concurrently. Default value is 2

    By making this value smaller, this shuffle will consume less memory but will take longer to complete, and vice versa.

  • kwargs – Parameters passed to the index building process.

  • type (The SQ (Scalar Quantization) is available for only "IVF_HNSW_SQ" index) –

:param : :param this quantization method is used to reduce the memory usage of the index: :param : :param it maps the float vectors to integer vectors: :param each integer is of num_bits: :param : :param now only 8 bits are supported.: :param If index_type is “IVF_*”: num_partitions :param then the following parameters are required: num_partitions :param If index_type is with “PQ”: num_sub_vectors :param then the following parameters are required: num_sub_vectors :param Optional parameters for “IVF_PQ”:

ivf_centroids :

K-mean centroids for IVF clustering.

Parameters:

"IVF_HNSW_*" (Optional parameters for) –

max_levelint

the maximum number of levels in the graph.

mint

the number of edges per node in the graph.

m_maxint

the maximum number of edges per node in the graph.

ef_constructionint

the number of nodes to examine during the construction.

Examples

import lance

dataset = lance.dataset("/tmp/sift.lance")
dataset.create_index(
    "vector",
    "IVF_PQ",
    num_partitions=256,
    num_sub_vectors=16
)
import lance

dataset = lance.dataset("/tmp/sift.lance")
dataset.create_index(
    "vector",
    "IVF_HNSW_SQ",
    num_partitions=256,
)

Experimental Accelerator (GPU) support:

  • accelerate: use GPU to train IVF partitions.

    Only supports CUDA (Nvidia) or MPS (Apple) currently. Requires PyTorch being installed.

import lance

dataset = lance.dataset("/tmp/sift.lance")
dataset.create_index(
    "vector",
    "IVF_PQ",
    num_partitions=256,
    num_sub_vectors=16,
    accelerator="cuda"
)

References

create_scalar_index(column: str, index_type: Literal['BTREE'], name: str | None = None, *, replace: bool = True)

Create a scalar index on a column.

Scalar indices, like vector indices, can be used to speed up scans. A scalar index can speed up scans that contain filter expressions on the indexed column. For example, the following scan will be faster if the column my_col has a scalar index:

import lance

dataset = lance.dataset("/tmp/images.lance")
my_table = dataset.scanner(filter="my_col != 7").to_table()

Scalar indices can also speed up scans containing a vector search and a prefilter:

Scalar indices can only speed up scans for basic filters using equality, comparison, range (e.g. my_col BETWEEN 0 AND 100), and set membership (e.g. my_col IN (0, 1, 2))

Scalar indices can be used if the filter contains multiple indexed columns and the filter criteria are AND’d or OR’d together (e.g. my_col < 0 AND other_col> 100)

Scalar indices may be used if the filter contains non-indexed columns but, depending on the structure of the filter, they may not be usable. For example, if the column not_indexed does not have a scalar index then the filter my_col = 0 OR not_indexed = 1 will not be able to use any scalar index on my_col.

To determine if a scan is making use of a scalar index you can use explain_plan to look at the query plan that lance has created. Queries that use scalar indices will either have a ScalarIndexQuery relation or a MaterializeIndex operator.

Currently, the only type of scalar index available is BTREE. This index combines is inspired by the btree data structure although only the first few layers of the btree are cached in memory.

Note that the LANCE_BYPASS_SPILLING environment variable can be used to bypass spilling to disk. Setting this to true can avoid memory exhaustion issues (see https://github.com/apache/datafusion/issues/10073 for more info).

Experimental API

Parameters:
  • column (str) – The column to be indexed. Must be a boolean, integer, float, or string column.

  • index_type (str) – The type of the index. Only "BTREE" is supported now.

  • name (str, optional) – The index name. If not provided, it will be generated from the column name.

  • replace (bool, default True) – Replace the existing index if it exists.

Examples

import lance

dataset = lance.dataset("/tmp/images.lance")
dataset.create_index(
    "category",
    "BTREE",
)
delete(predicate: str | Expression)

Delete rows from the dataset.

This marks rows as deleted, but does not physically remove them from the files. This keeps the existing indexes still valid.

Parameters:

predicate (str or pa.compute.Expression) – The predicate to use to select rows to delete. May either be a SQL string or a pyarrow Expression.

Examples

>>> import lance
>>> import pyarrow as pa
>>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]})
>>> dataset = lance.write_dataset(table, "example")
>>> dataset.delete("a = 1 or b in ('a', 'b')")
>>> dataset.to_table()
pyarrow.Table
a: int64
b: string
----
a: [[3]]
b: [["c"]]
drop_columns(columns: List[str])

Drop one or more columns from the dataset

Parameters:
  • columns (list of str) – The names of the columns to drop. These can be nested column references (e.g. “a.b.c”) or top-level column names (e.g. “a”).

  • the (This is a metadata-only operation and does not remove the data from) –

  • data (underlying storage. In order to remove the) –

  • subsequently (you must) –

  • and (call compact_files to rewrite the data without the removed columns) –

  • files. (then call cleanup_files to remove the old) –

Examples

>>> import lance
>>> import pyarrow as pa
>>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]})
>>> dataset = lance.write_dataset(table, "example")
>>> dataset.drop_columns(["a"])
>>> dataset.to_table().to_pandas()
   b
0  a
1  b
2  c
get_fragment(fragment_id: int) LanceFragment | None

Get the fragment with fragment id.

get_fragments(filter: Expression | None = None) List[LanceFragment]

Get all fragments from the dataset.

Note: filter is not supported yet.

property has_index
head(num_rows, **kwargs)

Load the first N rows of the dataset.

Parameters:
  • num_rows (int) – The number of rows to load.

  • **kwargs (dict, optional) – See scanner() method for full parameter description.

Returns:

table

Return type:

Table

index_statistics(index_name: str) Dict[str, Any]
join(right_dataset, keys, right_keys=None, join_type='left outer', left_suffix=None, right_suffix=None, coalesce_keys=True, use_threads=True)

Not implemented (just override pyarrow dataset to prevent segfault)

property lance_schema: LanceSchema

The LanceSchema for this dataset

property latest_version: int

Returns the latest version of the dataset.

list_indices() List[Dict[str, Any]]
merge(data_obj: ReaderLike, left_on: str, right_on: str | None = None, schema=None)

Merge another dataset into this one.

Performs a left join, where the dataset is the left side and data_obj is the right side. Rows existing in the dataset but not on the left will be filled with null values, unless Lance doesn’t support null values for some types, in which case an error will be raised.

Parameters:
  • data_obj (Reader-like) – The data to be merged. Acceptable types are: - Pandas DataFrame, Pyarrow Table, Dataset, Scanner, Iterator[RecordBatch], or RecordBatchReader

  • left_on (str) – The name of the column in the dataset to join on.

  • right_on (str or None) – The name of the column in data_obj to join on. If None, defaults to left_on.

Examples

>>> import lance
>>> import pyarrow as pa
>>> df = pa.table({'x': [1, 2, 3], 'y': ['a', 'b', 'c']})
>>> dataset = lance.write_dataset(df, "dataset")
>>> dataset.to_table().to_pandas()
   x  y
0  1  a
1  2  b
2  3  c
>>> new_df = pa.table({'x': [1, 2, 3], 'z': ['d', 'e', 'f']})
>>> dataset.merge(new_df, 'x')
>>> dataset.to_table().to_pandas()
   x  y  z
0  1  a  d
1  2  b  e
2  3  c  f

See also

LanceDataset.add_columns

Add new columns by computing batch-by-batch.

merge_insert(on: str | Iterable[str])

Returns a builder that can be used to create a “merge insert” operation

This operation can add rows, update rows, and remove rows in a single transaction. It is a very generic tool that can be used to create behaviors like “insert if not exists”, “update or insert (i.e. upsert)”, or even replace a portion of existing data with new data (e.g. replace all data where month=”january”)

The merge insert operation works by combining new data from a source table with existing data in a target table by using a join. There are three categories of records.

“Matched” records are records that exist in both the source table and the target table. “Not matched” records exist only in the source table (e.g. these are new data). “Not matched by source” records exist only in the target table (this is old data).

The builder returned by this method can be used to customize what should happen for each category of data.

Please note that the data will be reordered as part of this operation. This is because updated rows will be deleted from the dataset and then reinserted at the end with the new values. The order of the newly inserted rows may fluctuate randomly because a hash-join operation is used internally.

Parameters:

on (Union[str, Iterable[str]]) – A column (or columns) to join on. This is how records from the source table and target table are matched. Typically this is some kind of key or id column.

Examples

>>> import lance
>>> import pyarrow as pa
>>> table = pa.table({"a": [2, 1, 3], "b": ["a", "b", "c"]})
>>> dataset = lance.write_dataset(table, "example")
>>> new_table = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]})
>>> # Perform a "upsert" operation
>>> dataset.merge_insert("a")             \
...        .when_matched_update_all()     \
...        .when_not_matched_insert_all() \
...        .execute(new_table)
>>> dataset.to_table().sort_by("a").to_pandas()
   a  b
0  1  b
1  2  x
2  3  y
3  4  z
property optimize: DatasetOptimizer
property partition_expression

Not implemented (just override pyarrow dataset to prevent segfault)

replace_schema(schema: Schema)

Not implemented (just override pyarrow dataset to prevent segfault)

restore()

Restore the currently checked out version as the latest version of the dataset.

This creates a new commit.

sample(num_rows: int, columns: List[str] | Dict[str, str] | None = None, randomize_order: bool = True, **kwargs) Table

Select a random sample of data

Parameters:
  • num_rows (int) – number of rows to retrieve

  • columns (list of str, or dict of str to str default None) – List of column names to be fetched. Or a dictionary of column names to SQL expressions. All columns are fetched if None or unspecified.

  • **kwargs (dict, optional) – see scanner() method for full parameter description.

Returns:

table

Return type:

Table

scanner(columns: List[str] | Dict[str, str] | None = None, filter: str | Expression | None = None, limit: int | None = None, offset: int | None = None, nearest: dict | None = None, batch_size: int | None = None, batch_readahead: int | None = None, fragment_readahead: int | None = None, scan_in_order: bool = True, fragments: Iterable[LanceFragment] | None = None, *, prefilter: bool = False, with_row_id: bool = False, use_stats: bool = True) LanceScanner

Return a Scanner that can support various pushdowns.

Parameters:
  • columns (list of str, or dict of str to str default None) – List of column names to be fetched. Or a dictionary of column names to SQL expressions. All columns are fetched if None or unspecified.

  • filter (pa.compute.Expression or str) – Expression or str that is a valid SQL where clause. See Lance filter pushdown for valid SQL expressions.

  • limit (int, default None) – Fetch up to this many rows. All rows if None or unspecified.

  • offset (int, default None) – Fetch starting with this row. 0 if None or unspecified.

  • nearest (dict, default None) –

    Get the rows corresponding to the K most similar vectors. Example:

    {
        "column": <embedding col name>,
        "q": <query vector as pa.Float32Array>,
        "k": 10,
        "nprobes": 1,
        "refine_factor": 1
    }
    

  • batch_size (int, default None) – The max size of batches returned.

  • batch_readahead (int, optional) – The number of batches to read ahead.

  • fragment_readahead (int, optional) – The number of fragments to read ahead.

  • scan_in_order (bool, default True) – Whether to read the fragments and batches in order. If false, throughput may be higher, but batches will be returned out of order and memory use might increase.

  • fragments (iterable of LanceFragment, default None) – If specified, only scan these fragments. If scan_in_order is True, then the fragments will be scanned in the order given.

  • prefilter (bool, default False) –

    If True then the filter will be applied before the vector query is run. This will generate more correct results but it may be a more costly query. It’s generally good when the filter is highly selective.

    If False then the filter will be applied after the vector query is run. This will perform well but the results may have fewer than the requested number of rows (or be empty) if the rows closest to the query do not match the filter. It’s generally good when the filter is not very selective.

Notes

For now, if BOTH filter and nearest is specified, then: 1. nearest is executed first. 2. The results are filtered afterwards.

For debugging ANN results, you can choose to not use the index even if present by specifying use_index=False. For example, the following will always return exact KNN results:

dataset.to_table(nearest={
    "column": "vector",
    "k": 10,
    "q": <query vector>,
    "use_index": False
}
property schema: Schema

The pyarrow Schema for this dataset

property stats: LanceStats

Experimental API

take(indices: List[int] | Array, columns: List[str] | Dict[str, str] | None = None, **kwargs) Table

Select rows of data by index.

Parameters:
  • indices (Array or array-like) – indices of rows to select in the dataset.

  • columns (list of str, or dict of str to str default None) – List of column names to be fetched. Or a dictionary of column names to SQL expressions. All columns are fetched if None or unspecified.

  • **kwargs (dict, optional) – See scanner() method for full parameter description.

Returns:

table

Return type:

Table

to_batches(columns: List[str] | Dict[str, str] | None = None, filter: str | Expression | None = None, limit: int | None = None, offset: int | None = None, nearest: dict | None = None, batch_size: int | None = None, batch_readahead: int | None = None, fragment_readahead: int | None = None, scan_in_order: bool = True, *, prefilter: bool = False, with_row_id: bool = False, use_stats: bool = True, **kwargs) Iterator[RecordBatch]

Read the dataset as materialized record batches.

Parameters:

**kwargs (dict, optional) – Arguments for Scanner.from_dataset.

Returns:

record_batches

Return type:

Iterator of RecordBatch

to_table(columns: List[str] | Dict[str, str] | None = None, filter: str | Expression | None = None, limit: int | None = None, offset: int | None = None, nearest: dict | None = None, batch_size: int | None = None, batch_readahead: int | None = None, fragment_readahead: int | None = None, scan_in_order: bool = True, *, prefilter: bool = False, with_row_id: bool = False, use_stats: bool = True) Table

Read the data into memory as a pyarrow Table.

Parameters:
  • columns (list of str, or dict of str to str default None) – List of column names to be fetched. Or a dictionary of column names to SQL expressions. All columns are fetched if None or unspecified.

  • filter (pa.compute.Expression or str) –

    Expression or str that is a valid SQL where clause. See Lance filter pushdown for valid SQL expressions.

  • limit (int, default None) – Fetch up to this many rows. All rows if None or unspecified.

  • offset (int, default None) – Fetch starting with this row. 0 if None or unspecified.

  • nearest (dict, default None) –

    Get the rows corresponding to the K most similar vectors. Example:

    {
        "column": <embedding col name>,
        "q": <query vector as pa.Float32Array>,
        "k": 10,
        "metric": "cosine",
        "nprobes": 1,
        "refine_factor": 1
    }
    

  • batch_size (int, optional) – The number of rows to read at a time.

  • batch_readahead (int, optional) – The number of batches to read ahead.

  • fragment_readahead (int, optional) – The number of fragments to read ahead.

  • scan_in_order (bool, default True) – Whether to read the fragments and batches in order. If false, throughput may be higher, but batches will be returned out of order and memory use might increase.

  • prefilter (bool, default False) – Run filter before the vector search.

  • with_row_id (bool, default False) – Return physical row ID.

  • use_stats (bool, default True) – Use stats pushdown during filters.

Notes

If BOTH filter and nearest is specified, then: 1. nearest is executed first. 2. The results are filtered afterward, unless pre-filter sets to True.

update(updates: Dict[str, str], where: str | None = None)

Update column values for rows matching where.

Parameters:
  • updates (dict of str to str) – A mapping of column names to a SQL expression.

  • where (str, optional) – A SQL predicate indicating which rows should be updated.

Examples

>>> import lance
>>> import pyarrow as pa
>>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]})
>>> dataset = lance.write_dataset(table, "example")
>>> dataset.update(dict(a = 'a + 2'), where="b != 'a'")
>>> dataset.to_table().to_pandas()
   a  b
0  1  a
1  4  b
2  5  c
property uri: str

The location of the data

validate()

Validate the dataset.

This checks the integrity of the dataset and will raise an exception if the dataset is corrupted.

property version: int

Returns the currently checked out version of the dataset

versions()

Return all versions in this dataset.

class lance.dataset.LanceOperation

Bases: object

class Append(fragments: Iterable[FragmentMetadata])

Bases: BaseOperation

Append new rows to the dataset.

fragments

The fragments that contain the new rows.

Type:

list[FragmentMetadata]

Warning

This is an advanced API for distributed operations. To append to a dataset on a single machine, use lance.write_dataset().

Examples

To append new rows to a dataset, first use lance.fragment.LanceFragment.create() to create fragments. Then collect the fragment metadata into a list and pass it to this class. Finally, pass the operation to the LanceDataset.commit() method to create the new dataset.

>>> import lance
>>> import pyarrow as pa
>>> tab1 = pa.table({"a": [1, 2], "b": ["a", "b"]})
>>> dataset = lance.write_dataset(tab1, "example")
>>> tab2 = pa.table({"a": [3, 4], "b": ["c", "d"]})
>>> fragment = lance.fragment.LanceFragment.create("example", tab2)
>>> operation = lance.LanceOperation.Append([fragment])
>>> dataset = lance.LanceDataset.commit("example", operation,
...                                     read_version=dataset.version)
>>> dataset.to_table().to_pandas()
   a  b
0  1  a
1  2  b
2  3  c
3  4  d
fragments: Iterable[FragmentMetadata]
class BaseOperation

Bases: ABC

Base class for operations that can be applied to a dataset.

See available operations under LanceOperation.

class Delete(updated_fragments: Iterable[FragmentMetadata], deleted_fragment_ids: Iterable[int], predicate: str)

Bases: BaseOperation

Remove fragments or rows from the dataset.

updated_fragments

The fragments that have been updated with new deletion vectors.

Type:

list[FragmentMetadata]

deleted_fragment_ids

The ids of the fragments that have been deleted entirely. These are the fragments where LanceFragment.delete() returned None.

Type:

list[int]

predicate

The original SQL predicate used to select the rows to delete.

Type:

str

Warning

This is an advanced API for distributed operations. To delete rows from dataset on a single machine, use lance.LanceDataset.delete().

Examples

To delete rows from a dataset, call lance.fragment.LanceFragment.delete() on each of the fragments. If that returns a new fragment, add that to the updated_fragments list. If it returns None, that means the whole fragment was deleted, so add the fragment id to the deleted_fragment_ids. Finally, pass the operation to the LanceDataset.commit() method to complete the deletion operation.

>>> import lance
>>> import pyarrow as pa
>>> table = pa.table({"a": [1, 2], "b": ["a", "b"]})
>>> dataset = lance.write_dataset(table, "example")
>>> table = pa.table({"a": [3, 4], "b": ["c", "d"]})
>>> dataset = lance.write_dataset(table, "example", mode="append")
>>> dataset.to_table().to_pandas()
   a  b
0  1  a
1  2  b
2  3  c
3  4  d
>>> predicate = "a >= 2"
>>> updated_fragments = []
>>> deleted_fragment_ids = []
>>> for fragment in dataset.get_fragments():
...     new_fragment = fragment.delete(predicate)
...     if new_fragment is not None:
...         updated_fragments.append(new_fragment)
...     else:
...         deleted_fragment_ids.append(fragment.fragment_id)
>>> operation = lance.LanceOperation.Delete(updated_fragments,
...                                         deleted_fragment_ids,
...                                         predicate)
>>> dataset = lance.LanceDataset.commit("example", operation,
...                                     read_version=dataset.version)
>>> dataset.to_table().to_pandas()
   a  b
0  1  a
deleted_fragment_ids: Iterable[int]
predicate: str
updated_fragments: Iterable[FragmentMetadata]
class Merge(fragments: Iterable[FragmentMetadata], schema: LanceSchema | Schema)

Bases: BaseOperation

Operation that adds columns. Unlike Overwrite, this should not change the structure of the fragments, allowing existing indices to be kept.

fragments

The fragments that make up the new dataset.

Type:

iterable of FragmentMetadata

schema

The schema of the new dataset. Passing a LanceSchema is preferred, and passing a pyarrow.Schema is deprecated.

Type:

LanceSchema or pyarrow.Schema

Warning

This is an advanced API for distributed operations. To overwrite or create new dataset on a single machine, use lance.write_dataset().

Examples

To add new columns to a dataset, first define a method that will create the new columns based on the existing columns. Then use lance.fragment.LanceFragment.add_columns()

>>> import lance
>>> import pyarrow as pa
>>> import pyarrow.compute as pc
>>> table = pa.table({"a": [1, 2, 3, 4], "b": ["a", "b", "c", "d"]})
>>> dataset = lance.write_dataset(table, "example")
>>> dataset.to_table().to_pandas()
   a  b
0  1  a
1  2  b
2  3  c
3  4  d
>>> def double_a(batch: pa.RecordBatch) -> pa.RecordBatch:
...     doubled = pc.multiply(batch["a"], 2)
...     return pa.record_batch([doubled], ["a_doubled"])
>>> fragments = []
>>> for fragment in dataset.get_fragments():
...     new_fragment, new_schema = fragment.merge_columns(double_a,
...                                                       columns=['a'])
...     fragments.append(new_fragment)
>>> operation = lance.LanceOperation.Merge(fragments, new_schema)
>>> dataset = lance.LanceDataset.commit("example", operation,
...                                     read_version=dataset.version)
>>> dataset.to_table().to_pandas()
   a  b  a_doubled
0  1  a          2
1  2  b          4
2  3  c          6
3  4  d          8
fragments: Iterable[FragmentMetadata]
schema: LanceSchema | Schema
class Overwrite(new_schema: Schema, fragments: Iterable[FragmentMetadata])

Bases: BaseOperation

Overwrite or create a new dataset.

new_schema

The schema of the new dataset.

Type:

pyarrow.Schema

fragments

The fragments that make up the new dataset.

Type:

list[FragmentMetadata]

Warning

This is an advanced API for distributed operations. To overwrite or create new dataset on a single machine, use lance.write_dataset().

Examples

To create or overwrite a dataset, first use lance.fragment.LanceFragment.create() to create fragments. Then collect the fragment metadata into a list and pass it along with the schema to this class. Finally, pass the operation to the LanceDataset.commit() method to create the new dataset.

>>> import lance
>>> import pyarrow as pa
>>> tab1 = pa.table({"a": [1, 2], "b": ["a", "b"]})
>>> tab2 = pa.table({"a": [3, 4], "b": ["c", "d"]})
>>> fragment1 = lance.fragment.LanceFragment.create("example", tab1)
>>> fragment2 = lance.fragment.LanceFragment.create("example", tab2)
>>> fragments = [fragment1, fragment2]
>>> operation = lance.LanceOperation.Overwrite(tab1.schema, fragments)
>>> dataset = lance.LanceDataset.commit("example", operation)
>>> dataset.to_table().to_pandas()
   a  b
0  1  a
1  2  b
2  3  c
3  4  d
fragments: Iterable[FragmentMetadata]
new_schema: Schema
class Restore(version: int)

Bases: BaseOperation

Operation that restores a previous version of the dataset.

version: int
class lance.dataset.LanceScanner(scanner: _Scanner, dataset: LanceDataset)

Bases: Scanner

count_rows()

Count rows matching the scanner filter.

Returns:

count

Return type:

int

property dataset_schema: Schema

The schema with which batches will be read from fragments.

explain_plan(verbose=False) str

Return the execution plan for this scanner.

Parameters:

verbose (bool, default False) – Use a verbose output format.

Returns:

plan

Return type:

str

static from_batches(*args, **kwargs)

Not implemented

static from_dataset(*args, **kwargs)

Not implemented

static from_fragment(*args, **kwargs)

Not implemented

head(num_rows)

Load the first N rows of the dataset.

Parameters:

num_rows (int) – The number of rows to load.

Return type:

Table

property projected_schema: Schema

The materialized schema of the data, accounting for projections.

This is the schema of any data returned from the scanner.

scan_batches()

Consume a Scanner in record batches with corresponding fragments.

Returns:

record_batches

Return type:

iterator of TaggedRecordBatch

take(indices)

Not implemented

to_batches(self)

Consume a Scanner in record batches.

Returns:

record_batches

Return type:

iterator of RecordBatch

to_reader(self)

Consume this scanner as a RecordBatchReader.

Return type:

RecordBatchReader

to_table() Table

Read the data into memory and return a pyarrow Table.

class lance.dataset.LanceStats(dataset: _Dataset)

Bases: object

Statistics about a LanceDataset.

dataset_stats(max_rows_per_group: int = 1024) DatasetStats

Statistics about the dataset.

index_stats(index_name: str) Dict[str, Any]

Statistics about an index.

Parameters:

index_name (str) – The name of the index to get statistics for.

class lance.dataset.MergeInsertBuilder(dataset, on)

Bases: _MergeInsertBuilder

execute(data_obj: ReaderLike, *, schema: pa.Schema | None = None)

Executes the merge insert operation

There is no return value but the original dataset will be updated.

Parameters:
  • data_obj (ReaderLike) – The new data to use as the source table for the operation. This parameter can be any source of data (e.g. table / dataset) that write_dataset() accepts.

  • schema (Optional[pa.Schema]) – The schema of the data. This only needs to be supplied whenever the data source is some kind of generator.

when_matched_update_all(condition: str | None = None)

Configure the operation to update matched rows

After this method is called, when the merge insert operation executes, any rows that match both the source table and the target table will be updated. The rows from the target table will be removed and the rows from the source table will be added.

An optional condition may be specified. This should be an SQL filter and, if present, then only matched rows that also satisfy this filter will be updated. The SQL filter should use the prefix target. to refer to columns in the target table and the prefix source. to refer to columns in the source table. For example, source.last_update < target.last_update.

If a condition is specified and rows do not satisfy the condition then these rows will not be updated. Failure to satisfy the filter does not cause a “matched” row to become a “not matched” row.

when_not_matched_by_source_delete(expr: str | None = None)

Configure the operation to delete source rows that do not match

After this method is called, when the merge insert operation executes, any rows that exist only in the target table will be deleted. An optional filter can be specified to limit the scope of the delete operation. If given (as an SQL filter) then only rows which match the filter will be deleted.

when_not_matched_insert_all()

Configure the operation to insert not matched rows

After this method is called, when the merge insert operation executes, any rows that exist only in the source table will be inserted into the target table.

class lance.dataset.ScannerBuilder(ds: LanceDataset)

Bases: object

batch_readahead(nbatches: int | None = None) ScannerBuilder
batch_size(batch_size: int) ScannerBuilder

Set batch size for Scanner

columns(cols: List[str] | Dict[str, str] | None = None) ScannerBuilder
filter(filter: str | Expression) ScannerBuilder
fragment_readahead(nfragments: int | None = None) ScannerBuilder
limit(n: int | None = None) ScannerBuilder
nearest(column: str, q: QueryVectorLike, k: int | None = None, metric: str | None = None, nprobes: int | None = None, refine_factor: int | None = None, use_index: bool = True) ScannerBuilder
offset(n: int | None = None) ScannerBuilder
prefilter(prefilter: bool) ScannerBuilder
scan_in_order(scan_in_order: bool = True) ScannerBuilder

Whether to scan the dataset in order of fragments and batches.

If set to False, the scanner may read fragments concurrently and yield batches out of order. This may improve performance since it allows more concurrency in the scan, but can also use more memory.

to_scanner() LanceScanner
use_stats(use_stats: bool = True) ScannerBuilder

Enable use of statistics for query planning.

Disabling statistics is used for debugging and benchmarking purposes. This should be left on for normal use.

with_fragments(fragments: Iterable[LanceFragment] | None) ScannerBuilder
with_row_id(with_row_id: bool = True) ScannerBuilder

Enable returns with physical row IDs.

lance.dataset.batch_udf(output_schema=None, checkpoint_file=None)

Create a user defined function (UDF) that adds columns to a dataset.

This function is used to add columns to a dataset. It takes a function that takes a single argument, a RecordBatch, and returns a RecordBatch. The function is called once for each batch in the dataset. The function should not modify the input batch, but instead create a new batch with the new columns added.

Parameters:
  • output_schema (Schema, optional) – The schema of the output RecordBatch. This is used to validate the output of the function. If not provided, the schema of the first output RecordBatch will be used.

  • checkpoint_file (str or Path, optional) – If specified, this file will be used as a cache for unsaved results of this UDF. If the process fails, and you call add_columns again with this same file, it will resume from the last saved state. This is useful for long running processes that may fail and need to be resumed. This file may get very large. It will hold up to an entire data files’ worth of results on disk, which can be multiple gigabytes of data.

Return type:

AddColumnsUDF

lance.dataset.write_dataset(data_obj: ReaderLike, uri: str | Path, schema: pa.Schema | None = None, mode: str = 'create', *, max_rows_per_file: int = 1048576, max_rows_per_group: int = 1024, max_bytes_per_file: int = 96636764160, commit_lock: CommitLock | None = None, progress: FragmentWriteProgress | None = None, storage_options: Dict[str, str] | None = None, use_experimental_writer: bool = False) LanceDataset

Write a given data_obj to the given uri

Parameters:
  • data_obj (Reader-like) – The data to be written. Acceptable types are: - Pandas DataFrame, Pyarrow Table, Dataset, Scanner, or RecordBatchReader - Huggingface dataset

  • uri (str or Path) – Where to write the dataset to (directory)

  • schema (Schema, optional) – If specified and the input is a pandas DataFrame, use this schema instead of the default pandas to arrow table conversion.

  • mode (str) – create - create a new dataset (raises if uri already exists). overwrite - create a new snapshot version append - create a new version that is the concat of the input the latest version (raises if uri does not exist)

  • max_rows_per_file (int, default 1024 * 1024) – The max number of rows to write before starting a new file

  • max_rows_per_group (int, default 1024) – The max number of rows before starting a new group (in the same file)

  • max_bytes_per_file (int, default 90 * 1024 * 1024 * 1024) – The max number of bytes to write before starting a new file. This is a soft limit. This limit is checked after each group is written, which means larger groups may cause this to be overshot meaningfully. This defaults to 90 GB, since we have a hard limit of 100 GB per file on object stores.

  • commit_lock (CommitLock, optional) – A custom commit lock. Only needed if your object store does not support atomic commits. See the user guide for more details.

  • progress (FragmentWriteProgress, optional) – Experimental API. Progress tracking for writing the fragment. Pass a custom class that defines hooks to be called when each fragment is starting to write and finishing writing.

  • storage_options (optional, dict) – Extra options that make sense for a particular storage connection. This is used to store connection parameters like credentials, endpoint, etc.

  • use_experimental_writer (optional, bool) – Use the Lance v2 writer to write Lance v2 files. This is not recommended at this time as there are several known limitations in the v2 writer.

lance.debug module

Debug utilities for Lance.

lance.debug.format_fragment(fragment, dataset)

Debug print a LanceFragment.

lance.debug.format_manifest(dataset)

Print the full Lance manifest of the dataset.

lance.debug.format_schema(dataset)

Format the Lance schema of a dataset as a string.

This can be used to view the field ids and types in the schema.

lance.debug.list_transactions(dataset, /, max_transactions=10)

Return a string representation of each transaction in the dataset, in reverse chronological order.

If max_transactions is provided, only the most recent max_transactions transactions will be returned. Defaults to 10.

lance.dependencies module

lance.file module

class lance.file.LanceBufferDescriptor

Bases: object

position

The byte offset of the buffer in the file

size

The size (in bytes) of the buffer

class lance.file.LanceColumnMetadata

Bases: object

column_buffers

The column-wide buffers

pages

The data pages in the column

class lance.file.LanceFileMetadata

Bases: object

columns

The column metadata, an entry might be None if the metadata for a column was not loaded into memory when the file was opened.

global_buffers

The global buffers

major_version

The major version of the file

minor_version

The minor version of the file

num_column_metadata_bytes

The number of bytes in the column metadata section of the file

num_data_bytes

The number of bytes in the data section of the file

num_global_buffer_bytes

The number of bytes in the global buffer section of the file

num_rows

The number of rows in the file

schema

The schema of the file

class lance.file.LanceFileReader(path: str)

Bases: object

A file reader for reading Lance files

This class is used to read Lance data files, a low level structure optimized for storing multi-modal tabular data. If you are working with Lance datasets then you should use the LanceDataset class instead.

metadata() LanceFileMetadata

Return metadata describing the file contents

read_all(*, batch_size: int = 1024, batch_readahead=16) ReaderResults

Reads the entire file

Parameters:

batch_size (int, default 1024) –

The file will be read in batches. This parameter controls how many rows will be in each batch (except the final batch)

Smaller batches will use less memory but might be slightly slower because there is more per-batch overhead

read_range(start: int, num_rows: int, *, batch_size: int = 1024, batch_readahead=16) ReaderResults

Read a range of rows from the file

Parameters:
  • start (int) – The offset of the first row to start reading

  • num_rows (int) – The number of rows to read from the file

  • batch_size (int, default 1024) –

    The file will be read in batches. This parameter controls how many rows will be in each batch (except the final batch)

    Smaller batches will use less memory but might be slightly slower because there is more per-batch overhead

class lance.file.LanceFileWriter(path: str, schema: Schema, *, data_cache_bytes: int = None, **kwargs)

Bases: object

A file writer for writing Lance data files

This class is used to write Lance data files, a low level structure optimized for storing multi-modal tabular data. If you are working with Lance datasets then you should use the LanceDataset class instead.

close() int

Write the file metadata and close the file

Returns the number of rows written to the file

write_batch(batch: RecordBatch | Table) None

Write a batch of data to the file

Parameters:

batch (Union[pa.RecordBatch, pa.Table]) – The data to write to the file

class lance.file.LancePageMetadata

Bases: object

buffers

The buffers in the page

encoding

A description of the encoding used to encode the page

lance.fragment module

Dataset Fragment

class lance.fragment.FragmentMetadata(metadata: str)

Bases: object

Metadata of a Fragment in the dataset.

data_files() Iterable[str]

Return the data files of the fragment

deletion_file()

Return the deletion file, if any

static from_json(json_data: str) FragmentMetadata

Reconstruct FragmentMetadata from a JSON blob

classmethod from_metadata(metadata: _FragmentMetadata)
property id: int
to_json() str

Serialize FragmentMetadata to a JSON blob

class lance.fragment.LanceFragment(dataset: LanceDataset, fragment_id: int | None, *, fragment: _Fragment | None = None)

Bases: Fragment

add_columns(value_func: Callable[[RecordBatch], RecordBatch], columns: list[str] | None = None) FragmentMetadata

Add columns to this Fragment.

Deprecated since version 0.10.14: Use merge_columns() instead.

Warning

Internal API. This method is not intended to be used by end users.

Parameters:
  • value_func (Callable.) – A function that takes a RecordBatch as input and returns a RecordBatch.

  • columns (Optional[list[str]].) – If specified, only the columns in this list will be passed to the value_func. Otherwise, all columns will be passed to the value_func.

See also

lance.dataset.LanceOperation.Merge

The operation used to commit these changes to the dataset. See the doc page for an example of using this API.

Returns:

A new fragment with the added column(s).

Return type:

FragmentMetadata

count_rows(self, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, MemoryPool memory_pool=None)

Count rows matching the scanner filter.

Parameters:
  • filter (Expression, default None) – Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.

  • batch_size (int, default 131_072) – The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.

  • batch_readahead (int, default 16) – The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.

  • fragment_readahead (int, default 4) – The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.

  • fragment_scan_options (FragmentScanOptions, default None) – Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.

  • use_threads (bool, default True) – If enabled, then maximum parallelism will be used determined by the number of available CPU cores.

  • memory_pool (MemoryPool, default None) – For memory allocations, if required. If not specified, uses the default pool.

Returns:

count

Return type:

int

static create(dataset_uri: str | Path, data: Table | RecordBatchReader, fragment_id: int | None = None, schema: Schema | None = None, max_rows_per_group: int = 1024, progress: FragmentWriteProgress | None = None, mode: str = 'append', *, use_experimental_writer=False) FragmentMetadata

Create a FragmentMetadata from the given data.

This can be used if the dataset is not yet created.

Warning

Internal API. This method is not intended to be used by end users.

Parameters:
  • dataset_uri (str) – The URI of the dataset.

  • fragment_id (int) – The ID of the fragment.

  • data (pa.Table or pa.RecordBatchReader) – The data to be written to the fragment.

  • schema (pa.Schema, optional) – The schema of the data. If not specified, the schema will be inferred from the data.

  • max_rows_per_group (int, default 1024) – The maximum number of rows per group in the data file.

  • progress (FragmentWriteProgress, optional) – Experimental API. Progress tracking for writing the fragment. Pass a custom class that defines hooks to be called when each fragment is starting to write and finishing writing.

  • mode (str, default "append") – The write mode. If “append” is specified, the data will be checked against the existing dataset’s schema. Otherwise, pass “create” or “overwrite” to assign new field ids to the schema.

See also

lance.dataset.LanceOperation.Overwrite

The operation used to create a new dataset or overwrite one using fragments created with this API. See the doc page for an example of using this API.

lance.dataset.LanceOperation.Append

The operation used to append fragments created with this API to an existing dataset. See the doc page for an example of using this API.

Return type:

FragmentMetadata

static create_from_file(filename: str | Path, schema: Schema, fragment_id: int) LanceFragment

Create a fragment from the given datafile uri.

This can be used if the datafile is loss from dataset.

Warning

Internal API. This method is not intended to be used by end users.

Parameters:
  • filename (str) – The filename of the datafile.

  • scheme (pa.Schema) – The schema for the new datafile.

  • fragment_id (int) – The ID of the fragment.

data_files()

Return the data files of this fragment.

delete(predicate: str) FragmentMetadata | None

Delete rows from this Fragment.

This will add or update the deletion file of this fragment. It does not modify or delete the data files of this fragment. If no rows are left after the deletion, this method will return None.

Warning

Internal API. This method is not intended to be used by end users.

Parameters:

predicate (str) – A SQL predicate that specifies the rows to delete.

Returns:

A new fragment containing the new deletion file, or None if no rows left.

Return type:

FragmentMetadata or None

Examples

>>> import lance
>>> import pyarrow as pa
>>> tab = pa.table({"a": [1, 2, 3], "b": [4, 5, 6]})
>>> dataset = lance.write_dataset(tab, "dataset")
>>> frag = dataset.get_fragment(0)
>>> frag.delete("a > 1")
Fragment { id: 0, files: ..., deletion_file: Some(...), physical_rows: Some(3) }
>>> frag.delete("a > 0") is None
True

See also

lance.dataset.LanceOperation.Delete

The operation used to commit these changes to a dataset. See the doc page for an example of using this API.

deletion_file()

Return the deletion file, if any

property fragment_id
head(self, int num_rows, columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, MemoryPool memory_pool=None)

Load the first N rows of the fragment.

Parameters:
  • num_rows (int) – The number of rows to load.

  • columns (list of str, default None) –

    The columns to project. This can be a list of column names to include (order and duplicates will be preserved), or a dictionary with {new_column_name: expression} values for more advanced projections.

    The list of columns or expressions may use the special fields __batch_index (the index of the batch within the fragment), __fragment_index (the index of the fragment within the dataset), __last_in_fragment (whether the batch is last in fragment), and __filename (the name of the source file or a description of the source fragment).

    The columns will be passed down to Datasets and corresponding data fragments to avoid loading, copying, and deserializing columns that will not be required further down the compute chain. By default all of the available columns are projected. Raises an exception if any of the referenced column names does not exist in the dataset’s Schema.

  • filter (Expression, default None) – Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.

  • batch_size (int, default 131_072) – The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.

  • batch_readahead (int, default 16) – The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.

  • fragment_readahead (int, default 4) – The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.

  • fragment_scan_options (FragmentScanOptions, default None) – Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.

  • use_threads (bool, default True) – If enabled, then maximum parallelism will be used determined by the number of available CPU cores.

  • memory_pool (MemoryPool, default None) – For memory allocations, if required. If not specified, uses the default pool.

Return type:

Table

merge_columns(value_func: Callable[[pa.RecordBatch], pa.RecordBatch], columns: list[str] | None = None) Tuple[FragmentMetadata, LanceSchema]

Add columns to this Fragment.

Warning

Internal API. This method is not intended to be used by end users.

Parameters:
  • value_func (Callable.) – A function that takes a RecordBatch as input and returns a RecordBatch.

  • columns (Optional[list[str]].) – If specified, only the columns in this list will be passed to the value_func. Otherwise, all columns will be passed to the value_func.

See also

lance.dataset.LanceOperation.Merge

The operation used to commit these changes to the dataset. See the doc page for an example of using this API.

Returns:

A new fragment with the added column(s) and the final schema.

Return type:

Tuple[FragmentMetadata, LanceSchema]

property metadata: FragmentMetadata

Return the metadata of this fragment.

Return type:

FragmentMetadata

property num_deletions: int

Return the number of deleted rows in this fragment.

property partition_expression: Schema

An Expression which evaluates to true for all data viewed by this Fragment.

property physical_rows: int

Return the number of rows originally in this fragment.

To get the number of rows after deletions, use count_rows() instead.

property physical_schema: Schema

Return the physical schema of this Fragment. This schema can be different from the dataset read schema.

scanner(*, columns: List[str] | Dict[str, str] | None = None, batch_size: int | None = None, filter: str | pa.compute.Expression | None = None, limit: int | None = None, offset: int | None = None, with_row_id: bool = False, batch_readahead: int = 16) LanceScanner

See Dataset::scanner for details

property schema: Schema

Return the schema of this fragment.

take(self, indices, columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, MemoryPool memory_pool=None)

Select rows of data by index.

Parameters:
  • indices (Array or array-like) – The indices of row to select in the dataset.

  • columns (list of str, default None) –

    The columns to project. This can be a list of column names to include (order and duplicates will be preserved), or a dictionary with {new_column_name: expression} values for more advanced projections.

    The list of columns or expressions may use the special fields __batch_index (the index of the batch within the fragment), __fragment_index (the index of the fragment within the dataset), __last_in_fragment (whether the batch is last in fragment), and __filename (the name of the source file or a description of the source fragment).

    The columns will be passed down to Datasets and corresponding data fragments to avoid loading, copying, and deserializing columns that will not be required further down the compute chain. By default all of the available columns are projected. Raises an exception if any of the referenced column names does not exist in the dataset’s Schema.

  • filter (Expression, default None) – Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.

  • batch_size (int, default 131_072) – The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.

  • batch_readahead (int, default 16) – The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.

  • fragment_readahead (int, default 4) – The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.

  • fragment_scan_options (FragmentScanOptions, default None) – Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.

  • use_threads (bool, default True) – If enabled, then maximum parallelism will be used determined by the number of available CPU cores.

  • memory_pool (MemoryPool, default None) – For memory allocations, if required. If not specified, uses the default pool.

Return type:

Table

to_batches(self, Schema schema=None, columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, MemoryPool memory_pool=None)

Read the fragment as materialized record batches.

Parameters:
  • schema (Schema, optional) – Concrete schema to use for scanning.

  • columns (list of str, default None) –

    The columns to project. This can be a list of column names to include (order and duplicates will be preserved), or a dictionary with {new_column_name: expression} values for more advanced projections.

    The list of columns or expressions may use the special fields __batch_index (the index of the batch within the fragment), __fragment_index (the index of the fragment within the dataset), __last_in_fragment (whether the batch is last in fragment), and __filename (the name of the source file or a description of the source fragment).

    The columns will be passed down to Datasets and corresponding data fragments to avoid loading, copying, and deserializing columns that will not be required further down the compute chain. By default all of the available columns are projected. Raises an exception if any of the referenced column names does not exist in the dataset’s Schema.

  • filter (Expression, default None) – Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.

  • batch_size (int, default 131_072) – The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.

  • batch_readahead (int, default 16) – The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.

  • fragment_readahead (int, default 4) – The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.

  • fragment_scan_options (FragmentScanOptions, default None) – Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.

  • use_threads (bool, default True) – If enabled, then maximum parallelism will be used determined by the number of available CPU cores.

  • memory_pool (MemoryPool, default None) – For memory allocations, if required. If not specified, uses the default pool.

Returns:

record_batches

Return type:

iterator of RecordBatch

to_table(self, Schema schema=None, columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, MemoryPool memory_pool=None)

Convert this Fragment into a Table.

Use this convenience utility with care. This will serially materialize the Scan result in memory before creating the Table.

Parameters:
  • schema (Schema, optional) – Concrete schema to use for scanning.

  • columns (list of str, default None) –

    The columns to project. This can be a list of column names to include (order and duplicates will be preserved), or a dictionary with {new_column_name: expression} values for more advanced projections.

    The list of columns or expressions may use the special fields __batch_index (the index of the batch within the fragment), __fragment_index (the index of the fragment within the dataset), __last_in_fragment (whether the batch is last in fragment), and __filename (the name of the source file or a description of the source fragment).

    The columns will be passed down to Datasets and corresponding data fragments to avoid loading, copying, and deserializing columns that will not be required further down the compute chain. By default all of the available columns are projected. Raises an exception if any of the referenced column names does not exist in the dataset’s Schema.

  • filter (Expression, default None) – Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.

  • batch_size (int, default 131_072) – The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.

  • batch_readahead (int, default 16) – The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.

  • fragment_readahead (int, default 4) – The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.

  • fragment_scan_options (FragmentScanOptions, default None) – Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.

  • use_threads (bool, default True) – If enabled, then maximum parallelism will be used determined by the number of available CPU cores.

  • memory_pool (MemoryPool, default None) – For memory allocations, if required. If not specified, uses the default pool.

Returns:

table

Return type:

Table

lance.fragment.write_fragments(data: ReaderLike, dataset_uri: str | Path, schema: pa.Schema | None = None, *, mode: str = 'append', max_rows_per_file: int = 1048576, max_rows_per_group: int = 1024, max_bytes_per_file: int = 96636764160, progress: FragmentWriteProgress | None = None, use_experimental_writer: bool = False, storage_options: Dict[str, str] | None = None) List[FragmentMetadata]

Write data into one or more fragments.

Warning

This is a low-level API intended for manually implementing distributed writes. For most users, lance.write_dataset() is the recommended API.

Parameters:
  • data (pa.Table or pa.RecordBatchReader) – The data to be written to the fragment.

  • dataset_uri (str) – The URI of the dataset.

  • schema (pa.Schema, optional) – The schema of the data. If not specified, the schema will be inferred from the data.

  • mode (str, default "append") – The write mode. If “append” is specified, the data will be checked against the existing dataset’s schema. Otherwise, pass “create” or “overwrite” to assign new field ids to the schema.

  • max_rows_per_file (int, default 1024 * 1024) – The maximum number of rows per data file.

  • max_rows_per_group (int, default 1024) – The maximum number of rows per group in the data file.

  • max_bytes_per_file (int, default 90 * 1024 * 1024 * 1024) – The max number of bytes to write before starting a new file. This is a soft limit. This limit is checked after each group is written, which means larger groups may cause this to be overshot meaningfully. This defaults to 90 GB, since we have a hard limit of 100 GB per file on object stores.

  • progress (FragmentWriteProgress, optional) – Experimental API. Progress tracking for writing the fragment. Pass a custom class that defines hooks to be called when each fragment is starting to write and finishing writing.

  • use_experimental_writer (optional, bool) – Use the Lance v2 writer to write Lance v2 files. This is not recommended at this time as there are several known limitations in the v2 writer.

  • storage_options (Optional[Dict[str, str]]) – Extra options that make sense for a particular storage connection. This is used to store connection parameters like credentials, endpoint, etc.

Returns:

A list of FragmentMetadata for the fragments written. The fragment ids are left as zero meaning they are not yet specified. They will be assigned when the fragments are committed to a dataset.

Return type:

List[FragmentMetadata]

lance.optimize module

class lance.optimize.Compaction

Bases: object

File compaction operation.

To run with multiple threads in a single process, just use execute().

To run with multiple processes, first use plan() to construct a plan, then execute the tasks in parallel, and finally use commit(). The CompactionPlan contains many CompactionTask objects, which can be pickled and sent to other processes. The tasks produce RewriteResult objects, which can be pickled and sent back to the main process to be passed to commit().

static commit(dataset, rewrites)

Commit a compaction operation.

Once tasks from plan() have been executed, the results can be passed to this method to commit the compaction. It is not required that all of the original tasks are passed. For example, if only a subset were successful or completed before a deadline, you can pass just those.

Parameters:
  • dataset (lance.Dataset) – The dataset to compact. The dataset instance will be updated to the new version once committed.

  • rewrites (List[RewriteResult]) – The results of the compaction tasks to include in the commit.

Return type:

CompactionMetrics

static execute(dataset, options)

Execute a full compaction operation.

Parameters:
  • dataset (lance.Dataset) – The dataset to compact. The dataset instance will be updated to the new version once complete.

  • options (CompactionOptions) – The compaction options.

Returns:

The metrics from the compaction operation.

Return type:

CompactionMetrics

static plan(dataset, options)

Plan a compaction operation.

This is intended for users who want to run compaction in a distributed fashion. For running on a single process, use execute() instead.

Parameters:
  • dataset (lance.Dataset) – The dataset to compact.

  • options (CompactionOptions) – The compaction options.

Return type:

CompactionPlan

class lance.optimize.CompactionMetrics

Bases: object

files_added

The number of files that have been added, which is always equal to the number of fragments.

Type:

int

files_removed

The number of files that have been removed, including deletion files.

Type:

int

fragments_added

The number of new fragments that have been added.

Type:

int

fragments_removed

The number of fragments that have been overwritten.

Type:

int

class lance.optimize.CompactionOptions

Bases: TypedDict

Options for compaction.

materialize_deletions: bool | None

Whether to compact fragments with soft deleted rows so they are no longer present in the file. (default: True)

materialize_deletions_threadhold: float | None

The fraction of original rows that are soft deleted in a fragment before the fragment is a candidate for compaction. (default: 0.1 = 10%)

max_rows_per_group: int | None

Max number of rows per group. This does not affect which fragments need compaction, but does affect how they are re-written if selected. (default: 1024)

num_threads: int | None

The number of threads to use when performing compaction. If not specified, defaults to the number of cores on the machine.

target_rows_per_fragment: int | None

The target number of rows per fragment. This is the number of rows that will be in each fragment after compaction. (default: 1024*1024)

class lance.optimize.CompactionPlan

Bases: object

A plan to compact small dataset fragments into larger ones.

Created by lance.optimize.Compaction.plan().

static from_json(json)

Load a plan from a JSON representation.

Parameters:

json (str) – The JSON representation of the plan.

Return type:

CompactionPlan

json()

Get a JSON representation of the plan.

Return type:

str

Warning

The JSON representation is not guaranteed to be stable across versions.

num_tasks()

int : The number of compaction tasks in the plan.

read_version

The read version of the dataset that this plan was created from.

Type:

int

tasks

The individual tasks in the plan.

Type:

List[CompactionTask]

class lance.optimize.CompactionTask

Bases: object

execute(dataset)

Execute the compaction task and return the RewriteResult.

The rewrite result should be passed onto lance.optimize.Compaction.commit().

fragments

The fragments that will be compacted.

Type:

List[lance.fragment.FragmentMetadata]

static from_json(json)

Load a task from a JSON representation.

Parameters:

json (str) – The JSON representation of the task.

Return type:

CompactionTask

json()

Get a JSON representation of the task.

Return type:

str

Warning

The JSON representation is not guaranteed to be stable across versions.

read_version

The read version of the dataset that this task was created from.

Type:

int

class lance.optimize.RewriteResult

Bases: object

The result of a single compaction task.

Created by lance.optimize.CompactionTask.execute().

This result is pickle-able, so it can be serialized and sent back to the main process to be passed to lance.optimize.Compaction.commit().

static from_json(json)

Load a result from a JSON representation.

json()

Get a JSON representation of the result.

Return type:

str

Warning

The JSON representation is not guaranteed to be stable across versions.

metrics

The metrics from this compaction task.

Type:

CompactionMetrics

new_fragments

The metadata for fragments that are being added.

Type:

List[lance.fragment.FragmentMetadata]

original_fragments

The metadata for fragments that are being replaced.

Type:

List[lance.fragment.FragmentMetadata]

read_version

The version of the dataset the optimize operation is based on.

Type:

int

lance.progress module

class lance.progress.FileSystemFragmentWriteProgress(base_uri: str, metadata: Dict[str, str] | None = None)

Bases: FragmentWriteProgress

Progress tracking for Writing a Dataset or Fragment.

Warns:
  • This tracking class is experimental and will change in the future.

  • This implementation writes a JSON file to track in-progress state

  • to the filesystem for each fragment.

PROGRESS_EXT: str = '.in_progress'
begin(fragment: FragmentMetadata, multipart_id: str | None = None, **kwargs)

Called when a new fragment is created.

Parameters:
  • fragment (FragmentMetadata) – The fragment that is open to write to.

  • multipart_id (str, optional) – The multipart id to upload this fragment to cloud storage.

cleanup_partial_writes(dataset_uri: str) int

Finds all in-progress files and cleans up any partially written data files. This is useful for cleaning up after a failed write.

Parameters:

dataset_uri (str) – The URI of the table to clean up.

Returns:

The number of partial writes cleaned up.

Return type:

int

complete(fragment: FragmentMetadata, **kwargs)

Called when a fragment is completed

class lance.progress.FragmentWriteProgress

Bases: ABC

Progress tracking for Writing a Dataset or Fragment.

Warns:

This tracking class is experimental and may change in the future.

abstract begin(fragment: FragmentMetadata, multipart_id: str | None = None, **kwargs) None

Called when a new fragment is about to be written.

Parameters:
  • fragment (FragmentMetadata) – The fragment that is open to write to. The fragment id might not yet be assigned at this point.

  • multipart_id (str, optional) – The multipart id that will be uploaded to cloud storage. This may be used later to abort incomplete uploads if this fragment write fails.

  • kwargs (dict, optional) – Extra keyword arguments to pass to the implementation.

Return type:

None

abstract complete(fragment: FragmentMetadata, **kwargs) None

Callback when a fragment is completely written.

Parameters:
  • fragment (FragmentMetadata) – The fragment that is open to write to.

  • kwargs (dict, optional) – Extra keyword arguments to pass to the implementation.

class lance.progress.NoopFragmentWriteProgress

Bases: FragmentWriteProgress

No-op implementation of WriteProgressTracker.

This is the default implementation.

begin(fragment: FragmentMetadata, multipart_id: str | None = None, **kargs)

Called when a new fragment is about to be written.

Parameters:
  • fragment (FragmentMetadata) – The fragment that is open to write to. The fragment id might not yet be assigned at this point.

  • multipart_id (str, optional) – The multipart id that will be uploaded to cloud storage. This may be used later to abort incomplete uploads if this fragment write fails.

  • kwargs (dict, optional) – Extra keyword arguments to pass to the implementation.

Return type:

None

complete(fragment: FragmentMetadata, **kwargs)

Callback when a fragment is completely written.

Parameters:
  • fragment (FragmentMetadata) – The fragment that is open to write to.

  • kwargs (dict, optional) – Extra keyword arguments to pass to the implementation.

lance.sampler module

class lance.sampler.FragmentSampler

Bases: Sampler

Sampling over Fragments.

To implement a new FragmentSampler, you can implement the iter_fragments method to yield fragments in desired order.

abstract iter_fragments(ds: lance.LanceDataset, *args, **kwargs) Generator[lance.LanceFragment, None, None]

Iterate over data fragments.

class lance.sampler.FullScanSampler

Bases: FragmentSampler

Default Sampler, which scan the entire dataset sequentially.

iter_fragments(dataset: lance.LanceDataset, **kwargs) Generator[lance.LanceFragment, None, None]

Iterate over data fragments.

class lance.sampler.Sampler

Bases: ABC

Sampler over LanceDataset.

To implement a new Sampler, you can implement the __call__ method to yield a pyarrow.RecordBatch.

class lance.sampler.ShardedBatchSampler(rank: int, world_size: int, randomize: bool = False, seed: int = 0)

Bases: Sampler

Sharded batch sampler.

Each rank / process will process a subset of the batches.

The input is subdivided into batches (of size batch_size). Each rank / process takes every Nth batch (where N is the world size). The order in which batches are loaded is randomized.

When there is no filter then each process only needs to load the rows assigned to it but this process is still slightly less efficient than ShardedFragmentSampler since it requires loading rows by range instead of loading all rows for a given fragment.

If there is a filter then we cannot divide the row ids ahead of time. Instead, each process will load the entire filtered dataset and discard the rows that are not assigned to it. The resulting stream is then randomized via a reservoir sampler. This does not perfectly randomize the stream but it should generate a stream that is random enough for many use cases.

static from_torch(randomize: bool = False, seed: int = 0) ShardedBatchSampler

Use it from a PyTorch distributed environment.

Automatically infer rank and world_size from torch.distributed.

class lance.sampler.ShardedFragmentSampler(rank: int, world_size: int, randomize: bool = False, seed: int = 0)

Bases: FragmentSampler

Sharded fragments by rank and world_size.

Each rank / process will process a subset of the fragments. It yields batches from ds.fragments[rank::world_size].

This sampler is more efficient than ShardedBatchSampler when the dataset is large.

Parameters:
  • rank (int) – The rank of the process in the distributed cluster.

  • world_size (int) – The total number of processes in the distributed cluster.

  • randomize (bool) – If set true, randomize

  • seed (int) – The random seed to use when randomize is set true.

static from_torch(randomize: bool = False, seed: int = 0) ShardedFragmentSampler

Use from a PyTorch distributed environment.

Automatically infer rank and world_size from torch.distributed.

iter_fragments(dataset: lance.LanceDataset, **kwargs) Generator[lance.LanceFragment, None, None]

Iterate over data fragments.

lance.sampler.maybe_sample(dataset: str | Path | lance.LanceDataset, n: int, columns: list[str] | dict[str, str] | str, batch_size: int = 10240, max_takes: int = 2048) Generator[pa.RecordBatch, None, None]

Sample n records from the dataset.

Parameters:
  • dataset (Union[str, Path, lance.LanceDataset]) – The dataset to sample from.

  • n (int) – The number of records to sample.

  • columns (Union[list[str], dict[str, str], str]) – The columns to load.

  • batch_size (int, optional) – The batch size to use when loading the data, by default 10240.

  • max_takes (int, optional) – The maximum number of takes to perform, by default 2048. This is employed to minimize the number of random reads necessary for sampling. A sufficiently large value can provide an effective random sample without the need for excessive random reads.

Returns:

A generator that yields [RecordBatch] of data.

Return type:

Generator[pa.RecordBatch]

lance.schema module

class lance.schema.LanceSchema

Bases: object

A Lance Schema.

Unlike a PyArrow schema, a Lance schema assigns every field an integer id. This is used to track fields across versions. This assignment of fields to ids is initially done in depth-first order, but as a schema evolves the assignment may change.

The assignment of field ids is particular to each dataset, so these schemas cannot be used interchangeably between datasets.

static from_pyarrow(schema)

Create a Lance schema from a PyArrow schema.

This will assign field ids in depth-first order. Be aware this may not match the correct schema for a particular table.

to_pyarrow()

Convert the schema to a PyArrow schema.

lance.schema.json_to_schema(schema_json: Dict[str, Any]) Schema

Converts a JSON string to a PyArrow schema.

Parameters:

schema_json (Dict[str, Any]) – The JSON payload to convert to a PyArrow Schema.

lance.schema.schema_to_json(schema: Schema) Dict[str, Any]

Converts a pyarrow schema to a JSON string.

lance.tracing module

lance.tracing.lance_trace_to_chrome(path=None, level=None)
lance.tracing.trace_to_chrome(*, file: str = None, level: str = None)

Begins tracing lance events to a chrome trace file.

The trace file can be opened with chrome://tracing or with the Perfetto UI.

The file will be finished (and closed) when the python process exits.

Parameters:
  • file (str, optional) – The file to write the trace to. If None, then a file in the current directory will be created named ./trace-{unix epoch in micros}.json

  • level (str, optional) – The level of detail to trace. One of “trace”, “debug”, “info”, “warn” or “error”. If None, then “info” is used.

lance.util module

class lance.util.HNSW(hnsw)

Bases: object

static build(vectors_array: Iterator[Array], max_level=7, m=20, m_max=40, ef_construction=100, use_select_heuristic=True) HNSW
to_lance_file(file_path)
vectors() Array
class lance.util.KMeans(k: int, metric_type: Literal['l2', 'dot', 'cosine'] = 'l2', max_iters: int = 50)

Bases: object

KMean model for clustering.

It works with 2-D arrays of float32 type, and support distance metrics: “l2”, “cosine”, “dot”.

Note, you must train the kmeans model by calling fit() before calling predict(). Calling fit() again will reset the model.

Currently, the initial centroids are initialized randomly. kmean++ is implemented but not exposed yet.

Examples

>>> import numpy as np
>>> import lance
>>> data = np.random.randn(1000, 128).astype(np.float32)
>>> kmeans = lance.util.KMeans(8, metric_type="l2")
>>> kmeans.fit(data)
>>> centroids = np.stack(kmeans.centroids.to_numpy(zero_copy_only=False))
>>> clusters = kmeans.predict(data)
property centroids: FixedShapeTensorType | None

Returns the centroids of the model,

Returns None if the model is not trained.

fit(data: FixedSizeListArray | FixedShapeTensorArray | ndarray)

Fit the model to the data.

Parameters:

data (pa.FixedSizeListArray, pa.FixedShapeTensorArray, np.ndarray) – The data to fit the model to. Must be a 2-D array of float32 type.

predict(data: FixedSizeListArray | FixedShapeTensorArray | ndarray) UInt32Array

Predict the cluster for each vector in the data.

lance.util.build_sq_storage(row_ids_array: Iterator[Array], vectors_array: Array, dim, bounds: tuple) RecordBatch
lance.util.sanitize_ts(ts: ts_types) datetime

Returns a python datetime object from various timestamp input types.

lance.util.td_to_micros(td: timedelta) int

Returns the number of microseconds in a timedelta object.

lance.util.validate_vector_index(dataset, column: str, refine_factor: int = 5, sample_size: int | None = None, pass_threshold: float = 1.0)

Run in-sample queries and make sure that the recall for k=1 is very high (should be near 100%)

Parameters:
  • dataset (LanceDataset) – The dataset to sanity check.

  • column (str) – The column name of the vector column.

  • refine_factor (int, default=5) – The refine factor to use for the nearest neighbor query.

  • sample_size (int, optional) – The number of vectors to sample from the dataset. If None, the entire dataset will be used.

  • pass_threshold (float, default=1.0) – The minimum fraction of vectors that must pass the sanity check. If less than this fraction of vectors pass, a ValueError will be raised.

lance.vector module

Embedding vector utilities

lance.vector.compute_partitions(dataset: LanceDataset, column: str, kmeans: Any, batch_size: int = 10240, spill_dir: str | Path = None) str

Compute partitions for each row using GPU kmeans and spill to disk.

Parameters:
  • dataset (LanceDataset) – Dataset to compute partitions for.

  • column (str) – Column name of the vector column.

  • kmeans (lance.torch.kmeans.KMeans) – KMeans model to use to compute partitions.

  • batch_size (int, default 10240) – The batch size used to read the dataset.

  • spill_dir (Path) – The path to store the partitions.

Returns:

The absolute path of the partition dataset.

Return type:

str

lance.vector.train_ivf_centroids_on_accelerator(dataset: LanceDataset, column: str, k: int, metric_type: Literal['l2', 'cosine', 'dot'], accelerator: str | 'torch.Device', *, sample_rate: int = 256, max_iters: int = 50)

Use accelerator (GPU or MPS) to train kmeans.

lance.vector.vec_to_table(data: dict | list | ndarray, names: str | list | None = None, ndim: int | None = None, check_ndim: bool = True) Table

Create a pyarrow Table containing vectors. Vectors are created as FixedSizeListArray’s in pyarrow with Float32 values.

Examples

>>> import numpy as np
>>> np.random.seed(0)
>>> from lance.vector import vec_to_table
>>> dd = {"vector0": np.random.randn(10), "vector1": np.random.randn(10)}
>>> vec_to_table(dd)
pyarrow.Table
id: string
vector: fixed_size_list<item: float>[10]
  child 0, item: float
----
id: [["vector0","vector1"]]
vector: [[[1.7640524,0.4001572,0.978738,2.2408931,1.867558,-0.9772779,0.95008844,-0.1513572,-0.10321885,0.41059852],[0.14404356,1.4542735,0.7610377,0.121675014,0.44386324,0.33367434,1.4940791,-0.20515826,0.3130677,-0.85409576]]]
>>> vec_to_table(dd).to_pandas()
        id                                             vector
0  vector0  [1.7640524, 0.4001572, 0.978738, 2.2408931, 1....
1  vector1  [0.14404356, 1.4542735, 0.7610377, 0.121675014...
Parameters:
  • data (dict, list, or np.ndarray) – If dict, the keys are added as “id” column If list, then each element is assumed to be a vector If ndarray, then each row is assumed to be a vector

  • names (str or list, optional) – If data is dict, then names should be a list of 2 str; default [“id”, “vector”] If data is list or ndarray, then names should be str; default “vector”

  • ndim (int, optional) – Number of dimensions of the vectors. Inferred if omitted.

  • check_ndim (bool, default True) – Whether to verify that all vectors have the same length

Returns:

tbl – A pyarrow Table with vectors converted to appropriate types

Return type:

pa.Table

Module contents

class lance.FragmentMetadata(metadata: str)

Bases: object

Metadata of a Fragment in the dataset.

data_files() Iterable[str]

Return the data files of the fragment

deletion_file()

Return the deletion file, if any

static from_json(json_data: str) FragmentMetadata

Reconstruct FragmentMetadata from a JSON blob

classmethod from_metadata(metadata: _FragmentMetadata)
property id: int
to_json() str

Serialize FragmentMetadata to a JSON blob

class lance.LanceDataset(uri: str | Path, version: int | None = None, block_size: int | None = None, index_cache_size: int | None = None, metadata_cache_size: int | None = None, commit_lock: CommitLock | None = None, storage_options: Dict[str, str] | None = None)

Bases: Dataset

A dataset in Lance format where the data is stored at the given uri.

add_columns(transforms: Dict[str, str] | BatchUDF, read_columns: List[str] | None = None)

Add new columns with defined values.

There are two ways to specify the new columns. First, you can provide SQL expressions for each new column. Second you can provide a UDF that takes a batch of existing data and returns a new batch with the new columns. These new columns will be appended to the dataset.

See the lance.add_columns_udf() decorator for more information on writing UDFs.

Parameters:
  • transforms (dict or AddColumnsUDF) – If this is a dictionary, then the keys are the names of the new columns and the values are SQL expression strings. These strings can reference existing columns in the dataset. If this is a AddColumnsUDF, then it is a UDF that takes a batch of existing data and returns a new batch with the new columns.

  • read_columns (list of str, optional) – The names of the columns that the UDF will read. If None, then the UDF will read all columns. This is only used when transforms is a UDF. Otherwise, the read columns are inferred from the SQL expressions.

Examples

>>> import lance
>>> import pyarrow as pa
>>> table = pa.table({"a": [1, 2, 3]})
>>> dataset = lance.write_dataset(table, "my_dataset")
>>> @lance.batch_udf()
... def double_a(batch):
...     df = batch.to_pandas()
...     return pd.DataFrame({'double_a': 2 * df['a']})
>>> dataset.add_columns(double_a)
>>> dataset.to_table().to_pandas()
   a  double_a
0  1         2
1  2         4
2  3         6
>>> dataset.add_columns({"triple_a": "a * 3"})
>>> dataset.to_table().to_pandas()
   a  double_a  triple_a
0  1         2         3
1  2         4         6
2  3         6         9

See also

LanceDataset.merge

Merge a pre-computed set of columns into the dataset.

alter_columns(*alterations: Iterable[Dict[str, Any]])

Alter column name, data type, and nullability.

Columns that are renamed can keep any indices that are on them. If a column has an IVF_PQ index, it can be kept if the column is casted to another type. However, other index types don’t support casting at this time.

Column types can be upcasted (such as int32 to int64) or downcasted (such as int64 to int32). However, downcasting will fail if there are any values that cannot be represented in the new type. In general, columns can be casted to same general type: integers to integers, floats to floats, and strings to strings. However, strings, binary, and list columns can be casted between their size variants. For example, string to large string, binary to large binary, and list to large list.

Parameters:

alterations (Iterable[Dict[str, Any]]) –

A sequence of dictionaries, each with the following keys: - “path”: str

The column path to alter. For a top-level column, this is the name. For a nested column, this is the dot-separated path, e.g. “a.b.c”.

  • ”name”: str, optional

    The new name of the column. If not specified, the column name is not changed.

  • ”nullable”: bool, optional

    Whether the column should be nullable. If not specified, the column nullability is not changed. Only non-nullable columns can be changed to nullable. Currently, you cannot change a nullable column to non-nullable.

  • ”data_type”: pyarrow.DataType, optional

    The new data type to cast the column to. If not specified, the column data type is not changed.

Examples

>>> import lance
>>> import pyarrow as pa
>>> schema = pa.schema([pa.field('a', pa.int64()),
...                     pa.field('b', pa.string(), nullable=False)])
>>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]})
>>> dataset = lance.write_dataset(table, "example")
>>> dataset.alter_columns({"path": "a", "name": "x"},
...                       {"path": "b", "nullable": True})
>>> dataset.to_table().to_pandas()
   x  b
0  1  a
1  2  b
2  3  c
>>> dataset.alter_columns({"path": "x", "data_type": pa.int32()})
>>> dataset.schema
x: int32
b: string
checkout_version(version) LanceDataset

Load the given version of the dataset.

Unlike the dataset() constructor, this will re-use the current cache. This is a no-op if the dataset is already at the given version.

cleanup_old_versions(older_than: timedelta | None = None, *, delete_unverified: bool = False) CleanupStats

Cleans up old versions of the dataset.

Some dataset changes, such as overwriting, leave behind data that is not referenced by the latest dataset version. The old data is left in place to allow the dataset to be restored back to an older version.

This method will remove older versions and any data files they reference. Once this cleanup task has run you will not be able to checkout or restore these older versions.

Parameters:
  • older_than (timedelta, optional) – Only versions older than this will be removed. If not specified, this will default to two weeks.

  • delete_unverified (bool, default False) –

    Files leftover from a failed transaction may appear to be part of an in-progress operation (e.g. appending new data) and these files will not be deleted unless they are at least 7 days old. If delete_unverified is True then these files will be deleted regardless of their age.

    This should only be set to True if you can guarantee that no other process is currently working on this dataset. Otherwise the dataset could be put into a corrupted state.

static commit(base_uri: str | Path, operation: LanceOperation.BaseOperation, read_version: int | None = None, commit_lock: CommitLock | None = None) LanceDataset

Create a new version of dataset

This method is an advanced method which allows users to describe a change that has been made to the data files. This method is not needed when using Lance to apply changes (e.g. when using LanceDataset or write_dataset().)

It’s current purpose is to allow for changes being made in a distributed environment where no single process is doing all of the work. For example, a distributed bulk update or a distributed bulk modify operation.

Once all of the changes have been made, this method can be called to make the changes visible by updating the dataset manifest.

Warning

This is an advanced API and doesn’t provide the same level of validation as the other APIs. For example, it’s the responsibility of the caller to ensure that the fragments are valid for the schema.

Parameters:
  • base_uri (str or Path) – The base uri of the dataset

  • operation (BaseOperation) – The operation to apply to the dataset. This describes what changes have been made. See available operations under LanceOperation.

  • read_version (int, optional) – The version of the dataset that was used as the base for the changes. This is not needed for overwrite or restore operations.

  • commit_lock (CommitLock, optional) – A custom commit lock. Only needed if your object store does not support atomic commits. See the user guide for more details.

Returns:

A new version of Lance Dataset.

Return type:

LanceDataset

Examples

Creating a new dataset with the LanceOperation.Overwrite operation:

>>> import lance
>>> import pyarrow as pa
>>> tab1 = pa.table({"a": [1, 2], "b": ["a", "b"]})
>>> tab2 = pa.table({"a": [3, 4], "b": ["c", "d"]})
>>> fragment1 = lance.fragment.LanceFragment.create("example", tab1)
>>> fragment2 = lance.fragment.LanceFragment.create("example", tab2)
>>> fragments = [fragment1, fragment2]
>>> operation = lance.LanceOperation.Overwrite(tab1.schema, fragments)
>>> dataset = lance.LanceDataset.commit("example", operation)
>>> dataset.to_table().to_pandas()
   a  b
0  1  a
1  2  b
2  3  c
3  4  d
count_rows(filter: Expression | str | None = None, **kwargs) int

Count rows matching the scanner filter.

Parameters:

**kwargs (dict, optional) – See py:method:scanner method for full parameter description.

Returns:

count – The total number of rows in the dataset.

Return type:

int

create_index(column: str | List[str], index_type: str, name: str | None = None, metric: str = 'L2', replace: bool = False, num_partitions: int | None = None, ivf_centroids: np.ndarray | pa.FixedSizeListArray | pa.FixedShapeTensorArray | None = None, pq_codebook: np.ndarray | pa.FixedSizeListArray | pa.FixedShapeTensorArray | None = None, num_sub_vectors: int | None = None, accelerator: str | 'torch.Device' | None = None, index_cache_size: int | None = None, shuffle_partition_batches: int | None = None, shuffle_partition_concurrency: int | None = None, **kwargs) LanceDataset

Create index on column.

Experimental API

Parameters:
  • column (str) – The column to be indexed.

  • index_type (str) – The type of the index. "IVF_PQ, IVF_HNSW_PQ and IVF_HNSW_SQ" are supported now.

  • name (str, optional) – The index name. If not provided, it will be generated from the column name.

  • metric (str) – The distance metric type, i.e., “L2” (alias to “euclidean”), “cosine” or “dot” (dot product). Default is “L2”.

  • replace (bool) – Replace the existing index if it exists.

  • num_partitions (int, optional) – The number of partitions of IVF (Inverted File Index).

  • ivf_centroids (np.ndarray, pyarrow.FixedSizeListArray) –

  • Optional. (or pyarrow.FixedShapeTensorArray.) – A num_partitions x dimension array of K-mean centroids for IVF clustering. If not provided, a new Kmean model will be trained.

  • pq_codebook (np.ndarray, pyarrow.FixedSizeListArray) –

  • Optional. – A num_sub_vectors x (2 ^ nbits * dimensions // num_sub_vectors) array of K-mean centroids for PQ codebook. Note: nbits is always 8 for now. If not provided, a new PQ model will be trained.

  • num_sub_vectors (int, optional) – The number of sub-vectors for PQ (Product Quantization).

  • accelerator (str or torch.Device, optional) – If set, use an accelerator to speed up the training process. Accepted accelerator: “cuda” (Nvidia GPU) and “mps” (Apple Silicon GPU). If not set, use the CPU.

  • index_cache_size (int, optional) – The size of the index cache in number of entries. Default value is 256.

  • shuffle_partition_batches (int, optional) –

    The number of batches, using the row group size of the dataset, to include in each shuffle partition. Default value is 10240.

    Assuming the row group size is 1024, each shuffle partition will hold 10240 * 1024 = 10,485,760 rows. By making this value smaller, this shuffle will consume less memory but will take longer to complete, and vice versa.

  • shuffle_partition_concurrency (int, optional) –

    The number of shuffle partitions to process concurrently. Default value is 2

    By making this value smaller, this shuffle will consume less memory but will take longer to complete, and vice versa.

  • kwargs – Parameters passed to the index building process.

  • type (The SQ (Scalar Quantization) is available for only "IVF_HNSW_SQ" index) –

:param : :param this quantization method is used to reduce the memory usage of the index: :param : :param it maps the float vectors to integer vectors: :param each integer is of num_bits: :param : :param now only 8 bits are supported.: :param If index_type is “IVF_*”: num_partitions :param then the following parameters are required: num_partitions :param If index_type is with “PQ”: num_sub_vectors :param then the following parameters are required: num_sub_vectors :param Optional parameters for “IVF_PQ”:

ivf_centroids :

K-mean centroids for IVF clustering.

Parameters:

"IVF_HNSW_*" (Optional parameters for) –

max_levelint

the maximum number of levels in the graph.

mint

the number of edges per node in the graph.

m_maxint

the maximum number of edges per node in the graph.

ef_constructionint

the number of nodes to examine during the construction.

Examples

import lance

dataset = lance.dataset("/tmp/sift.lance")
dataset.create_index(
    "vector",
    "IVF_PQ",
    num_partitions=256,
    num_sub_vectors=16
)
import lance

dataset = lance.dataset("/tmp/sift.lance")
dataset.create_index(
    "vector",
    "IVF_HNSW_SQ",
    num_partitions=256,
)

Experimental Accelerator (GPU) support:

  • accelerate: use GPU to train IVF partitions.

    Only supports CUDA (Nvidia) or MPS (Apple) currently. Requires PyTorch being installed.

import lance

dataset = lance.dataset("/tmp/sift.lance")
dataset.create_index(
    "vector",
    "IVF_PQ",
    num_partitions=256,
    num_sub_vectors=16,
    accelerator="cuda"
)

References

create_scalar_index(column: str, index_type: Literal['BTREE'], name: str | None = None, *, replace: bool = True)

Create a scalar index on a column.

Scalar indices, like vector indices, can be used to speed up scans. A scalar index can speed up scans that contain filter expressions on the indexed column. For example, the following scan will be faster if the column my_col has a scalar index:

import lance

dataset = lance.dataset("/tmp/images.lance")
my_table = dataset.scanner(filter="my_col != 7").to_table()

Scalar indices can also speed up scans containing a vector search and a prefilter:

Scalar indices can only speed up scans for basic filters using equality, comparison, range (e.g. my_col BETWEEN 0 AND 100), and set membership (e.g. my_col IN (0, 1, 2))

Scalar indices can be used if the filter contains multiple indexed columns and the filter criteria are AND’d or OR’d together (e.g. my_col < 0 AND other_col> 100)

Scalar indices may be used if the filter contains non-indexed columns but, depending on the structure of the filter, they may not be usable. For example, if the column not_indexed does not have a scalar index then the filter my_col = 0 OR not_indexed = 1 will not be able to use any scalar index on my_col.

To determine if a scan is making use of a scalar index you can use explain_plan to look at the query plan that lance has created. Queries that use scalar indices will either have a ScalarIndexQuery relation or a MaterializeIndex operator.

Currently, the only type of scalar index available is BTREE. This index combines is inspired by the btree data structure although only the first few layers of the btree are cached in memory.

Note that the LANCE_BYPASS_SPILLING environment variable can be used to bypass spilling to disk. Setting this to true can avoid memory exhaustion issues (see https://github.com/apache/datafusion/issues/10073 for more info).

Experimental API

Parameters:
  • column (str) – The column to be indexed. Must be a boolean, integer, float, or string column.

  • index_type (str) – The type of the index. Only "BTREE" is supported now.

  • name (str, optional) – The index name. If not provided, it will be generated from the column name.

  • replace (bool, default True) – Replace the existing index if it exists.

Examples

import lance

dataset = lance.dataset("/tmp/images.lance")
dataset.create_index(
    "category",
    "BTREE",
)
delete(predicate: str | Expression)

Delete rows from the dataset.

This marks rows as deleted, but does not physically remove them from the files. This keeps the existing indexes still valid.

Parameters:

predicate (str or pa.compute.Expression) – The predicate to use to select rows to delete. May either be a SQL string or a pyarrow Expression.

Examples

>>> import lance
>>> import pyarrow as pa
>>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]})
>>> dataset = lance.write_dataset(table, "example")
>>> dataset.delete("a = 1 or b in ('a', 'b')")
>>> dataset.to_table()
pyarrow.Table
a: int64
b: string
----
a: [[3]]
b: [["c"]]
drop_columns(columns: List[str])

Drop one or more columns from the dataset

Parameters:
  • columns (list of str) – The names of the columns to drop. These can be nested column references (e.g. “a.b.c”) or top-level column names (e.g. “a”).

  • the (This is a metadata-only operation and does not remove the data from) –

  • data (underlying storage. In order to remove the) –

  • subsequently (you must) –

  • and (call compact_files to rewrite the data without the removed columns) –

  • files. (then call cleanup_files to remove the old) –

Examples

>>> import lance
>>> import pyarrow as pa
>>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]})
>>> dataset = lance.write_dataset(table, "example")
>>> dataset.drop_columns(["a"])
>>> dataset.to_table().to_pandas()
   b
0  a
1  b
2  c
get_fragment(fragment_id: int) LanceFragment | None

Get the fragment with fragment id.

get_fragments(filter: Expression | None = None) List[LanceFragment]

Get all fragments from the dataset.

Note: filter is not supported yet.

property has_index
head(num_rows, **kwargs)

Load the first N rows of the dataset.

Parameters:
  • num_rows (int) – The number of rows to load.

  • **kwargs (dict, optional) – See scanner() method for full parameter description.

Returns:

table

Return type:

Table

index_statistics(index_name: str) Dict[str, Any]
join(right_dataset, keys, right_keys=None, join_type='left outer', left_suffix=None, right_suffix=None, coalesce_keys=True, use_threads=True)

Not implemented (just override pyarrow dataset to prevent segfault)

property lance_schema: LanceSchema

The LanceSchema for this dataset

property latest_version: int

Returns the latest version of the dataset.

list_indices() List[Dict[str, Any]]
merge(data_obj: ReaderLike, left_on: str, right_on: str | None = None, schema=None)

Merge another dataset into this one.

Performs a left join, where the dataset is the left side and data_obj is the right side. Rows existing in the dataset but not on the left will be filled with null values, unless Lance doesn’t support null values for some types, in which case an error will be raised.

Parameters:
  • data_obj (Reader-like) – The data to be merged. Acceptable types are: - Pandas DataFrame, Pyarrow Table, Dataset, Scanner, Iterator[RecordBatch], or RecordBatchReader

  • left_on (str) – The name of the column in the dataset to join on.

  • right_on (str or None) – The name of the column in data_obj to join on. If None, defaults to left_on.

Examples

>>> import lance
>>> import pyarrow as pa
>>> df = pa.table({'x': [1, 2, 3], 'y': ['a', 'b', 'c']})
>>> dataset = lance.write_dataset(df, "dataset")
>>> dataset.to_table().to_pandas()
   x  y
0  1  a
1  2  b
2  3  c
>>> new_df = pa.table({'x': [1, 2, 3], 'z': ['d', 'e', 'f']})
>>> dataset.merge(new_df, 'x')
>>> dataset.to_table().to_pandas()
   x  y  z
0  1  a  d
1  2  b  e
2  3  c  f

See also

LanceDataset.add_columns

Add new columns by computing batch-by-batch.

merge_insert(on: str | Iterable[str])

Returns a builder that can be used to create a “merge insert” operation

This operation can add rows, update rows, and remove rows in a single transaction. It is a very generic tool that can be used to create behaviors like “insert if not exists”, “update or insert (i.e. upsert)”, or even replace a portion of existing data with new data (e.g. replace all data where month=”january”)

The merge insert operation works by combining new data from a source table with existing data in a target table by using a join. There are three categories of records.

“Matched” records are records that exist in both the source table and the target table. “Not matched” records exist only in the source table (e.g. these are new data). “Not matched by source” records exist only in the target table (this is old data).

The builder returned by this method can be used to customize what should happen for each category of data.

Please note that the data will be reordered as part of this operation. This is because updated rows will be deleted from the dataset and then reinserted at the end with the new values. The order of the newly inserted rows may fluctuate randomly because a hash-join operation is used internally.

Parameters:

on (Union[str, Iterable[str]]) – A column (or columns) to join on. This is how records from the source table and target table are matched. Typically this is some kind of key or id column.

Examples

>>> import lance
>>> import pyarrow as pa
>>> table = pa.table({"a": [2, 1, 3], "b": ["a", "b", "c"]})
>>> dataset = lance.write_dataset(table, "example")
>>> new_table = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]})
>>> # Perform a "upsert" operation
>>> dataset.merge_insert("a")             \
...        .when_matched_update_all()     \
...        .when_not_matched_insert_all() \
...        .execute(new_table)
>>> dataset.to_table().sort_by("a").to_pandas()
   a  b
0  1  b
1  2  x
2  3  y
3  4  z
property optimize: DatasetOptimizer
property partition_expression

Not implemented (just override pyarrow dataset to prevent segfault)

replace_schema(schema: Schema)

Not implemented (just override pyarrow dataset to prevent segfault)

restore()

Restore the currently checked out version as the latest version of the dataset.

This creates a new commit.

sample(num_rows: int, columns: List[str] | Dict[str, str] | None = None, randomize_order: bool = True, **kwargs) Table

Select a random sample of data

Parameters:
  • num_rows (int) – number of rows to retrieve

  • columns (list of str, or dict of str to str default None) – List of column names to be fetched. Or a dictionary of column names to SQL expressions. All columns are fetched if None or unspecified.

  • **kwargs (dict, optional) – see scanner() method for full parameter description.

Returns:

table

Return type:

Table

scanner(columns: List[str] | Dict[str, str] | None = None, filter: Expression | str | None = None, limit: int | None = None, offset: int | None = None, nearest: dict | None = None, batch_size: int | None = None, batch_readahead: int | None = None, fragment_readahead: int | None = None, scan_in_order: bool = True, fragments: Iterable[LanceFragment] | None = None, *, prefilter: bool = False, with_row_id: bool = False, use_stats: bool = True) LanceScanner

Return a Scanner that can support various pushdowns.

Parameters:
  • columns (list of str, or dict of str to str default None) – List of column names to be fetched. Or a dictionary of column names to SQL expressions. All columns are fetched if None or unspecified.

  • filter (pa.compute.Expression or str) –

    Expression or str that is a valid SQL where clause. See Lance filter pushdown for valid SQL expressions.

  • limit (int, default None) – Fetch up to this many rows. All rows if None or unspecified.

  • offset (int, default None) – Fetch starting with this row. 0 if None or unspecified.

  • nearest (dict, default None) –

    Get the rows corresponding to the K most similar vectors. Example:

    {
        "column": <embedding col name>,
        "q": <query vector as pa.Float32Array>,
        "k": 10,
        "nprobes": 1,
        "refine_factor": 1
    }
    

  • batch_size (int, default None) – The max size of batches returned.

  • batch_readahead (int, optional) – The number of batches to read ahead.

  • fragment_readahead (int, optional) – The number of fragments to read ahead.

  • scan_in_order (bool, default True) – Whether to read the fragments and batches in order. If false, throughput may be higher, but batches will be returned out of order and memory use might increase.

  • fragments (iterable of LanceFragment, default None) – If specified, only scan these fragments. If scan_in_order is True, then the fragments will be scanned in the order given.

  • prefilter (bool, default False) –

    If True then the filter will be applied before the vector query is run. This will generate more correct results but it may be a more costly query. It’s generally good when the filter is highly selective.

    If False then the filter will be applied after the vector query is run. This will perform well but the results may have fewer than the requested number of rows (or be empty) if the rows closest to the query do not match the filter. It’s generally good when the filter is not very selective.

Notes

For now, if BOTH filter and nearest is specified, then: 1. nearest is executed first. 2. The results are filtered afterwards.

For debugging ANN results, you can choose to not use the index even if present by specifying use_index=False. For example, the following will always return exact KNN results:

dataset.to_table(nearest={
    "column": "vector",
    "k": 10,
    "q": <query vector>,
    "use_index": False
}
property schema: Schema

The pyarrow Schema for this dataset

property stats: LanceStats

Experimental API

take(indices: List[int] | Array, columns: List[str] | Dict[str, str] | None = None, **kwargs) Table

Select rows of data by index.

Parameters:
  • indices (Array or array-like) – indices of rows to select in the dataset.

  • columns (list of str, or dict of str to str default None) – List of column names to be fetched. Or a dictionary of column names to SQL expressions. All columns are fetched if None or unspecified.

  • **kwargs (dict, optional) – See scanner() method for full parameter description.

Returns:

table

Return type:

Table

to_batches(columns: List[str] | Dict[str, str] | None = None, filter: Expression | str | None = None, limit: int | None = None, offset: int | None = None, nearest: dict | None = None, batch_size: int | None = None, batch_readahead: int | None = None, fragment_readahead: int | None = None, scan_in_order: bool = True, *, prefilter: bool = False, with_row_id: bool = False, use_stats: bool = True, **kwargs) Iterator[RecordBatch]

Read the dataset as materialized record batches.

Parameters:

**kwargs (dict, optional) – Arguments for Scanner.from_dataset.

Returns:

record_batches

Return type:

Iterator of RecordBatch

to_table(columns: List[str] | Dict[str, str] | None = None, filter: Expression | str | None = None, limit: int | None = None, offset: int | None = None, nearest: dict | None = None, batch_size: int | None = None, batch_readahead: int | None = None, fragment_readahead: int | None = None, scan_in_order: bool = True, *, prefilter: bool = False, with_row_id: bool = False, use_stats: bool = True) Table

Read the data into memory as a pyarrow Table.

Parameters:
  • columns (list of str, or dict of str to str default None) – List of column names to be fetched. Or a dictionary of column names to SQL expressions. All columns are fetched if None or unspecified.

  • filter (pa.compute.Expression or str) –

    Expression or str that is a valid SQL where clause. See Lance filter pushdown for valid SQL expressions.

  • limit (int, default None) – Fetch up to this many rows. All rows if None or unspecified.

  • offset (int, default None) – Fetch starting with this row. 0 if None or unspecified.

  • nearest (dict, default None) –

    Get the rows corresponding to the K most similar vectors. Example:

    {
        "column": <embedding col name>,
        "q": <query vector as pa.Float32Array>,
        "k": 10,
        "metric": "cosine",
        "nprobes": 1,
        "refine_factor": 1
    }
    

  • batch_size (int, optional) – The number of rows to read at a time.

  • batch_readahead (int, optional) – The number of batches to read ahead.

  • fragment_readahead (int, optional) – The number of fragments to read ahead.

  • scan_in_order (bool, default True) – Whether to read the fragments and batches in order. If false, throughput may be higher, but batches will be returned out of order and memory use might increase.

  • prefilter (bool, default False) – Run filter before the vector search.

  • with_row_id (bool, default False) – Return physical row ID.

  • use_stats (bool, default True) – Use stats pushdown during filters.

Notes

If BOTH filter and nearest is specified, then: 1. nearest is executed first. 2. The results are filtered afterward, unless pre-filter sets to True.

update(updates: Dict[str, str], where: str | None = None)

Update column values for rows matching where.

Parameters:
  • updates (dict of str to str) – A mapping of column names to a SQL expression.

  • where (str, optional) – A SQL predicate indicating which rows should be updated.

Examples

>>> import lance
>>> import pyarrow as pa
>>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]})
>>> dataset = lance.write_dataset(table, "example")
>>> dataset.update(dict(a = 'a + 2'), where="b != 'a'")
>>> dataset.to_table().to_pandas()
   a  b
0  1  a
1  4  b
2  5  c
property uri: str

The location of the data

validate()

Validate the dataset.

This checks the integrity of the dataset and will raise an exception if the dataset is corrupted.

property version: int

Returns the currently checked out version of the dataset

versions()

Return all versions in this dataset.

class lance.LanceFragment(dataset: LanceDataset, fragment_id: int | None, *, fragment: _Fragment | None = None)

Bases: Fragment

add_columns(value_func: Callable[[RecordBatch], RecordBatch], columns: list[str] | None = None) FragmentMetadata

Add columns to this Fragment.

Deprecated since version 0.10.14: Use merge_columns() instead.

Warning

Internal API. This method is not intended to be used by end users.

Parameters:
  • value_func (Callable.) – A function that takes a RecordBatch as input and returns a RecordBatch.

  • columns (Optional[list[str]].) – If specified, only the columns in this list will be passed to the value_func. Otherwise, all columns will be passed to the value_func.

See also

lance.dataset.LanceOperation.Merge

The operation used to commit these changes to the dataset. See the doc page for an example of using this API.

Returns:

A new fragment with the added column(s).

Return type:

FragmentMetadata

count_rows(self, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, MemoryPool memory_pool=None)

Count rows matching the scanner filter.

Parameters:
  • filter (Expression, default None) – Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.

  • batch_size (int, default 131_072) – The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.

  • batch_readahead (int, default 16) – The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.

  • fragment_readahead (int, default 4) – The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.

  • fragment_scan_options (FragmentScanOptions, default None) – Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.

  • use_threads (bool, default True) – If enabled, then maximum parallelism will be used determined by the number of available CPU cores.

  • memory_pool (MemoryPool, default None) – For memory allocations, if required. If not specified, uses the default pool.

Returns:

count

Return type:

int

static create(dataset_uri: str | Path, data: Table | RecordBatchReader, fragment_id: int | None = None, schema: Schema | None = None, max_rows_per_group: int = 1024, progress: FragmentWriteProgress | None = None, mode: str = 'append', *, use_experimental_writer=False) FragmentMetadata

Create a FragmentMetadata from the given data.

This can be used if the dataset is not yet created.

Warning

Internal API. This method is not intended to be used by end users.

Parameters:
  • dataset_uri (str) – The URI of the dataset.

  • fragment_id (int) – The ID of the fragment.

  • data (pa.Table or pa.RecordBatchReader) – The data to be written to the fragment.

  • schema (pa.Schema, optional) – The schema of the data. If not specified, the schema will be inferred from the data.

  • max_rows_per_group (int, default 1024) – The maximum number of rows per group in the data file.

  • progress (FragmentWriteProgress, optional) – Experimental API. Progress tracking for writing the fragment. Pass a custom class that defines hooks to be called when each fragment is starting to write and finishing writing.

  • mode (str, default "append") – The write mode. If “append” is specified, the data will be checked against the existing dataset’s schema. Otherwise, pass “create” or “overwrite” to assign new field ids to the schema.

See also

lance.dataset.LanceOperation.Overwrite

The operation used to create a new dataset or overwrite one using fragments created with this API. See the doc page for an example of using this API.

lance.dataset.LanceOperation.Append

The operation used to append fragments created with this API to an existing dataset. See the doc page for an example of using this API.

Return type:

FragmentMetadata

static create_from_file(filename: str | Path, schema: Schema, fragment_id: int) LanceFragment

Create a fragment from the given datafile uri.

This can be used if the datafile is loss from dataset.

Warning

Internal API. This method is not intended to be used by end users.

Parameters:
  • filename (str) – The filename of the datafile.

  • scheme (pa.Schema) – The schema for the new datafile.

  • fragment_id (int) – The ID of the fragment.

data_files()

Return the data files of this fragment.

delete(predicate: str) FragmentMetadata | None

Delete rows from this Fragment.

This will add or update the deletion file of this fragment. It does not modify or delete the data files of this fragment. If no rows are left after the deletion, this method will return None.

Warning

Internal API. This method is not intended to be used by end users.

Parameters:

predicate (str) – A SQL predicate that specifies the rows to delete.

Returns:

A new fragment containing the new deletion file, or None if no rows left.

Return type:

FragmentMetadata or None

Examples

>>> import lance
>>> import pyarrow as pa
>>> tab = pa.table({"a": [1, 2, 3], "b": [4, 5, 6]})
>>> dataset = lance.write_dataset(tab, "dataset")
>>> frag = dataset.get_fragment(0)
>>> frag.delete("a > 1")
Fragment { id: 0, files: ..., deletion_file: Some(...), physical_rows: Some(3) }
>>> frag.delete("a > 0") is None
True

See also

lance.dataset.LanceOperation.Delete

The operation used to commit these changes to a dataset. See the doc page for an example of using this API.

deletion_file()

Return the deletion file, if any

property fragment_id
head(self, int num_rows, columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, MemoryPool memory_pool=None)

Load the first N rows of the fragment.

Parameters:
  • num_rows (int) – The number of rows to load.

  • columns (list of str, default None) –

    The columns to project. This can be a list of column names to include (order and duplicates will be preserved), or a dictionary with {new_column_name: expression} values for more advanced projections.

    The list of columns or expressions may use the special fields __batch_index (the index of the batch within the fragment), __fragment_index (the index of the fragment within the dataset), __last_in_fragment (whether the batch is last in fragment), and __filename (the name of the source file or a description of the source fragment).

    The columns will be passed down to Datasets and corresponding data fragments to avoid loading, copying, and deserializing columns that will not be required further down the compute chain. By default all of the available columns are projected. Raises an exception if any of the referenced column names does not exist in the dataset’s Schema.

  • filter (Expression, default None) – Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.

  • batch_size (int, default 131_072) – The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.

  • batch_readahead (int, default 16) – The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.

  • fragment_readahead (int, default 4) – The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.

  • fragment_scan_options (FragmentScanOptions, default None) – Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.

  • use_threads (bool, default True) – If enabled, then maximum parallelism will be used determined by the number of available CPU cores.

  • memory_pool (MemoryPool, default None) – For memory allocations, if required. If not specified, uses the default pool.

Return type:

Table

merge_columns(value_func: Callable[[pa.RecordBatch], pa.RecordBatch], columns: list[str] | None = None) Tuple[FragmentMetadata, LanceSchema]

Add columns to this Fragment.

Warning

Internal API. This method is not intended to be used by end users.

Parameters:
  • value_func (Callable.) – A function that takes a RecordBatch as input and returns a RecordBatch.

  • columns (Optional[list[str]].) – If specified, only the columns in this list will be passed to the value_func. Otherwise, all columns will be passed to the value_func.

See also

lance.dataset.LanceOperation.Merge

The operation used to commit these changes to the dataset. See the doc page for an example of using this API.

Returns:

A new fragment with the added column(s) and the final schema.

Return type:

Tuple[FragmentMetadata, LanceSchema]

property metadata: FragmentMetadata

Return the metadata of this fragment.

Return type:

FragmentMetadata

property num_deletions: int

Return the number of deleted rows in this fragment.

property partition_expression: Schema

An Expression which evaluates to true for all data viewed by this Fragment.

property physical_rows: int

Return the number of rows originally in this fragment.

To get the number of rows after deletions, use count_rows() instead.

property physical_schema: Schema

Return the physical schema of this Fragment. This schema can be different from the dataset read schema.

scanner(*, columns: List[str] | Dict[str, str] | None = None, batch_size: int | None = None, filter: str | pa.compute.Expression | None = None, limit: int | None = None, offset: int | None = None, with_row_id: bool = False, batch_readahead: int = 16) LanceScanner

See Dataset::scanner for details

property schema: Schema

Return the schema of this fragment.

take(self, indices, columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, MemoryPool memory_pool=None)

Select rows of data by index.

Parameters:
  • indices (Array or array-like) – The indices of row to select in the dataset.

  • columns (list of str, default None) –

    The columns to project. This can be a list of column names to include (order and duplicates will be preserved), or a dictionary with {new_column_name: expression} values for more advanced projections.

    The list of columns or expressions may use the special fields __batch_index (the index of the batch within the fragment), __fragment_index (the index of the fragment within the dataset), __last_in_fragment (whether the batch is last in fragment), and __filename (the name of the source file or a description of the source fragment).

    The columns will be passed down to Datasets and corresponding data fragments to avoid loading, copying, and deserializing columns that will not be required further down the compute chain. By default all of the available columns are projected. Raises an exception if any of the referenced column names does not exist in the dataset’s Schema.

  • filter (Expression, default None) – Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.

  • batch_size (int, default 131_072) – The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.

  • batch_readahead (int, default 16) – The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.

  • fragment_readahead (int, default 4) – The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.

  • fragment_scan_options (FragmentScanOptions, default None) – Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.

  • use_threads (bool, default True) – If enabled, then maximum parallelism will be used determined by the number of available CPU cores.

  • memory_pool (MemoryPool, default None) – For memory allocations, if required. If not specified, uses the default pool.

Return type:

Table

to_batches(self, Schema schema=None, columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, MemoryPool memory_pool=None)

Read the fragment as materialized record batches.

Parameters:
  • schema (Schema, optional) – Concrete schema to use for scanning.

  • columns (list of str, default None) –

    The columns to project. This can be a list of column names to include (order and duplicates will be preserved), or a dictionary with {new_column_name: expression} values for more advanced projections.

    The list of columns or expressions may use the special fields __batch_index (the index of the batch within the fragment), __fragment_index (the index of the fragment within the dataset), __last_in_fragment (whether the batch is last in fragment), and __filename (the name of the source file or a description of the source fragment).

    The columns will be passed down to Datasets and corresponding data fragments to avoid loading, copying, and deserializing columns that will not be required further down the compute chain. By default all of the available columns are projected. Raises an exception if any of the referenced column names does not exist in the dataset’s Schema.

  • filter (Expression, default None) – Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.

  • batch_size (int, default 131_072) – The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.

  • batch_readahead (int, default 16) – The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.

  • fragment_readahead (int, default 4) – The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.

  • fragment_scan_options (FragmentScanOptions, default None) – Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.

  • use_threads (bool, default True) – If enabled, then maximum parallelism will be used determined by the number of available CPU cores.

  • memory_pool (MemoryPool, default None) – For memory allocations, if required. If not specified, uses the default pool.

Returns:

record_batches

Return type:

iterator of RecordBatch

to_table(self, Schema schema=None, columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, MemoryPool memory_pool=None)

Convert this Fragment into a Table.

Use this convenience utility with care. This will serially materialize the Scan result in memory before creating the Table.

Parameters:
  • schema (Schema, optional) – Concrete schema to use for scanning.

  • columns (list of str, default None) –

    The columns to project. This can be a list of column names to include (order and duplicates will be preserved), or a dictionary with {new_column_name: expression} values for more advanced projections.

    The list of columns or expressions may use the special fields __batch_index (the index of the batch within the fragment), __fragment_index (the index of the fragment within the dataset), __last_in_fragment (whether the batch is last in fragment), and __filename (the name of the source file or a description of the source fragment).

    The columns will be passed down to Datasets and corresponding data fragments to avoid loading, copying, and deserializing columns that will not be required further down the compute chain. By default all of the available columns are projected. Raises an exception if any of the referenced column names does not exist in the dataset’s Schema.

  • filter (Expression, default None) – Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.

  • batch_size (int, default 131_072) – The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.

  • batch_readahead (int, default 16) – The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.

  • fragment_readahead (int, default 4) – The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.

  • fragment_scan_options (FragmentScanOptions, default None) – Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.

  • use_threads (bool, default True) – If enabled, then maximum parallelism will be used determined by the number of available CPU cores.

  • memory_pool (MemoryPool, default None) – For memory allocations, if required. If not specified, uses the default pool.

Returns:

table

Return type:

Table

class lance.LanceOperation

Bases: object

class Append(fragments: Iterable[FragmentMetadata])

Bases: BaseOperation

Append new rows to the dataset.

fragments

The fragments that contain the new rows.

Type:

list[FragmentMetadata]

Warning

This is an advanced API for distributed operations. To append to a dataset on a single machine, use lance.write_dataset().

Examples

To append new rows to a dataset, first use lance.fragment.LanceFragment.create() to create fragments. Then collect the fragment metadata into a list and pass it to this class. Finally, pass the operation to the LanceDataset.commit() method to create the new dataset.

>>> import lance
>>> import pyarrow as pa
>>> tab1 = pa.table({"a": [1, 2], "b": ["a", "b"]})
>>> dataset = lance.write_dataset(tab1, "example")
>>> tab2 = pa.table({"a": [3, 4], "b": ["c", "d"]})
>>> fragment = lance.fragment.LanceFragment.create("example", tab2)
>>> operation = lance.LanceOperation.Append([fragment])
>>> dataset = lance.LanceDataset.commit("example", operation,
...                                     read_version=dataset.version)
>>> dataset.to_table().to_pandas()
   a  b
0  1  a
1  2  b
2  3  c
3  4  d
fragments: Iterable[FragmentMetadata]
class BaseOperation

Bases: ABC

Base class for operations that can be applied to a dataset.

See available operations under LanceOperation.

class Delete(updated_fragments: Iterable[FragmentMetadata], deleted_fragment_ids: Iterable[int], predicate: str)

Bases: BaseOperation

Remove fragments or rows from the dataset.

updated_fragments

The fragments that have been updated with new deletion vectors.

Type:

list[FragmentMetadata]

deleted_fragment_ids

The ids of the fragments that have been deleted entirely. These are the fragments where LanceFragment.delete() returned None.

Type:

list[int]

predicate

The original SQL predicate used to select the rows to delete.

Type:

str

Warning

This is an advanced API for distributed operations. To delete rows from dataset on a single machine, use lance.LanceDataset.delete().

Examples

To delete rows from a dataset, call lance.fragment.LanceFragment.delete() on each of the fragments. If that returns a new fragment, add that to the updated_fragments list. If it returns None, that means the whole fragment was deleted, so add the fragment id to the deleted_fragment_ids. Finally, pass the operation to the LanceDataset.commit() method to complete the deletion operation.

>>> import lance
>>> import pyarrow as pa
>>> table = pa.table({"a": [1, 2], "b": ["a", "b"]})
>>> dataset = lance.write_dataset(table, "example")
>>> table = pa.table({"a": [3, 4], "b": ["c", "d"]})
>>> dataset = lance.write_dataset(table, "example", mode="append")
>>> dataset.to_table().to_pandas()
   a  b
0  1  a
1  2  b
2  3  c
3  4  d
>>> predicate = "a >= 2"
>>> updated_fragments = []
>>> deleted_fragment_ids = []
>>> for fragment in dataset.get_fragments():
...     new_fragment = fragment.delete(predicate)
...     if new_fragment is not None:
...         updated_fragments.append(new_fragment)
...     else:
...         deleted_fragment_ids.append(fragment.fragment_id)
>>> operation = lance.LanceOperation.Delete(updated_fragments,
...                                         deleted_fragment_ids,
...                                         predicate)
>>> dataset = lance.LanceDataset.commit("example", operation,
...                                     read_version=dataset.version)
>>> dataset.to_table().to_pandas()
   a  b
0  1  a
deleted_fragment_ids: Iterable[int]
predicate: str
updated_fragments: Iterable[FragmentMetadata]
class Merge(fragments: Iterable[FragmentMetadata], schema: LanceSchema | Schema)

Bases: BaseOperation

Operation that adds columns. Unlike Overwrite, this should not change the structure of the fragments, allowing existing indices to be kept.

fragments

The fragments that make up the new dataset.

Type:

iterable of FragmentMetadata

schema

The schema of the new dataset. Passing a LanceSchema is preferred, and passing a pyarrow.Schema is deprecated.

Type:

LanceSchema or pyarrow.Schema

Warning

This is an advanced API for distributed operations. To overwrite or create new dataset on a single machine, use lance.write_dataset().

Examples

To add new columns to a dataset, first define a method that will create the new columns based on the existing columns. Then use lance.fragment.LanceFragment.add_columns()

>>> import lance
>>> import pyarrow as pa
>>> import pyarrow.compute as pc
>>> table = pa.table({"a": [1, 2, 3, 4], "b": ["a", "b", "c", "d"]})
>>> dataset = lance.write_dataset(table, "example")
>>> dataset.to_table().to_pandas()
   a  b
0  1  a
1  2  b
2  3  c
3  4  d
>>> def double_a(batch: pa.RecordBatch) -> pa.RecordBatch:
...     doubled = pc.multiply(batch["a"], 2)
...     return pa.record_batch([doubled], ["a_doubled"])
>>> fragments = []
>>> for fragment in dataset.get_fragments():
...     new_fragment, new_schema = fragment.merge_columns(double_a,
...                                                       columns=['a'])
...     fragments.append(new_fragment)
>>> operation = lance.LanceOperation.Merge(fragments, new_schema)
>>> dataset = lance.LanceDataset.commit("example", operation,
...                                     read_version=dataset.version)
>>> dataset.to_table().to_pandas()
   a  b  a_doubled
0  1  a          2
1  2  b          4
2  3  c          6
3  4  d          8
fragments: Iterable[FragmentMetadata]
schema: LanceSchema | Schema
class Overwrite(new_schema: Schema, fragments: Iterable[FragmentMetadata])

Bases: BaseOperation

Overwrite or create a new dataset.

new_schema

The schema of the new dataset.

Type:

pyarrow.Schema

fragments

The fragments that make up the new dataset.

Type:

list[FragmentMetadata]

Warning

This is an advanced API for distributed operations. To overwrite or create new dataset on a single machine, use lance.write_dataset().

Examples

To create or overwrite a dataset, first use lance.fragment.LanceFragment.create() to create fragments. Then collect the fragment metadata into a list and pass it along with the schema to this class. Finally, pass the operation to the LanceDataset.commit() method to create the new dataset.

>>> import lance
>>> import pyarrow as pa
>>> tab1 = pa.table({"a": [1, 2], "b": ["a", "b"]})
>>> tab2 = pa.table({"a": [3, 4], "b": ["c", "d"]})
>>> fragment1 = lance.fragment.LanceFragment.create("example", tab1)
>>> fragment2 = lance.fragment.LanceFragment.create("example", tab2)
>>> fragments = [fragment1, fragment2]
>>> operation = lance.LanceOperation.Overwrite(tab1.schema, fragments)
>>> dataset = lance.LanceDataset.commit("example", operation)
>>> dataset.to_table().to_pandas()
   a  b
0  1  a
1  2  b
2  3  c
3  4  d
fragments: Iterable[FragmentMetadata]
new_schema: Schema
class Restore(version: int)

Bases: BaseOperation

Operation that restores a previous version of the dataset.

version: int
class lance.LanceScanner(scanner: _Scanner, dataset: LanceDataset)

Bases: Scanner

count_rows()

Count rows matching the scanner filter.

Returns:

count

Return type:

int

property dataset_schema: Schema

The schema with which batches will be read from fragments.

explain_plan(verbose=False) str

Return the execution plan for this scanner.

Parameters:

verbose (bool, default False) – Use a verbose output format.

Returns:

plan

Return type:

str

static from_batches(*args, **kwargs)

Not implemented

static from_dataset(*args, **kwargs)

Not implemented

static from_fragment(*args, **kwargs)

Not implemented

head(num_rows)

Load the first N rows of the dataset.

Parameters:

num_rows (int) – The number of rows to load.

Return type:

Table

property projected_schema: Schema

The materialized schema of the data, accounting for projections.

This is the schema of any data returned from the scanner.

scan_batches()

Consume a Scanner in record batches with corresponding fragments.

Returns:

record_batches

Return type:

iterator of TaggedRecordBatch

take(indices)

Not implemented

to_batches(self)

Consume a Scanner in record batches.

Returns:

record_batches

Return type:

iterator of RecordBatch

to_reader(self)

Consume this scanner as a RecordBatchReader.

Return type:

RecordBatchReader

to_table() Table

Read the data into memory and return a pyarrow Table.

class lance.MergeInsertBuilder(dataset, on)

Bases: _MergeInsertBuilder

execute(data_obj: ReaderLike, *, schema: pa.Schema | None = None)

Executes the merge insert operation

There is no return value but the original dataset will be updated.

Parameters:
  • data_obj (ReaderLike) – The new data to use as the source table for the operation. This parameter can be any source of data (e.g. table / dataset) that write_dataset() accepts.

  • schema (Optional[pa.Schema]) – The schema of the data. This only needs to be supplied whenever the data source is some kind of generator.

when_matched_update_all(condition: str | None = None)

Configure the operation to update matched rows

After this method is called, when the merge insert operation executes, any rows that match both the source table and the target table will be updated. The rows from the target table will be removed and the rows from the source table will be added.

An optional condition may be specified. This should be an SQL filter and, if present, then only matched rows that also satisfy this filter will be updated. The SQL filter should use the prefix target. to refer to columns in the target table and the prefix source. to refer to columns in the source table. For example, source.last_update < target.last_update.

If a condition is specified and rows do not satisfy the condition then these rows will not be updated. Failure to satisfy the filter does not cause a “matched” row to become a “not matched” row.

when_not_matched_by_source_delete(expr: str | None = None)

Configure the operation to delete source rows that do not match

After this method is called, when the merge insert operation executes, any rows that exist only in the target table will be deleted. An optional filter can be specified to limit the scope of the delete operation. If given (as an SQL filter) then only rows which match the filter will be deleted.

when_not_matched_insert_all()

Configure the operation to insert not matched rows

After this method is called, when the merge insert operation executes, any rows that exist only in the source table will be inserted into the target table.

lance.batch_udf(output_schema=None, checkpoint_file=None)

Create a user defined function (UDF) that adds columns to a dataset.

This function is used to add columns to a dataset. It takes a function that takes a single argument, a RecordBatch, and returns a RecordBatch. The function is called once for each batch in the dataset. The function should not modify the input batch, but instead create a new batch with the new columns added.

Parameters:
  • output_schema (Schema, optional) – The schema of the output RecordBatch. This is used to validate the output of the function. If not provided, the schema of the first output RecordBatch will be used.

  • checkpoint_file (str or Path, optional) – If specified, this file will be used as a cache for unsaved results of this UDF. If the process fails, and you call add_columns again with this same file, it will resume from the last saved state. This is useful for long running processes that may fail and need to be resumed. This file may get very large. It will hold up to an entire data files’ worth of results on disk, which can be multiple gigabytes of data.

Return type:

AddColumnsUDF

lance.dataset(uri: str | Path, version: int | None = None, asof: ts_types | None = None, block_size: int | None = None, commit_lock: CommitLock | None = None, index_cache_size: int | None = None, storage_options: Dict[str, str] | None = None) LanceDataset

Opens the Lance dataset from the address specified.

Parameters:
  • uri (str) – Address to the Lance dataset.

  • version (optional, int) – If specified, load a specific version of the Lance dataset. Else, loads the latest version.

  • asof (optional, datetime or str) – If specified, find the latest version created on or earlier than the given argument value. If a version is already specified, this arg is ignored.

  • block_size (optional, int) – Block size in bytes. Provide a hint for the size of the minimal I/O request.

  • commit_handler (optional, CommitLock) – If specified, use the provided commit handler to lock the table while committing a new version. Not necessary on object stores other than S3 or when there are no concurrent writers.

  • index_cache_size (optional, int) –

    Index cache size. Index cache is a LRU cache with TTL. This number specifies the number of index pages, for example, IVF partitions, to be cached in the host memory. Default value is 256.

    Roughly, for an IVF_PQ partition with n rows, the size of each index page equals the combination of the pq code (nd.array([n,pq], dtype=uint8)) and the row ids (nd.array([n], dtype=uint64)). Approximately, n = Total Rows / number of IVF partitions. pq = number of PQ sub-vectors.

  • storage_options (optional, dict) – Extra options that make sense for a particular storage connection. This is used to store connection parameters like credentials, endpoint, etc.

lance.json_to_schema(schema_json: Dict[str, Any]) Schema

Converts a JSON string to a PyArrow schema.

Parameters:

schema_json (Dict[str, Any]) – The JSON payload to convert to a PyArrow Schema.

lance.schema_to_json(schema: Schema) Dict[str, Any]

Converts a pyarrow schema to a JSON string.

lance.write_dataset(data_obj: ReaderLike, uri: str | Path, schema: pa.Schema | None = None, mode: str = 'create', *, max_rows_per_file: int = 1048576, max_rows_per_group: int = 1024, max_bytes_per_file: int = 96636764160, commit_lock: CommitLock | None = None, progress: FragmentWriteProgress | None = None, storage_options: Dict[str, str] | None = None, use_experimental_writer: bool = False) LanceDataset

Write a given data_obj to the given uri

Parameters:
  • data_obj (Reader-like) – The data to be written. Acceptable types are: - Pandas DataFrame, Pyarrow Table, Dataset, Scanner, or RecordBatchReader - Huggingface dataset

  • uri (str or Path) – Where to write the dataset to (directory)

  • schema (Schema, optional) – If specified and the input is a pandas DataFrame, use this schema instead of the default pandas to arrow table conversion.

  • mode (str) – create - create a new dataset (raises if uri already exists). overwrite - create a new snapshot version append - create a new version that is the concat of the input the latest version (raises if uri does not exist)

  • max_rows_per_file (int, default 1024 * 1024) – The max number of rows to write before starting a new file

  • max_rows_per_group (int, default 1024) – The max number of rows before starting a new group (in the same file)

  • max_bytes_per_file (int, default 90 * 1024 * 1024 * 1024) – The max number of bytes to write before starting a new file. This is a soft limit. This limit is checked after each group is written, which means larger groups may cause this to be overshot meaningfully. This defaults to 90 GB, since we have a hard limit of 100 GB per file on object stores.

  • commit_lock (CommitLock, optional) – A custom commit lock. Only needed if your object store does not support atomic commits. See the user guide for more details.

  • progress (FragmentWriteProgress, optional) – Experimental API. Progress tracking for writing the fragment. Pass a custom class that defines hooks to be called when each fragment is starting to write and finishing writing.

  • storage_options (optional, dict) – Extra options that make sense for a particular storage connection. This is used to store connection parameters like credentials, endpoint, etc.

  • use_experimental_writer (optional, bool) – Use the Lance v2 writer to write Lance v2 files. This is not recommended at this time as there are several known limitations in the v2 writer.