Skip to content

Apache DataFusion Integration

Lance datasets can be queried with Apache Datafusion, an extensible query engine written in Rust that uses Apache Arrow as its in-memory format. This means you can write complex SQL queries to analyze your data in Lance.

The integration allows users to pass down column selections and basic filters to Lance, reducing the amount of scanned data when executing your query. Additionally, the integration allows streaming data from Lance datasets, which allows users to do aggregation larger-than-memory.

Rust

Lance includes a DataFusion table provider lance::datafusion::LanceTableProvider. Users can register a Lance dataset as a table in DataFusion and run SQL with it:

Simple SQL

use datafusion::prelude::SessionContext;
use crate::datafusion::LanceTableProvider;

let ctx = SessionContext::new();

ctx.register_table("dataset",
    Arc::new(LanceTableProvider::new(
    Arc::new(dataset.clone()),
    /* with_row_id */ false,
    /* with_row_addr */ false,
    )))?;

let df = ctx.sql("SELECT * FROM dataset LIMIT 10").await?;
let result = df.collect().await?;

Join 2 Tables

use datafusion::prelude::SessionContext;
use crate::datafusion::LanceTableProvider;

let ctx = SessionContext::new();

ctx.register_table("orders",
    Arc::new(LanceTableProvider::new(
    Arc::new(orders_dataset.clone()),
    /* with_row_id */ false,
    /* with_row_addr */ false,
    )))?;

ctx.register_table("customers",
    Arc::new(LanceTableProvider::new(
    Arc::new(customers_dataset.clone()),
    /* with_row_id */ false,
    /* with_row_addr */ false,
    )))?;

let df = ctx.sql("
    SELECT o.order_id, o.amount, c.customer_name 
    FROM orders o 
    JOIN customers c ON o.customer_id = c.customer_id
    LIMIT 10
").await?;

let result = df.collect().await?;

Python

In Python, this integration is done via Datafusion FFI. An FFI table provider FFILanceTableProvider is included in pylance. For example, if I want to query my_lance_dataset:

from datafusion import SessionContext # pip install datafusion
from lance import FFILanceTableProvider

ctx = SessionContext()

table1 = FFILanceTableProvider(
    my_lance_dataset, with_row_id=True, with_row_addr=True
)
ctx.register_table_provider("table1", table1)
ctx.table("table1")
ctx.sql("SELECT * FROM table1 LIMIT 10")