let compose_results ~on_error ta tb =
let name =
sprintf "(compose_results <%s> <%s>)"
Option.(value ~default:"" (name ta))
Option.(value ~default:"" (name tb)) in
make_general ~name ()
~feed:(fun i -> feed ta i)
~stop:(fun () -> stop ta)
~next:(fun () ->
let call_tb_next () =
begin match next tb with
| `output (Ok o) -> `output (Ok o)
| `output (Error o) -> `output (Error (on_error (`right o)))
| `not_ready -> `not_ready
| `end_of_stream -> `end_of_stream
end
in
match next ta with
| `output (Ok o) -> feed tb o; call_tb_next ()
| `output (Error o) -> `output (Error (on_error (`left o)))
| `not_ready -> call_tb_next ()
| `end_of_stream -> stop tb; call_tb_next ())