Indexing Pipeline
Swiftide indexes your data using a parallel, asynchronous, streaming pipeline. Throughout a pipeline, Nodes
are transformed and ultimately persisted. Every step with a pipeline returns the same, owned pipeline.
An indexing pipeline step-by-step
-
The pipeline starts with a loader:
let pipeline = indexing::Pipeline::from_loader(FileLoader::new("./"));A loader implements the
Loader
trait which yieldsNodes
to a stream. -
Nodes can then be transformed with an existing transformer:
pipeline.then(MetadataQACode::new(openai_client.clone()));Any transformer has to implement the
Transformer
trait, which takes an ownedNode
and outputs aResult<Node>
. Closures also implement this trait! -
… and you can also do this:
pipeline.then(|node| {node.chunk = format!("{}\n{}", &node.chunk, "awesome!");Ok(node)}); -
Batch transformations are also supported:
pipeline.then_in_batch(Embed::new(FastEmbed::try_default()?));Batchable transformers implement the
BatchableTransformer
trait, which takes a vector ofNodes
and outputs anIndexingStream
. -
Nodes can be filtered using a NodeCache at any stage, based on a cache key the node cache defines. Redis uses a prefix and the hash of an
Node
, based on the path and text, by default.pipeline.filter_cached(Redis::try_from_url(redis_url,"swiftide-examples",)?);Node caches implement the
NodeCache
trait, which defines aget
andset
method, taking anNode
as input. -
At any point in the pipeline, nodes can be chunked into smaller parts:
pipeline.then_chunk(ChunkCode::try_for_language_and_chunk_size("rust",10..2048,)?);Chunkers implement the ChunkerTransformer trait, which take an
Node
and return anIndexingStream
. By default metadata is copied over to each node. -
Also, nodes can be persisted (multiple times!) to storage:
pipeline.then_store_with(Qdrant::try_from_url(qdrant_url)?.batch_size(50).vector_size(1536).collection_name("swiftide-examples").build()?,)Storages implement the
Storage
trait, which definesetup
,store
,batch_store
andbatch_size
methods. They also provide ways to convert anNode
to something that can be stored. -
Finally, the pipeline can be run as follows:
pipeline.run()?;