Lance ❤️ Ray

Ray effortlessly scale up ML workload to large distributed compute environment.

Basic Operations

Lance format is one of the official Ray data sources:

  • Lance Data Source ray.data.read_lance()

  • Lance Data Sink ray.data.Dataste.write_lance()

import ray
import pandas as pd

ray.init()

data = [
    {"id": 1, "name": "alice"},
    {"id": 2, "name": "bob"},
    {"id": 3, "name": "charlie"}
]
ray.data.from_items(data).write_lance("./alice_bob_and_charlie.lance")

# It can be read via lance directly
df = (
    lance.
    dataset("./alice_bob_and_charlie.lance")
    .to_table()
    .to_pandas()
    .sort_values(by=["id"])
    .reset_index(drop=True)
)
assert df.equals(pd.DataFrame(data)), "{} != {}".format(
    df, pd.DataFrame(data)
)

# Or via Ray.data.read_lance
ray_df = (
    ray.data.read_lance("./alice_bob_and_charlie.lance")
    .to_pandas()
    .sort_values(by=["id"])
    .reset_index(drop=True)
)
assert df.equals(ray_df)

Advanced Operations

Parallel Column Merging

Demonstration of parallel column generation using Lance’s native operations:

import pyarrow as pa
from pathlib import Path
import lance

# Define schema
schema = pa.schema([
    pa.field("id", pa.int64()),
    pa.field("height", pa.int64()),
    pa.field("weight", pa.int64()),
])

# Generate initial dataset
ds = (
    ray.data.range(10)  # Create 0-9 IDs
    .map(lambda x: {
        "id": x["id"],
        "height": x["id"] + 5,  # height = id + 5
        "weight": x["id"] * 2   # weight = id * 2
    })
    .write_lance(str(output_path), schema=schema)
)

# Define label generation logic
def generate_labels(batch: pa.RecordBatch) -> pa.RecordBatch:
    heights = batch.column("height").to_pylist()
    size_labels = ["tall" if h > 8 else "medium" if h > 6 else "short" for h in heights]
    return pa.RecordBatch.from_arrays([
        pa.array(size_labels)
    ], names=["size_labels"])

# Add new columns in parallel
lance_ds = lance.dataset(output_path)
add_columns(
    lance_ds,
    generate_labels,
    source_columns=["height"],  # Input columns needed
)

# Display final results
final_df = lance_ds.to_table().to_pandas()
print("\\nEnhanced dataset with size labels:\\n")
print(final_df.sort_values("id").to_string(index=False))