-- |Implements bounded channels. These channels differ from normal 'Chan's in
-- that they are guaranteed to contain no more than a certain number of
-- elements. This is ideal when you may be writing to a channel faster than you
-- are able to read from it.
--
-- This module supports all the functions of "Control.Concurrent.Chan" except
-- 'unGetChan' and 'dupChan', which are not supported for bounded channels.
--
-- Extra consitency: This version enforces that if thread Alice writes
-- e1 followed by e2 then e1 will be returned by readChan before e2.
-- Conversely, if thead Bob reads e1 followed by e2 then it was true that
-- writeChan e1 preceded writeChan e2.
--
-- Previous versions did not enforce this consistency: if writeChan were
-- preempted between putMVars or killThread arrived between putMVars then it
-- can fail.  Similarly it might fail if readChan were stopped after putMVar
-- and before the second takeMVar.  An unlucky pattern of several such deaths
-- might actually break the invariants of the array in an unrecoverable way
-- causing all future reads and writes to block.
module Control.Concurrent.BoundedChan(
         BoundedChan
       , newBoundedChan
       , writeChan
       , tryWriteChan
       , readChan
       , tryReadChan
       , isEmptyChan
       , getChanContents
       , writeList2Chan
       )
  where

import Control.Concurrent.MVar (MVar, isEmptyMVar, newEmptyMVar, newMVar,
                                putMVar, tryPutMVar, takeMVar, tryTakeMVar)
import Control.Exception       (mask_, onException)
import Control.Monad           (replicateM)
import Data.Array              (Array, (!), listArray)
import System.IO.Unsafe        (unsafeInterleaveIO)

-- |'BoundedChan' is an abstract data type representing a bounded channel.
data BoundedChan a = BC {
       BoundedChan a -> Int
_size     :: Int
     , BoundedChan a -> Array Int (MVar a)
_contents :: Array Int (MVar a)
     , BoundedChan a -> MVar Int
_writePos :: MVar Int
     , BoundedChan a -> MVar Int
_readPos  :: MVar Int
     }

