method reader =
try
while true do
(* Wait for the writer to emit the block we fancy, or for shutdown. *)
Mutex.lock wait_m ;
if state <> `Tired && read = write then
Condition.wait wait_c wait_m ;
Mutex.unlock wait_m ;
(* Exit or... *)
if state = `Tired then raise Exit ;
(* ...read a block. *)
self#push_block buffer.(read mod nb_blocks);
Mutex.lock wait_m ;
read <- (read + 1) mod (2*nb_blocks) ;
Mutex.unlock wait_m ;
Condition.signal wait_c
done
with
| Exit ->
self#close
| e ->
(* We crashed. Let's attempt to leave things in a decent state.
* Note that the exception should only come from #pull_lock,
* which is performed outside of critical section, so there's
* not need to unlock. *)
self#close ;
(* It is possible that the reader is waiting for us,
* hence blocking the streaming thread, and consequently
* the possibility of going to sleep peacefully.
* Let's resume it, even though he'll get an arbitrary block. *)
Mutex.lock wait_m ;
read <- (read + 1) mod (2*nb_blocks) ;
state <- `Crashed ;
Mutex.unlock wait_m ;
Condition.signal wait_c ;
raise e