lance package¶
Subpackages¶
Submodules¶
lance.arrow module¶
Extensions to PyArrows.
- class lance.arrow.BFloat16Array¶
Bases:
ExtensionArray
Bfloat16 PyArrow Array.
- classmethod from_numpy(array: ndarray)¶
Create a
BFloat16Array
from a NumPy array.Can only convert from a NumPy array of dtype
bfloat16
from theml_dtypes
module.Examples
>>> import numpy as np >>> from ml_dtypes import bfloat16 >>> from lance.arrow import BFloat16Array >>> arr = np.array([1.0, 2.0, 3.0], dtype=bfloat16) >>> print(BFloat16Array.from_numpy(arr)) [ 1, 2, 3 ]
- to_numpy(zero_copy_only=False)¶
Convert to a NumPy array.
This will do a zero-copy conversion.
The conversion will fail if the array contains null values.
- class lance.arrow.BFloat16Type¶
Bases:
ExtensionType
- to_pandas_dtype(self)¶
Return the equivalent NumPy / Pandas dtype.
Examples
>>> import pyarrow as pa >>> pa.int64().to_pandas_dtype() <class 'numpy.int64'>
- class lance.arrow.EncodedImageType(storage_type: DataType = DataType(binary))¶
Bases:
ExtensionType
- class lance.arrow.FixedShapeImageTensorType(arrow_type: DataType, shape)¶
Bases:
ExtensionType
- class lance.arrow.ImageArray¶
Bases:
ExtensionArray
- classmethod from_array(images)¶
Create an one of subclasses of ImageArray from input data.
- Parameters:
images (Union[pa.StringArray, pa.BinaryArray, pa.FixedShapeTensorArray,) – Iterable]
- Return type:
Union[ImageURIArray, EncodedImageArray, FixedShapeImageTensorArray]
- class lance.arrow.ImageScalar¶
Bases:
ExtensionScalar
- as_py(self)¶
Return this scalar as a Python object.
- class lance.arrow.ImageURIArray¶
Bases:
ImageArray
Array of image URIs. URIs may represent local files or remote files on the web, S3 or GCS if they are accessible by the machine executing this.
- classmethod from_uris(uris: StringArray | LargeStringArray | Iterable[str | Path])¶
Create an ImageURIArray from an array or iterable of URIs (such as a list).
- Parameters:
uris (Union[pa.StringArray, pa.LargeStringArray, Iterable[Union[str, Path]]]) –
- Returns:
Array of image URIs
- Return type:
Examples
>>> uris = ["file::///tmp/1.png"] >>> ImageURIArray.from_uris(uris) <lance.arrow.ImageURIArray object at 0x...> ['file::///tmp/1.png']
- read_uris(storage_type=DataType(binary)) EncodedImageArray ¶
Read the images from the URIs into memory and return an EncodedImageArray
- Parameters:
storage_type (pa.DataType, optional) – The storage type to use for the encoded images. Default is pa.binary(). To support arrays with more than 2GiB of data, use pa.large_binary().
- Returns:
Array of encoded images
- Return type:
EncodedImageArray
Examples
>>> import os >>> uris = [os.path.join(os.path.dirname(__file__), "../tests/images/1.png")] >>> uri_array = ImageURIArray.from_uris(uris) >>> uri_array.read_uris() <lance.arrow.EncodedImageArray object at 0x...> ...
- class lance.arrow.ImageURIType(storage_type: DataType = DataType(string))¶
Bases:
ExtensionType
- lance.arrow.bfloat16_array(values)¶
- lance.arrow.cast(arr: Array, target_type: DataType | str, *args, **kwargs) Array ¶
Cast an array to another data type.
Extends
pyarrow.compute.cast()
for lance defined extension types. In case where the casting can be handled by pyarrow natively, it falls back to pyarrow.Supported operations:
Cast between floating (
float16
,float32
,float64
) arrays andbfloat16
arrays.Cast between FixedSizeListArray of floats (
float16
,float32
,float64
,bfloat16
) with the same list size.
- Parameters:
arr (pyarrow.Array) – Array to cast.
target_type (pyarrow.DataType or str) – Target data type. Accepts anything
pyarrow.compute.cast()
accepts. Additionally, accepts strings"bfloat16"
,"bf16"
orBFloat16Type
.
lance.blob module¶
- class lance.blob.BlobColumn(blob_column: Array | ChunkedArray)¶
Bases:
object
A utility to wrap a Pyarrow binary column and iterate over the rows as file-like objects.
This can be useful for working with medium-to-small binary objects that need to interface with APIs that expect file-like objects. For very large binary objects (4-8MB or more per value) you might be better off creating a blob column and using lance.Dataset.take_blobs to access the blob data.
- class lance.blob.BlobFile(inner: LanceBlobFile)¶
Bases:
RawIOBase
Represents a blob in a Lance dataset as a file-like object.
- close() None ¶
Flush and close the IO object.
This method has no effect if the file is already closed.
- property closed: bool¶
- readable() bool ¶
Return whether object was opened for reading.
If False, read() will raise OSError.
- readall() bytes ¶
Read until EOF, using multiple read() call.
- readinto(b: bytearray) int ¶
- seek(offset: int, whence: int = 0) int ¶
Change the stream position to the given byte offset.
- offset
The stream position, relative to ‘whence’.
- whence
The relative position to seek from.
The offset is interpreted relative to the position indicated by whence. Values for whence are:
os.SEEK_SET or 0 – start of stream (the default); offset should be zero or positive
os.SEEK_CUR or 1 – current stream position; offset may be negative
os.SEEK_END or 2 – end of stream; offset is usually negative
Return the new absolute position.
- seekable() bool ¶
Return whether object supports random access.
If False, seek(), tell() and truncate() will raise OSError. This method may need to do a test seek().
- size() int ¶
Returns the size of the blob in bytes.
- tell() int ¶
Return current stream position.
- class lance.blob.BlobIterator(binary_iter: Iterator[BinaryScalar])¶
Bases:
object
lance.commit module¶
- exception lance.commit.CommitConflictError¶
Bases:
Exception
lance.conftest module¶
lance.dataset module¶
- class lance.dataset.BulkCommitResult¶
Bases:
TypedDict
- dataset: LanceDataset¶
- merged: Transaction¶
- class lance.dataset.DatasetOptimizer(dataset: LanceDataset)¶
Bases:
object
- compact_files(*, target_rows_per_fragment: int = 1048576, max_rows_per_group: int = 1024, max_bytes_per_file: int | None = None, materialize_deletions: bool = True, materialize_deletions_threshold: float = 0.1, num_threads: int | None = None, batch_size: int | None = None) CompactionMetrics ¶
Compacts small files in the dataset, reducing total number of files.
- This does a few things:
Removes deleted rows from fragments
Removes dropped columns from fragments
Merges small fragments into larger ones
This method preserves the insertion order of the dataset. This may mean it leaves small fragments in the dataset if they are not adjacent to other fragments that need compaction. For example, if you have fragments with row counts 5 million, 100, and 5 million, the middle fragment will not be compacted because the fragments it is adjacent to do not need compaction.
- Parameters:
target_rows_per_fragment (int, default 1024*1024) – The target number of rows per fragment. This is the number of rows that will be in each fragment after compaction.
max_rows_per_group (int, default 1024) –
Max number of rows per group. This does not affect which fragments need compaction, but does affect how they are re-written if selected.
This setting only affects datasets using the legacy storage format. The newer format does not require row groups.
max_bytes_per_file (Optional[int], default None) –
Max number of bytes in a single file. This does not affect which fragments need compaction, but does affect how they are re-written if selected. If this value is too small you may end up with fragments that are smaller than target_rows_per_fragment.
The default will use the default from
write_dataset
.materialize_deletions (bool, default True) – Whether to compact fragments with soft deleted rows so they are no longer present in the file.
materialize_deletions_threshold (float, default 0.1) – The fraction of original rows that are soft deleted in a fragment before the fragment is a candidate for compaction.
num_threads (int, optional) – The number of threads to use when performing compaction. If not specified, defaults to the number of cores on the machine.
batch_size (int, optional) –
The batch size to use when scanning input fragments. You may want to reduce this if you are running out of memory during compaction.
The default will use the same default from
scanner
.
- Returns:
Metrics about the compaction process
- Return type:
See also
- optimize_indices(**kwargs)¶
Optimizes index performance.
As new data arrives it is not added to existing indexes automatically. When searching we need to perform an indexed search of the old data plus an expensive unindexed search on the new data. As the amount of new unindexed data grows this can have an impact on search latency. This function will add the new data to existing indexes, restoring the performance. This function does not retrain the index, it only assigns the new data to existing partitions. This means an update is much quicker than retraining the entire index but may have less accuracy (especially if the new data exhibits new patterns, concepts, or trends)
- Parameters:
num_indices_to_merge (int, default 1) – The number of indices to merge. If set to 0, new delta index will be created.
index_names (List[str], default None) – The names of the indices to optimize. If None, all indices will be optimized.
- class lance.dataset.DatasetStats¶
Bases:
TypedDict
- num_deleted_rows: int¶
- num_fragments: int¶
- num_small_files: int¶
- class lance.dataset.LanceDataset(uri: str | Path, version: int | str | None = None, block_size: int | None = None, index_cache_size: int | None = None, metadata_cache_size: int | None = None, commit_lock: CommitLock | None = None, storage_options: Dict[str, str] | None = None, serialized_manifest: bytes | None = None, default_scan_options: Dict[str, Any] | None = None)¶
Bases:
Dataset
A Lance Dataset in Lance format where the data is stored at the given uri.
- add_columns(transforms: Dict[str, str] | BatchUDF | ReaderLike, read_columns: List[str] | None = None, reader_schema: pa.Schema | None = None, batch_size: int | None = None)¶
Add new columns with defined values.
There are several ways to specify the new columns. First, you can provide SQL expressions for each new column. Second you can provide a UDF that takes a batch of existing data and returns a new batch with the new columns. These new columns will be appended to the dataset.
You can also provide a RecordBatchReader which will read the new column values from some external source. This is often useful when the new column values have already been staged to files (often by some distributed process)
See the
lance.add_columns_udf()
decorator for more information on writing UDFs.- Parameters:
transforms (dict or AddColumnsUDF or ReaderLike) – If this is a dictionary, then the keys are the names of the new columns and the values are SQL expression strings. These strings can reference existing columns in the dataset. If this is a AddColumnsUDF, then it is a UDF that takes a batch of existing data and returns a new batch with the new columns.
read_columns (list of str, optional) – The names of the columns that the UDF will read. If None, then the UDF will read all columns. This is only used when transforms is a UDF. Otherwise, the read columns are inferred from the SQL expressions.
reader_schema (pa.Schema, optional) – Only valid if transforms is a ReaderLike object. This will be used to determine the schema of the reader.
batch_size (int, optional) – The number of rows to read at a time from the source dataset when applying the transform. This is ignored if the dataset is a v1 dataset.
Examples
>>> import lance >>> import pyarrow as pa >>> table = pa.table({"a": [1, 2, 3]}) >>> dataset = lance.write_dataset(table, "my_dataset") >>> @lance.batch_udf() ... def double_a(batch): ... df = batch.to_pandas() ... return pd.DataFrame({'double_a': 2 * df['a']}) >>> dataset.add_columns(double_a) >>> dataset.to_table().to_pandas() a double_a 0 1 2 1 2 4 2 3 6 >>> dataset.add_columns({"triple_a": "a * 3"}) >>> dataset.to_table().to_pandas() a double_a triple_a 0 1 2 3 1 2 4 6 2 3 6 9
See also
LanceDataset.merge
Merge a pre-computed set of columns into the dataset.
- alter_columns(*alterations: Iterable[Dict[str, Any]])¶
Alter column name, data type, and nullability.
Columns that are renamed can keep any indices that are on them. If a column has an IVF_PQ index, it can be kept if the column is casted to another type. However, other index types don’t support casting at this time.
Column types can be upcasted (such as int32 to int64) or downcasted (such as int64 to int32). However, downcasting will fail if there are any values that cannot be represented in the new type. In general, columns can be casted to same general type: integers to integers, floats to floats, and strings to strings. However, strings, binary, and list columns can be casted between their size variants. For example, string to large string, binary to large binary, and list to large list.
Columns that are renamed can keep any indices that are on them. However, if the column is casted to a different type, it’s indices will be dropped.
- Parameters:
alterations (Iterable[Dict[str, Any]]) –
A sequence of dictionaries, each with the following keys:
- ”path”: str
The column path to alter. For a top-level column, this is the name. For a nested column, this is the dot-separated path, e.g. “a.b.c”.
- ”name”: str, optional
The new name of the column. If not specified, the column name is not changed.
- ”nullable”: bool, optional
Whether the column should be nullable. If not specified, the column nullability is not changed. Only non-nullable columns can be changed to nullable. Currently, you cannot change a nullable column to non-nullable.
- ”data_type”: pyarrow.DataType, optional
The new data type to cast the column to. If not specified, the column data type is not changed.
Examples
>>> import lance >>> import pyarrow as pa >>> schema = pa.schema([pa.field('a', pa.int64()), ... pa.field('b', pa.string(), nullable=False)]) >>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]}) >>> dataset = lance.write_dataset(table, "example") >>> dataset.alter_columns({"path": "a", "name": "x"}, ... {"path": "b", "nullable": True}) >>> dataset.to_table().to_pandas() x b 0 1 a 1 2 b 2 3 c >>> dataset.alter_columns({"path": "x", "data_type": pa.int32()}) >>> dataset.schema x: int32 b: string
- checkout_version(version: int | str) LanceDataset ¶
Load the given version of the dataset.
Unlike the
dataset()
constructor, this will re-use the current cache. This is a no-op if the dataset is already at the given version.- Parameters:
version (int | str,) – The version to check out. A version number (int) or a tag (str) can be provided.
- Return type:
- cleanup_old_versions(older_than: timedelta | None = None, *, delete_unverified: bool = False, error_if_tagged_old_versions: bool = True) CleanupStats ¶
Cleans up old versions of the dataset.
Some dataset changes, such as overwriting, leave behind data that is not referenced by the latest dataset version. The old data is left in place to allow the dataset to be restored back to an older version.
This method will remove older versions and any data files they reference. Once this cleanup task has run you will not be able to checkout or restore these older versions.
- Parameters:
older_than (timedelta, optional) – Only versions older than this will be removed. If not specified, this will default to two weeks.
delete_unverified (bool, default False) –
Files leftover from a failed transaction may appear to be part of an in-progress operation (e.g. appending new data) and these files will not be deleted unless they are at least 7 days old. If delete_unverified is True then these files will be deleted regardless of their age.
This should only be set to True if you can guarantee that no other process is currently working on this dataset. Otherwise the dataset could be put into a corrupted state.
error_if_tagged_old_versions (bool, default True) – Some versions may have tags associated with them. Tagged versions will not be cleaned up, regardless of how old they are. If this argument is set to True (the default), an exception will be raised if any tagged versions match the parameters. Otherwise, tagged versions will be ignored without any error and only untagged versions will be cleaned up.
- static commit(base_uri: str | Path | LanceDataset, operation: LanceOperation.BaseOperation, read_version: int | None = None, commit_lock: CommitLock | None = None, storage_options: Dict[str, str] | None = None, enable_v2_manifest_paths: bool | None = None, detached: bool | None = False, max_retries: int = 20) LanceDataset ¶
Create a new version of dataset
This method is an advanced method which allows users to describe a change that has been made to the data files. This method is not needed when using Lance to apply changes (e.g. when using
LanceDataset
orwrite_dataset()
.)It’s current purpose is to allow for changes being made in a distributed environment where no single process is doing all of the work. For example, a distributed bulk update or a distributed bulk modify operation.
Once all of the changes have been made, this method can be called to make the changes visible by updating the dataset manifest.
Warning
This is an advanced API and doesn’t provide the same level of validation as the other APIs. For example, it’s the responsibility of the caller to ensure that the fragments are valid for the schema.
- Parameters:
base_uri (str, Path, or LanceDataset) – The base uri of the dataset, or the dataset object itself. Using the dataset object can be more efficient because it can re-use the file metadata cache.
operation (BaseOperation) – The operation to apply to the dataset. This describes what changes have been made. See available operations under
LanceOperation
.read_version (int, optional) – The version of the dataset that was used as the base for the changes. This is not needed for overwrite or restore operations.
commit_lock (CommitLock, optional) – A custom commit lock. Only needed if your object store does not support atomic commits. See the user guide for more details.
storage_options (optional, dict) – Extra options that make sense for a particular storage connection. This is used to store connection parameters like credentials, endpoint, etc.
enable_v2_manifest_paths (bool, optional) – If True, and this is a new dataset, uses the new V2 manifest paths. These paths provide more efficient opening of datasets with many versions on object stores. This parameter has no effect if the dataset already exists. To migrate an existing dataset, instead use the
migrate_manifest_paths_v2()
method. Default is False. WARNING: turning this on will make the dataset unreadable for older versions of Lance (prior to 0.17.0).detached (bool, optional) – If True, then the commit will not be part of the dataset lineage. It will never show up as the latest dataset and the only way to check it out in the future will be to specifically check it out by version. The version will be a random version that is only unique amongst detached commits. The caller should store this somewhere as there will be no other way to obtain it in the future.
max_retries (int) – The maximum number of retries to perform when committing the dataset.
- Returns:
A new version of Lance Dataset.
- Return type:
Examples
Creating a new dataset with the
LanceOperation.Overwrite
operation:>>> import lance >>> import pyarrow as pa >>> tab1 = pa.table({"a": [1, 2], "b": ["a", "b"]}) >>> tab2 = pa.table({"a": [3, 4], "b": ["c", "d"]}) >>> fragment1 = lance.fragment.LanceFragment.create("example", tab1) >>> fragment2 = lance.fragment.LanceFragment.create("example", tab2) >>> fragments = [fragment1, fragment2] >>> operation = lance.LanceOperation.Overwrite(tab1.schema, fragments) >>> dataset = lance.LanceDataset.commit("example", operation) >>> dataset.to_table().to_pandas() a b 0 1 a 1 2 b 2 3 c 3 4 d
- static commit_batch(dest: str | Path | LanceDataset, transactions: Sequence[Transaction], commit_lock: CommitLock | None = None, storage_options: Dict[str, str] | None = None, enable_v2_manifest_paths: bool | None = None, detached: bool | None = False, max_retries: int = 20) BulkCommitResult ¶
Create a new version of dataset with multiple transactions.
This method is an advanced method which allows users to describe a change that has been made to the data files. This method is not needed when using Lance to apply changes (e.g. when using
LanceDataset
orwrite_dataset()
.)- Parameters:
dest (str, Path, or LanceDataset) – The base uri of the dataset, or the dataset object itself. Using the dataset object can be more efficient because it can re-use the file metadata cache.
transactions (Iterable[Transaction]) – The transactions to apply to the dataset. These will be merged into a single transaction and applied to the dataset. Note: Only append transactions are currently supported. Other transaction types will be supported in the future.
commit_lock (CommitLock, optional) – A custom commit lock. Only needed if your object store does not support atomic commits. See the user guide for more details.
storage_options (optional, dict) – Extra options that make sense for a particular storage connection. This is used to store connection parameters like credentials, endpoint, etc.
enable_v2_manifest_paths (bool, optional) – If True, and this is a new dataset, uses the new V2 manifest paths. These paths provide more efficient opening of datasets with many versions on object stores. This parameter has no effect if the dataset already exists. To migrate an existing dataset, instead use the
migrate_manifest_paths_v2()
method. Default is False. WARNING: turning this on will make the dataset unreadable for older versions of Lance (prior to 0.17.0).detached (bool, optional) – If True, then the commit will not be part of the dataset lineage. It will never show up as the latest dataset and the only way to check it out in the future will be to specifically check it out by version. The version will be a random version that is only unique amongst detached commits. The caller should store this somewhere as there will be no other way to obtain it in the future.
max_retries (int) – The maximum number of retries to perform when committing the dataset.
- Returns:
- dataset: LanceDataset
A new version of Lance Dataset.
- merged: Transaction
The merged transaction that was applied to the dataset.
- Return type:
dict with keys
- count_rows(filter: str | Expression | None = None, **kwargs) int ¶
Count rows matching the scanner filter.
- Parameters:
**kwargs (dict, optional) – See py:method:scanner method for full parameter description.
- Returns:
count – The total number of rows in the dataset.
- Return type:
int
- create_index(column: str | List[str], index_type: str, name: str | None = None, metric: str = 'L2', replace: bool = False, num_partitions: int | None = None, ivf_centroids: np.ndarray | pa.FixedSizeListArray | pa.FixedShapeTensorArray | None = None, pq_codebook: np.ndarray | pa.FixedSizeListArray | pa.FixedShapeTensorArray | None = None, num_sub_vectors: int | None = None, accelerator: str | 'torch.Device' | None = None, index_cache_size: int | None = None, shuffle_partition_batches: int | None = None, shuffle_partition_concurrency: int | None = None, ivf_centroids_file: str | None = None, precomputed_partition_dataset: str | None = None, storage_options: Dict[str, str] | None = None, filter_nan: bool = True, one_pass_ivfpq: bool = False, **kwargs) LanceDataset ¶
Create index on column.
Experimental API
- Parameters:
column (str) – The column to be indexed.
index_type (str) – The type of the index.
"IVF_PQ, IVF_HNSW_PQ and IVF_HNSW_SQ"
are supported now.name (str, optional) – The index name. If not provided, it will be generated from the column name.
metric (str) – The distance metric type, i.e., “L2” (alias to “euclidean”), “cosine” or “dot” (dot product). Default is “L2”.
replace (bool) – Replace the existing index if it exists.
num_partitions (int, optional) – The number of partitions of IVF (Inverted File Index).
ivf_centroids (optional) – It can be either
np.ndarray
,pyarrow.FixedSizeListArray
orpyarrow.FixedShapeTensorArray
. Anum_partitions x dimension
array of existing K-mean centroids for IVF clustering. If not provided, a new KMeans model will be trained.pq_codebook (optional,) –
It can be
np.ndarray
,pyarrow.FixedSizeListArray
, orpyarrow.FixedShapeTensorArray
. Anum_sub_vectors x (2 ^ nbits * dimensions // num_sub_vectors)
array of K-mean centroids for PQ codebook.Note:
nbits
is always 8 for now. If not provided, a new PQ model will be trained.num_sub_vectors (int, optional) – The number of sub-vectors for PQ (Product Quantization).
accelerator (str or
torch.Device
, optional) – If set, use an accelerator to speed up the training process. Accepted accelerator: “cuda” (Nvidia GPU) and “mps” (Apple Silicon GPU). If not set, use the CPU.index_cache_size (int, optional) – The size of the index cache in number of entries. Default value is 256.
shuffle_partition_batches (int, optional) –
The number of batches, using the row group size of the dataset, to include in each shuffle partition. Default value is 10240.
Assuming the row group size is 1024, each shuffle partition will hold 10240 * 1024 = 10,485,760 rows. By making this value smaller, this shuffle will consume less memory but will take longer to complete, and vice versa.
shuffle_partition_concurrency (int, optional) –
The number of shuffle partitions to process concurrently. Default value is 2
By making this value smaller, this shuffle will consume less memory but will take longer to complete, and vice versa.
storage_options (optional, dict) – Extra options that make sense for a particular storage connection. This is used to store connection parameters like credentials, endpoint, etc.
filter_nan (bool) – Defaults to True. False is UNSAFE, and will cause a crash if any null/nan values are present (and otherwise will not). Disables the null filter used for nullable columns. Obtains a small speed boost.
one_pass_ivfpq (bool) – Defaults to False. If enabled, index type must be “IVF_PQ”. Reduces disk IO.
kwargs – Parameters passed to the index building process.
The SQ (Scalar Quantization) is available for only
IVF_HNSW_SQ
index type, this quantization method is used to reduce the memory usage of the index, it maps the float vectors to integer vectors, each integer is ofnum_bits
, now only 8 bits are supported.- If
index_type
is “IVF_*”, then the following parameters are required: num_partitions
- If
index_type
is with “PQ”, then the following parameters are required: num_sub_vectors
Optional parameters for IVF_PQ:
- ivf_centroids
Existing K-mean centroids for IVF clustering.
- num_bits
The number of bits for PQ (Product Quantization). Default is 8. Only 4, 8 are supported.
- Optional parameters for IVF_HNSW_*:
- max_level
Int, the maximum number of levels in the graph.
- m
Int, the number of edges per node in the graph.
- ef_construction
Int, the number of nodes to examine during the construction.
Examples
import lance dataset = lance.dataset("/tmp/sift.lance") dataset.create_index( "vector", "IVF_PQ", num_partitions=256, num_sub_vectors=16 )
import lance dataset = lance.dataset("/tmp/sift.lance") dataset.create_index( "vector", "IVF_HNSW_SQ", num_partitions=256, )
Experimental Accelerator (GPU) support:
- accelerate: use GPU to train IVF partitions.
Only supports CUDA (Nvidia) or MPS (Apple) currently. Requires PyTorch being installed.
import lance dataset = lance.dataset("/tmp/sift.lance") dataset.create_index( "vector", "IVF_PQ", num_partitions=256, num_sub_vectors=16, accelerator="cuda" )
References
- create_scalar_index(column: str, index_type: Literal['BTREE'] | Literal['BITMAP'] | Literal['LABEL_LIST'] | Literal['INVERTED'] | Literal['FTS'], name: str | None = None, *, replace: bool = True, **kwargs)¶
Create a scalar index on a column.
Scalar indices, like vector indices, can be used to speed up scans. A scalar index can speed up scans that contain filter expressions on the indexed column. For example, the following scan will be faster if the column
my_col
has a scalar index:import lance dataset = lance.dataset("/tmp/images.lance") my_table = dataset.scanner(filter="my_col != 7").to_table()
Vector search with pre-filers can also benefit from scalar indices. For example,
import lance dataset = lance.dataset("/tmp/images.lance") my_table = dataset.scanner( nearest=dict( column="vector", q=[1, 2, 3, 4], k=10, ) filter="my_col != 7", prefilter=True )
There are 4 types of scalar indices available today.
BTREE
. The most common type isBTREE
. This index is inspired by the btree data structure although only the first few layers of the btree are cached in memory. It will perform well on columns with a large number of unique values and few rows per value.BITMAP
. This index stores a bitmap for each unique value in the column. This index is useful for columns with a small number of unique values and many rows per value.LABEL_LIST
. A special index that is used to index list columns whose values have small cardinality. For example, a column that contains lists of tags (e.g.["tag1", "tag2", "tag3"]
) can be indexed with aLABEL_LIST
index. This index can only speedup queries witharray_has_any
orarray_has_all
filters.FTS/INVERTED
. It is used to index document columns. This index can conduct full-text searches. For example, a column that contains any word of query string “hello world”. The results will be ranked by BM25.
Note that the
LANCE_BYPASS_SPILLING
environment variable can be used to bypass spilling to disk. Setting this to true can avoid memory exhaustion issues (see https://github.com/apache/datafusion/issues/10073 for more info).Experimental API
- Parameters:
column (str) – The column to be indexed. Must be a boolean, integer, float, or string column.
index_type (str) – The type of the index. One of
"BTREE"
,"BITMAP"
,"LABEL_LIST"
, “FTS” or"INVERTED"
.name (str, optional) – The index name. If not provided, it will be generated from the column name.
replace (bool, default True) – Replace the existing index if it exists.
Parameters (Optional) –
------------------- –
with_position (bool, default True) – This is for the
INVERTED
index. If True, the index will store the positions of the words in the document, so that you can conduct phrase query. This will significantly increase the index size. It won’t impact the performance of non-phrase queries even if it is set to True.base_tokenizer (str, default "simple") – This is for the
INVERTED
index. The base tokenizer to use. The value can be: * “simple”: splits tokens on whitespace and punctuation. * “whitespace”: splits tokens on whitespace. * “raw”: no tokenization.language (str, default "English") – This is for the
INVERTED
index. The language for stemming and stop words. This is only used when stem or remove_stop_words is truemax_token_length (Optional[int], default 40) – This is for the
INVERTED
index. The maximum token length. Any token longer than this will be removed.lower_case (bool, default True) – This is for the
INVERTED
index. If True, the index will convert all text to lowercase.stem (bool, default False) – This is for the
INVERTED
index. If True, the index will stem the tokens.remove_stop_words (bool, default False) – This is for the
INVERTED
index. If True, the index will remove stop words.ascii_folding (bool, default False) – This is for the
INVERTED
index. If True, the index will convert non-ascii characters to ascii characters if possible. This would remove accents like “é” -> “e”.
Examples
import lance dataset = lance.dataset("/tmp/images.lance") dataset.create_index( "category", "BTREE", )
Scalar indices can only speed up scans for basic filters using equality, comparison, range (e.g.
my_col BETWEEN 0 AND 100
), and set membership (e.g. my_col IN (0, 1, 2))Scalar indices can be used if the filter contains multiple indexed columns and the filter criteria are AND’d or OR’d together (e.g.
my_col < 0 AND other_col> 100
)Scalar indices may be used if the filter contains non-indexed columns but, depending on the structure of the filter, they may not be usable. For example, if the column
not_indexed
does not have a scalar index then the filtermy_col = 0 OR not_indexed = 1
will not be able to use any scalar index onmy_col
.To determine if a scan is making use of a scalar index you can use
explain_plan
to look at the query plan that lance has created. Queries that use scalar indices will either have aScalarIndexQuery
relation or aMaterializeIndex
operator.
- property data_storage_version: str¶
The version of the data storage format this dataset is using
- delete(predicate: str | Expression)¶
Delete rows from the dataset.
This marks rows as deleted, but does not physically remove them from the files. This keeps the existing indexes still valid.
- Parameters:
predicate (str or pa.compute.Expression) – The predicate to use to select rows to delete. May either be a SQL string or a pyarrow Expression.
Examples
>>> import lance >>> import pyarrow as pa >>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]}) >>> dataset = lance.write_dataset(table, "example") >>> dataset.delete("a = 1 or b in ('a', 'b')") >>> dataset.to_table() pyarrow.Table a: int64 b: string ---- a: [[3]] b: [["c"]]
- static drop(base_uri: str | Path, storage_options: Dict[str, str] | None = None) None ¶
- drop_columns(columns: List[str])¶
Drop one or more columns from the dataset
This is a metadata-only operation and does not remove the data from the underlying storage. In order to remove the data, you must subsequently call
compact_files
to rewrite the data without the removed columns and then callcleanup_old_versions
to remove the old files.- Parameters:
columns (list of str) – The names of the columns to drop. These can be nested column references (e.g. “a.b.c”) or top-level column names (e.g. “a”).
Examples
>>> import lance >>> import pyarrow as pa >>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]}) >>> dataset = lance.write_dataset(table, "example") >>> dataset.drop_columns(["a"]) >>> dataset.to_table().to_pandas() b 0 a 1 b 2 c
- get_fragment(fragment_id: int) LanceFragment | None ¶
Get the fragment with fragment id.
- get_fragments(filter: Expression | None = None) List[LanceFragment] ¶
Get all fragments from the dataset.
Note: filter is not supported yet.
- property has_index¶
- head(num_rows, **kwargs)¶
Load the first N rows of the dataset.
- Parameters:
num_rows (int) – The number of rows to load.
**kwargs (dict, optional) – See scanner() method for full parameter description.
- Returns:
table
- Return type:
Table
- index_statistics(index_name: str) Dict[str, Any] ¶
- insert(data: ReaderLike, *, mode='append', **kwargs)¶
Insert data into the dataset.
- Parameters:
data_obj (Reader-like) – The data to be written. Acceptable types are: - Pandas DataFrame, Pyarrow Table, Dataset, Scanner, or RecordBatchReader - Huggingface dataset
mode (str, default 'append') –
- The mode to use when writing the data. Options are:
create - create a new dataset (raises if uri already exists). overwrite - create a new snapshot version append - create a new version that is the concat of the input the latest version (raises if uri does not exist)
**kwargs (dict, optional) – Additional keyword arguments to pass to
write_dataset()
.
- join(right_dataset, keys, right_keys=None, join_type='left outer', left_suffix=None, right_suffix=None, coalesce_keys=True, use_threads=True)¶
Not implemented (just override pyarrow dataset to prevent segfault)
- property lance_schema: LanceSchema¶
The LanceSchema for this dataset
- property latest_version: int¶
Returns the latest version of the dataset.
- list_indices() List[Dict[str, Any]] ¶
- merge(data_obj: ReaderLike, left_on: str, right_on: str | None = None, schema=None)¶
Merge another dataset into this one.
Performs a left join, where the dataset is the left side and data_obj is the right side. Rows existing in the dataset but not on the left will be filled with null values, unless Lance doesn’t support null values for some types, in which case an error will be raised.
- Parameters:
data_obj (Reader-like) – The data to be merged. Acceptable types are: - Pandas DataFrame, Pyarrow Table, Dataset, Scanner, Iterator[RecordBatch], or RecordBatchReader
left_on (str) – The name of the column in the dataset to join on.
right_on (str or None) – The name of the column in data_obj to join on. If None, defaults to left_on.
Examples
>>> import lance >>> import pyarrow as pa >>> df = pa.table({'x': [1, 2, 3], 'y': ['a', 'b', 'c']}) >>> dataset = lance.write_dataset(df, "dataset") >>> dataset.to_table().to_pandas() x y 0 1 a 1 2 b 2 3 c >>> new_df = pa.table({'x': [1, 2, 3], 'z': ['d', 'e', 'f']}) >>> dataset.merge(new_df, 'x') >>> dataset.to_table().to_pandas() x y z 0 1 a d 1 2 b e 2 3 c f
See also
LanceDataset.add_columns
Add new columns by computing batch-by-batch.
- merge_insert(on: str | Iterable[str])¶
Returns a builder that can be used to create a “merge insert” operation
This operation can add rows, update rows, and remove rows in a single transaction. It is a very generic tool that can be used to create behaviors like “insert if not exists”, “update or insert (i.e. upsert)”, or even replace a portion of existing data with new data (e.g. replace all data where month=”january”)
The merge insert operation works by combining new data from a source table with existing data in a target table by using a join. There are three categories of records.
“Matched” records are records that exist in both the source table and the target table. “Not matched” records exist only in the source table (e.g. these are new data). “Not matched by source” records exist only in the target table (this is old data).
The builder returned by this method can be used to customize what should happen for each category of data.
Please note that the data will be reordered as part of this operation. This is because updated rows will be deleted from the dataset and then reinserted at the end with the new values. The order of the newly inserted rows may fluctuate randomly because a hash-join operation is used internally.
- Parameters:
on (Union[str, Iterable[str]]) – A column (or columns) to join on. This is how records from the source table and target table are matched. Typically this is some kind of key or id column.
Examples
Use when_matched_update_all() and when_not_matched_insert_all() to perform an “upsert” operation. This will update rows that already exist in the dataset and insert rows that do not exist.
>>> import lance >>> import pyarrow as pa >>> table = pa.table({"a": [2, 1, 3], "b": ["a", "b", "c"]}) >>> dataset = lance.write_dataset(table, "example") >>> new_table = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]}) >>> # Perform a "upsert" operation >>> dataset.merge_insert("a") \ ... .when_matched_update_all() \ ... .when_not_matched_insert_all() \ ... .execute(new_table) {'num_inserted_rows': 1, 'num_updated_rows': 2, 'num_deleted_rows': 0} >>> dataset.to_table().sort_by("a").to_pandas() a b 0 1 b 1 2 x 2 3 y 3 4 z
Use when_not_matched_insert_all() to perform an “insert if not exists” operation. This will only insert rows that do not already exist in the dataset.
>>> import lance >>> import pyarrow as pa >>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]}) >>> dataset = lance.write_dataset(table, "example2") >>> new_table = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]}) >>> # Perform an "insert if not exists" operation >>> dataset.merge_insert("a") \ ... .when_not_matched_insert_all() \ ... .execute(new_table) {'num_inserted_rows': 1, 'num_updated_rows': 0, 'num_deleted_rows': 0} >>> dataset.to_table().sort_by("a").to_pandas() a b 0 1 a 1 2 b 2 3 c 3 4 z
You are not required to provide all the columns. If you only want to update a subset of columns, you can omit columns you don’t want to update. Omitted columns will keep their existing values if they are updated, or will be null if they are inserted.
>>> import lance >>> import pyarrow as pa >>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"], \ ... "c": ["x", "y", "z"]}) >>> dataset = lance.write_dataset(table, "example3") >>> new_table = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]}) >>> # Perform an "upsert" operation, only updating column "a" >>> dataset.merge_insert("a") \ ... .when_matched_update_all() \ ... .when_not_matched_insert_all() \ ... .execute(new_table) {'num_inserted_rows': 1, 'num_updated_rows': 2, 'num_deleted_rows': 0} >>> dataset.to_table().sort_by("a").to_pandas() a b c 0 1 a x 1 2 x y 2 3 y z 3 4 z None
- migrate_manifest_paths_v2()¶
Migrate the manifest paths to the new format.
This will update the manifest to use the new v2 format for paths.
This function is idempotent, and can be run multiple times without changing the state of the object store.
DANGER: this should not be run while other concurrent operations are happening. And it should also run until completion before resuming other operations.
- property optimize: DatasetOptimizer¶
- property partition_expression¶
Not implemented (just override pyarrow dataset to prevent segfault)
- replace_field_metadata(field_name: str, new_metadata: Dict[str, str])¶
Replace the metadata of a field in the schema
- Parameters:
field_name (str) – The name of the field to replace the metadata for
new_metadata (dict) – The new metadata to set
- replace_schema(schema: Schema)¶
Not implemented (just override pyarrow dataset to prevent segfault)
See :py:method:`replace_schema_metadata` or :py:method:`replace_field_metadata`
- replace_schema_metadata(new_metadata: Dict[str, str])¶
Replace the schema metadata of the dataset
- Parameters:
new_metadata (dict) – The new metadata to set
- restore()¶
Restore the currently checked out version as the latest version of the dataset.
This creates a new commit.
- sample(num_rows: int, columns: List[str] | Dict[str, str] | None = None, randomize_order: bool = True, **kwargs) Table ¶
Select a random sample of data
- Parameters:
num_rows (int) – number of rows to retrieve
columns (list of str, or dict of str to str default None) – List of column names to be fetched. Or a dictionary of column names to SQL expressions. All columns are fetched if None or unspecified.
**kwargs (dict, optional) – see scanner() method for full parameter description.
- Returns:
table
- Return type:
Table
- scanner(columns: List[str] | Dict[str, str] | None = None, filter: str | Expression | None = None, limit: int | None = None, offset: int | None = None, nearest: dict | None = None, batch_size: int | None = None, batch_readahead: int | None = None, fragment_readahead: int | None = None, scan_in_order: bool = None, fragments: Iterable[LanceFragment] | None = None, full_text_query: str | dict | None = None, *, prefilter: bool = None, with_row_id: bool = None, with_row_address: bool = None, use_stats: bool = None, fast_search: bool = None, io_buffer_size: int | None = None, late_materialization: bool | List[str] | None = None, use_scalar_index: bool | None = None) LanceScanner ¶
Return a Scanner that can support various pushdowns.
- Parameters:
columns (list of str, or dict of str to str default None) – List of column names to be fetched. Or a dictionary of column names to SQL expressions. All columns are fetched if None or unspecified.
filter (pa.compute.Expression or str) – Expression or str that is a valid SQL where clause. See Lance filter pushdown for valid SQL expressions.
limit (int, default None) – Fetch up to this many rows. All rows if None or unspecified.
offset (int, default None) – Fetch starting with this row. 0 if None or unspecified.
nearest (dict, default None) –
Get the rows corresponding to the K most similar vectors. Example:
{ "column": <embedding col name>, "q": <query vector as pa.Float32Array>, "k": 10, "nprobes": 1, "refine_factor": 1 }
batch_size (int, default None) – The target size of batches returned. In some cases batches can be up to twice this size (but never larger than this). In some cases batches can be smaller than this size.
io_buffer_size (int, default None) – The size of the IO buffer. See
ScannerBuilder.io_buffer_size
for more information.batch_readahead (int, optional) – The number of batches to read ahead.
fragment_readahead (int, optional) – The number of fragments to read ahead.
scan_in_order (bool, default True) – Whether to read the fragments and batches in order. If false, throughput may be higher, but batches will be returned out of order and memory use might increase.
fragments (iterable of LanceFragment, default None) – If specified, only scan these fragments. If scan_in_order is True, then the fragments will be scanned in the order given.
prefilter (bool, default False) –
If True then the filter will be applied before the vector query is run. This will generate more correct results but it may be a more costly query. It’s generally good when the filter is highly selective.
If False then the filter will be applied after the vector query is run. This will perform well but the results may have fewer than the requested number of rows (or be empty) if the rows closest to the query do not match the filter. It’s generally good when the filter is not very selective.
use_scalar_index (bool, default True) – Lance will automatically use scalar indices to optimize a query. In some corner cases this can make query performance worse and this parameter can be used to disable scalar indices in these cases.
late_materialization (bool or List[str], default None) –
Allows custom control over late materialization. Late materialization fetches non-query columns using a take operation after the filter. This is useful when there are few results or columns are very large.
Early materialization can be better when there are many results or the columns are very narrow.
If True, then all columns are late materialized. If False, then all columns are early materialized. If a list of strings, then only the columns in the list are late materialized.
The default uses a heuristic that assumes filters will select about 0.1% of the rows. If your filter is more selective (e.g. find by id) you may want to set this to True. If your filter is not very selective (e.g. matches 20% of the rows) you may want to set this to False.
full_text_query (str or dict, optional) –
query string to search for, the results will be ranked by BM25. e.g. “hello world”, would match documents containing “hello” or “world”. or a dictionary with the following keys:
- columns: list[str]
The columns to search, currently only supports a single column in the columns list.
- query: str
The query string to search for.
fast_search (bool, default False) – If True, then the search will only be performed on the indexed data, which yields faster search time.
Notes
For now, if BOTH filter and nearest is specified, then:
nearest is executed first.
The results are filtered afterwards.
For debugging ANN results, you can choose to not use the index even if present by specifying
use_index=False
. For example, the following will always return exact KNN results:dataset.to_table(nearest={ "column": "vector", "k": 10, "q": <query vector>, "use_index": False }
- property schema: Schema¶
The pyarrow Schema for this dataset
- session() _Session ¶
Return the dataset session, which holds the dataset’s state.
- property stats: LanceStats¶
Experimental API
- take(indices: List[int] | Array, columns: List[str] | Dict[str, str] | None = None, **kwargs) Table ¶
Select rows of data by index.
- Parameters:
indices (Array or array-like) – indices of rows to select in the dataset.
columns (list of str, or dict of str to str default None) – List of column names to be fetched. Or a dictionary of column names to SQL expressions. All columns are fetched if None or unspecified.
**kwargs (dict, optional) – See :py:method::scanner method for full parameter description.
- Returns:
table
- Return type:
pyarrow.Table
- take_blobs(row_ids: List[int] | Array, blob_column: str) List[BlobFile] ¶
Select blobs by row IDs.
Instead of loading large binary blob data into memory before processing it, this API allows you to open binary blob data as a regular Python file-like object. For more details, see
lance.BlobFile
.- Parameters:
row_ids (List Array or array-like) – row IDs to select in the dataset.
blob_column (str) – The name of the blob column to select.
- Returns:
blob_files
- Return type:
List[BlobFile]
- to_batches(columns: List[str] | Dict[str, str] | None = None, filter: str | Expression | None = None, limit: int | None = None, offset: int | None = None, nearest: dict | None = None, batch_size: int | None = None, batch_readahead: int | None = None, fragment_readahead: int | None = None, scan_in_order: bool = True, *, prefilter: bool = False, with_row_id: bool = False, with_row_address: bool = False, use_stats: bool = True, full_text_query: str | dict | None = None, io_buffer_size: int | None = None, late_materialization: bool | List[str] | None = None, use_scalar_index: bool | None = None, **kwargs) Iterator[RecordBatch] ¶
Read the dataset as materialized record batches.
- Parameters:
**kwargs (dict, optional) – Arguments for
Scanner.from_dataset
.- Returns:
record_batches
- Return type:
Iterator of RecordBatch
- to_table(columns: List[str] | Dict[str, str] | None = None, filter: str | Expression | None = None, limit: int | None = None, offset: int | None = None, nearest: dict | None = None, batch_size: int | None = None, batch_readahead: int | None = None, fragment_readahead: int | None = None, scan_in_order: bool = True, *, prefilter: bool = False, with_row_id: bool = False, with_row_address: bool = False, use_stats: bool = True, fast_search: bool = False, full_text_query: str | dict | None = None, io_buffer_size: int | None = None, late_materialization: bool | List[str] | None = None, use_scalar_index: bool | None = None) Table ¶
Read the data into memory as a
pyarrow.Table
- Parameters:
columns (list of str, or dict of str to str default None) – List of column names to be fetched. Or a dictionary of column names to SQL expressions. All columns are fetched if None or unspecified.
filter (pa.compute.Expression or str) –
Expression or str that is a valid SQL where clause. See Lance filter pushdown for valid SQL expressions.
limit (int, default None) – Fetch up to this many rows. All rows if None or unspecified.
offset (int, default None) – Fetch starting with this row. 0 if None or unspecified.
nearest (dict, default None) –
Get the rows corresponding to the K most similar vectors. Example:
{ "column": <embedding col name>, "q": <query vector as pa.Float32Array>, "k": 10, "metric": "cosine", "nprobes": 1, "refine_factor": 1 }
batch_size (int, optional) – The number of rows to read at a time.
io_buffer_size (int, default None) – The size of the IO buffer. See
ScannerBuilder.io_buffer_size
for more information.batch_readahead (int, optional) – The number of batches to read ahead.
fragment_readahead (int, optional) – The number of fragments to read ahead.
scan_in_order (bool, default True) – Whether to read the fragments and batches in order. If false, throughput may be higher, but batches will be returned out of order and memory use might increase.
prefilter (bool, default False) – Run filter before the vector search.
late_materialization (bool or List[str], default None) – Allows custom control over late materialization. See
ScannerBuilder.late_materialization
for more information.use_scalar_index (bool, default True) – Allows custom control over scalar index usage. See
ScannerBuilder.use_scalar_index
for more information.with_row_id (bool, default False) – Return row ID.
with_row_address (bool, default False) – Return row address
use_stats (bool, default True) – Use stats pushdown during filters.
full_text_query (str or dict, optional) –
query string to search for, the results will be ranked by BM25. e.g. “hello world”, would match documents contains “hello” or “world”. or a dictionary with the following keys:
- columns: list[str]
The columns to search, currently only supports a single column in the columns list.
- query: str
The query string to search for.
Notes
If BOTH filter and nearest is specified, then:
nearest is executed first.
The results are filtered afterward, unless pre-filter sets to True.
- update(updates: Dict[str, str], where: str | None = None) Dict[str, Any] ¶
Update column values for rows matching where.
- Parameters:
updates (dict of str to str) – A mapping of column names to a SQL expression.
where (str, optional) – A SQL predicate indicating which rows should be updated.
- Returns:
updates – A dictionary containing the number of rows updated.
- Return type:
dict
Examples
>>> import lance >>> import pyarrow as pa >>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]}) >>> dataset = lance.write_dataset(table, "example") >>> update_stats = dataset.update(dict(a = 'a + 2'), where="b != 'a'") >>> update_stats["num_updated_rows"] = 2 >>> dataset.to_table().to_pandas() a b 0 1 a 1 4 b 2 5 c
- property uri: str¶
The location of the data
- validate()¶
Validate the dataset.
This checks the integrity of the dataset and will raise an exception if the dataset is corrupted.
- property version: int¶
Returns the currently checked out version of the dataset
- versions()¶
Return all versions in this dataset.
- class lance.dataset.LanceOperation¶
Bases:
object
- class Append(fragments: Iterable[FragmentMetadata])¶
Bases:
BaseOperation
Append new rows to the dataset.
- fragments¶
The fragments that contain the new rows.
- Type:
list[FragmentMetadata]
Warning
This is an advanced API for distributed operations. To append to a dataset on a single machine, use
lance.write_dataset()
.Examples
To append new rows to a dataset, first use
lance.fragment.LanceFragment.create()
to create fragments. Then collect the fragment metadata into a list and pass it to this class. Finally, pass the operation to theLanceDataset.commit()
method to create the new dataset.>>> import lance >>> import pyarrow as pa >>> tab1 = pa.table({"a": [1, 2], "b": ["a", "b"]}) >>> dataset = lance.write_dataset(tab1, "example") >>> tab2 = pa.table({"a": [3, 4], "b": ["c", "d"]}) >>> fragment = lance.fragment.LanceFragment.create("example", tab2) >>> operation = lance.LanceOperation.Append([fragment]) >>> dataset = lance.LanceDataset.commit("example", operation, ... read_version=dataset.version) >>> dataset.to_table().to_pandas() a b 0 1 a 1 2 b 2 3 c 3 4 d
- fragments: Iterable[FragmentMetadata]¶
- class BaseOperation¶
Bases:
ABC
Base class for operations that can be applied to a dataset.
See available operations under
LanceOperation
.
- class CreateIndex(uuid: str, name: str, fields: List[int], dataset_version: int, fragment_ids: Set[int])¶
Bases:
BaseOperation
Operation that creates an index on the dataset.
- dataset_version: int¶
- fields: List[int]¶
- fragment_ids: Set[int]¶
- name: str¶
- uuid: str¶
- class Delete(updated_fragments: Iterable[FragmentMetadata], deleted_fragment_ids: Iterable[int], predicate: str)¶
Bases:
BaseOperation
Remove fragments or rows from the dataset.
- updated_fragments¶
The fragments that have been updated with new deletion vectors.
- Type:
list[FragmentMetadata]
- deleted_fragment_ids¶
The ids of the fragments that have been deleted entirely. These are the fragments where
LanceFragment.delete()
returned None.- Type:
list[int]
- predicate¶
The original SQL predicate used to select the rows to delete.
- Type:
str
Warning
This is an advanced API for distributed operations. To delete rows from dataset on a single machine, use
lance.LanceDataset.delete()
.Examples
To delete rows from a dataset, call
lance.fragment.LanceFragment.delete()
on each of the fragments. If that returns a new fragment, add that to theupdated_fragments
list. If it returns None, that means the whole fragment was deleted, so add the fragment id to thedeleted_fragment_ids
. Finally, pass the operation to theLanceDataset.commit()
method to complete the deletion operation.>>> import lance >>> import pyarrow as pa >>> table = pa.table({"a": [1, 2], "b": ["a", "b"]}) >>> dataset = lance.write_dataset(table, "example") >>> table = pa.table({"a": [3, 4], "b": ["c", "d"]}) >>> dataset = lance.write_dataset(table, "example", mode="append") >>> dataset.to_table().to_pandas() a b 0 1 a 1 2 b 2 3 c 3 4 d >>> predicate = "a >= 2" >>> updated_fragments = [] >>> deleted_fragment_ids = [] >>> for fragment in dataset.get_fragments(): ... new_fragment = fragment.delete(predicate) ... if new_fragment is not None: ... updated_fragments.append(new_fragment) ... else: ... deleted_fragment_ids.append(fragment.fragment_id) >>> operation = lance.LanceOperation.Delete(updated_fragments, ... deleted_fragment_ids, ... predicate) >>> dataset = lance.LanceDataset.commit("example", operation, ... read_version=dataset.version) >>> dataset.to_table().to_pandas() a b 0 1 a
- deleted_fragment_ids: Iterable[int]¶
- predicate: str¶
- updated_fragments: Iterable[FragmentMetadata]¶
- class Merge(fragments: Iterable[FragmentMetadata], schema: LanceSchema | Schema)¶
Bases:
BaseOperation
Operation that adds columns. Unlike Overwrite, this should not change the structure of the fragments, allowing existing indices to be kept.
- fragments¶
The fragments that make up the new dataset.
- Type:
iterable of FragmentMetadata
- schema¶
The schema of the new dataset. Passing a LanceSchema is preferred, and passing a pyarrow.Schema is deprecated.
- Type:
LanceSchema or pyarrow.Schema
Warning
This is an advanced API for distributed operations. To overwrite or create new dataset on a single machine, use
lance.write_dataset()
.Examples
To add new columns to a dataset, first define a method that will create the new columns based on the existing columns. Then use
lance.fragment.LanceFragment.add_columns()
>>> import lance >>> import pyarrow as pa >>> import pyarrow.compute as pc >>> table = pa.table({"a": [1, 2, 3, 4], "b": ["a", "b", "c", "d"]}) >>> dataset = lance.write_dataset(table, "example") >>> dataset.to_table().to_pandas() a b 0 1 a 1 2 b 2 3 c 3 4 d >>> def double_a(batch: pa.RecordBatch) -> pa.RecordBatch: ... doubled = pc.multiply(batch["a"], 2) ... return pa.record_batch([doubled], ["a_doubled"]) >>> fragments = [] >>> for fragment in dataset.get_fragments(): ... new_fragment, new_schema = fragment.merge_columns(double_a, ... columns=['a']) ... fragments.append(new_fragment) >>> operation = lance.LanceOperation.Merge(fragments, new_schema) >>> dataset = lance.LanceDataset.commit("example", operation, ... read_version=dataset.version) >>> dataset.to_table().to_pandas() a b a_doubled 0 1 a 2 1 2 b 4 2 3 c 6 3 4 d 8
- fragments: Iterable[FragmentMetadata]¶
- schema: LanceSchema | Schema¶
- class Overwrite(new_schema: LanceSchema | Schema, fragments: Iterable[FragmentMetadata])¶
Bases:
BaseOperation
Overwrite or create a new dataset.
- new_schema¶
The schema of the new dataset.
- Type:
pyarrow.Schema
- fragments¶
The fragments that make up the new dataset.
- Type:
list[FragmentMetadata]
Warning
This is an advanced API for distributed operations. To overwrite or create new dataset on a single machine, use
lance.write_dataset()
.Examples
To create or overwrite a dataset, first use
lance.fragment.LanceFragment.create()
to create fragments. Then collect the fragment metadata into a list and pass it along with the schema to this class. Finally, pass the operation to theLanceDataset.commit()
method to create the new dataset.>>> import lance >>> import pyarrow as pa >>> tab1 = pa.table({"a": [1, 2], "b": ["a", "b"]}) >>> tab2 = pa.table({"a": [3, 4], "b": ["c", "d"]}) >>> fragment1 = lance.fragment.LanceFragment.create("example", tab1) >>> fragment2 = lance.fragment.LanceFragment.create("example", tab2) >>> fragments = [fragment1, fragment2] >>> operation = lance.LanceOperation.Overwrite(tab1.schema, fragments) >>> dataset = lance.LanceDataset.commit("example", operation) >>> dataset.to_table().to_pandas() a b 0 1 a 1 2 b 2 3 c 3 4 d
- fragments: Iterable[FragmentMetadata]¶
- new_schema: LanceSchema | Schema¶
- class Restore(version: int)¶
Bases:
BaseOperation
Operation that restores a previous version of the dataset.
- version: int¶
- class Rewrite(groups: Iterable[RewriteGroup], rewritten_indices: Iterable[RewrittenIndex])¶
Bases:
BaseOperation
Operation that rewrites one or more files and indices into one or more files and indices.
- groups¶
Groups of files that have been rewritten.
- Type:
list[RewriteGroup]
- rewritten_indices¶
Indices that have been rewritten.
- Type:
list[RewrittenIndex]
Warning
This is an advanced API not intended for general use.
- groups: Iterable[RewriteGroup]¶
- rewritten_indices: Iterable[RewrittenIndex]¶
- class RewriteGroup(old_fragments: Iterable[FragmentMetadata], new_fragments: Iterable[FragmentMetadata])¶
Bases:
object
Collection of rewritten files
- new_fragments: Iterable[FragmentMetadata]¶
- old_fragments: Iterable[FragmentMetadata]¶
- class lance.dataset.LanceScanner(scanner: _Scanner, dataset: LanceDataset)¶
Bases:
Scanner
- count_rows()¶
Count rows matching the scanner filter.
- Returns:
count
- Return type:
int
- property dataset_schema: Schema¶
The schema with which batches will be read from fragments.
- explain_plan(verbose=False) str ¶
Return the execution plan for this scanner.
- Parameters:
verbose (bool, default False) – Use a verbose output format.
- Returns:
plan
- Return type:
str
- static from_batches(*args, **kwargs)¶
Not implemented
- static from_dataset(*args, **kwargs)¶
Not implemented
- static from_fragment(*args, **kwargs)¶
Not implemented
- head(num_rows)¶
Load the first N rows of the dataset.
- Parameters:
num_rows (int) – The number of rows to load.
- Return type:
Table
- property projected_schema: Schema¶
The materialized schema of the data, accounting for projections.
This is the schema of any data returned from the scanner.
- scan_batches()¶
Consume a Scanner in record batches with corresponding fragments.
- Returns:
record_batches
- Return type:
iterator of TaggedRecordBatch
- take(indices)¶
Not implemented
- to_batches(self)¶
Consume a Scanner in record batches.
- Returns:
record_batches
- Return type:
iterator of RecordBatch
- to_reader(self)¶
Consume this scanner as a RecordBatchReader.
- Return type:
RecordBatchReader
- to_table() Table ¶
Read the data into memory and return a pyarrow Table.
- class lance.dataset.LanceStats(dataset: _Dataset)¶
Bases:
object
Statistics about a LanceDataset.
- dataset_stats(max_rows_per_group: int = 1024) DatasetStats ¶
Statistics about the dataset.
- index_stats(index_name: str) Dict[str, Any] ¶
Statistics about an index.
- Parameters:
index_name (str) – The name of the index to get statistics for.
- class lance.dataset.MergeInsertBuilder(dataset, on)¶
Bases:
_MergeInsertBuilder
- execute(data_obj: ReaderLike, *, schema: pa.Schema | None = None)¶
Executes the merge insert operation
This function updates the original dataset and returns a dictionary with information about merge statistics - i.e. the number of inserted, updated, and deleted rows.
- Parameters:
data_obj (ReaderLike) – The new data to use as the source table for the operation. This parameter can be any source of data (e.g. table / dataset) that
write_dataset()
accepts.schema (Optional[pa.Schema]) – The schema of the data. This only needs to be supplied whenever the data source is some kind of generator.
- when_matched_update_all(condition: str | None = None) MergeInsertBuilder ¶
Configure the operation to update matched rows
After this method is called, when the merge insert operation executes, any rows that match both the source table and the target table will be updated. The rows from the target table will be removed and the rows from the source table will be added.
An optional condition may be specified. This should be an SQL filter and, if present, then only matched rows that also satisfy this filter will be updated. The SQL filter should use the prefix target. to refer to columns in the target table and the prefix source. to refer to columns in the source table. For example, source.last_update < target.last_update.
If a condition is specified and rows do not satisfy the condition then these rows will not be updated. Failure to satisfy the filter does not cause a “matched” row to become a “not matched” row.
- when_not_matched_by_source_delete(expr: str | None = None) MergeInsertBuilder ¶
Configure the operation to delete source rows that do not match
After this method is called, when the merge insert operation executes, any rows that exist only in the target table will be deleted. An optional filter can be specified to limit the scope of the delete operation. If given (as an SQL filter) then only rows which match the filter will be deleted.
- when_not_matched_insert_all() MergeInsertBuilder ¶
Configure the operation to insert not matched rows
After this method is called, when the merge insert operation executes, any rows that exist only in the source table will be inserted into the target table.
- class lance.dataset.ScannerBuilder(ds: LanceDataset)¶
Bases:
object
- apply_defaults(default_opts: Dict[str, Any]) ScannerBuilder ¶
- batch_readahead(nbatches: int | None = None) ScannerBuilder ¶
This parameter is ignored when reading v2 files
- batch_size(batch_size: int) ScannerBuilder ¶
Set batch size for Scanner
- columns(cols: List[str] | Dict[str, str] | None = None) ScannerBuilder ¶
- fast_search(flag: bool) ScannerBuilder ¶
Enable fast search, which only perform search on the indexed data.
Users can use Table::optimize() or create_index() to include the new data into index, thus make new data searchable.
- filter(filter: str | Expression) ScannerBuilder ¶
- fragment_readahead(nfragments: int | None = None) ScannerBuilder ¶
- full_text_search(query: str, columns: List[str] | None = None) ScannerBuilder ¶
Filter rows by full text searching. Experimental API, may remove it after we support to do this within filter SQL-like expression
Must create inverted index on the given column before searching,
- io_buffer_size(io_buffer_size: int) ScannerBuilder ¶
Set the I/O buffer size for the Scanner
This is the amount of RAM that will be reserved for holding I/O received from storage before it is processed. This is used to control the amount of memory used by the scanner. If the buffer is full then the scanner will block until the buffer is processed.
Generally this should scale with the number of concurrent I/O threads. The default is 2GiB which comfortably provides enough space for somewhere between 32 and 256 concurrent I/O threads.
This value is not a hard cap on the amount of RAM the scanner will use. Some space is used for the compute (which can be controlled by the batch size) and Lance does not keep track of memory after it is returned to the user.
Currently, if there is a single batch of data which is larger than the io buffer size then the scanner will deadlock. This is a known issue and will be fixed in a future release.
This parameter is only used when reading v2 files
- late_materialization(late_materialization: bool | List[str]) ScannerBuilder ¶
- limit(n: int | None = None) ScannerBuilder ¶
- nearest(column: str, q: QueryVectorLike, k: int | None = None, metric: str | None = None, nprobes: int | None = None, refine_factor: int | None = None, use_index: bool = True, ef: int | None = None) ScannerBuilder ¶
- offset(n: int | None = None) ScannerBuilder ¶
- prefilter(prefilter: bool) ScannerBuilder ¶
- scan_in_order(scan_in_order: bool = True) ScannerBuilder ¶
Whether to scan the dataset in order of fragments and batches.
If set to False, the scanner may read fragments concurrently and yield batches out of order. This may improve performance since it allows more concurrency in the scan, but can also use more memory.
This parameter is ignored when using v2 files. In the v2 file format there is no penalty to scanning in order and so all scans will scan in order.
- to_scanner() LanceScanner ¶
- use_scalar_index(use_scalar_index: bool = True) ScannerBuilder ¶
Set whether scalar indices should be used in a query
Scans will use scalar indices, when available, to optimize queries with filters. However, in some corner cases, scalar indices may make performance worse. This parameter allows users to disable scalar indices in these cases.
- use_stats(use_stats: bool = True) ScannerBuilder ¶
Enable use of statistics for query planning.
Disabling statistics is used for debugging and benchmarking purposes. This should be left on for normal use.
- with_fragments(fragments: Iterable[LanceFragment] | None) ScannerBuilder ¶
- with_row_address(with_row_address: bool = True) ScannerBuilder ¶
Enables returns with row addresses.
Row addresses are a unique but unstable identifier for each row in the dataset that consists of the fragment id (upper 32 bits) and the row offset in the fragment (lower 32 bits). Row IDs are generally preferred since they do not change when a row is modified or compacted. However, row addresses may be useful in some advanced use cases.
- with_row_id(with_row_id: bool = True) ScannerBuilder ¶
Enable returns with row IDs.
- class lance.dataset.Tags(dataset: _Dataset)¶
Bases:
object
Dataset tag manager.
- create(tag: str, version: int) None ¶
Create a tag for a given dataset version.
- Parameters:
tag (str,) – The name of the tag to create. This name must be unique among all tag names for the dataset.
version (int,) – The dataset version to tag.
- delete(tag: str) None ¶
Delete tag from the dataset.
- Parameters:
tag (str,) – The name of the tag to delete.
- list() dict[str, int] ¶
List all dataset tags.
- Returns:
A dictionary mapping tag names to version numbers.
- Return type:
dict[str, int]
- update(tag: str, version: int) None ¶
Update tag to a new version.
- Parameters:
tag (str,) – The name of the tag to update.
version (int,) – The new dataset version to tag.
- class lance.dataset.Transaction(read_version: 'int', operation: 'LanceOperation.BaseOperation', uuid: 'str' = <factory>, blobs_op: 'Optional[LanceOperation.BaseOperation]' = None)¶
Bases:
object
- blobs_op: BaseOperation | None = None¶
- operation: BaseOperation¶
- read_version: int¶
- uuid: str¶
- lance.dataset.write_dataset(data_obj: ReaderLike, uri: str | Path | LanceDataset, schema: pa.Schema | None = None, mode: str = 'create', *, max_rows_per_file: int = 1048576, max_rows_per_group: int = 1024, max_bytes_per_file: int = 96636764160, commit_lock: CommitLock | None = None, progress: FragmentWriteProgress | None = None, storage_options: Dict[str, str] | None = None, data_storage_version: str | None = None, use_legacy_format: bool | None = None, enable_v2_manifest_paths: bool = False, enable_move_stable_row_ids: bool = False) LanceDataset ¶
Write a given data_obj to the given uri
- Parameters:
data_obj (Reader-like) – The data to be written. Acceptable types are: - Pandas DataFrame, Pyarrow Table, Dataset, Scanner, or RecordBatchReader - Huggingface dataset
uri (str, Path, or LanceDataset) – Where to write the dataset to (directory). If a LanceDataset is passed, the session will be reused.
schema (Schema, optional) – If specified and the input is a pandas DataFrame, use this schema instead of the default pandas to arrow table conversion.
mode (str) – create - create a new dataset (raises if uri already exists). overwrite - create a new snapshot version append - create a new version that is the concat of the input the latest version (raises if uri does not exist)
max_rows_per_file (int, default 1024 * 1024) – The max number of rows to write before starting a new file
max_rows_per_group (int, default 1024) – The max number of rows before starting a new group (in the same file)
max_bytes_per_file (int, default 90 * 1024 * 1024 * 1024) – The max number of bytes to write before starting a new file. This is a soft limit. This limit is checked after each group is written, which means larger groups may cause this to be overshot meaningfully. This defaults to 90 GB, since we have a hard limit of 100 GB per file on object stores.
commit_lock (CommitLock, optional) – A custom commit lock. Only needed if your object store does not support atomic commits. See the user guide for more details.
progress (FragmentWriteProgress, optional) – Experimental API. Progress tracking for writing the fragment. Pass a custom class that defines hooks to be called when each fragment is starting to write and finishing writing.
storage_options (optional, dict) – Extra options that make sense for a particular storage connection. This is used to store connection parameters like credentials, endpoint, etc.
data_storage_version (optional, str, default None) – The version of the data storage format to use. Newer versions are more efficient but require newer versions of lance to read. The default (None) will use the latest stable version. See the user guide for more details.
use_legacy_format (optional, bool, default None) – Deprecated method for setting the data storage version. Use the data_storage_version parameter instead.
enable_v2_manifest_paths (bool, optional) – If True, and this is a new dataset, uses the new V2 manifest paths. These paths provide more efficient opening of datasets with many versions on object stores. This parameter has no effect if the dataset already exists. To migrate an existing dataset, instead use the
LanceDataset.migrate_manifest_paths_v2()
method. Default is False.enable_move_stable_row_ids (bool, optional) – Experimental parameter: if set to true, the writer will use move-stable row ids. These row ids are stable after compaction operations, but not after updates. This makes compaction more efficient, since with stable row ids no secondary indices need to be updated to point to new row ids.
lance.debug module¶
Debug utilities for Lance.
- lance.debug.format_fragment(fragment, dataset)¶
Debug print a LanceFragment.
- lance.debug.format_manifest(dataset)¶
Print the full Lance manifest of the dataset.
- lance.debug.format_schema(dataset)¶
Format the Lance schema of a dataset as a string.
This can be used to view the field ids and types in the schema.
- lance.debug.list_transactions(dataset, /, max_transactions=10)¶
Return a string representation of each transaction in the dataset, in reverse chronological order.
If max_transactions is provided, only the most recent max_transactions transactions will be returned. Defaults to 10.
lance.dependencies module¶
lance.file module¶
- class lance.file.LanceBufferDescriptor¶
Bases:
object
- position¶
The byte offset of the buffer in the file
- size¶
The size (in bytes) of the buffer
- class lance.file.LanceColumnMetadata¶
Bases:
object
- column_buffers¶
The column-wide buffers
- pages¶
The data pages in the column
- class lance.file.LanceFileMetadata¶
Bases:
object
- columns¶
The column metadata, an entry might be None if the metadata for a column was not loaded into memory when the file was opened.
- global_buffers¶
The global buffers
- major_version¶
The major version of the file
- minor_version¶
The minor version of the file
- num_column_metadata_bytes¶
The number of bytes in the column metadata section of the file
- num_data_bytes¶
The number of bytes in the data section of the file
- num_global_buffer_bytes¶
The number of bytes in the global buffer section of the file
- num_rows¶
The number of rows in the file
- schema¶
The schema of the file
- class lance.file.LanceFileReader(path: str, storage_options: Dict[str, str] | None = None)¶
Bases:
object
A file reader for reading Lance files
This class is used to read Lance data files, a low level structure optimized for storing multi-modal tabular data. If you are working with Lance datasets then you should use the LanceDataset class instead.
- file_statistics() LanceFileStatistics ¶
Return file statistics of the file
- metadata() LanceFileMetadata ¶
Return metadata describing the file contents
- read_all(*, batch_size: int = 1024, batch_readahead=16) ReaderResults ¶
Reads the entire file
- Parameters:
batch_size (int, default 1024) –
The file will be read in batches. This parameter controls how many rows will be in each batch (except the final batch)
Smaller batches will use less memory but might be slightly slower because there is more per-batch overhead
- read_global_buffer(index: int) bytes ¶
Read a global buffer from the file at a given index
- Parameters:
index (int) – The index of the global buffer to read
- Returns:
The contents of the global buffer
- Return type:
bytes
- read_range(start: int, num_rows: int, *, batch_size: int = 1024, batch_readahead=16) ReaderResults ¶
Read a range of rows from the file
- Parameters:
start (int) – The offset of the first row to start reading
num_rows (int) – The number of rows to read from the file
batch_size (int, default 1024) –
The file will be read in batches. This parameter controls how many rows will be in each batch (except the final batch)
Smaller batches will use less memory but might be slightly slower because there is more per-batch overhead
- take_rows(indices, *, batch_size: int = 1024, batch_readahead=16) ReaderResults ¶
Read a specific set of rows from the file
- Parameters:
indices (List[int]) – The indices of the rows to read from the file in ascending order
batch_size (int, default 1024) –
The file will be read in batches. This parameter controls how many rows will be in each batch (except the final batch)
Smaller batches will use less memory but might be slightly slower because there is more per-batch overhead
- class lance.file.LanceFileStatistics¶
Bases:
object
Statistics summarize some of the file metadata for quick summary info
- columns¶
Statistics about each of the columns in the file
- class lance.file.LanceFileWriter(path: str, schema: Schema | None = None, *, data_cache_bytes: int | None = None, version: str | None = None, storage_options: Dict[str, str] | None = None, **kwargs)¶
Bases:
object
A file writer for writing Lance data files
This class is used to write Lance data files, a low level structure optimized for storing multi-modal tabular data. If you are working with Lance datasets then you should use the LanceDataset class instead.
- add_global_buffer(data: bytes) int ¶
Add a global buffer to the file. The global buffer can contain any arbitrary bytes.
- Parameters:
data (bytes) – The data to write to the file.
- Returns:
The index of the global buffer. This will always start at 1 and increment by 1 each time this method is called.
- Return type:
int
- add_schema_metadata(key: str, value: str) None ¶
Add a metadata (key/value pair) entry to the schema. This method allows you to alter the schema metadata. It must be called before close is called.
- Parameters:
key (str) – The key to add.
value (str) – The value to add.
- close() int ¶
Write the file metadata and close the file
Returns the number of rows written to the file
- write_batch(batch: RecordBatch | Table) None ¶
Write a batch of data to the file
- Parameters:
batch (Union[pa.RecordBatch, pa.Table]) – The data to write to the file
lance.fragment module¶
Dataset Fragment
- class lance.fragment.DataFile(path: str, fields: List[int], column_indices: List[int] = None, file_major_version: int = 0, file_minor_version: int = 0)¶
Bases:
object
A data file in a fragment.
- path¶
The path to the data file.
- Type:
str
- fields¶
The field ids of the columns in this file.
- Type:
List[int]
- column_indices¶
The column indices where the fields are stored in the file. Will have the same length as fields.
- Type:
List[int]
- file_major_version¶
The major version of the data storage format.
- Type:
int
- file_minor_version¶
The minor version of the data storage format.
- Type:
int
- column_indices: List[int]¶
- field_ids() List[int] ¶
- fields: List[int]¶
- file_major_version: int = 0¶
- file_minor_version: int = 0¶
- property path: str¶
- class lance.fragment.DeletionFile(read_version, id, file_type, num_deleted_rows)¶
Bases:
object
- asdict()¶
- file_type¶
- id¶
- num_deleted_rows¶
- path(fragment_id, base_uri=None)¶
- read_version¶
- class lance.fragment.FragmentMetadata(id: int, files: List[DataFile], physical_rows: int, deletion_file: DeletionFile | None = None, row_id_meta: RowIdMeta | None = None)¶
Bases:
object
Metadata for a fragment.
- id¶
The ID of the fragment.
- Type:
int
- files¶
The data files of the fragment. Each data file must have the same number of rows. Each file stores a different subset of the columns.
- Type:
List[DataFile]
- physical_rows¶
The number of rows originally in this fragment. This is the number of rows in the data files before deletions.
- Type:
int
- deletion_file¶
The deletion file, if any.
- Type:
Optional[DeletionFile]
- deletion_file: DeletionFile | None = None¶
- static from_json(json_data: str) FragmentMetadata ¶
- id: int¶
- property num_deletions: int¶
The number of rows that have been deleted from this fragment.
- property num_rows: int¶
The number of rows in this fragment after deletions.
- physical_rows: int¶
- to_json() dict ¶
Get this as a simple JSON-serializable dictionary.
- class lance.fragment.LanceFragment(dataset: LanceDataset, fragment_id: int | None, *, fragment: _Fragment | None = None)¶
Bases:
Fragment
- count_rows(self, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, MemoryPool memory_pool=None)¶
Count rows matching the scanner filter.
- Parameters:
filter (Expression, default None) – Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.
batch_size (int, default 131_072) – The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.
batch_readahead (int, default 16) – The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.
fragment_readahead (int, default 4) – The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.
fragment_scan_options (FragmentScanOptions, default None) – Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.
use_threads (bool, default True) – If enabled, then maximum parallelism will be used determined by the number of available CPU cores.
memory_pool (MemoryPool, default None) – For memory allocations, if required. If not specified, uses the default pool.
- Returns:
count
- Return type:
int
- static create(dataset_uri: str | Path, data: Table | RecordBatchReader, fragment_id: int | None = None, schema: Schema | None = None, max_rows_per_group: int = 1024, progress: FragmentWriteProgress | None = None, mode: str = 'append', *, data_storage_version: str | None = None, use_legacy_format: bool | None = None, storage_options: Dict[str, str] | None = None) FragmentMetadata ¶
Create a
FragmentMetadata
from the given data.This can be used if the dataset is not yet created.
Warning
Internal API. This method is not intended to be used by end users.
- Parameters:
dataset_uri (str) – The URI of the dataset.
fragment_id (int) – The ID of the fragment.
data (pa.Table or pa.RecordBatchReader) – The data to be written to the fragment.
schema (pa.Schema, optional) – The schema of the data. If not specified, the schema will be inferred from the data.
max_rows_per_group (int, default 1024) – The maximum number of rows per group in the data file.
progress (FragmentWriteProgress, optional) – Experimental API. Progress tracking for writing the fragment. Pass a custom class that defines hooks to be called when each fragment is starting to write and finishing writing.
mode (str, default "append") – The write mode. If “append” is specified, the data will be checked against the existing dataset’s schema. Otherwise, pass “create” or “overwrite” to assign new field ids to the schema.
data_storage_version (optional, str, default None) – The version of the data storage format to use. Newer versions are more efficient but require newer versions of lance to read. The default (None) will use the latest stable version. See the user guide for more details.
use_legacy_format (bool, default None) – Deprecated parameter. Use data_storage_version instead.
storage_options (optional, dict) – Extra options that make sense for a particular storage connection. This is used to store connection parameters like credentials, endpoint, etc.
See also
lance.dataset.LanceOperation.Overwrite
The operation used to create a new dataset or overwrite one using fragments created with this API. See the doc page for an example of using this API.
lance.dataset.LanceOperation.Append
The operation used to append fragments created with this API to an existing dataset. See the doc page for an example of using this API.
- Return type:
- static create_from_file(filename: str | Path, dataset: LanceDataset, fragment_id: int) FragmentMetadata ¶
Create a fragment from the given datafile uri.
This can be used if the datafile is loss from dataset.
Warning
Internal API. This method is not intended to be used by end users.
- Parameters:
filename (str) – The filename of the datafile.
dataset (LanceDataset) – The dataset that the fragment belongs to.
fragment_id (int) – The ID of the fragment.
- data_files()¶
Return the data files of this fragment.
- delete(predicate: str) FragmentMetadata | None ¶
Delete rows from this Fragment.
This will add or update the deletion file of this fragment. It does not modify or delete the data files of this fragment. If no rows are left after the deletion, this method will return None.
Warning
Internal API. This method is not intended to be used by end users.
- Parameters:
predicate (str) – A SQL predicate that specifies the rows to delete.
- Returns:
A new fragment containing the new deletion file, or None if no rows left.
- Return type:
FragmentMetadata or None
Examples
>>> import lance >>> import pyarrow as pa >>> tab = pa.table({"a": [1, 2, 3], "b": [4, 5, 6]}) >>> dataset = lance.write_dataset(tab, "dataset") >>> frag = dataset.get_fragment(0) >>> frag.delete("a > 1") FragmentMetadata(id=0, files=[DataFile(path='...', fields=[0, 1], ...), ...) >>> frag.delete("a > 0") is None True
See also
lance.dataset.LanceOperation.Delete
The operation used to commit these changes to a dataset. See the doc page for an example of using this API.
- deletion_file()¶
Return the deletion file, if any
- property fragment_id¶
- head(self, int num_rows, columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, MemoryPool memory_pool=None)¶
Load the first N rows of the fragment.
- Parameters:
num_rows (int) – The number of rows to load.
columns (list of str, default None) –
The columns to project. This can be a list of column names to include (order and duplicates will be preserved), or a dictionary with {new_column_name: expression} values for more advanced projections.
The list of columns or expressions may use the special fields __batch_index (the index of the batch within the fragment), __fragment_index (the index of the fragment within the dataset), __last_in_fragment (whether the batch is last in fragment), and __filename (the name of the source file or a description of the source fragment).
The columns will be passed down to Datasets and corresponding data fragments to avoid loading, copying, and deserializing columns that will not be required further down the compute chain. By default all of the available columns are projected. Raises an exception if any of the referenced column names does not exist in the dataset’s Schema.
filter (Expression, default None) – Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.
batch_size (int, default 131_072) – The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.
batch_readahead (int, default 16) – The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.
fragment_readahead (int, default 4) – The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.
fragment_scan_options (FragmentScanOptions, default None) – Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.
use_threads (bool, default True) – If enabled, then maximum parallelism will be used determined by the number of available CPU cores.
memory_pool (MemoryPool, default None) – For memory allocations, if required. If not specified, uses the default pool.
- Return type:
Table
- merge_columns(value_func: Dict[str, str] | BatchUDF | ReaderLike | Callable[[pa.RecordBatch], pa.RecordBatch], columns: list[str] | None = None, batch_size: int | None = None, reader_schema: pa.Schema | None = None) Tuple[FragmentMetadata, LanceSchema] ¶
Add columns to this Fragment.
Warning
Internal API. This method is not intended to be used by end users.
The parameters and their interpretation are the same as in the
lance.dataset.LanceDataset.add_columns()
operation.The only difference is that, instead of modifying the dataset, a new fragment is created. The new schema of the fragment is returned as well. These can be used in a later operation to commit the changes to the dataset.
See also
lance.dataset.LanceOperation.Merge
The operation used to commit these changes to the dataset. See the doc page for an example of using this API.
- Returns:
A new fragment with the added column(s) and the final schema.
- Return type:
Tuple[FragmentMetadata, LanceSchema]
- property metadata: FragmentMetadata¶
Return the metadata of this fragment.
- Return type:
- property num_deletions: int¶
Return the number of deleted rows in this fragment.
- property partition_expression: Schema¶
An Expression which evaluates to true for all data viewed by this Fragment.
- property physical_rows: int¶
Return the number of rows originally in this fragment.
To get the number of rows after deletions, use
count_rows()
instead.
- property physical_schema: Schema¶
Return the physical schema of this Fragment. This schema can be different from the dataset read schema.
- scanner(*, columns: List[str] | Dict[str, str] | None = None, batch_size: int | None = None, filter: str | pa.compute.Expression | None = None, limit: int | None = None, offset: int | None = None, with_row_id: bool = False, batch_readahead: int = 16) LanceScanner ¶
See Dataset::scanner for details
- property schema: Schema¶
Return the schema of this fragment.
- take(self, indices, columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, MemoryPool memory_pool=None)¶
Select rows of data by index.
- Parameters:
indices (Array or array-like) – The indices of row to select in the dataset.
columns (list of str, default None) –
The columns to project. This can be a list of column names to include (order and duplicates will be preserved), or a dictionary with {new_column_name: expression} values for more advanced projections.
The list of columns or expressions may use the special fields __batch_index (the index of the batch within the fragment), __fragment_index (the index of the fragment within the dataset), __last_in_fragment (whether the batch is last in fragment), and __filename (the name of the source file or a description of the source fragment).
The columns will be passed down to Datasets and corresponding data fragments to avoid loading, copying, and deserializing columns that will not be required further down the compute chain. By default all of the available columns are projected. Raises an exception if any of the referenced column names does not exist in the dataset’s Schema.
filter (Expression, default None) – Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.
batch_size (int, default 131_072) – The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.
batch_readahead (int, default 16) – The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.
fragment_readahead (int, default 4) – The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.
fragment_scan_options (FragmentScanOptions, default None) – Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.
use_threads (bool, default True) – If enabled, then maximum parallelism will be used determined by the number of available CPU cores.
memory_pool (MemoryPool, default None) – For memory allocations, if required. If not specified, uses the default pool.
- Return type:
Table
- to_batches(self, Schema schema=None, columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, MemoryPool memory_pool=None)¶
Read the fragment as materialized record batches.
- Parameters:
schema (Schema, optional) – Concrete schema to use for scanning.
columns (list of str, default None) –
The columns to project. This can be a list of column names to include (order and duplicates will be preserved), or a dictionary with {new_column_name: expression} values for more advanced projections.
The list of columns or expressions may use the special fields __batch_index (the index of the batch within the fragment), __fragment_index (the index of the fragment within the dataset), __last_in_fragment (whether the batch is last in fragment), and __filename (the name of the source file or a description of the source fragment).
The columns will be passed down to Datasets and corresponding data fragments to avoid loading, copying, and deserializing columns that will not be required further down the compute chain. By default all of the available columns are projected. Raises an exception if any of the referenced column names does not exist in the dataset’s Schema.
filter (Expression, default None) – Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.
batch_size (int, default 131_072) – The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.
batch_readahead (int, default 16) – The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.
fragment_readahead (int, default 4) – The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.
fragment_scan_options (FragmentScanOptions, default None) – Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.
use_threads (bool, default True) – If enabled, then maximum parallelism will be used determined by the number of available CPU cores.
memory_pool (MemoryPool, default None) – For memory allocations, if required. If not specified, uses the default pool.
- Returns:
record_batches
- Return type:
iterator of RecordBatch
- to_table(self, Schema schema=None, columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, MemoryPool memory_pool=None)¶
Convert this Fragment into a Table.
Use this convenience utility with care. This will serially materialize the Scan result in memory before creating the Table.
- Parameters:
schema (Schema, optional) – Concrete schema to use for scanning.
columns (list of str, default None) –
The columns to project. This can be a list of column names to include (order and duplicates will be preserved), or a dictionary with {new_column_name: expression} values for more advanced projections.
The list of columns or expressions may use the special fields __batch_index (the index of the batch within the fragment), __fragment_index (the index of the fragment within the dataset), __last_in_fragment (whether the batch is last in fragment), and __filename (the name of the source file or a description of the source fragment).
The columns will be passed down to Datasets and corresponding data fragments to avoid loading, copying, and deserializing columns that will not be required further down the compute chain. By default all of the available columns are projected. Raises an exception if any of the referenced column names does not exist in the dataset’s Schema.
filter (Expression, default None) – Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.
batch_size (int, default 131_072) – The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.
batch_readahead (int, default 16) – The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.
fragment_readahead (int, default 4) – The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.
fragment_scan_options (FragmentScanOptions, default None) – Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.
use_threads (bool, default True) – If enabled, then maximum parallelism will be used determined by the number of available CPU cores.
memory_pool (MemoryPool, default None) – For memory allocations, if required. If not specified, uses the default pool.
- Returns:
table
- Return type:
Table
- lance.fragment.write_fragments(data: ReaderLike, dataset_uri: str | Path | LanceDataset, schema: pa.Schema | None = None, *, mode: str = 'append', max_rows_per_file: int = 1048576, max_rows_per_group: int = 1024, max_bytes_per_file: int = 96636764160, progress: FragmentWriteProgress | None = None, data_storage_version: str | None = None, use_legacy_format: bool | None = None, storage_options: Dict[str, str] | None = None) List[FragmentMetadata] ¶
Write data into one or more fragments.
Warning
This is a low-level API intended for manually implementing distributed writes. For most users,
lance.write_dataset()
is the recommended API.- Parameters:
data (pa.Table or pa.RecordBatchReader) – The data to be written to the fragment.
dataset_uri (str, Path, or LanceDataset) – The URI of the dataset or the dataset object.
schema (pa.Schema, optional) – The schema of the data. If not specified, the schema will be inferred from the data.
mode (str, default "append") – The write mode. If “append” is specified, the data will be checked against the existing dataset’s schema. Otherwise, pass “create” or “overwrite” to assign new field ids to the schema.
max_rows_per_file (int, default 1024 * 1024) – The maximum number of rows per data file.
max_rows_per_group (int, default 1024) – The maximum number of rows per group in the data file.
max_bytes_per_file (int, default 90 * 1024 * 1024 * 1024) – The max number of bytes to write before starting a new file. This is a soft limit. This limit is checked after each group is written, which means larger groups may cause this to be overshot meaningfully. This defaults to 90 GB, since we have a hard limit of 100 GB per file on object stores.
progress (FragmentWriteProgress, optional) – Experimental API. Progress tracking for writing the fragment. Pass a custom class that defines hooks to be called when each fragment is starting to write and finishing writing.
data_storage_version (optional, str, default None) – The version of the data storage format to use. Newer versions are more efficient but require newer versions of lance to read. The default (None) will use the 2.0 version. See the user guide for more details.
use_legacy_format (optional, bool, default None) – Deprecated method for setting the data storage version. Use the data_storage_version parameter instead.
storage_options (Optional[Dict[str, str]]) – Extra options that make sense for a particular storage connection. This is used to store connection parameters like credentials, endpoint, etc.
- Returns:
A list of
FragmentMetadata
for the fragments written. The fragment ids are left as zero meaning they are not yet specified. They will be assigned when the fragments are committed to a dataset.- Return type:
List[FragmentMetadata]
lance.hf module¶
lance.indices module¶
- class lance.indices.IndicesBuilder(dataset, column: str)¶
Bases:
object
A class with helper functions for building indices on a dataset.
This methods in this class can break down the process of building indices into smaller steps. This can be useful for debugging and checkpointing when building indices for extremely large datasets.
This class is intended for advanced users that need to create vector indices at large scales.
The methods in this class are experimental and may change in future versions.
For datasets with 10s of millions or fewer rows it will likely be simpler to just use the create_index method on the dataset object.
- assign_ivf_partitions(ivf_model: IvfModel, accelerator: str | torch.Device, *, output_uri: str | None = None) str ¶
Calculates which IVF partition each vector belongs to. This searches the IVF centroids and assigns the closest centroid to the vector. The result is stored in a Lance dataset located at output_uri. The schema of the partition assignment dataset is:
row_id: uint64 partition: uint32
Note: There is no advantage to separately computing the partition assignment without an accelerator. If you are not using an accelerator then you should skip this method and proceed without precomputed partition assignments.
- Parameters:
ivf_model (IvfModel) – An IvfModel, previously created by
train_ivf
which the data will be assigned to.accelerator (Union[str, torch.Device]) – An optional accelerator to use to offload computation to specialized hardware. Currently supported values are the same as those in
train_ivf
output_uri (Optional[str], default None) – Destination Lance dataset where the partition assignments will be written Can be None in which case a random directory will be used.
- Returns:
The path of the partition assignment dataset (will be equal to output_uri unless the value is None)
- Return type:
str
- load_shuffled_vectors(filenames: list[str], dir_path: str, ivf: IvfModel, pq: PqModel, index_name: str | None = None)¶
Takes filenames of the sorted, transformed vector files as input. Loads these sorted files and commits the index into the dataset.
- Parameters:
filenames (list[str]) – The filenames of the sorted storage files.
dir_path (str) – Path of the directory where all the files are located.
index_name (Optional[str]) – The name of the index to be created. If not provided, the default name will be “{column_name}_idx”.
ivf (IvfModel) – The IVF model used to create the inputs.
pq (PqModel) – The PQ model used to create the inputs.
- shuffle_transformed_vectors(unsorted_filenames: list[str], dir_path: str, ivf: IvfModel, shuffle_output_root_filename: str | None = 'sorted') list[str] ¶
Take the transformed, unsorted vector files as input, and create sorted storage files. Sorting is done based on the partition id. This function only makes sense if the transformed vector file contains a partition_id column.
- Parameters:
unsorted_filenames (list[str]) – The filenames of the unsorted files.
dir_path (str) – Directory where all the files are located, and where output files will be placed.
ivf (IvfModel) – The IVF model used for the transformations (e.g. partition assignment)
shuffle_output_root_filename (Optional[str]) – The root filename for the sorted output files. If not provided, the root filename used will be “sorted”.
- Returns:
The file paths of the sorted transformed vector files. These will be of the form shuffle_output_root_filename_i.lance.
- Return type:
list[str]
- train_ivf(num_partitions=None, *, distance_type='l2', accelerator: str | torch.Device | None = None, sample_rate: int = 256, max_iters: int = 50) IvfModel ¶
Train IVF centroids for the given vector column.
This will run k-means clustering on the given vector column to train the IVF centroids. This is the first step in several vector indices. The centroids will be used to partition the vectors into different clusters.
IVF centroids are trained from a sample of the data (determined by the sample_rate). While this sample is not huge it might still be quite large.
K-means is an iterative algorithm that can be computationally expensive. The accelerator argument can be used to offload the computation to a hardware accelerator such as a GPU or TPU.
- Parameters:
num_partitions (int) – The number of partitions to train. Large values are more expensive to train and can lead to longer search times. Smaller values could lead to overtraining, reduced recall, and require large nprobes values. If not specified the default will be the integer nearest the square root of the number of rows.
distance_type ("l2" | "dot" | "cosine") – The distance type to used. This is defined in more detail in the LanceDB documentation on creating indices.
accelerator (str | torch.Device) – An optional accelerator to use to offload computation to specialized hardware. Currently supported values are “cuda” and “mps”.
sample_rate (int) – IVF is trained on a random sample of the dataset. The sample_rate determines the size of this sample. There will be sample_rate rows loaded for each partition for a total of sample_rate * num_partitions rows. If the dataset does not contain enough rows an error will be raised.
max_iters (int) – K-means is an iterative algorithm that is run until it converges. In some cases, k-means will not converge but will cycle between various possible minima. In these cases we must terminate or run forever. The max_iters parameter defines a cutoff at which we terminate training.
- train_pq(ivf_model: IvfModel, num_subvectors=None, *, sample_rate: int = 256, max_iters: int = 50) PqModel ¶
Train a PQ model for a given column.
This will run k-means clustering on each subvector to determine the centroids that will be used to quantize the subvectors. This step runs against a randomly chosen sample of the data. The sample size is typically quite small and PQ training is relatively fast regardless of dataset scale. As a result, accelerators are not needed here.
- Parameters:
ivf_model (IvfModel) – The IVF model to use to partition the vectors into clusters. This is needed because PQ is trained on residuals from the IVF model.
num_subvectors (int) –
The number of subvectors to divide the source vectors into. This must be a divisor of the vector dimension. If not specified the default will be the vector dimension divided by 16 if the dimension is divisible by 16, otherwise the vector dimension divided by 8 if the dimension is divisible by 8.
Automatic calculation of num_subvectors will fail if the vector dimension is not divisible by 16 or 8. In this case you must specify num_subvectors manually (though any value you choose is likely to lead to poor performance)
sample_rate (int) – This parameter is used in the same way as in the IVF model.
max_iters (int) – This parameter is used in the same way as in the IVF model.
- transform_vectors(ivf: IvfModel, pq: PqModel, dest_uri: str, fragments: list[lance.fragment.LanceFragment] | None = None, partition_ds_uri: str | None = None)¶
Apply transformations to the vectors in the dataset and create an unsorted storage file. The unsorted storage file is a lance file that will at least have a row id column. Normally it will have other columns containing the transform outputs (such as partition id and PQ code)
- Parameters:
ivf (IvfModel) – The IVF model to use for the transformations (e.g. partition assignment)
pq (PqModel) – The PQ model to use for the transformations (e.g. quantization)
dest_uri (str) – The URI to save the transformed vectors to. The URI can be a local file path or a cloud storage path.
fragments (list[LanceFragment]) – The list of data fragments to use when computing the transformed vectors. This is an optional parameter (the default uses all fragments).
partition_ds_uri (str) – The URI of a precomputed partitions dataset. This allows the partition transform to be skipped, using the precomputed value instead. This is optional.
- class lance.indices.IvfModel(centroids: Array, distance_type: str)¶
Bases:
object
A class that represents a trained IVF model.
- centroids¶
The centroids of the IVF clusters
- distance_type¶
The distance type used to train the IVF model
- classmethod load(uri: str)¶
Load an IVF model from a lance file.
- Parameters:
uri (str) – The URI to load the model from. The URI can be a local file path or a cloud storage path.
- property num_partitions: int¶
The number of partitions / centroids in the IVF model
- save(uri: str)¶
Save the IVF model to a lance file.
- Parameters:
uri (str) – The URI to save the model to. The URI can be a local file path or a cloud storage path.
- class lance.indices.PqModel(num_subvectors: int, codebook: FixedSizeListArray)¶
Bases:
object
A class that represents a trained PQ model
Can be saved / loaded to checkpoint progress.
- codebook¶
The centroids of the PQ clusters
- property dimension¶
The dimension of the vectors this model was trained on
- classmethod load(uri: str)¶
Load a PQ model from a lance file.
- Parameters:
uri (str) – The URI to load the model from. The URI can be a local file path or a cloud storage path.
- num_subvectors¶
The number of subvectors to divide source vectors into
- save(uri: str)¶
Save the PQ model to a lance file.
- Parameters:
uri (str) – The URI to save the model to. The URI can be a local file path or a cloud storage path.
lance.log module¶
- lance.log.get_log_level()¶
- lance.log.get_python_log_level(rust_log_level: str) str ¶
- lance.log.set_logger(file_path: str | None = 'pylance.log', name='pylance', level=20, format_string=None, log_handler=None)¶
lance.optimize module¶
- class lance.optimize.Compaction¶
Bases:
object
File compaction operation.
To run with multiple threads in a single process, just use
execute()
.To run with multiple processes, first use
plan()
to construct a plan, then execute the tasks in parallel, and finally usecommit()
. TheCompactionPlan
contains manyCompactionTask
objects, which can be pickled and sent to other processes. The tasks produceRewriteResult
objects, which can be pickled and sent back to the main process to be passed tocommit()
.- static commit(dataset, rewrites)¶
Commit a compaction operation.
Once tasks from
plan()
have been executed, the results can be passed to this method to commit the compaction. It is not required that all of the original tasks are passed. For example, if only a subset were successful or completed before a deadline, you can pass just those.- Parameters:
dataset (lance.Dataset) – The dataset to compact. The dataset instance will be updated to the new version once committed.
rewrites (List[RewriteResult]) – The results of the compaction tasks to include in the commit.
- Return type:
- static execute(dataset, options)¶
Execute a full compaction operation.
- Parameters:
dataset (lance.Dataset) – The dataset to compact. The dataset instance will be updated to the new version once complete.
options (CompactionOptions) – The compaction options.
- Returns:
The metrics from the compaction operation.
- Return type:
- static plan(dataset, options)¶
Plan a compaction operation.
This is intended for users who want to run compaction in a distributed fashion. For running on a single process, use
execute()
instead.- Parameters:
dataset (lance.Dataset) – The dataset to compact.
options (CompactionOptions) – The compaction options.
- Return type:
- class lance.optimize.CompactionMetrics¶
Bases:
object
- files_added¶
The number of files that have been added, which is always equal to the number of fragments.
- Type:
int
- files_removed¶
The number of files that have been removed, including deletion files.
- Type:
int
- fragments_added¶
The number of new fragments that have been added.
- Type:
int
- fragments_removed¶
The number of fragments that have been overwritten.
- Type:
int
- class lance.optimize.CompactionOptions¶
Bases:
TypedDict
Options for compaction.
- batch_size: int | None¶
The batch size to use when scanning input fragments. You may want to reduce this if you are running out of memory during compaction.
The default will use the same default from
scanner
.
- materialize_deletions: bool | None¶
Whether to compact fragments with soft deleted rows so they are no longer present in the file. (default: True)
- materialize_deletions_threadhold: float | None¶
The fraction of original rows that are soft deleted in a fragment before the fragment is a candidate for compaction. (default: 0.1 = 10%)
- max_bytes_per_file: int | None¶
Max number of bytes in a single file. This does not affect which fragments need compaction, but does affect how they are re-written if selected. If this value is too small you may end up with fragments that are smaller than target_rows_per_fragment.
The default will use the default from
write_dataset
.
- max_rows_per_group: int | None¶
Max number of rows per group. This does not affect which fragments need compaction, but does affect how they are re-written if selected. (default: 1024)
- num_threads: int | None¶
The number of threads to use when performing compaction. If not specified, defaults to the number of cores on the machine.
- target_rows_per_fragment: int | None¶
The target number of rows per fragment. This is the number of rows that will be in each fragment after compaction. (default: 1024*1024)
- class lance.optimize.CompactionPlan¶
Bases:
object
A plan to compact small dataset fragments into larger ones.
Created by
lance.optimize.Compaction.plan()
.- static from_json(json)¶
Load a plan from a JSON representation.
- Parameters:
json (str) – The JSON representation of the plan.
- Return type:
- json()¶
Get a JSON representation of the plan.
- Return type:
str
Warning
The JSON representation is not guaranteed to be stable across versions.
- num_tasks()¶
int : The number of compaction tasks in the plan.
- read_version¶
The read version of the dataset that this plan was created from.
- Type:
int
- tasks¶
The individual tasks in the plan.
- Type:
List[CompactionTask]
- class lance.optimize.CompactionTask¶
Bases:
object
- execute(dataset)¶
Execute the compaction task and return the
RewriteResult
.The rewrite result should be passed onto
lance.optimize.Compaction.commit()
.
- fragments¶
The fragments that will be compacted.
- Type:
- static from_json(json)¶
Load a task from a JSON representation.
- Parameters:
json (str) – The JSON representation of the task.
- Return type:
- json()¶
Get a JSON representation of the task.
- Return type:
str
Warning
The JSON representation is not guaranteed to be stable across versions.
- read_version¶
The read version of the dataset that this task was created from.
- Type:
int
- class lance.optimize.RewriteResult¶
Bases:
object
The result of a single compaction task.
Created by
lance.optimize.CompactionTask.execute()
.This result is pickle-able, so it can be serialized and sent back to the main process to be passed to
lance.optimize.Compaction.commit()
.- static from_json(json)¶
Load a result from a JSON representation.
- json()¶
Get a JSON representation of the result.
- Return type:
str
Warning
The JSON representation is not guaranteed to be stable across versions.
- metrics¶
The metrics from this compaction task.
- Type:
- new_fragments¶
The metadata for fragments that are being added.
- Type:
- original_fragments¶
The metadata for fragments that are being replaced.
- Type:
- read_version¶
The version of the dataset the optimize operation is based on.
- Type:
int
lance.progress module¶
- class lance.progress.FileSystemFragmentWriteProgress(base_uri: str, metadata: Dict[str, str] | None = None)¶
Bases:
FragmentWriteProgress
Progress tracking for Writing a Dataset or Fragment.
- Warns:
This tracking class is experimental and will change in the future.
This implementation writes a JSON file to track in-progress state
to the filesystem for each fragment.
- PROGRESS_EXT: str = '.in_progress'¶
- begin(fragment: FragmentMetadata, **kwargs)¶
Called when a new fragment is created.
- Parameters:
fragment (FragmentMetadata) – The fragment that is open to write to.
- complete(fragment: FragmentMetadata, **kwargs)¶
Called when a fragment is completed
- class lance.progress.FragmentWriteProgress¶
Bases:
ABC
Progress tracking for Writing a Dataset or Fragment.
- Warns:
This tracking class is experimental and may change in the future.
- abstract begin(fragment: FragmentMetadata, **kwargs) None ¶
Called when a new fragment is about to be written.
- Parameters:
fragment (FragmentMetadata) – The fragment that is open to write to. The fragment id might not yet be assigned at this point.
kwargs (dict, optional) – Extra keyword arguments to pass to the implementation.
- Return type:
None
- abstract complete(fragment: FragmentMetadata, **kwargs) None ¶
Callback when a fragment is completely written.
- Parameters:
fragment (FragmentMetadata) – The fragment that is open to write to.
kwargs (dict, optional) – Extra keyword arguments to pass to the implementation.
- class lance.progress.NoopFragmentWriteProgress¶
Bases:
FragmentWriteProgress
No-op implementation of WriteProgressTracker.
This is the default implementation.
- begin(fragment: FragmentMetadata, **kargs)¶
Called when a new fragment is about to be written.
- Parameters:
fragment (FragmentMetadata) – The fragment that is open to write to. The fragment id might not yet be assigned at this point.
kwargs (dict, optional) – Extra keyword arguments to pass to the implementation.
- Return type:
None
- complete(fragment: FragmentMetadata, **kwargs)¶
Callback when a fragment is completely written.
- Parameters:
fragment (FragmentMetadata) – The fragment that is open to write to.
kwargs (dict, optional) – Extra keyword arguments to pass to the implementation.
lance.sampler module¶
- class lance.sampler.FragmentSampler¶
Bases:
Sampler
Sampling over Fragments.
To implement a new FragmentSampler, you can implement the iter_fragments method to yield fragments in desired order.
- abstract iter_fragments(ds: lance.LanceDataset, *args, **kwargs) Generator[lance.LanceFragment, None, None] ¶
Iterate over data fragments.
- class lance.sampler.FullScanSampler¶
Bases:
FragmentSampler
Default Sampler, which scan the entire dataset sequentially.
- iter_fragments(dataset: lance.LanceDataset, **kwargs) Generator[lance.LanceFragment, None, None] ¶
Iterate over data fragments.
- class lance.sampler.Sampler¶
Bases:
ABC
Sampler over LanceDataset.
To implement a new Sampler, you can implement the __call__ method to yield a pyarrow.RecordBatch.
- class lance.sampler.ShardedBatchSampler(rank: int, world_size: int, randomize: bool = False, seed: int = 0)¶
Bases:
Sampler
Sharded batch sampler.
Each rank / process will process a subset of the batches.
The input is subdivided into batches (of size batch_size). Each rank / process takes every Nth batch (where N is the world size). The order in which batches are loaded is randomized.
When there is no filter then each process only needs to load the rows assigned to it but this process is still slightly less efficient than ShardedFragmentSampler since it requires loading rows by range instead of loading all rows for a given fragment.
If there is a filter then we cannot divide the row ids ahead of time. Instead, each process will load the entire filtered dataset and discard the rows that are not assigned to it. The resulting stream is then randomized via a reservoir sampler. This does not perfectly randomize the stream but it should generate a stream that is random enough for many use cases.
- static from_torch(randomize: bool = False, seed: int = 0) ShardedBatchSampler ¶
Use it from a PyTorch distributed environment.
Automatically infer rank and world_size from torch.distributed.
- class lance.sampler.ShardedFragmentSampler(rank: int, world_size: int, randomize: bool = False, seed: int = 0)¶
Bases:
FragmentSampler
Sharded fragments by rank and world_size.
Each rank / process will process a subset of the fragments. It yields batches from
ds.fragments[rank::world_size]
.This sampler is more efficient than ShardedBatchSampler when the dataset is large.
- Parameters:
rank (int) – The rank of the process in the distributed cluster.
world_size (int) – The total number of processes in the distributed cluster.
randomize (bool) – If set true, randomize
seed (int) – The random seed to use when randomize is set true.
- static from_torch(randomize: bool = False, seed: int = 0) ShardedFragmentSampler ¶
Use from a PyTorch distributed environment.
Automatically infer rank and world_size from
torch.distributed
.
- iter_fragments(dataset: lance.LanceDataset, **kwargs) Generator[lance.LanceFragment, None, None] ¶
Iterate over data fragments.
- lance.sampler.maybe_sample(dataset: str | Path | lance.LanceDataset, n: int, columns: list[str] | dict[str, str] | str, batch_size: int = 10240, max_takes: int = 2048, filt: str | None = None) Generator[pa.RecordBatch, None, None] ¶
Sample n records from the dataset.
- Parameters:
dataset (Union[str, Path, lance.LanceDataset]) – The dataset to sample from.
n (int) – The number of records to sample.
columns (Union[list[str], dict[str, str], str]) – The columns to load.
batch_size (int, optional) – The batch size to use when loading the data, by default 10240.
max_takes (int, optional) – The maximum number of takes to perform, by default 2048. This is employed to minimize the number of random reads necessary for sampling. A sufficiently large value can provide an effective random sample without the need for excessive random reads.
filter (str, optional) – The filter to apply to the dataset, by default None. If a filter is provided, then we will first load all row ids in memory and then batch through the ids in random order until enough matches have been found.
- Returns:
A generator that yields [RecordBatch] of data.
- Return type:
Generator[pa.RecordBatch]
lance.schema module¶
- class lance.schema.LanceSchema¶
Bases:
object
A Lance Schema.
Unlike a PyArrow schema, a Lance schema assigns every field an integer id. This is used to track fields across versions. This assignment of fields to ids is initially done in depth-first order, but as a schema evolves the assignment may change.
The assignment of field ids is particular to each dataset, so these schemas cannot be used interchangeably between datasets.
- static from_pyarrow(schema)¶
Create a Lance schema from a PyArrow schema.
This will assign field ids in depth-first order. Be aware this may not match the correct schema for a particular table.
- to_pyarrow()¶
Convert the schema to a PyArrow schema.
- lance.schema.json_to_schema(schema_json: Dict[str, Any]) Schema ¶
Converts a JSON string to a PyArrow schema.
- Parameters:
schema_json (Dict[str, Any]) – The JSON payload to convert to a PyArrow Schema.
- lance.schema.schema_to_json(schema: Schema) Dict[str, Any] ¶
Converts a pyarrow schema to a JSON string.
lance.tracing module¶
- lance.tracing.lance_trace_to_chrome(path=None, level=None)¶
- lance.tracing.trace_to_chrome(*, file: str = None, level: str = None)¶
Begins tracing lance events to a chrome trace file.
The trace file can be opened with chrome://tracing or with the Perfetto UI.
The file will be finished (and closed) when the python process exits.
- Parameters:
file (str, optional) – The file to write the trace to. If None, then a file in the current directory will be created named ./trace-{unix epoch in micros}.json
level (str, optional) – The level of detail to trace. One of “trace”, “debug”, “info”, “warn” or “error”. If None, then “info” is used.
lance.types module¶
lance.udf module¶
- class lance.udf.BatchUDF(func, output_schema=None, checkpoint_file=None)¶
Bases:
object
A user-defined function that can be passed to
LanceDataset.add_columns()
.Use
lance.add_columns_udf()
decorator to wrap a function with this class.
- class lance.udf.BatchUDFCheckpoint(path)¶
Bases:
object
A cache for BatchUDF results to avoid recomputation.
This is backed by a SQLite database.
- class BatchInfo(fragment_id, batch_index)¶
Bases:
NamedTuple
- batch_index: int¶
Alias for field number 1
- fragment_id: int¶
Alias for field number 0
- cleanup()¶
- get_fragment(fragment_id: int) str | None ¶
Retrieves a fragment as a JSON string.
- insert_fragment(fragment_id: int, fragment: str)¶
Save a JSON string of a fragment to the cache.
- lance.udf.batch_udf(output_schema=None, checkpoint_file=None)¶
Create a user defined function (UDF) that adds columns to a dataset.
This function is used to add columns to a dataset. It takes a function that takes a single argument, a RecordBatch, and returns a RecordBatch. The function is called once for each batch in the dataset. The function should not modify the input batch, but instead create a new batch with the new columns added.
- Parameters:
output_schema (Schema, optional) – The schema of the output RecordBatch. This is used to validate the output of the function. If not provided, the schema of the first output RecordBatch will be used.
checkpoint_file (str or Path, optional) – If specified, this file will be used as a cache for unsaved results of this UDF. If the process fails, and you call add_columns again with this same file, it will resume from the last saved state. This is useful for long running processes that may fail and need to be resumed. This file may get very large. It will hold up to an entire data files’ worth of results on disk, which can be multiple gigabytes of data.
- Return type:
AddColumnsUDF
- lance.udf.normalize_transform(udf_like: Dict[str, str] | BatchUDF | ReaderLike, data_source: LanceDataset | LanceFragment, read_columns: List[str] | None = None, reader_schema: pa.Schema | None = None)¶
lance.util module¶
- class lance.util.KMeans(k: int, metric_type: Literal['l2', 'dot', 'cosine'] = 'l2', max_iters: int = 50, centroids: FixedSizeListArray | None = None)¶
Bases:
object
KMean model for clustering.
It works with 2-D arrays of float32 type, and support distance metrics: “l2”, “cosine”, “dot”.
Note, you must train the kmeans model by calling
fit()
before callingpredict()
. Calling fit() again will reset the model.Currently, the initial centroids are initialized randomly.
kmean++
is implemented but not exposed yet.Examples
>>> import numpy as np >>> import lance >>> data = np.random.randn(1000, 128).astype(np.float32) >>> kmeans = lance.util.KMeans(8, metric_type="l2") >>> kmeans.fit(data) >>> centroids = np.stack(kmeans.centroids.to_numpy(zero_copy_only=False)) >>> clusters = kmeans.predict(data)
- property centroids: FixedShapeTensorType | None¶
Returns the centroids of the model,
Returns None if the model is not trained.
- fit(data: FixedSizeListArray | FixedShapeTensorArray | ndarray)¶
Fit the model to the data.
- Parameters:
data (pa.FixedSizeListArray, pa.FixedShapeTensorArray, np.ndarray) – The data to fit the model to. Must be a 2-D array of float32 type.
- predict(data: FixedSizeListArray | FixedShapeTensorArray | ndarray) UInt32Array ¶
Predict the cluster for each vector in the data.
- lance.util.sanitize_ts(ts: ts_types) datetime ¶
Returns a python datetime object from various timestamp input types.
- lance.util.td_to_micros(td: timedelta) int ¶
Returns the number of microseconds in a timedelta object.
- lance.util.validate_vector_index(dataset, column: str, refine_factor: int = 5, sample_size: int | None = None, pass_threshold: float = 1.0)¶
Run in-sample queries and make sure that the recall for k=1 is very high (should be near 100%)
- Parameters:
dataset (LanceDataset) – The dataset to sanity check.
column (str) – The column name of the vector column.
refine_factor (int, default=5) – The refine factor to use for the nearest neighbor query.
sample_size (int, optional) – The number of vectors to sample from the dataset. If None, the entire dataset will be used.
pass_threshold (float, default=1.0) – The minimum fraction of vectors that must pass the sanity check. If less than this fraction of vectors pass, a ValueError will be raised.
lance.vector module¶
Embedding vector utilities
- lance.vector.compute_partitions(dataset: LanceDataset, column: str, kmeans: Any, batch_size: int = 40960, dst_dataset_uri: str | Path | None = None, allow_cuda_tf32: bool = True, num_sub_vectors: int | None = None, filter_nan: bool = True, sample_size: int | None = None) str ¶
Compute partitions for each row using GPU kmeans and spill to disk.
- Parameters:
dataset (LanceDataset) – Dataset to compute partitions for.
column (str) – Column name of the vector column.
kmeans (lance.torch.kmeans.KMeans) – KMeans model to use to compute partitions.
batch_size (int, default 10240) – The batch size used to read the dataset.
dst_dataset_uri (Union[str, Path], optional) – The path to store the partitions. If not specified a random directory is used instead
allow_tf32 (bool, default True) – Whether to allow tf32 for matmul on CUDA.
- Returns:
The absolute path of the partition dataset.
- Return type:
str
- lance.vector.compute_pq_codes(dataset: LanceDataset, kmeans_list: List[Any], batch_size: int = 40960, dst_dataset_uri: str | Path | None = None, allow_cuda_tf32: bool = True) str ¶
Compute pq codes for each row using GPU kmeans and spill to disk.
- Parameters:
dataset (LanceDataset) – Dataset to compute pq codes for.
kmeans_list (List[lance.torch.kmeans.KMeans]) – KMeans models to use to compute pq (one per subspace)
batch_size (int, default 10240) – The batch size used to read the dataset.
dst_dataset_uri (Union[str, Path], optional) – The path to store the partitions. If not specified a random directory is used instead
allow_tf32 (bool, default True) – Whether to allow tf32 for matmul on CUDA.
- Returns:
The absolute path of the pq codes dataset.
- Return type:
str
- lance.vector.one_pass_assign_ivf_pq_on_accelerator(dataset: LanceDataset, column: str, metric_type: Literal['l2', 'cosine', 'dot'], accelerator: str | 'torch.Device', ivf_kmeans: Any, pq_kmeans_list: List[Any], dst_dataset_uri: str | Path | None = None, batch_size: int = 40960, *, filter_nan: bool = True, allow_cuda_tf32: bool = True)¶
Compute partitions for each row using GPU kmeans and spill to disk.
- Returns:
The absolute path of the ivfpq codes dataset, as precomputed partition buffers.
- Return type:
str
- lance.vector.one_pass_train_ivf_pq_on_accelerator(dataset: LanceDataset, column: str, k: int, metric_type: Literal['l2', 'cosine', 'dot'], accelerator: str | 'torch.Device', num_sub_vectors: int, batch_size: int = 40960, *, sample_rate: int = 256, max_iters: int = 50, filter_nan: bool = True)¶
- lance.vector.train_ivf_centroids_on_accelerator(dataset: LanceDataset, column: str, k: int, metric_type: Literal['l2', 'cosine', 'dot'], accelerator: str | 'torch.Device', batch_size: int = 40960, *, sample_rate: int = 256, max_iters: int = 50, filter_nan: bool = True) Tuple[np.ndarray, Any] ¶
Use accelerator (GPU or MPS) to train kmeans.
- lance.vector.train_pq_codebook_on_accelerator(dataset: LanceDataset, metric_type: Literal['l2', 'cosine', 'dot'], accelerator: str | 'torch.Device', num_sub_vectors: int, batch_size: int = 40960) Tuple[np.ndarray, List[Any]] ¶
Use accelerator (GPU or MPS) to train pq codebook.
- lance.vector.vec_to_table(data: dict | list | ndarray, names: str | list | None = None, ndim: int | None = None, check_ndim: bool = True) Table ¶
Create a pyarrow Table containing vectors. Vectors are created as FixedSizeListArray’s in pyarrow with Float32 values.
Examples
>>> import numpy as np >>> np.random.seed(0) >>> from lance.vector import vec_to_table >>> dd = {"vector0": np.random.randn(10), "vector1": np.random.randn(10)} >>> vec_to_table(dd) pyarrow.Table id: string vector: fixed_size_list<item: float>[10] child 0, item: float ---- id: [["vector0","vector1"]] vector: [[[1.7640524,0.4001572,0.978738,2.2408931,1.867558,-0.9772779,0.95008844,-0.1513572,-0.10321885,0.41059852],[0.14404356,1.4542735,0.7610377,0.121675014,0.44386324,0.33367434,1.4940791,-0.20515826,0.3130677,-0.85409576]]] >>> vec_to_table(dd).to_pandas() id vector 0 vector0 [1.7640524, 0.4001572, 0.978738, 2.2408931, 1.... 1 vector1 [0.14404356, 1.4542735, 0.7610377, 0.121675014...
- Parameters:
data (dict, list, or np.ndarray) – If dict, the keys are added as “id” column If list, then each element is assumed to be a vector If ndarray, then each row is assumed to be a vector
names (str or list, optional) – If data is dict, then names should be a list of 2 str; default [“id”, “vector”] If data is list or ndarray, then names should be str; default “vector”
ndim (int, optional) – Number of dimensions of the vectors. Inferred if omitted.
check_ndim (bool, default True) – Whether to verify that all vectors have the same length
- Returns:
tbl – A pyarrow Table with vectors converted to appropriate types
- Return type:
pa.Table
Module contents¶
- class lance.BlobColumn(blob_column: Array | ChunkedArray)¶
Bases:
object
A utility to wrap a Pyarrow binary column and iterate over the rows as file-like objects.
This can be useful for working with medium-to-small binary objects that need to interface with APIs that expect file-like objects. For very large binary objects (4-8MB or more per value) you might be better off creating a blob column and using lance.Dataset.take_blobs to access the blob data.
- class lance.BlobFile(inner: LanceBlobFile)¶
Bases:
RawIOBase
Represents a blob in a Lance dataset as a file-like object.
- close() None ¶
Flush and close the IO object.
This method has no effect if the file is already closed.
- property closed: bool¶
- readable() bool ¶
Return whether object was opened for reading.
If False, read() will raise OSError.
- readall() bytes ¶
Read until EOF, using multiple read() call.
- readinto(b: bytearray) int ¶
- seek(offset: int, whence: int = 0) int ¶
Change the stream position to the given byte offset.
- offset
The stream position, relative to ‘whence’.
- whence
The relative position to seek from.
The offset is interpreted relative to the position indicated by whence. Values for whence are:
os.SEEK_SET or 0 – start of stream (the default); offset should be zero or positive
os.SEEK_CUR or 1 – current stream position; offset may be negative
os.SEEK_END or 2 – end of stream; offset is usually negative
Return the new absolute position.
- seekable() bool ¶
Return whether object supports random access.
If False, seek(), tell() and truncate() will raise OSError. This method may need to do a test seek().
- size() int ¶
Returns the size of the blob in bytes.
- tell() int ¶
Return current stream position.
- class lance.FragmentMetadata(id: int, files: List[DataFile], physical_rows: int, deletion_file: DeletionFile | None = None, row_id_meta: RowIdMeta | None = None)¶
Bases:
object
Metadata for a fragment.
- id¶
The ID of the fragment.
- Type:
int
- files¶
The data files of the fragment. Each data file must have the same number of rows. Each file stores a different subset of the columns.
- Type:
List[DataFile]
- physical_rows¶
The number of rows originally in this fragment. This is the number of rows in the data files before deletions.
- Type:
int
- deletion_file¶
The deletion file, if any.
- Type:
Optional[DeletionFile]
- deletion_file: DeletionFile | None = None¶
- static from_json(json_data: str) FragmentMetadata ¶
- id: int¶
- property num_deletions: int¶
The number of rows that have been deleted from this fragment.
- property num_rows: int¶
The number of rows in this fragment after deletions.
- physical_rows: int¶
- to_json() dict ¶
Get this as a simple JSON-serializable dictionary.
- class lance.LanceDataset(uri: str | Path, version: int | str | None = None, block_size: int | None = None, index_cache_size: int | None = None, metadata_cache_size: int | None = None, commit_lock: CommitLock | None = None, storage_options: Dict[str, str] | None = None, serialized_manifest: bytes | None = None, default_scan_options: Dict[str, Any] | None = None)¶
Bases:
Dataset
A Lance Dataset in Lance format where the data is stored at the given uri.
- add_columns(transforms: Dict[str, str] | BatchUDF | ReaderLike, read_columns: List[str] | None = None, reader_schema: pa.Schema | None = None, batch_size: int | None = None)¶
Add new columns with defined values.
There are several ways to specify the new columns. First, you can provide SQL expressions for each new column. Second you can provide a UDF that takes a batch of existing data and returns a new batch with the new columns. These new columns will be appended to the dataset.
You can also provide a RecordBatchReader which will read the new column values from some external source. This is often useful when the new column values have already been staged to files (often by some distributed process)
See the
lance.add_columns_udf()
decorator for more information on writing UDFs.- Parameters:
transforms (dict or AddColumnsUDF or ReaderLike) – If this is a dictionary, then the keys are the names of the new columns and the values are SQL expression strings. These strings can reference existing columns in the dataset. If this is a AddColumnsUDF, then it is a UDF that takes a batch of existing data and returns a new batch with the new columns.
read_columns (list of str, optional) – The names of the columns that the UDF will read. If None, then the UDF will read all columns. This is only used when transforms is a UDF. Otherwise, the read columns are inferred from the SQL expressions.
reader_schema (pa.Schema, optional) – Only valid if transforms is a ReaderLike object. This will be used to determine the schema of the reader.
batch_size (int, optional) – The number of rows to read at a time from the source dataset when applying the transform. This is ignored if the dataset is a v1 dataset.
Examples
>>> import lance >>> import pyarrow as pa >>> table = pa.table({"a": [1, 2, 3]}) >>> dataset = lance.write_dataset(table, "my_dataset") >>> @lance.batch_udf() ... def double_a(batch): ... df = batch.to_pandas() ... return pd.DataFrame({'double_a': 2 * df['a']}) >>> dataset.add_columns(double_a) >>> dataset.to_table().to_pandas() a double_a 0 1 2 1 2 4 2 3 6 >>> dataset.add_columns({"triple_a": "a * 3"}) >>> dataset.to_table().to_pandas() a double_a triple_a 0 1 2 3 1 2 4 6 2 3 6 9
See also
LanceDataset.merge
Merge a pre-computed set of columns into the dataset.
- alter_columns(*alterations: Iterable[Dict[str, Any]])¶
Alter column name, data type, and nullability.
Columns that are renamed can keep any indices that are on them. If a column has an IVF_PQ index, it can be kept if the column is casted to another type. However, other index types don’t support casting at this time.
Column types can be upcasted (such as int32 to int64) or downcasted (such as int64 to int32). However, downcasting will fail if there are any values that cannot be represented in the new type. In general, columns can be casted to same general type: integers to integers, floats to floats, and strings to strings. However, strings, binary, and list columns can be casted between their size variants. For example, string to large string, binary to large binary, and list to large list.
Columns that are renamed can keep any indices that are on them. However, if the column is casted to a different type, it’s indices will be dropped.
- Parameters:
alterations (Iterable[Dict[str, Any]]) –
A sequence of dictionaries, each with the following keys:
- ”path”: str
The column path to alter. For a top-level column, this is the name. For a nested column, this is the dot-separated path, e.g. “a.b.c”.
- ”name”: str, optional
The new name of the column. If not specified, the column name is not changed.
- ”nullable”: bool, optional
Whether the column should be nullable. If not specified, the column nullability is not changed. Only non-nullable columns can be changed to nullable. Currently, you cannot change a nullable column to non-nullable.
- ”data_type”: pyarrow.DataType, optional
The new data type to cast the column to. If not specified, the column data type is not changed.
Examples
>>> import lance >>> import pyarrow as pa >>> schema = pa.schema([pa.field('a', pa.int64()), ... pa.field('b', pa.string(), nullable=False)]) >>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]}) >>> dataset = lance.write_dataset(table, "example") >>> dataset.alter_columns({"path": "a", "name": "x"}, ... {"path": "b", "nullable": True}) >>> dataset.to_table().to_pandas() x b 0 1 a 1 2 b 2 3 c >>> dataset.alter_columns({"path": "x", "data_type": pa.int32()}) >>> dataset.schema x: int32 b: string
- checkout_version(version: int | str) LanceDataset ¶
Load the given version of the dataset.
Unlike the
dataset()
constructor, this will re-use the current cache. This is a no-op if the dataset is already at the given version.- Parameters:
version (int | str,) – The version to check out. A version number (int) or a tag (str) can be provided.
- Return type:
- cleanup_old_versions(older_than: timedelta | None = None, *, delete_unverified: bool = False, error_if_tagged_old_versions: bool = True) CleanupStats ¶
Cleans up old versions of the dataset.
Some dataset changes, such as overwriting, leave behind data that is not referenced by the latest dataset version. The old data is left in place to allow the dataset to be restored back to an older version.
This method will remove older versions and any data files they reference. Once this cleanup task has run you will not be able to checkout or restore these older versions.
- Parameters:
older_than (timedelta, optional) – Only versions older than this will be removed. If not specified, this will default to two weeks.
delete_unverified (bool, default False) –
Files leftover from a failed transaction may appear to be part of an in-progress operation (e.g. appending new data) and these files will not be deleted unless they are at least 7 days old. If delete_unverified is True then these files will be deleted regardless of their age.
This should only be set to True if you can guarantee that no other process is currently working on this dataset. Otherwise the dataset could be put into a corrupted state.
error_if_tagged_old_versions (bool, default True) – Some versions may have tags associated with them. Tagged versions will not be cleaned up, regardless of how old they are. If this argument is set to True (the default), an exception will be raised if any tagged versions match the parameters. Otherwise, tagged versions will be ignored without any error and only untagged versions will be cleaned up.
- static commit(base_uri: str | Path | LanceDataset, operation: LanceOperation.BaseOperation, read_version: int | None = None, commit_lock: CommitLock | None = None, storage_options: Dict[str, str] | None = None, enable_v2_manifest_paths: bool | None = None, detached: bool | None = False, max_retries: int = 20) LanceDataset ¶
Create a new version of dataset
This method is an advanced method which allows users to describe a change that has been made to the data files. This method is not needed when using Lance to apply changes (e.g. when using
LanceDataset
orwrite_dataset()
.)It’s current purpose is to allow for changes being made in a distributed environment where no single process is doing all of the work. For example, a distributed bulk update or a distributed bulk modify operation.
Once all of the changes have been made, this method can be called to make the changes visible by updating the dataset manifest.
Warning
This is an advanced API and doesn’t provide the same level of validation as the other APIs. For example, it’s the responsibility of the caller to ensure that the fragments are valid for the schema.
- Parameters:
base_uri (str, Path, or LanceDataset) – The base uri of the dataset, or the dataset object itself. Using the dataset object can be more efficient because it can re-use the file metadata cache.
operation (BaseOperation) – The operation to apply to the dataset. This describes what changes have been made. See available operations under
LanceOperation
.read_version (int, optional) – The version of the dataset that was used as the base for the changes. This is not needed for overwrite or restore operations.
commit_lock (CommitLock, optional) – A custom commit lock. Only needed if your object store does not support atomic commits. See the user guide for more details.
storage_options (optional, dict) – Extra options that make sense for a particular storage connection. This is used to store connection parameters like credentials, endpoint, etc.
enable_v2_manifest_paths (bool, optional) – If True, and this is a new dataset, uses the new V2 manifest paths. These paths provide more efficient opening of datasets with many versions on object stores. This parameter has no effect if the dataset already exists. To migrate an existing dataset, instead use the
migrate_manifest_paths_v2()
method. Default is False. WARNING: turning this on will make the dataset unreadable for older versions of Lance (prior to 0.17.0).detached (bool, optional) – If True, then the commit will not be part of the dataset lineage. It will never show up as the latest dataset and the only way to check it out in the future will be to specifically check it out by version. The version will be a random version that is only unique amongst detached commits. The caller should store this somewhere as there will be no other way to obtain it in the future.
max_retries (int) – The maximum number of retries to perform when committing the dataset.
- Returns:
A new version of Lance Dataset.
- Return type:
Examples
Creating a new dataset with the
LanceOperation.Overwrite
operation:>>> import lance >>> import pyarrow as pa >>> tab1 = pa.table({"a": [1, 2], "b": ["a", "b"]}) >>> tab2 = pa.table({"a": [3, 4], "b": ["c", "d"]}) >>> fragment1 = lance.fragment.LanceFragment.create("example", tab1) >>> fragment2 = lance.fragment.LanceFragment.create("example", tab2) >>> fragments = [fragment1, fragment2] >>> operation = lance.LanceOperation.Overwrite(tab1.schema, fragments) >>> dataset = lance.LanceDataset.commit("example", operation) >>> dataset.to_table().to_pandas() a b 0 1 a 1 2 b 2 3 c 3 4 d
- static commit_batch(dest: str | Path | LanceDataset, transactions: Sequence[Transaction], commit_lock: CommitLock | None = None, storage_options: Dict[str, str] | None = None, enable_v2_manifest_paths: bool | None = None, detached: bool | None = False, max_retries: int = 20) BulkCommitResult ¶
Create a new version of dataset with multiple transactions.
This method is an advanced method which allows users to describe a change that has been made to the data files. This method is not needed when using Lance to apply changes (e.g. when using
LanceDataset
orwrite_dataset()
.)- Parameters:
dest (str, Path, or LanceDataset) – The base uri of the dataset, or the dataset object itself. Using the dataset object can be more efficient because it can re-use the file metadata cache.
transactions (Iterable[Transaction]) – The transactions to apply to the dataset. These will be merged into a single transaction and applied to the dataset. Note: Only append transactions are currently supported. Other transaction types will be supported in the future.
commit_lock (CommitLock, optional) – A custom commit lock. Only needed if your object store does not support atomic commits. See the user guide for more details.
storage_options (optional, dict) – Extra options that make sense for a particular storage connection. This is used to store connection parameters like credentials, endpoint, etc.
enable_v2_manifest_paths (bool, optional) – If True, and this is a new dataset, uses the new V2 manifest paths. These paths provide more efficient opening of datasets with many versions on object stores. This parameter has no effect if the dataset already exists. To migrate an existing dataset, instead use the
migrate_manifest_paths_v2()
method. Default is False. WARNING: turning this on will make the dataset unreadable for older versions of Lance (prior to 0.17.0).detached (bool, optional) – If True, then the commit will not be part of the dataset lineage. It will never show up as the latest dataset and the only way to check it out in the future will be to specifically check it out by version. The version will be a random version that is only unique amongst detached commits. The caller should store this somewhere as there will be no other way to obtain it in the future.
max_retries (int) – The maximum number of retries to perform when committing the dataset.
- Returns:
- dataset: LanceDataset
A new version of Lance Dataset.
- merged: Transaction
The merged transaction that was applied to the dataset.
- Return type:
dict with keys
- count_rows(filter: Expression | str | None = None, **kwargs) int ¶
Count rows matching the scanner filter.
- Parameters:
**kwargs (dict, optional) – See py:method:scanner method for full parameter description.
- Returns:
count – The total number of rows in the dataset.
- Return type:
int
- create_index(column: str | List[str], index_type: str, name: str | None = None, metric: str = 'L2', replace: bool = False, num_partitions: int | None = None, ivf_centroids: np.ndarray | pa.FixedSizeListArray | pa.FixedShapeTensorArray | None = None, pq_codebook: np.ndarray | pa.FixedSizeListArray | pa.FixedShapeTensorArray | None = None, num_sub_vectors: int | None = None, accelerator: str | 'torch.Device' | None = None, index_cache_size: int | None = None, shuffle_partition_batches: int | None = None, shuffle_partition_concurrency: int | None = None, ivf_centroids_file: str | None = None, precomputed_partition_dataset: str | None = None, storage_options: Dict[str, str] | None = None, filter_nan: bool = True, one_pass_ivfpq: bool = False, **kwargs) LanceDataset ¶
Create index on column.
Experimental API
- Parameters:
column (str) – The column to be indexed.
index_type (str) – The type of the index.
"IVF_PQ, IVF_HNSW_PQ and IVF_HNSW_SQ"
are supported now.name (str, optional) – The index name. If not provided, it will be generated from the column name.
metric (str) – The distance metric type, i.e., “L2” (alias to “euclidean”), “cosine” or “dot” (dot product). Default is “L2”.
replace (bool) – Replace the existing index if it exists.
num_partitions (int, optional) – The number of partitions of IVF (Inverted File Index).
ivf_centroids (optional) – It can be either
np.ndarray
,pyarrow.FixedSizeListArray
orpyarrow.FixedShapeTensorArray
. Anum_partitions x dimension
array of existing K-mean centroids for IVF clustering. If not provided, a new KMeans model will be trained.pq_codebook (optional,) –
It can be
np.ndarray
,pyarrow.FixedSizeListArray
, orpyarrow.FixedShapeTensorArray
. Anum_sub_vectors x (2 ^ nbits * dimensions // num_sub_vectors)
array of K-mean centroids for PQ codebook.Note:
nbits
is always 8 for now. If not provided, a new PQ model will be trained.num_sub_vectors (int, optional) – The number of sub-vectors for PQ (Product Quantization).
accelerator (str or
torch.Device
, optional) – If set, use an accelerator to speed up the training process. Accepted accelerator: “cuda” (Nvidia GPU) and “mps” (Apple Silicon GPU). If not set, use the CPU.index_cache_size (int, optional) – The size of the index cache in number of entries. Default value is 256.
shuffle_partition_batches (int, optional) –
The number of batches, using the row group size of the dataset, to include in each shuffle partition. Default value is 10240.
Assuming the row group size is 1024, each shuffle partition will hold 10240 * 1024 = 10,485,760 rows. By making this value smaller, this shuffle will consume less memory but will take longer to complete, and vice versa.
shuffle_partition_concurrency (int, optional) –
The number of shuffle partitions to process concurrently. Default value is 2
By making this value smaller, this shuffle will consume less memory but will take longer to complete, and vice versa.
storage_options (optional, dict) – Extra options that make sense for a particular storage connection. This is used to store connection parameters like credentials, endpoint, etc.
filter_nan (bool) – Defaults to True. False is UNSAFE, and will cause a crash if any null/nan values are present (and otherwise will not). Disables the null filter used for nullable columns. Obtains a small speed boost.
one_pass_ivfpq (bool) – Defaults to False. If enabled, index type must be “IVF_PQ”. Reduces disk IO.
kwargs – Parameters passed to the index building process.
The SQ (Scalar Quantization) is available for only
IVF_HNSW_SQ
index type, this quantization method is used to reduce the memory usage of the index, it maps the float vectors to integer vectors, each integer is ofnum_bits
, now only 8 bits are supported.- If
index_type
is “IVF_*”, then the following parameters are required: num_partitions
- If
index_type
is with “PQ”, then the following parameters are required: num_sub_vectors
Optional parameters for IVF_PQ:
- ivf_centroids
Existing K-mean centroids for IVF clustering.
- num_bits
The number of bits for PQ (Product Quantization). Default is 8. Only 4, 8 are supported.
- Optional parameters for IVF_HNSW_*:
- max_level
Int, the maximum number of levels in the graph.
- m
Int, the number of edges per node in the graph.
- ef_construction
Int, the number of nodes to examine during the construction.
Examples
import lance dataset = lance.dataset("/tmp/sift.lance") dataset.create_index( "vector", "IVF_PQ", num_partitions=256, num_sub_vectors=16 )
import lance dataset = lance.dataset("/tmp/sift.lance") dataset.create_index( "vector", "IVF_HNSW_SQ", num_partitions=256, )
Experimental Accelerator (GPU) support:
- accelerate: use GPU to train IVF partitions.
Only supports CUDA (Nvidia) or MPS (Apple) currently. Requires PyTorch being installed.
import lance dataset = lance.dataset("/tmp/sift.lance") dataset.create_index( "vector", "IVF_PQ", num_partitions=256, num_sub_vectors=16, accelerator="cuda" )
References
- create_scalar_index(column: str, index_type: Literal['BTREE'] | Literal['BITMAP'] | Literal['LABEL_LIST'] | Literal['INVERTED'] | Literal['FTS'], name: str | None = None, *, replace: bool = True, **kwargs)¶
Create a scalar index on a column.
Scalar indices, like vector indices, can be used to speed up scans. A scalar index can speed up scans that contain filter expressions on the indexed column. For example, the following scan will be faster if the column
my_col
has a scalar index:import lance dataset = lance.dataset("/tmp/images.lance") my_table = dataset.scanner(filter="my_col != 7").to_table()
Vector search with pre-filers can also benefit from scalar indices. For example,
import lance dataset = lance.dataset("/tmp/images.lance") my_table = dataset.scanner( nearest=dict( column="vector", q=[1, 2, 3, 4], k=10, ) filter="my_col != 7", prefilter=True )
There are 4 types of scalar indices available today.
BTREE
. The most common type isBTREE
. This index is inspired by the btree data structure although only the first few layers of the btree are cached in memory. It will perform well on columns with a large number of unique values and few rows per value.BITMAP
. This index stores a bitmap for each unique value in the column. This index is useful for columns with a small number of unique values and many rows per value.LABEL_LIST
. A special index that is used to index list columns whose values have small cardinality. For example, a column that contains lists of tags (e.g.["tag1", "tag2", "tag3"]
) can be indexed with aLABEL_LIST
index. This index can only speedup queries witharray_has_any
orarray_has_all
filters.FTS/INVERTED
. It is used to index document columns. This index can conduct full-text searches. For example, a column that contains any word of query string “hello world”. The results will be ranked by BM25.
Note that the
LANCE_BYPASS_SPILLING
environment variable can be used to bypass spilling to disk. Setting this to true can avoid memory exhaustion issues (see https://github.com/apache/datafusion/issues/10073 for more info).Experimental API
- Parameters:
column (str) – The column to be indexed. Must be a boolean, integer, float, or string column.
index_type (str) – The type of the index. One of
"BTREE"
,"BITMAP"
,"LABEL_LIST"
, “FTS” or"INVERTED"
.name (str, optional) – The index name. If not provided, it will be generated from the column name.
replace (bool, default True) – Replace the existing index if it exists.
Parameters (Optional) –
------------------- –
with_position (bool, default True) – This is for the
INVERTED
index. If True, the index will store the positions of the words in the document, so that you can conduct phrase query. This will significantly increase the index size. It won’t impact the performance of non-phrase queries even if it is set to True.base_tokenizer (str, default "simple") – This is for the
INVERTED
index. The base tokenizer to use. The value can be: * “simple”: splits tokens on whitespace and punctuation. * “whitespace”: splits tokens on whitespace. * “raw”: no tokenization.language (str, default "English") – This is for the
INVERTED
index. The language for stemming and stop words. This is only used when stem or remove_stop_words is truemax_token_length (Optional[int], default 40) – This is for the
INVERTED
index. The maximum token length. Any token longer than this will be removed.lower_case (bool, default True) – This is for the
INVERTED
index. If True, the index will convert all text to lowercase.stem (bool, default False) – This is for the
INVERTED
index. If True, the index will stem the tokens.remove_stop_words (bool, default False) – This is for the
INVERTED
index. If True, the index will remove stop words.ascii_folding (bool, default False) – This is for the
INVERTED
index. If True, the index will convert non-ascii characters to ascii characters if possible. This would remove accents like “é” -> “e”.
Examples
import lance dataset = lance.dataset("/tmp/images.lance") dataset.create_index( "category", "BTREE", )
Scalar indices can only speed up scans for basic filters using equality, comparison, range (e.g.
my_col BETWEEN 0 AND 100
), and set membership (e.g. my_col IN (0, 1, 2))Scalar indices can be used if the filter contains multiple indexed columns and the filter criteria are AND’d or OR’d together (e.g.
my_col < 0 AND other_col> 100
)Scalar indices may be used if the filter contains non-indexed columns but, depending on the structure of the filter, they may not be usable. For example, if the column
not_indexed
does not have a scalar index then the filtermy_col = 0 OR not_indexed = 1
will not be able to use any scalar index onmy_col
.To determine if a scan is making use of a scalar index you can use
explain_plan
to look at the query plan that lance has created. Queries that use scalar indices will either have aScalarIndexQuery
relation or aMaterializeIndex
operator.
- property data_storage_version: str¶
The version of the data storage format this dataset is using
- delete(predicate: str | Expression)¶
Delete rows from the dataset.
This marks rows as deleted, but does not physically remove them from the files. This keeps the existing indexes still valid.
- Parameters:
predicate (str or pa.compute.Expression) – The predicate to use to select rows to delete. May either be a SQL string or a pyarrow Expression.
Examples
>>> import lance >>> import pyarrow as pa >>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]}) >>> dataset = lance.write_dataset(table, "example") >>> dataset.delete("a = 1 or b in ('a', 'b')") >>> dataset.to_table() pyarrow.Table a: int64 b: string ---- a: [[3]] b: [["c"]]
- static drop(base_uri: str | Path, storage_options: Dict[str, str] | None = None) None ¶
- drop_columns(columns: List[str])¶
Drop one or more columns from the dataset
This is a metadata-only operation and does not remove the data from the underlying storage. In order to remove the data, you must subsequently call
compact_files
to rewrite the data without the removed columns and then callcleanup_old_versions
to remove the old files.- Parameters:
columns (list of str) – The names of the columns to drop. These can be nested column references (e.g. “a.b.c”) or top-level column names (e.g. “a”).
Examples
>>> import lance >>> import pyarrow as pa >>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]}) >>> dataset = lance.write_dataset(table, "example") >>> dataset.drop_columns(["a"]) >>> dataset.to_table().to_pandas() b 0 a 1 b 2 c
- get_fragment(fragment_id: int) LanceFragment | None ¶
Get the fragment with fragment id.
- get_fragments(filter: Expression | None = None) List[LanceFragment] ¶
Get all fragments from the dataset.
Note: filter is not supported yet.
- property has_index¶
- head(num_rows, **kwargs)¶
Load the first N rows of the dataset.
- Parameters:
num_rows (int) – The number of rows to load.
**kwargs (dict, optional) – See scanner() method for full parameter description.
- Returns:
table
- Return type:
Table
- index_statistics(index_name: str) Dict[str, Any] ¶
- insert(data: ReaderLike, *, mode='append', **kwargs)¶
Insert data into the dataset.
- Parameters:
data_obj (Reader-like) – The data to be written. Acceptable types are: - Pandas DataFrame, Pyarrow Table, Dataset, Scanner, or RecordBatchReader - Huggingface dataset
mode (str, default 'append') –
- The mode to use when writing the data. Options are:
create - create a new dataset (raises if uri already exists). overwrite - create a new snapshot version append - create a new version that is the concat of the input the latest version (raises if uri does not exist)
**kwargs (dict, optional) – Additional keyword arguments to pass to
write_dataset()
.
- join(right_dataset, keys, right_keys=None, join_type='left outer', left_suffix=None, right_suffix=None, coalesce_keys=True, use_threads=True)¶
Not implemented (just override pyarrow dataset to prevent segfault)
- property lance_schema: LanceSchema¶
The LanceSchema for this dataset
- property latest_version: int¶
Returns the latest version of the dataset.
- list_indices() List[Dict[str, Any]] ¶
- merge(data_obj: ReaderLike, left_on: str, right_on: str | None = None, schema=None)¶
Merge another dataset into this one.
Performs a left join, where the dataset is the left side and data_obj is the right side. Rows existing in the dataset but not on the left will be filled with null values, unless Lance doesn’t support null values for some types, in which case an error will be raised.
- Parameters:
data_obj (Reader-like) – The data to be merged. Acceptable types are: - Pandas DataFrame, Pyarrow Table, Dataset, Scanner, Iterator[RecordBatch], or RecordBatchReader
left_on (str) – The name of the column in the dataset to join on.
right_on (str or None) – The name of the column in data_obj to join on. If None, defaults to left_on.
Examples
>>> import lance >>> import pyarrow as pa >>> df = pa.table({'x': [1, 2, 3], 'y': ['a', 'b', 'c']}) >>> dataset = lance.write_dataset(df, "dataset") >>> dataset.to_table().to_pandas() x y 0 1 a 1 2 b 2 3 c >>> new_df = pa.table({'x': [1, 2, 3], 'z': ['d', 'e', 'f']}) >>> dataset.merge(new_df, 'x') >>> dataset.to_table().to_pandas() x y z 0 1 a d 1 2 b e 2 3 c f
See also
LanceDataset.add_columns
Add new columns by computing batch-by-batch.
- merge_insert(on: str | Iterable[str])¶
Returns a builder that can be used to create a “merge insert” operation
This operation can add rows, update rows, and remove rows in a single transaction. It is a very generic tool that can be used to create behaviors like “insert if not exists”, “update or insert (i.e. upsert)”, or even replace a portion of existing data with new data (e.g. replace all data where month=”january”)
The merge insert operation works by combining new data from a source table with existing data in a target table by using a join. There are three categories of records.
“Matched” records are records that exist in both the source table and the target table. “Not matched” records exist only in the source table (e.g. these are new data). “Not matched by source” records exist only in the target table (this is old data).
The builder returned by this method can be used to customize what should happen for each category of data.
Please note that the data will be reordered as part of this operation. This is because updated rows will be deleted from the dataset and then reinserted at the end with the new values. The order of the newly inserted rows may fluctuate randomly because a hash-join operation is used internally.
- Parameters:
on (Union[str, Iterable[str]]) – A column (or columns) to join on. This is how records from the source table and target table are matched. Typically this is some kind of key or id column.
Examples
Use when_matched_update_all() and when_not_matched_insert_all() to perform an “upsert” operation. This will update rows that already exist in the dataset and insert rows that do not exist.
>>> import lance >>> import pyarrow as pa >>> table = pa.table({"a": [2, 1, 3], "b": ["a", "b", "c"]}) >>> dataset = lance.write_dataset(table, "example") >>> new_table = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]}) >>> # Perform a "upsert" operation >>> dataset.merge_insert("a") \ ... .when_matched_update_all() \ ... .when_not_matched_insert_all() \ ... .execute(new_table) {'num_inserted_rows': 1, 'num_updated_rows': 2, 'num_deleted_rows': 0} >>> dataset.to_table().sort_by("a").to_pandas() a b 0 1 b 1 2 x 2 3 y 3 4 z
Use when_not_matched_insert_all() to perform an “insert if not exists” operation. This will only insert rows that do not already exist in the dataset.
>>> import lance >>> import pyarrow as pa >>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]}) >>> dataset = lance.write_dataset(table, "example2") >>> new_table = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]}) >>> # Perform an "insert if not exists" operation >>> dataset.merge_insert("a") \ ... .when_not_matched_insert_all() \ ... .execute(new_table) {'num_inserted_rows': 1, 'num_updated_rows': 0, 'num_deleted_rows': 0} >>> dataset.to_table().sort_by("a").to_pandas() a b 0 1 a 1 2 b 2 3 c 3 4 z
You are not required to provide all the columns. If you only want to update a subset of columns, you can omit columns you don’t want to update. Omitted columns will keep their existing values if they are updated, or will be null if they are inserted.
>>> import lance >>> import pyarrow as pa >>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"], \ ... "c": ["x", "y", "z"]}) >>> dataset = lance.write_dataset(table, "example3") >>> new_table = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]}) >>> # Perform an "upsert" operation, only updating column "a" >>> dataset.merge_insert("a") \ ... .when_matched_update_all() \ ... .when_not_matched_insert_all() \ ... .execute(new_table) {'num_inserted_rows': 1, 'num_updated_rows': 2, 'num_deleted_rows': 0} >>> dataset.to_table().sort_by("a").to_pandas() a b c 0 1 a x 1 2 x y 2 3 y z 3 4 z None
- migrate_manifest_paths_v2()¶
Migrate the manifest paths to the new format.
This will update the manifest to use the new v2 format for paths.
This function is idempotent, and can be run multiple times without changing the state of the object store.
DANGER: this should not be run while other concurrent operations are happening. And it should also run until completion before resuming other operations.
- property optimize: DatasetOptimizer¶
- property partition_expression¶
Not implemented (just override pyarrow dataset to prevent segfault)
- replace_field_metadata(field_name: str, new_metadata: Dict[str, str])¶
Replace the metadata of a field in the schema
- Parameters:
field_name (str) – The name of the field to replace the metadata for
new_metadata (dict) – The new metadata to set
- replace_schema(schema: Schema)¶
Not implemented (just override pyarrow dataset to prevent segfault)
See :py:method:`replace_schema_metadata` or :py:method:`replace_field_metadata`
- replace_schema_metadata(new_metadata: Dict[str, str])¶
Replace the schema metadata of the dataset
- Parameters:
new_metadata (dict) – The new metadata to set
- restore()¶
Restore the currently checked out version as the latest version of the dataset.
This creates a new commit.
- sample(num_rows: int, columns: List[str] | Dict[str, str] | None = None, randomize_order: bool = True, **kwargs) Table ¶
Select a random sample of data
- Parameters:
num_rows (int) – number of rows to retrieve
columns (list of str, or dict of str to str default None) – List of column names to be fetched. Or a dictionary of column names to SQL expressions. All columns are fetched if None or unspecified.
**kwargs (dict, optional) – see scanner() method for full parameter description.
- Returns:
table
- Return type:
Table
- scanner(columns: List[str] | Dict[str, str] | None = None, filter: Expression | str | None = None, limit: int | None = None, offset: int | None = None, nearest: dict | None = None, batch_size: int | None = None, batch_readahead: int | None = None, fragment_readahead: int | None = None, scan_in_order: bool = None, fragments: Iterable[LanceFragment] | None = None, full_text_query: str | dict | None = None, *, prefilter: bool = None, with_row_id: bool = None, with_row_address: bool = None, use_stats: bool = None, fast_search: bool = None, io_buffer_size: int | None = None, late_materialization: bool | List[str] | None = None, use_scalar_index: bool | None = None) LanceScanner ¶
Return a Scanner that can support various pushdowns.
- Parameters:
columns (list of str, or dict of str to str default None) – List of column names to be fetched. Or a dictionary of column names to SQL expressions. All columns are fetched if None or unspecified.
filter (pa.compute.Expression or str) –
Expression or str that is a valid SQL where clause. See Lance filter pushdown for valid SQL expressions.
limit (int, default None) – Fetch up to this many rows. All rows if None or unspecified.
offset (int, default None) – Fetch starting with this row. 0 if None or unspecified.
nearest (dict, default None) –
Get the rows corresponding to the K most similar vectors. Example:
{ "column": <embedding col name>, "q": <query vector as pa.Float32Array>, "k": 10, "nprobes": 1, "refine_factor": 1 }
batch_size (int, default None) – The target size of batches returned. In some cases batches can be up to twice this size (but never larger than this). In some cases batches can be smaller than this size.
io_buffer_size (int, default None) – The size of the IO buffer. See
ScannerBuilder.io_buffer_size
for more information.batch_readahead (int, optional) – The number of batches to read ahead.
fragment_readahead (int, optional) – The number of fragments to read ahead.
scan_in_order (bool, default True) – Whether to read the fragments and batches in order. If false, throughput may be higher, but batches will be returned out of order and memory use might increase.
fragments (iterable of LanceFragment, default None) – If specified, only scan these fragments. If scan_in_order is True, then the fragments will be scanned in the order given.
prefilter (bool, default False) –
If True then the filter will be applied before the vector query is run. This will generate more correct results but it may be a more costly query. It’s generally good when the filter is highly selective.
If False then the filter will be applied after the vector query is run. This will perform well but the results may have fewer than the requested number of rows (or be empty) if the rows closest to the query do not match the filter. It’s generally good when the filter is not very selective.
use_scalar_index (bool, default True) – Lance will automatically use scalar indices to optimize a query. In some corner cases this can make query performance worse and this parameter can be used to disable scalar indices in these cases.
late_materialization (bool or List[str], default None) –
Allows custom control over late materialization. Late materialization fetches non-query columns using a take operation after the filter. This is useful when there are few results or columns are very large.
Early materialization can be better when there are many results or the columns are very narrow.
If True, then all columns are late materialized. If False, then all columns are early materialized. If a list of strings, then only the columns in the list are late materialized.
The default uses a heuristic that assumes filters will select about 0.1% of the rows. If your filter is more selective (e.g. find by id) you may want to set this to True. If your filter is not very selective (e.g. matches 20% of the rows) you may want to set this to False.
full_text_query (str or dict, optional) –
query string to search for, the results will be ranked by BM25. e.g. “hello world”, would match documents containing “hello” or “world”. or a dictionary with the following keys:
- columns: list[str]
The columns to search, currently only supports a single column in the columns list.
- query: str
The query string to search for.
fast_search (bool, default False) – If True, then the search will only be performed on the indexed data, which yields faster search time.
Notes
For now, if BOTH filter and nearest is specified, then:
nearest is executed first.
The results are filtered afterwards.
For debugging ANN results, you can choose to not use the index even if present by specifying
use_index=False
. For example, the following will always return exact KNN results:dataset.to_table(nearest={ "column": "vector", "k": 10, "q": <query vector>, "use_index": False }
- property schema: Schema¶
The pyarrow Schema for this dataset
- session() _Session ¶
Return the dataset session, which holds the dataset’s state.
- property stats: LanceStats¶
Experimental API
- take(indices: List[int] | Array, columns: List[str] | Dict[str, str] | None = None, **kwargs) Table ¶
Select rows of data by index.
- Parameters:
indices (Array or array-like) – indices of rows to select in the dataset.
columns (list of str, or dict of str to str default None) – List of column names to be fetched. Or a dictionary of column names to SQL expressions. All columns are fetched if None or unspecified.
**kwargs (dict, optional) – See :py:method::scanner method for full parameter description.
- Returns:
table
- Return type:
pyarrow.Table
- take_blobs(row_ids: List[int] | Array, blob_column: str) List[BlobFile] ¶
Select blobs by row IDs.
Instead of loading large binary blob data into memory before processing it, this API allows you to open binary blob data as a regular Python file-like object. For more details, see
lance.BlobFile
.- Parameters:
row_ids (List Array or array-like) – row IDs to select in the dataset.
blob_column (str) – The name of the blob column to select.
- Returns:
blob_files
- Return type:
List[BlobFile]
- to_batches(columns: List[str] | Dict[str, str] | None = None, filter: Expression | str | None = None, limit: int | None = None, offset: int | None = None, nearest: dict | None = None, batch_size: int | None = None, batch_readahead: int | None = None, fragment_readahead: int | None = None, scan_in_order: bool = True, *, prefilter: bool = False, with_row_id: bool = False, with_row_address: bool = False, use_stats: bool = True, full_text_query: str | dict | None = None, io_buffer_size: int | None = None, late_materialization: bool | List[str] | None = None, use_scalar_index: bool | None = None, **kwargs) Iterator[RecordBatch] ¶
Read the dataset as materialized record batches.
- Parameters:
**kwargs (dict, optional) – Arguments for
Scanner.from_dataset
.- Returns:
record_batches
- Return type:
Iterator of RecordBatch
- to_table(columns: List[str] | Dict[str, str] | None = None, filter: Expression | str | None = None, limit: int | None = None, offset: int | None = None, nearest: dict | None = None, batch_size: int | None = None, batch_readahead: int | None = None, fragment_readahead: int | None = None, scan_in_order: bool = True, *, prefilter: bool = False, with_row_id: bool = False, with_row_address: bool = False, use_stats: bool = True, fast_search: bool = False, full_text_query: str | dict | None = None, io_buffer_size: int | None = None, late_materialization: bool | List[str] | None = None, use_scalar_index: bool | None = None) Table ¶
Read the data into memory as a
pyarrow.Table
- Parameters:
columns (list of str, or dict of str to str default None) – List of column names to be fetched. Or a dictionary of column names to SQL expressions. All columns are fetched if None or unspecified.
filter (pa.compute.Expression or str) –
Expression or str that is a valid SQL where clause. See Lance filter pushdown for valid SQL expressions.
limit (int, default None) – Fetch up to this many rows. All rows if None or unspecified.
offset (int, default None) – Fetch starting with this row. 0 if None or unspecified.
nearest (dict, default None) –
Get the rows corresponding to the K most similar vectors. Example:
{ "column": <embedding col name>, "q": <query vector as pa.Float32Array>, "k": 10, "metric": "cosine", "nprobes": 1, "refine_factor": 1 }
batch_size (int, optional) – The number of rows to read at a time.
io_buffer_size (int, default None) – The size of the IO buffer. See
ScannerBuilder.io_buffer_size
for more information.batch_readahead (int, optional) – The number of batches to read ahead.
fragment_readahead (int, optional) – The number of fragments to read ahead.
scan_in_order (bool, default True) – Whether to read the fragments and batches in order. If false, throughput may be higher, but batches will be returned out of order and memory use might increase.
prefilter (bool, default False) – Run filter before the vector search.
late_materialization (bool or List[str], default None) – Allows custom control over late materialization. See
ScannerBuilder.late_materialization
for more information.use_scalar_index (bool, default True) – Allows custom control over scalar index usage. See
ScannerBuilder.use_scalar_index
for more information.with_row_id (bool, default False) – Return row ID.
with_row_address (bool, default False) – Return row address
use_stats (bool, default True) – Use stats pushdown during filters.
full_text_query (str or dict, optional) –
query string to search for, the results will be ranked by BM25. e.g. “hello world”, would match documents contains “hello” or “world”. or a dictionary with the following keys:
- columns: list[str]
The columns to search, currently only supports a single column in the columns list.
- query: str
The query string to search for.
Notes
If BOTH filter and nearest is specified, then:
nearest is executed first.
The results are filtered afterward, unless pre-filter sets to True.
- update(updates: Dict[str, str], where: str | None = None) Dict[str, Any] ¶
Update column values for rows matching where.
- Parameters:
updates (dict of str to str) – A mapping of column names to a SQL expression.
where (str, optional) – A SQL predicate indicating which rows should be updated.
- Returns:
updates – A dictionary containing the number of rows updated.
- Return type:
dict
Examples
>>> import lance >>> import pyarrow as pa >>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]}) >>> dataset = lance.write_dataset(table, "example") >>> update_stats = dataset.update(dict(a = 'a + 2'), where="b != 'a'") >>> update_stats["num_updated_rows"] = 2 >>> dataset.to_table().to_pandas() a b 0 1 a 1 4 b 2 5 c
- property uri: str¶
The location of the data
- validate()¶
Validate the dataset.
This checks the integrity of the dataset and will raise an exception if the dataset is corrupted.
- property version: int¶
Returns the currently checked out version of the dataset
- versions()¶
Return all versions in this dataset.
- class lance.LanceFragment(dataset: LanceDataset, fragment_id: int | None, *, fragment: _Fragment | None = None)¶
Bases:
Fragment
- count_rows(self, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, MemoryPool memory_pool=None)¶
Count rows matching the scanner filter.
- Parameters:
filter (Expression, default None) – Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.
batch_size (int, default 131_072) – The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.
batch_readahead (int, default 16) – The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.
fragment_readahead (int, default 4) – The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.
fragment_scan_options (FragmentScanOptions, default None) – Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.
use_threads (bool, default True) – If enabled, then maximum parallelism will be used determined by the number of available CPU cores.
memory_pool (MemoryPool, default None) – For memory allocations, if required. If not specified, uses the default pool.
- Returns:
count
- Return type:
int
- static create(dataset_uri: str | Path, data: Table | RecordBatchReader, fragment_id: int | None = None, schema: Schema | None = None, max_rows_per_group: int = 1024, progress: FragmentWriteProgress | None = None, mode: str = 'append', *, data_storage_version: str | None = None, use_legacy_format: bool | None = None, storage_options: Dict[str, str] | None = None) FragmentMetadata ¶
Create a
FragmentMetadata
from the given data.This can be used if the dataset is not yet created.
Warning
Internal API. This method is not intended to be used by end users.
- Parameters:
dataset_uri (str) – The URI of the dataset.
fragment_id (int) – The ID of the fragment.
data (pa.Table or pa.RecordBatchReader) – The data to be written to the fragment.
schema (pa.Schema, optional) – The schema of the data. If not specified, the schema will be inferred from the data.
max_rows_per_group (int, default 1024) – The maximum number of rows per group in the data file.
progress (FragmentWriteProgress, optional) – Experimental API. Progress tracking for writing the fragment. Pass a custom class that defines hooks to be called when each fragment is starting to write and finishing writing.
mode (str, default "append") – The write mode. If “append” is specified, the data will be checked against the existing dataset’s schema. Otherwise, pass “create” or “overwrite” to assign new field ids to the schema.
data_storage_version (optional, str, default None) – The version of the data storage format to use. Newer versions are more efficient but require newer versions of lance to read. The default (None) will use the latest stable version. See the user guide for more details.
use_legacy_format (bool, default None) – Deprecated parameter. Use data_storage_version instead.
storage_options (optional, dict) – Extra options that make sense for a particular storage connection. This is used to store connection parameters like credentials, endpoint, etc.
See also
lance.dataset.LanceOperation.Overwrite
The operation used to create a new dataset or overwrite one using fragments created with this API. See the doc page for an example of using this API.
lance.dataset.LanceOperation.Append
The operation used to append fragments created with this API to an existing dataset. See the doc page for an example of using this API.
- Return type:
- static create_from_file(filename: str | Path, dataset: LanceDataset, fragment_id: int) FragmentMetadata ¶
Create a fragment from the given datafile uri.
This can be used if the datafile is loss from dataset.
Warning
Internal API. This method is not intended to be used by end users.
- Parameters:
filename (str) – The filename of the datafile.
dataset (LanceDataset) – The dataset that the fragment belongs to.
fragment_id (int) – The ID of the fragment.
- data_files()¶
Return the data files of this fragment.
- delete(predicate: str) FragmentMetadata | None ¶
Delete rows from this Fragment.
This will add or update the deletion file of this fragment. It does not modify or delete the data files of this fragment. If no rows are left after the deletion, this method will return None.
Warning
Internal API. This method is not intended to be used by end users.
- Parameters:
predicate (str) – A SQL predicate that specifies the rows to delete.
- Returns:
A new fragment containing the new deletion file, or None if no rows left.
- Return type:
FragmentMetadata or None
Examples
>>> import lance >>> import pyarrow as pa >>> tab = pa.table({"a": [1, 2, 3], "b": [4, 5, 6]}) >>> dataset = lance.write_dataset(tab, "dataset") >>> frag = dataset.get_fragment(0) >>> frag.delete("a > 1") FragmentMetadata(id=0, files=[DataFile(path='...', fields=[0, 1], ...), ...) >>> frag.delete("a > 0") is None True
See also
lance.dataset.LanceOperation.Delete
The operation used to commit these changes to a dataset. See the doc page for an example of using this API.
- deletion_file()¶
Return the deletion file, if any
- property fragment_id¶
- head(self, int num_rows, columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, MemoryPool memory_pool=None)¶
Load the first N rows of the fragment.
- Parameters:
num_rows (int) – The number of rows to load.
columns (list of str, default None) –
The columns to project. This can be a list of column names to include (order and duplicates will be preserved), or a dictionary with {new_column_name: expression} values for more advanced projections.
The list of columns or expressions may use the special fields __batch_index (the index of the batch within the fragment), __fragment_index (the index of the fragment within the dataset), __last_in_fragment (whether the batch is last in fragment), and __filename (the name of the source file or a description of the source fragment).
The columns will be passed down to Datasets and corresponding data fragments to avoid loading, copying, and deserializing columns that will not be required further down the compute chain. By default all of the available columns are projected. Raises an exception if any of the referenced column names does not exist in the dataset’s Schema.
filter (Expression, default None) – Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.
batch_size (int, default 131_072) – The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.
batch_readahead (int, default 16) – The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.
fragment_readahead (int, default 4) – The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.
fragment_scan_options (FragmentScanOptions, default None) – Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.
use_threads (bool, default True) – If enabled, then maximum parallelism will be used determined by the number of available CPU cores.
memory_pool (MemoryPool, default None) – For memory allocations, if required. If not specified, uses the default pool.
- Return type:
Table
- merge_columns(value_func: Dict[str, str] | BatchUDF | ReaderLike | Callable[[pa.RecordBatch], pa.RecordBatch], columns: list[str] | None = None, batch_size: int | None = None, reader_schema: pa.Schema | None = None) Tuple[FragmentMetadata, LanceSchema] ¶
Add columns to this Fragment.
Warning
Internal API. This method is not intended to be used by end users.
The parameters and their interpretation are the same as in the
lance.dataset.LanceDataset.add_columns()
operation.The only difference is that, instead of modifying the dataset, a new fragment is created. The new schema of the fragment is returned as well. These can be used in a later operation to commit the changes to the dataset.
See also
lance.dataset.LanceOperation.Merge
The operation used to commit these changes to the dataset. See the doc page for an example of using this API.
- Returns:
A new fragment with the added column(s) and the final schema.
- Return type:
Tuple[FragmentMetadata, LanceSchema]
- property metadata: FragmentMetadata¶
Return the metadata of this fragment.
- Return type:
- property num_deletions: int¶
Return the number of deleted rows in this fragment.
- property partition_expression: Schema¶
An Expression which evaluates to true for all data viewed by this Fragment.
- property physical_rows: int¶
Return the number of rows originally in this fragment.
To get the number of rows after deletions, use
count_rows()
instead.
- property physical_schema: Schema¶
Return the physical schema of this Fragment. This schema can be different from the dataset read schema.
- scanner(*, columns: List[str] | Dict[str, str] | None = None, batch_size: int | None = None, filter: str | pa.compute.Expression | None = None, limit: int | None = None, offset: int | None = None, with_row_id: bool = False, batch_readahead: int = 16) LanceScanner ¶
See Dataset::scanner for details
- property schema: Schema¶
Return the schema of this fragment.
- take(self, indices, columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, MemoryPool memory_pool=None)¶
Select rows of data by index.
- Parameters:
indices (Array or array-like) – The indices of row to select in the dataset.
columns (list of str, default None) –
The columns to project. This can be a list of column names to include (order and duplicates will be preserved), or a dictionary with {new_column_name: expression} values for more advanced projections.
The list of columns or expressions may use the special fields __batch_index (the index of the batch within the fragment), __fragment_index (the index of the fragment within the dataset), __last_in_fragment (whether the batch is last in fragment), and __filename (the name of the source file or a description of the source fragment).
The columns will be passed down to Datasets and corresponding data fragments to avoid loading, copying, and deserializing columns that will not be required further down the compute chain. By default all of the available columns are projected. Raises an exception if any of the referenced column names does not exist in the dataset’s Schema.
filter (Expression, default None) – Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.
batch_size (int, default 131_072) – The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.
batch_readahead (int, default 16) – The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.
fragment_readahead (int, default 4) – The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.
fragment_scan_options (FragmentScanOptions, default None) – Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.
use_threads (bool, default True) – If enabled, then maximum parallelism will be used determined by the number of available CPU cores.
memory_pool (MemoryPool, default None) – For memory allocations, if required. If not specified, uses the default pool.
- Return type:
Table
- to_batches(self, Schema schema=None, columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, MemoryPool memory_pool=None)¶
Read the fragment as materialized record batches.
- Parameters:
schema (Schema, optional) – Concrete schema to use for scanning.
columns (list of str, default None) –
The columns to project. This can be a list of column names to include (order and duplicates will be preserved), or a dictionary with {new_column_name: expression} values for more advanced projections.
The list of columns or expressions may use the special fields __batch_index (the index of the batch within the fragment), __fragment_index (the index of the fragment within the dataset), __last_in_fragment (whether the batch is last in fragment), and __filename (the name of the source file or a description of the source fragment).
The columns will be passed down to Datasets and corresponding data fragments to avoid loading, copying, and deserializing columns that will not be required further down the compute chain. By default all of the available columns are projected. Raises an exception if any of the referenced column names does not exist in the dataset’s Schema.
filter (Expression, default None) – Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.
batch_size (int, default 131_072) – The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.
batch_readahead (int, default 16) – The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.
fragment_readahead (int, default 4) – The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.
fragment_scan_options (FragmentScanOptions, default None) – Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.
use_threads (bool, default True) – If enabled, then maximum parallelism will be used determined by the number of available CPU cores.
memory_pool (MemoryPool, default None) – For memory allocations, if required. If not specified, uses the default pool.
- Returns:
record_batches
- Return type:
iterator of RecordBatch
- to_table(self, Schema schema=None, columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, MemoryPool memory_pool=None)¶
Convert this Fragment into a Table.
Use this convenience utility with care. This will serially materialize the Scan result in memory before creating the Table.
- Parameters:
schema (Schema, optional) – Concrete schema to use for scanning.
columns (list of str, default None) –
The columns to project. This can be a list of column names to include (order and duplicates will be preserved), or a dictionary with {new_column_name: expression} values for more advanced projections.
The list of columns or expressions may use the special fields __batch_index (the index of the batch within the fragment), __fragment_index (the index of the fragment within the dataset), __last_in_fragment (whether the batch is last in fragment), and __filename (the name of the source file or a description of the source fragment).
The columns will be passed down to Datasets and corresponding data fragments to avoid loading, copying, and deserializing columns that will not be required further down the compute chain. By default all of the available columns are projected. Raises an exception if any of the referenced column names does not exist in the dataset’s Schema.
filter (Expression, default None) – Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.
batch_size (int, default 131_072) – The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.
batch_readahead (int, default 16) – The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.
fragment_readahead (int, default 4) – The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.
fragment_scan_options (FragmentScanOptions, default None) – Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.
use_threads (bool, default True) – If enabled, then maximum parallelism will be used determined by the number of available CPU cores.
memory_pool (MemoryPool, default None) – For memory allocations, if required. If not specified, uses the default pool.
- Returns:
table
- Return type:
Table
- class lance.LanceOperation¶
Bases:
object
- class Append(fragments: Iterable[FragmentMetadata])¶
Bases:
BaseOperation
Append new rows to the dataset.
- fragments¶
The fragments that contain the new rows.
- Type:
list[FragmentMetadata]
Warning
This is an advanced API for distributed operations. To append to a dataset on a single machine, use
lance.write_dataset()
.Examples
To append new rows to a dataset, first use
lance.fragment.LanceFragment.create()
to create fragments. Then collect the fragment metadata into a list and pass it to this class. Finally, pass the operation to theLanceDataset.commit()
method to create the new dataset.>>> import lance >>> import pyarrow as pa >>> tab1 = pa.table({"a": [1, 2], "b": ["a", "b"]}) >>> dataset = lance.write_dataset(tab1, "example") >>> tab2 = pa.table({"a": [3, 4], "b": ["c", "d"]}) >>> fragment = lance.fragment.LanceFragment.create("example", tab2) >>> operation = lance.LanceOperation.Append([fragment]) >>> dataset = lance.LanceDataset.commit("example", operation, ... read_version=dataset.version) >>> dataset.to_table().to_pandas() a b 0 1 a 1 2 b 2 3 c 3 4 d
- fragments: Iterable[FragmentMetadata]¶
- class BaseOperation¶
Bases:
ABC
Base class for operations that can be applied to a dataset.
See available operations under
LanceOperation
.
- class CreateIndex(uuid: str, name: str, fields: List[int], dataset_version: int, fragment_ids: Set[int])¶
Bases:
BaseOperation
Operation that creates an index on the dataset.
- dataset_version: int¶
- fields: List[int]¶
- fragment_ids: Set[int]¶
- name: str¶
- uuid: str¶
- class Delete(updated_fragments: Iterable[FragmentMetadata], deleted_fragment_ids: Iterable[int], predicate: str)¶
Bases:
BaseOperation
Remove fragments or rows from the dataset.
- updated_fragments¶
The fragments that have been updated with new deletion vectors.
- Type:
list[FragmentMetadata]
- deleted_fragment_ids¶
The ids of the fragments that have been deleted entirely. These are the fragments where
LanceFragment.delete()
returned None.- Type:
list[int]
- predicate¶
The original SQL predicate used to select the rows to delete.
- Type:
str
Warning
This is an advanced API for distributed operations. To delete rows from dataset on a single machine, use
lance.LanceDataset.delete()
.Examples
To delete rows from a dataset, call
lance.fragment.LanceFragment.delete()
on each of the fragments. If that returns a new fragment, add that to theupdated_fragments
list. If it returns None, that means the whole fragment was deleted, so add the fragment id to thedeleted_fragment_ids
. Finally, pass the operation to theLanceDataset.commit()
method to complete the deletion operation.>>> import lance >>> import pyarrow as pa >>> table = pa.table({"a": [1, 2], "b": ["a", "b"]}) >>> dataset = lance.write_dataset(table, "example") >>> table = pa.table({"a": [3, 4], "b": ["c", "d"]}) >>> dataset = lance.write_dataset(table, "example", mode="append") >>> dataset.to_table().to_pandas() a b 0 1 a 1 2 b 2 3 c 3 4 d >>> predicate = "a >= 2" >>> updated_fragments = [] >>> deleted_fragment_ids = [] >>> for fragment in dataset.get_fragments(): ... new_fragment = fragment.delete(predicate) ... if new_fragment is not None: ... updated_fragments.append(new_fragment) ... else: ... deleted_fragment_ids.append(fragment.fragment_id) >>> operation = lance.LanceOperation.Delete(updated_fragments, ... deleted_fragment_ids, ... predicate) >>> dataset = lance.LanceDataset.commit("example", operation, ... read_version=dataset.version) >>> dataset.to_table().to_pandas() a b 0 1 a
- deleted_fragment_ids: Iterable[int]¶
- predicate: str¶
- updated_fragments: Iterable[FragmentMetadata]¶
- class Merge(fragments: Iterable[FragmentMetadata], schema: LanceSchema | Schema)¶
Bases:
BaseOperation
Operation that adds columns. Unlike Overwrite, this should not change the structure of the fragments, allowing existing indices to be kept.
- fragments¶
The fragments that make up the new dataset.
- Type:
iterable of FragmentMetadata
- schema¶
The schema of the new dataset. Passing a LanceSchema is preferred, and passing a pyarrow.Schema is deprecated.
- Type:
LanceSchema or pyarrow.Schema
Warning
This is an advanced API for distributed operations. To overwrite or create new dataset on a single machine, use
lance.write_dataset()
.Examples
To add new columns to a dataset, first define a method that will create the new columns based on the existing columns. Then use
lance.fragment.LanceFragment.add_columns()
>>> import lance >>> import pyarrow as pa >>> import pyarrow.compute as pc >>> table = pa.table({"a": [1, 2, 3, 4], "b": ["a", "b", "c", "d"]}) >>> dataset = lance.write_dataset(table, "example") >>> dataset.to_table().to_pandas() a b 0 1 a 1 2 b 2 3 c 3 4 d >>> def double_a(batch: pa.RecordBatch) -> pa.RecordBatch: ... doubled = pc.multiply(batch["a"], 2) ... return pa.record_batch([doubled], ["a_doubled"]) >>> fragments = [] >>> for fragment in dataset.get_fragments(): ... new_fragment, new_schema = fragment.merge_columns(double_a, ... columns=['a']) ... fragments.append(new_fragment) >>> operation = lance.LanceOperation.Merge(fragments, new_schema) >>> dataset = lance.LanceDataset.commit("example", operation, ... read_version=dataset.version) >>> dataset.to_table().to_pandas() a b a_doubled 0 1 a 2 1 2 b 4 2 3 c 6 3 4 d 8
- fragments: Iterable[FragmentMetadata]¶
- schema: LanceSchema | Schema¶
- class Overwrite(new_schema: LanceSchema | Schema, fragments: Iterable[FragmentMetadata])¶
Bases:
BaseOperation
Overwrite or create a new dataset.
- new_schema¶
The schema of the new dataset.
- Type:
pyarrow.Schema
- fragments¶
The fragments that make up the new dataset.
- Type:
list[FragmentMetadata]
Warning
This is an advanced API for distributed operations. To overwrite or create new dataset on a single machine, use
lance.write_dataset()
.Examples
To create or overwrite a dataset, first use
lance.fragment.LanceFragment.create()
to create fragments. Then collect the fragment metadata into a list and pass it along with the schema to this class. Finally, pass the operation to theLanceDataset.commit()
method to create the new dataset.>>> import lance >>> import pyarrow as pa >>> tab1 = pa.table({"a": [1, 2], "b": ["a", "b"]}) >>> tab2 = pa.table({"a": [3, 4], "b": ["c", "d"]}) >>> fragment1 = lance.fragment.LanceFragment.create("example", tab1) >>> fragment2 = lance.fragment.LanceFragment.create("example", tab2) >>> fragments = [fragment1, fragment2] >>> operation = lance.LanceOperation.Overwrite(tab1.schema, fragments) >>> dataset = lance.LanceDataset.commit("example", operation) >>> dataset.to_table().to_pandas() a b 0 1 a 1 2 b 2 3 c 3 4 d
- fragments: Iterable[FragmentMetadata]¶
- new_schema: LanceSchema | Schema¶
- class Restore(version: int)¶
Bases:
BaseOperation
Operation that restores a previous version of the dataset.
- version: int¶
- class Rewrite(groups: Iterable[RewriteGroup], rewritten_indices: Iterable[RewrittenIndex])¶
Bases:
BaseOperation
Operation that rewrites one or more files and indices into one or more files and indices.
- groups¶
Groups of files that have been rewritten.
- Type:
list[RewriteGroup]
- rewritten_indices¶
Indices that have been rewritten.
- Type:
list[RewrittenIndex]
Warning
This is an advanced API not intended for general use.
- groups: Iterable[RewriteGroup]¶
- rewritten_indices: Iterable[RewrittenIndex]¶
- class RewriteGroup(old_fragments: Iterable[FragmentMetadata], new_fragments: Iterable[FragmentMetadata])¶
Bases:
object
Collection of rewritten files
- new_fragments: Iterable[FragmentMetadata]¶
- old_fragments: Iterable[FragmentMetadata]¶
- class lance.LanceScanner(scanner: _Scanner, dataset: LanceDataset)¶
Bases:
Scanner
- count_rows()¶
Count rows matching the scanner filter.
- Returns:
count
- Return type:
int
- property dataset_schema: Schema¶
The schema with which batches will be read from fragments.
- explain_plan(verbose=False) str ¶
Return the execution plan for this scanner.
- Parameters:
verbose (bool, default False) – Use a verbose output format.
- Returns:
plan
- Return type:
str
- static from_batches(*args, **kwargs)¶
Not implemented
- static from_dataset(*args, **kwargs)¶
Not implemented
- static from_fragment(*args, **kwargs)¶
Not implemented
- head(num_rows)¶
Load the first N rows of the dataset.
- Parameters:
num_rows (int) – The number of rows to load.
- Return type:
Table
- property projected_schema: Schema¶
The materialized schema of the data, accounting for projections.
This is the schema of any data returned from the scanner.
- scan_batches()¶
Consume a Scanner in record batches with corresponding fragments.
- Returns:
record_batches
- Return type:
iterator of TaggedRecordBatch
- take(indices)¶
Not implemented
- to_batches(self)¶
Consume a Scanner in record batches.
- Returns:
record_batches
- Return type:
iterator of RecordBatch
- to_reader(self)¶
Consume this scanner as a RecordBatchReader.
- Return type:
RecordBatchReader
- to_table() Table ¶
Read the data into memory and return a pyarrow Table.
- class lance.MergeInsertBuilder(dataset, on)¶
Bases:
_MergeInsertBuilder
- execute(data_obj: ReaderLike, *, schema: pa.Schema | None = None)¶
Executes the merge insert operation
This function updates the original dataset and returns a dictionary with information about merge statistics - i.e. the number of inserted, updated, and deleted rows.
- Parameters:
data_obj (ReaderLike) – The new data to use as the source table for the operation. This parameter can be any source of data (e.g. table / dataset) that
write_dataset()
accepts.schema (Optional[pa.Schema]) – The schema of the data. This only needs to be supplied whenever the data source is some kind of generator.
- when_matched_update_all(condition: str | None = None) MergeInsertBuilder ¶
Configure the operation to update matched rows
After this method is called, when the merge insert operation executes, any rows that match both the source table and the target table will be updated. The rows from the target table will be removed and the rows from the source table will be added.
An optional condition may be specified. This should be an SQL filter and, if present, then only matched rows that also satisfy this filter will be updated. The SQL filter should use the prefix target. to refer to columns in the target table and the prefix source. to refer to columns in the source table. For example, source.last_update < target.last_update.
If a condition is specified and rows do not satisfy the condition then these rows will not be updated. Failure to satisfy the filter does not cause a “matched” row to become a “not matched” row.
- when_not_matched_by_source_delete(expr: str | None = None) MergeInsertBuilder ¶
Configure the operation to delete source rows that do not match
After this method is called, when the merge insert operation executes, any rows that exist only in the target table will be deleted. An optional filter can be specified to limit the scope of the delete operation. If given (as an SQL filter) then only rows which match the filter will be deleted.
- when_not_matched_insert_all() MergeInsertBuilder ¶
Configure the operation to insert not matched rows
After this method is called, when the merge insert operation executes, any rows that exist only in the source table will be inserted into the target table.
- class lance.Transaction(read_version: 'int', operation: 'LanceOperation.BaseOperation', uuid: 'str' = <factory>, blobs_op: 'Optional[LanceOperation.BaseOperation]' = None)¶
Bases:
object
- blobs_op: BaseOperation | None = None¶
- operation: BaseOperation¶
- read_version: int¶
- uuid: str¶
- lance.batch_udf(output_schema=None, checkpoint_file=None)¶
Create a user defined function (UDF) that adds columns to a dataset.
This function is used to add columns to a dataset. It takes a function that takes a single argument, a RecordBatch, and returns a RecordBatch. The function is called once for each batch in the dataset. The function should not modify the input batch, but instead create a new batch with the new columns added.
- Parameters:
output_schema (Schema, optional) – The schema of the output RecordBatch. This is used to validate the output of the function. If not provided, the schema of the first output RecordBatch will be used.
checkpoint_file (str or Path, optional) – If specified, this file will be used as a cache for unsaved results of this UDF. If the process fails, and you call add_columns again with this same file, it will resume from the last saved state. This is useful for long running processes that may fail and need to be resumed. This file may get very large. It will hold up to an entire data files’ worth of results on disk, which can be multiple gigabytes of data.
- Return type:
AddColumnsUDF
- lance.dataset(uri: str | Path, version: int | str | None = None, asof: ts_types | None = None, block_size: int | None = None, commit_lock: CommitLock | None = None, index_cache_size: int | None = None, storage_options: Dict[str, str] | None = None, default_scan_options: Dict[str, str] | None = None) LanceDataset ¶
Opens the Lance dataset from the address specified.
- Parameters:
uri (str) – Address to the Lance dataset. It can be a local file path /tmp/data.lance, or a cloud object store URI, i.e., s3://bucket/data.lance.
version (optional, int | str) – If specified, load a specific version of the Lance dataset. Else, loads the latest version. A version number (int) or a tag (str) can be provided.
asof (optional, datetime or str) – If specified, find the latest version created on or earlier than the given argument value. If a version is already specified, this arg is ignored.
block_size (optional, int) – Block size in bytes. Provide a hint for the size of the minimal I/O request.
commit_handler (optional, lance.commit.CommitLock) – If specified, use the provided commit handler to lock the table while committing a new version. Not necessary on object stores other than S3 or when there are no concurrent writers.
index_cache_size (optional, int) –
Index cache size. Index cache is a LRU cache with TTL. This number specifies the number of index pages, for example, IVF partitions, to be cached in the host memory. Default value is
256
.Roughly, for an
IVF_PQ
partition withn
rows, the size of each index page equals the combination of the pq code (nd.array([n,pq], dtype=uint8))
and the row ids (nd.array([n], dtype=uint64)
). Approximately,n = Total Rows / number of IVF partitions
.pq = number of PQ sub-vectors
.storage_options (optional, dict) – Extra options that make sense for a particular storage connection. This is used to store connection parameters like credentials, endpoint, etc.
default_scan_options (optional, dict) –
Default scan options that are used when scanning the dataset. This accepts the same arguments described in
lance.LanceDataset.scanner()
. The arguments will be applied to any scan operation.This can be useful to supply defaults for common parameters such as
batch_size
.It can also be used to create a view of the dataset that includes meta fields such as
_rowid
or_rowaddr
. Ifdefault_scan_options
is provided then the schema returned bylance.LanceDataset.schema()
will include these fields if the appropriate scan options are set.
- lance.json_to_schema(schema_json: Dict[str, Any]) Schema ¶
Converts a JSON string to a PyArrow schema.
- Parameters:
schema_json (Dict[str, Any]) – The JSON payload to convert to a PyArrow Schema.
- lance.schema_to_json(schema: Schema) Dict[str, Any] ¶
Converts a pyarrow schema to a JSON string.
- lance.set_logger(file_path='pylance.log', name='pylance', level=20, format_string=None, log_handler=None)¶
- lance.write_dataset(data_obj: ReaderLike, uri: str | Path | LanceDataset, schema: pa.Schema | None = None, mode: str = 'create', *, max_rows_per_file: int = 1048576, max_rows_per_group: int = 1024, max_bytes_per_file: int = 96636764160, commit_lock: CommitLock | None = None, progress: FragmentWriteProgress | None = None, storage_options: Dict[str, str] | None = None, data_storage_version: str | None = None, use_legacy_format: bool | None = None, enable_v2_manifest_paths: bool = False, enable_move_stable_row_ids: bool = False) LanceDataset ¶
Write a given data_obj to the given uri
- Parameters:
data_obj (Reader-like) – The data to be written. Acceptable types are: - Pandas DataFrame, Pyarrow Table, Dataset, Scanner, or RecordBatchReader - Huggingface dataset
uri (str, Path, or LanceDataset) – Where to write the dataset to (directory). If a LanceDataset is passed, the session will be reused.
schema (Schema, optional) – If specified and the input is a pandas DataFrame, use this schema instead of the default pandas to arrow table conversion.
mode (str) – create - create a new dataset (raises if uri already exists). overwrite - create a new snapshot version append - create a new version that is the concat of the input the latest version (raises if uri does not exist)
max_rows_per_file (int, default 1024 * 1024) – The max number of rows to write before starting a new file
max_rows_per_group (int, default 1024) – The max number of rows before starting a new group (in the same file)
max_bytes_per_file (int, default 90 * 1024 * 1024 * 1024) – The max number of bytes to write before starting a new file. This is a soft limit. This limit is checked after each group is written, which means larger groups may cause this to be overshot meaningfully. This defaults to 90 GB, since we have a hard limit of 100 GB per file on object stores.
commit_lock (CommitLock, optional) – A custom commit lock. Only needed if your object store does not support atomic commits. See the user guide for more details.
progress (FragmentWriteProgress, optional) – Experimental API. Progress tracking for writing the fragment. Pass a custom class that defines hooks to be called when each fragment is starting to write and finishing writing.
storage_options (optional, dict) – Extra options that make sense for a particular storage connection. This is used to store connection parameters like credentials, endpoint, etc.
data_storage_version (optional, str, default None) – The version of the data storage format to use. Newer versions are more efficient but require newer versions of lance to read. The default (None) will use the latest stable version. See the user guide for more details.
use_legacy_format (optional, bool, default None) – Deprecated method for setting the data storage version. Use the data_storage_version parameter instead.
enable_v2_manifest_paths (bool, optional) – If True, and this is a new dataset, uses the new V2 manifest paths. These paths provide more efficient opening of datasets with many versions on object stores. This parameter has no effect if the dataset already exists. To migrate an existing dataset, instead use the
LanceDataset.migrate_manifest_paths_v2()
method. Default is False.enable_move_stable_row_ids (bool, optional) – Experimental parameter: if set to true, the writer will use move-stable row ids. These row ids are stable after compaction operations, but not after updates. This makes compaction more efficient, since with stable row ids no secondary indices need to be updated to point to new row ids.