Skip to content

Splitting and merging streams

When indexing you might want to apply different transformations based on a condition.

Instead of building multiple streams or applying transformations conditionally, streams can also be split.

.split_by takes a predicate. If the predicate is true, nodes will go left, otherwise they will go right.

For example:

let (mut markdown, mut code) =
Pipeline::from_loader(FileLoader::new(path).with_extensions(&extensions))
.split_by(|node| {
let Ok(node) = node else { return true };
node.path.extension().map_or(true, |ext| ext == "md")
});
code = code
.then_chunk(ChunkCode::try_for_language_and_chunk_size(
language,
50..1024,
)?)
.then(MetadataQACode::new(openai.clone()));
markdown = markdown
.then_chunk(ChunkMarkdown::from_chunk_range(50..1024))
.then(MetadataQAText::new(openai.clone()));
code.merge(markdown)
.then_in_batch(50, Embed::new(openai.clone()))
.then_store_with(qdrant.clone())
.run()
.await
}

Split streams are not lazy and implemented with buffered channels under the hood. This means that there should not be any slow code inbetween splitting and running the resulting pipelines. The merging is optional, you can also run each stream independently. However, because they’re not lazy, they would need to be run in parallel.