Library
Module
Module type
Parameter
Class
Class type
Pipelines: carry a bundle of inputs through a series of transformations whilst preserving the order of the original input.
Steps are the building blocks of pipeline. A step is essentially a function from a given type to another.
val sync : ('a -> 'b) -> ('a, 'b) step
sync f
is the function f
as a synchronous step. Data that is transformed by a synchronous function is never buffered. Instead it is transformed as it arrives.
async_s f
is the function f
as an asynchronous serial step. There is only ever at most one unresolved promise created by this function. If data arrives at this step and the function as already been called, the data is buffered until the promise created by the call resolves.
async_p f
is the function f
as an asynchronous parallel step. Multiple unresolved promise for this step can be unresolved at the same time. Data might still be buffered if the global limit on unresolved promises is reached.
Error management: these steps are helpers for managing errors through the result
type.
all_ok
is sync (fun x -> Ok x)
and it is meant to inject all of the available data into the result
type.
map_in_err f s
is a step with the same synchronicity as s
. On Ok
data it acts the same as s
, but Error
data is modified by f
before being handled normally by s
.
val map_out_err :
('erra -> 'errb) ->
('a, ('b, 'erra) result) step ->
('a, ('b, 'errb) result) step
map_out_err f s
is a step with the same synchronicity as s
. On Ok
data it acts the same as s
, but Error
data is modified by f
after being handled normally by s
.
with_err s
is a step with the same synchronicity as s
. It acts as s
on Ok
data and it is a no-op on Error
data.
recover f
is sync (function | Ok v -> v | Error e -> f e)
: it maps the Error
data onto the same type as the Ok
data and exits the result
type.
val init_key : ('a, 'a * 'a) step
Pipelines are essentially lists of steps.
Pipeline constructors are akin to list constructors, with less sugar.
val nil : ('x, 'x) pipe
The recommended use for building pipelines is: cons (sync f) @@ cons (async_p g) @@ cons (async_p h) @@ nil
Core functionality: run ?pool pipe input
runs all the elements of input
through the steps of pipeline
. All the while it maintains the following invariants:
pool
unresolved high-level promises at any one time. A high-level promise is one that corresponds to a call to one of the step functions. (Note that each such promise can create additional promises which are not limited by the pool
parameter of run
.) By default, no limits are imposed.x
is before y
in input
, then for any step s
, the high-level promise of s
for x
will be created before the high-level promise of s
for y
.Exception handling: run
does not attempt to handle any exception. It is up to the caller to ensure that expected exceptions are wrapped in a result
constructor or some such solution. The pipeline library provides some support for result
-handling.
Post-processing: useful to deal with pipeline built around error management or id-marking combinators.
val partition_by_error : ('o, 'err) result list -> 'o list * 'err list
partition_by_error
sorts the results into distinct sets: successful and error results. Note that the order within each set is preserved, but the interleaving across the sets is lost.