Streaming and Concurrency
The indexing pipeline is streaming, asynchronous, unordered and concurrent.
Concurrency
When transforming, chunking or storing, steps are awaited buffered. Depending on the concurrency setting of the stream, this means that many promises are awaited concurrently.
Default concurrency and overriding
The default concurrency for a pipeline is the number of available cpus and can be overwritten by
calling pipeline.with_concurrency(concurrency: usize)
with the desired concurrency setting.
Transformers, chunkers and stores can also implement concurrency
on their respective traits, allowing for fine grained control per step.
Throttling
If due to rate or other limitations throughput is too high, there is also a pipeline.throttle(duration: impl Into<Duration>)
, which will limit the amount of nodes passing through to one per the given duration.
Indexing Stream
You might have seen the IndexingStream
type mentioned a few times. It is the internal stream that is being passed around, build on top of the Rust Stream
and StreamExt
. By wrapping it we have more control and less boilerplate when dealing with streams.
When building batch transformers, storage or chunkers, you will need to return a IndexingStream
. We’ve tried to make that as easy as possible and there are multiple ways.
Using Into
From a list of Nodes
using Into
:
There is also an implementation of Into
for Rust streams.
Converting an iterator
You can also convert an Iterator
into an IndexingStream
directly. This is great, as the iterator itself will stream it’s results, instead of having to collect it first.