Skip to content

Streaming and Concurrency

The ingestion pipeline is streaming, asynchronous, unordered and concurrent.

Concurrency

When transforming, chunking or storing, steps are awaited buffered. Depending on the concurrency setting of the stream, this means that many promises are awaited concurrently.

Default concurrency and overriding

The default concurrency for a pipeline is the number of available cpus and can be overwritten by calling pipeline.with_concurrency(concurrency: usize) with the desired concurrency setting.

Transformers, chunkers and stores can also implement concurrency on their respective traits, allowing for fine grained control per step.

Throttling

If due to rate or other limitations throughput is too high, there is also a pipeline.throttle(duration: impl Into<Duration>), which wil limit the amount of nodes passing through to one per the given duration.

Ingestion Stream

You might have seen the IngestionStream type mentioned a few times. It is the internal stream that is being passed around, build on top of the Rust Stream and StreamExt. By wrapping it we have more control and less boilerplate when dealing with streams.

When building batch transformers, storage or chunkers, you will need to return a IngestionStream. We’ve tried to make that as easy as possible and there are multiple ways.

Using Into

From a list of IngestionNodes using Into:

let nodes: Vec<Result<IngestionNode>>> = vec![Ok(IngestionNode::default())];
let stream: IngestionStream = nodes.into();

There is also an implementation of Into for Rust streams.

Converting an iterator

You can also convert an Iterator into an IngestionStream directly. This is great, as the iterator itself will stream it’s results, instead of having to collect it first.

let nodes: Vec<Result<IngestionNode>>> = vec![IngestionNode::default()];
let stream: IngestionStream = IngestionStream::iter(nodes.into_iter().map(|node| {
node.metadata.insert("foo".to_string(), "bar".to_string());
Ok(node)
}));