-- Versions of modifyMVar and withMVar that do not 'restore' the previous mask state when running
-- 'io', with added modification strictness.  The lack of 'restore' may make these perform better
-- than the normal version.  Moving strictness here makes using them more pleasant.
{-# INLINE modifyMVar_mask #-}
modifyMVar_mask :: MVar a -> (a -> IO (a,b)) -> IO b
modifyMVar_mask :: MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar_mask m :: MVar a
m io :: a -> IO (a, b)
io =
  IO b -> IO b
forall a. IO a -> IO a
mask_ (IO b -> IO b) -> IO b -> IO b
forall a b. (a -> b) -> a -> b
$ do
    a
a <- MVar a -> IO a
forall a. MVar a -> IO a
takeMVar MVar a
m
    (a' :: a
a',b :: b
b) <- a -> IO (a, b)
io a
a IO (a, b) -> IO () -> IO (a, b)
forall a b. IO a -> IO b -> IO a
`onException` MVar a -> a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar a
m a
a
    MVar a -> a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar a
m (a -> IO ()) -> a -> IO ()
forall a b. (a -> b) -> a -> b
$! a
a'
    b -> IO b
forall (m :: * -> *) a. Monad m => a -> m a
return b
b

{-# INLINE modifyMVar_mask_ #-}
modifyMVar_mask_ :: MVar a -> (a -> IO a) -> IO ()
modifyMVar_mask_ :: MVar a -> (a -> IO a) -> IO ()
modifyMVar_mask_ m :: MVar a
m io :: a -> IO a
io =
  IO () -> IO ()
forall a. IO a -> IO a
mask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    a
a <- MVar a -> IO a
forall a. MVar a -> IO a
takeMVar MVar a
m
    a
a' <- a -> IO a
io a
a IO a -> IO () -> IO a
forall a b. IO a -> IO b -> IO a
`onException` MVar a -> a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar a
m a
a
    MVar a -> a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar a
m (a -> IO ()) -> a -> IO ()
forall a b. (a -> b) -> a -> b
$! a
a'

{-# INLINE withMVar_mask #-}
withMVar_mask :: MVar a -> (a -> IO b) -> IO b
withMVar_mask :: MVar a -> (a -> IO b) -> IO b
withMVar_mask m :: MVar a
m io :: a -> IO b
io =
  IO b -> IO b
forall a. IO a -> IO a
mask_ (IO b -> IO b) -> IO b -> IO b
forall a b. (a -> b) -> a -> b
$ do
    a
a <- MVar a -> IO a
forall a. MVar a -> IO a
takeMVar MVar a
m
    b
b <- a -> IO b
io a
a IO b -> IO () -> IO b
forall a b. IO a -> IO b -> IO a
`onException` MVar a -> a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar a
m a
a
    MVar a -> a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar a
m a
a
    b -> IO b
forall (m :: * -> *) a. Monad m => a -> m a
return b
b

-- |@newBoundedChan n@ returns a channel than can contain no more than @n@
-- elements.
newBoundedChan :: Int -> IO (BoundedChan a)
newBoundedChan :: Int -> IO (BoundedChan a)
newBoundedChan x :: Int
x = do
  [MVar a]
entls <- Int -> IO (MVar a) -> IO [MVar a]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
x IO (MVar a)
forall a. IO (MVar a)
newEmptyMVar
  MVar Int
wpos  <- Int -> IO (MVar Int)
forall a. a -> IO (MVar a)
newMVar 0
  MVar Int
rpos  <- Int -> IO (MVar Int)
forall a. a -> IO (MVar a)
newMVar 0
  let entries :: Array Int (MVar a)
entries = (Int, Int) -> [MVar a] -> Array Int (MVar a)
forall i e. Ix i => (i, i) -> [e] -> Array i e
listArray (0, Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
- 1) [MVar a]
entls
  BoundedChan a -> IO (BoundedChan a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> Array Int (MVar a) -> MVar Int -> MVar Int -> BoundedChan a
forall a.
Int -> Array Int (MVar a) -> MVar Int -> MVar Int -> BoundedChan a
BC Int
x Array Int (MVar a)
entries MVar Int
wpos MVar Int
rpos)

-- |Write an element to the channel. If the channel is full, this routine will
-- block until it is able to write.  Blockers wait in a fair FIFO queue.
writeChan :: BoundedChan a -> a -> IO ()
writeChan :: BoundedChan a -> a -> IO ()
writeChan (BC size :: Int
size contents :: Array Int (MVar a)
contents wposMV :: MVar Int
wposMV _) x :: a
x = MVar Int -> (Int -> IO Int) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_mask_ MVar Int
wposMV ((Int -> IO Int) -> IO ()) -> (Int -> IO Int) -> IO ()
forall a b. (a -> b) -> a -> b
$
  \wpos :: Int
wpos -> do
    MVar a -> a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar (Array Int (MVar a)
contents Array Int (MVar a) -> Int -> MVar a
forall i e. Ix i => Array i e -> i -> e
! Int
wpos) a
x
    Int -> IO Int
forall (m :: * -> *) a. Monad m => a -> m a
return ((Int -> Int
forall a. Enum a => a -> a
succ Int
wpos) Int -> Int -> Int
forall a. Integral a => a -> a -> a
`mod` Int
size) -- only advance when putMVar succeeds

-- |A variant of 'writeChan' which, instead of blocking when the channel is
-- full, simply aborts and does not write the element. Note that this routine
-- can still block while waiting for write access to the channel.
tryWriteChan :: BoundedChan a -> a -> IO Bool
tryWriteChan :: BoundedChan a -> a -> IO Bool
tryWriteChan (BC size :: Int
size contents :: Array Int (MVar a)
contents wposMV :: MVar Int
wposMV _) x :: a
x = MVar Int -> (Int -> IO (Int, Bool)) -> IO Bool
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar_mask MVar Int
wposMV ((Int -> IO (Int, Bool)) -> IO Bool)
-> (Int -> IO (Int, Bool)) -> IO Bool
forall a b. (a -> b) -> a -> b
$
  \wpos :: Int
wpos -> do
    Bool
success <- MVar a -> a -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar (Array Int (MVar a)
contents Array Int (MVar a) -> Int -> MVar a
forall i e. Ix i => Array i e -> i -> e
! Int
wpos) a
x
    (Int, Bool) -> IO (Int, Bool)
forall (m :: * -> *) a. Monad m => a -> m a
return ((Int, Bool) -> IO (Int, Bool)) -> (Int, Bool) -> IO (Int, Bool)
forall a b. (a -> b) -> a -> b
$ if Bool
success
      then ((Int -> Int
forall a. Enum a => a -> a
succ Int
wpos) Int -> Int -> Int
forall a. Integral a => a -> a -> a
`mod` Int
size, Bool
True) -- only advance when putMVar succeeds
      else (Int
wpos, Bool
False)

-- |Read an element from the channel. If the channel is empty, this routine
-- will block until it is able to read.  Blockers wait in a fair FIFO queue.
readChan :: BoundedChan a -> IO a
readChan :: BoundedChan a -> IO a
readChan (BC size :: Int
size contents :: Array Int (MVar a)
contents _ rposMV :: MVar Int
rposMV) = MVar Int -> (Int -> IO (Int, a)) -> IO a
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar_mask MVar Int
rposMV ((Int -> IO (Int, a)) -> IO a) -> (Int -> IO (Int, a)) -> IO a
forall a b. (a -> b) -> a -> b
$
  \rpos :: Int
rpos -> do
    a
a <- MVar a -> IO a
forall a. MVar a -> IO a
takeMVar (Array Int (MVar a)
contents Array Int (MVar a) -> Int -> MVar a
forall i e. Ix i => Array i e -> i -> e
! Int
rpos)
    (Int, a) -> IO (Int, a)
forall (m :: * -> *) a. Monad m => a -> m a
return ((Int -> Int
forall a. Enum a => a -> a
succ Int
rpos) Int -> Int -> Int
forall a. Integral a => a -> a -> a
`mod` Int
size, a
a) -- only advance when takeMVar succeeds

-- |A variant of 'readChan' which, instead of blocking when the channel is
-- empty, immediately returns 'Nothing'. Otherwise, 'tryReadChan' returns
-- @'Just' a@ where @a@ is the element read from the channel. Note that this
-- routine can still block while waiting for read access to the channel.
tryReadChan :: BoundedChan a -> IO (Maybe a)
tryReadChan :: BoundedChan a -> IO (Maybe a)
tryReadChan (BC size :: Int
size contents :: Array Int (MVar a)
contents _ rposMV :: MVar Int
rposMV) = MVar Int -> (Int -> IO (Int, Maybe a)) -> IO (Maybe a)
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar_mask MVar Int
rposMV ((Int -> IO (Int, Maybe a)) -> IO (Maybe a))
-> (Int -> IO (Int, Maybe a)) -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$
  \rpos :: Int
rpos -> do
    Maybe a
ma <- MVar a -> IO (Maybe a)
forall a. MVar a -> IO (Maybe a)
tryTakeMVar (Array Int (MVar a)
contents Array Int (MVar a) -> Int -> MVar a
forall i e. Ix i => Array i e -> i -> e
! Int
rpos)
    (Int, Maybe a) -> IO (Int, Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return ((Int, Maybe a) -> IO (Int, Maybe a))
-> (Int, Maybe a) -> IO (Int, Maybe a)
forall a b. (a -> b) -> a -> b
$ case Maybe a
ma of
      Just a :: a
a -> ((Int -> Int
forall a. Enum a => a -> a
succ Int
rpos) Int -> Int -> Int
forall a. Integral a => a -> a -> a
`mod` Int
size, a -> Maybe a
forall a. a -> Maybe a
Just a
a) -- only advance when takeMVar succeeds
      Nothing -> (Int
rpos, Maybe a
forall a. Maybe a
Nothing)

-- |DANGER: This may block on an empty channel if there is already a blocked reader.
-- Returns 'True' if the supplied channel is empty.
--
-- DEPRECATED
{-# DEPRECATED isEmptyChan "This isEmptyChan can block, no non-blocking substitute yet" #-}
isEmptyChan :: BoundedChan a -> IO Bool
isEmptyChan :: BoundedChan a -> IO Bool
isEmptyChan (BC _ contents :: Array Int (MVar a)
contents _ rposMV :: MVar Int
rposMV) = MVar Int -> (Int -> IO Bool) -> IO Bool
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar_mask MVar Int
rposMV ((Int -> IO Bool) -> IO Bool) -> (Int -> IO Bool) -> IO Bool
forall a b. (a -> b) -> a -> b
$
  \rpos :: Int
rpos -> MVar a -> IO Bool
forall a. MVar a -> IO Bool
isEmptyMVar (Array Int (MVar a)
contents Array Int (MVar a) -> Int -> MVar a
forall i e. Ix i => Array i e -> i -> e
! Int
rpos)

-- |Return a lazy list representing the contents of the supplied channel.  Competing
-- readers might steal from this list.
getChanContents :: BoundedChan a -> IO [a]
getChanContents :: BoundedChan a -> IO [a]
getChanContents ch :: BoundedChan a
ch = IO [a] -> IO [a]
forall a. IO a -> IO a
unsafeInterleaveIO (IO [a] -> IO [a]) -> IO [a] -> IO [a]
forall a b. (a -> b) -> a -> b
$ do
  a
x  <- BoundedChan a -> IO a
forall a. BoundedChan a -> IO a
readChan BoundedChan a
ch
  [a]
xs <- BoundedChan a -> IO [a]
forall a. BoundedChan a -> IO [a]
getChanContents BoundedChan a
ch
  [a] -> IO [a]
forall (m :: * -> *) a. Monad m => a -> m a
return (a
xa -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
xs)

-- |Write a list of elements to the channel. If the channel becomes full, this
-- routine will block until it is able to write.  Competing writers may interleave with
-- this one.
writeList2Chan :: BoundedChan a -> [a] -> IO ()
writeList2Chan :: BoundedChan a -> [a] -> IO ()
writeList2Chan = (a -> IO ()) -> [a] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ ((a -> IO ()) -> [a] -> IO ())
-> (BoundedChan a -> a -> IO ()) -> BoundedChan a -> [a] -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. BoundedChan a -> a -> IO ()
forall a. BoundedChan a -> a -> IO ()
writeChan