{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE LambdaCase   #-}
{-# LANGUAGE RankNTypes   #-}

-- |
-- Module      : Data.Conduino.Combinators
-- Copyright   : (c) Justin Le 2019
-- License     : BSD3
--
-- Maintainer  : justin@jle.im
-- Stability   : experimental
-- Portability : non-portable
--
-- A basic collection of base 'Pipe's that serve as a "prelude" for the
-- package.  This module is meant to be imported qualified.
--
-- > import qualified Data.Conduino.Combinators as C
--
module Data.Conduino.Combinators (
  -- * Sources
  -- ** Pure
  -- *** Infinite
    unfold
  , iterate
  , repeat
  -- *** Finite
  , unfoldMaybe
  , unfoldEither
  , iterateMaybe
  , iterateEither
  , sourceList
  , replicate
  -- ** Monadic
  -- *** Infinite
  , repeatM
  -- *** Finite
  , repeatMaybeM
  , repeatEitherM
  , replicateM
  , sourceHandleLines
  , stdinLines
  , sourceHandle
  , stdin
  -- * Pipes
  , map
  , mapM
  , iterM
  , scan
  , mapAccum
  , take
  , takeWhile
  , filter
  , concatMap
  , concat
  , pairs
  , consecutive
  -- * Sinks
  -- ** Pure
  , drop
  , dropWhile
  , foldr
  , foldl
  , foldMap
  , fold
  , sinkNull
  , sinkList
  , last
  -- ** Monadic
  , sinkHandle
  , stdout
  , stderr
  ) where

import           Control.Applicative
import           Control.Exception
import           Control.Monad hiding          (mapM, replicateM)
import           Control.Monad.IO.Class
import           Control.Monad.Trans.Class
import           Control.Monad.Trans.Maybe
import           Data.Conduino
import           Data.Either
import           Data.Foldable hiding          (foldr, foldl, fold, concat, concatMap, foldMap)
import           Data.Maybe
import           Data.Semigroup
import           Prelude hiding                (map, iterate, mapM, replicate, repeat, foldr, drop, foldl, last, take, concatMap, filter, concat, takeWhile, dropWhile, foldMap)
import           System.IO.Error
import qualified Data.ByteString               as BS
import qualified Data.ByteString.Lazy.Internal as BSL
import qualified Data.Sequence                 as Seq
import qualified System.IO                     as S

-- | A version of 'unfoldMaybe' that can choose the "result" value by
-- passing it in as 'Left'.
unfoldEither
    :: (s -> Either a (o, s))
    -> s
    -> Pipe i o u m a
unfoldEither :: (s -> Either a (o, s)) -> s -> Pipe i o u m a
unfoldEither f :: s -> Either a (o, s)
f = s -> Pipe i o u m a
forall i u (m :: * -> *). s -> Pipe i o u m a
go
  where
    go :: s -> Pipe i o u m a
go z :: s
z = case s -> Either a (o, s)
f s
z of
      Left  r :: a
r       -> a -> Pipe i o u m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
r
      Right (x :: o
x, z' :: s
z') -> o -> Pipe i o u m ()
forall o i u (m :: * -> *). o -> Pipe i o u m ()
yield o
x Pipe i o u m () -> Pipe i o u m a -> Pipe i o u m a
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> s -> Pipe i o u m a
go s
z'

-- | A version of 'unfold' that can terminate and end by returning
-- 'Nothing'.
unfoldMaybe
    :: (s -> Maybe (o, s))
    -> s
    -> Pipe i o u m ()
unfoldMaybe :: (s -> Maybe (o, s)) -> s -> Pipe i o u m ()
unfoldMaybe f :: s -> Maybe (o, s)
f = (s -> Either () (o, s)) -> s -> Pipe i o u m ()
forall s a o i u (m :: * -> *).
(s -> Either a (o, s)) -> s -> Pipe i o u m a
unfoldEither (Either () (o, s)
-> ((o, s) -> Either () (o, s)) -> Maybe (o, s) -> Either () (o, s)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> Either () (o, s)
forall a b. a -> Either a b
Left ()) (o, s) -> Either () (o, s)
forall a b. b -> Either a b
Right (Maybe (o, s) -> Either () (o, s))
-> (s -> Maybe (o, s)) -> s -> Either () (o, s)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. s -> Maybe (o, s)
f)

-- | Repeatedly apply an "unfolding" function to a given initial state,
-- yielding the first item in the tuple as output and updating the state as
-- the second item in the tuple.  Goes on forever.  See 'unfoldMaybe' for
-- a version that stops.
unfold
    :: (s -> (o, s))
    -> s
    -> Pipe i o u m a
unfold :: (s -> (o, s)) -> s -> Pipe i o u m a
unfold f :: s -> (o, s)
f = s -> Pipe i o u m a
forall i u (m :: * -> *) b. s -> Pipe i o u m b
go
  where
    go :: s -> Pipe i o u m b
go z :: s
z = o -> Pipe i o u m ()
forall o i u (m :: * -> *). o -> Pipe i o u m ()
yield o
x Pipe i o u m () -> Pipe i o u m b -> Pipe i o u m b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> s -> Pipe i o u m b
go s
z'
      where
        (x :: o
x, z' :: s
z') = s -> (o, s)
f s
z

-- | A version of 'iterateMaybe' that can specify a result value by
-- providing it in the 'Left'.
iterateEither
    :: (o -> Either a o)
    -> o
    -> Pipe i o u m a
iterateEither :: (o -> Either a o) -> o -> Pipe i o u m a
iterateEither f :: o -> Either a o
f = (o -> Either a (o, o)) -> o -> Pipe i o u m a
forall s a o i u (m :: * -> *).
(s -> Either a (o, s)) -> s -> Pipe i o u m a
unfoldEither ((o -> (o, o)) -> Either a o -> Either a (o, o)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((o -> o -> (o, o)) -> o -> (o, o)
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (,)) (Either a o -> Either a (o, o))
-> (o -> Either a o) -> o -> Either a (o, o)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. o -> Either a o
f)

-- | A version of 'iterate' that can choose to terminate and stop by
-- returning 'Nothing'.
iterateMaybe
    :: (o -> Maybe o)
    -> o
    -> Pipe i o u m ()
iterateMaybe :: (o -> Maybe o) -> o -> Pipe i o u m ()
iterateMaybe f :: o -> Maybe o
f = (o -> Maybe (o, o)) -> o -> Pipe i o u m ()
forall s o i u (m :: * -> *).
(s -> Maybe (o, s)) -> s -> Pipe i o u m ()
unfoldMaybe ((o -> (o, o)) -> Maybe o -> Maybe (o, o)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((o -> o -> (o, o)) -> o -> (o, o)
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (,)) (Maybe o -> Maybe (o, o)) -> (o -> Maybe o) -> o -> Maybe (o, o)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. o -> Maybe o
f)

-- | Repeatedly apply a function to a given starting value and yield each
-- result forever.
--
-- >>> runPipePure $ iterate succ 0
--       .| take 5
--       .| sinkList
--
-- [1,2,3,4,5]
--
-- This doesn't yield the original starting value.  However, you can yield
-- it iterate after:
--
-- >>> runPipePure $ (yield 0 >> iterate succ 0)
--       .| take 5
--       .| sinkList
--
-- [0,1,2,3,4,5]
iterate
    :: (o -> o)
    -> o
    -> Pipe i o u m a
iterate :: (o -> o) -> o -> Pipe i o u m a
iterate f :: o -> o
f = (o -> (o, o)) -> o -> Pipe i o u m a
forall s o i u (m :: * -> *) a.
(s -> (o, s)) -> s -> Pipe i o u m a
unfold ((o -> o -> (o, o)) -> o -> (o, o)
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (,) (o -> (o, o)) -> (o -> o) -> o -> (o, o)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. o -> o
f)

-- | Yield every item in a foldable container.
sourceList :: Foldable t => t a -> Pipe i a u m ()
sourceList :: t a -> Pipe i a u m ()
sourceList = (a -> Pipe i a u m ()) -> t a -> Pipe i a u m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ a -> Pipe i a u m ()
forall o i u (m :: * -> *). o -> Pipe i o u m ()
yield

-- | Repeatedly yield a given item forever.
repeat :: o -> Pipe i o u m a
repeat :: o -> Pipe i o u m a
repeat = Pipe i o u m () -> Pipe i o u m a
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (Pipe i o u m () -> Pipe i o u m a)
-> (o -> Pipe i o u m ()) -> o -> Pipe i o u m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. o -> Pipe i o u m ()
forall o i u (m :: * -> *). o -> Pipe i o u m ()
yield

-- | Yield a given item a certain number of times.
replicate :: Int -> o -> Pipe i o u m ()
replicate :: Int -> o -> Pipe i o u m ()
replicate n :: Int
n = Int -> Pipe i o u m () -> Pipe i o u m ()
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
n (Pipe i o u m () -> Pipe i o u m ())
-> (o -> Pipe i o u m ()) -> o -> Pipe i o u m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. o -> Pipe i o u m ()
forall o i u (m :: * -> *). o -> Pipe i o u m ()
yield

-- | Like 'repeatMaybeM', but allow specification of a final result type.
repeatEitherM
    :: Monad m
    => m (Either a o)
    -> Pipe i o u m a
repeatEitherM :: m (Either a o) -> Pipe i o u m a
repeatEitherM x :: m (Either a o)
x = Pipe i o u m a
forall i u. Pipe i o u m a
go
  where
    go :: Pipe i o u m a
go = m (Either a o) -> Pipe i o u m (Either a o)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift m (Either a o)
x Pipe i o u m (Either a o)
-> (Either a o -> Pipe i o u m a) -> Pipe i o u m a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      Left r :: a
r  -> a -> Pipe i o u m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
r
      Right y :: o
y -> o -> Pipe i o u m ()
forall o i u (m :: * -> *). o -> Pipe i o u m ()
yield o
y Pipe i o u m () -> Pipe i o u m a -> Pipe i o u m a
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> Pipe i o u m a
go

-- | Repeat a monadic action, yielding the item in the 'Just' every time.
-- As soon as it sees 'Nothing', stop producing forever.
--
-- Remember that each item will only be "executed" when something
-- downstream requests output.
repeatMaybeM
    :: Monad m
    => m (Maybe o)
    -> Pipe i o u m ()
repeatMaybeM :: m (Maybe o) -> Pipe i o u m ()
repeatMaybeM = m (Either () o) -> Pipe i o u m ()
forall (m :: * -> *) a o i u.
Monad m =>
m (Either a o) -> Pipe i o u m a
repeatEitherM (m (Either () o) -> Pipe i o u m ())
-> (m (Maybe o) -> m (Either () o))
-> m (Maybe o)
-> Pipe i o u m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Maybe o -> Either () o) -> m (Maybe o) -> m (Either () o)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Either () o -> (o -> Either () o) -> Maybe o -> Either () o
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> Either () o
forall a b. a -> Either a b
Left ()) o -> Either () o
forall a b. b -> Either a b
Right)

-- | Repeat a monadic action a given number of times, yielding each result,
-- and then stop producing forever.
--
-- Remember that each item will only be "executed" when something
-- downstream requests output.
replicateM
    :: Monad m
    => Int
    -> m o
    -> Pipe i o u m ()
replicateM :: Int -> m o -> Pipe i o u m ()
replicateM n :: Int
n x :: m o
x = Int -> Pipe i o u m () -> Pipe i o u m ()
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
n (Pipe i o u m () -> Pipe i o u m ())
-> Pipe i o u m () -> Pipe i o u m ()
forall a b. (a -> b) -> a -> b
$ m o -> Pipe i o u m o
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift m o
x Pipe i o u m o -> (o -> Pipe i o u m ()) -> Pipe i o u m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= o -> Pipe i o u m ()
forall o i u (m :: * -> *). o -> Pipe i o u m ()
yield

-- | Source from each line received from 'stdin'.  This stops as soon as
-- end-of-file is reached, or an empty line is seen.
stdinLines :: MonadIO m => Pipe i String u m ()
stdinLines :: Pipe i String u m ()
stdinLines = Handle -> Pipe i String u m ()
forall (m :: * -> *) i u.
MonadIO m =>
Handle -> Pipe i String u m ()
sourceHandleLines Handle
S.stdin

-- | Source from stdin, yielding bytestrings as they are drawn.  If you
-- want to retrieve each line as a string, see 'stdinLines'.
stdin :: MonadIO m => Pipe i BS.ByteString u m ()
stdin :: Pipe i ByteString u m ()
stdin = Handle -> Pipe i ByteString u m ()
forall (m :: * -> *) i u.
MonadIO m =>
Handle -> Pipe i ByteString u m ()
sourceHandle Handle
S.stdin

-- | Source from a given I/O handle, yielding each line drawn as a string.
-- To draw raw bytes, use 'sourceHandle'.
--
-- This stop as soon as end-of-file is reached, or an empty line is seen.
sourceHandleLines
    :: MonadIO m
    => S.Handle
    -> Pipe i String u m ()
sourceHandleLines :: Handle -> Pipe i String u m ()
sourceHandleLines h :: Handle
h = m (Maybe String) -> Pipe i String u m ()
forall (m :: * -> *) o i u.
Monad m =>
m (Maybe o) -> Pipe i o u m ()
repeatMaybeM (m (Maybe String) -> Pipe i String u m ())
-> m (Maybe String) -> Pipe i String u m ()
forall a b. (a -> b) -> a -> b
$ do
    Bool
d <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Handle -> IO Bool
S.hIsEOF Handle
h
    if Bool
d
      then Maybe String -> m (Maybe String)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe String
forall a. Maybe a
Nothing
      else IO (Maybe String) -> m (Maybe String)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe String) -> m (Maybe String))
-> ((() -> IO (Maybe String)) -> IO (Maybe String))
-> (() -> IO (Maybe String))
-> m (Maybe String)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (IOError -> Maybe ())
-> IO (Maybe String)
-> (() -> IO (Maybe String))
-> IO (Maybe String)
forall e b a.
Exception e =>
(e -> Maybe b) -> IO a -> (b -> IO a) -> IO a
catchJust
                (Bool -> Maybe ()
forall (f :: * -> *). Alternative f => Bool -> f ()
guard (Bool -> Maybe ()) -> (IOError -> Bool) -> IOError -> Maybe ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IOError -> Bool
isEOFError)
                ((String -> Bool) -> Maybe String -> Maybe String
forall (m :: * -> *) a. MonadPlus m => (a -> Bool) -> m a -> m a
mfilter (Bool -> Bool
not (Bool -> Bool) -> (String -> Bool) -> String -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null) (Maybe String -> Maybe String)
-> (String -> Maybe String) -> String -> Maybe String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Maybe String
forall a. a -> Maybe a
Just (String -> Maybe String) -> IO String -> IO (Maybe String)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Handle -> IO String
S.hGetLine Handle
h)
                ((() -> IO (Maybe String)) -> m (Maybe String))
-> (() -> IO (Maybe String)) -> m (Maybe String)
forall a b. (a -> b) -> a -> b
$ \_ -> Maybe String -> IO (Maybe String)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe String
forall a. Maybe a
Nothing

-- | Source from a given I/O handle, yielding bytestrings as they are
-- pulled.  If you want to retrieve each line as a string, see
-- 'sourceHandleLines'.
sourceHandle
    :: MonadIO m
    => S.Handle
    -> Pipe i BS.ByteString u m ()
sourceHandle :: Handle -> Pipe i ByteString u m ()
sourceHandle h :: Handle
h = m (Maybe ByteString) -> Pipe i ByteString u m ()
forall (m :: * -> *) o i u.
Monad m =>
m (Maybe o) -> Pipe i o u m ()
repeatMaybeM
               (m (Maybe ByteString) -> Pipe i ByteString u m ())
-> (IO ByteString -> m (Maybe ByteString))
-> IO ByteString
-> Pipe i ByteString u m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ByteString -> Maybe ByteString)
-> m ByteString -> m (Maybe ByteString)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((ByteString -> Bool) -> Maybe ByteString -> Maybe ByteString
forall (m :: * -> *) a. MonadPlus m => (a -> Bool) -> m a -> m a
mfilter (Bool -> Bool
not (Bool -> Bool) -> (ByteString -> Bool) -> ByteString -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Bool
BS.null) (Maybe ByteString -> Maybe ByteString)
-> (ByteString -> Maybe ByteString)
-> ByteString
-> Maybe ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just)
               (m ByteString -> m (Maybe ByteString))
-> (IO ByteString -> m ByteString)
-> IO ByteString
-> m (Maybe ByteString)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO ByteString -> m ByteString
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
               (IO ByteString -> Pipe i ByteString u m ())
-> IO ByteString -> Pipe i ByteString u m ()
forall a b. (a -> b) -> a -> b
$ Handle -> Int -> IO ByteString
BS.hGetSome Handle
h Int
BSL.defaultChunkSize

-- | Sink into a given I/O handle, writing each input to the handle.
sinkHandle
    :: MonadIO m
    => S.Handle
    -> Pipe BS.ByteString o u m ()
sinkHandle :: Handle -> Pipe ByteString o u m ()
sinkHandle h :: Handle
h = (ByteString -> m ()) -> Pipe ByteString () u m u
forall (m :: * -> *) i o u. Monad m => (i -> m o) -> Pipe i o u m u
mapM (IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (ByteString -> IO ()) -> ByteString -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Handle -> ByteString -> IO ()
BS.hPut Handle
h)
            Pipe ByteString () u m u
-> Pipe () o u m () -> Pipe ByteString o u m ()
forall (m :: * -> *) a b u v c r.
Monad m =>
Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m r
.| Pipe () o u m ()
forall i o u (m :: * -> *). Pipe i o u m ()
sinkNull

-- | A sink into stdout.
stdout :: MonadIO m => Pipe BS.ByteString o u m ()
stdout :: Pipe ByteString o u m ()
stdout = Handle -> Pipe ByteString o u m ()
forall (m :: * -> *) o u.
MonadIO m =>
Handle -> Pipe ByteString o u m ()
sinkHandle Handle
S.stdout

-- | A sink into stderr.
stderr :: MonadIO m => Pipe BS.ByteString o u m ()
stderr :: Pipe ByteString o u m ()
stderr = Handle -> Pipe ByteString o u m ()
forall (m :: * -> *) o u.
MonadIO m =>
Handle -> Pipe ByteString o u m ()
sinkHandle Handle
S.stderr

-- | Repeat a monadic action forever, yielding each output.
--
-- Remember that each item will only be "executed" when something
-- downstream requests output.
repeatM
    :: Monad m
    => m o
    -> Pipe i o u m a
repeatM :: m o -> Pipe i o u m a
repeatM x :: m o
x = Pipe i o u m a
forall i u b. Pipe i o u m b
go
  where
    go :: Pipe i o u m b
go = (o -> Pipe i o u m ()
forall o i u (m :: * -> *). o -> Pipe i o u m ()
yield (o -> Pipe i o u m ()) -> Pipe i o u m o -> Pipe i o u m ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< m o -> Pipe i o u m o
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift m o
x) Pipe i o u m () -> Pipe i o u m b -> Pipe i o u m b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> Pipe i o u m b
go

-- | Process every incoming item with a pure function, and yield its
-- output.
map :: (i -> o) -> Pipe i o u m u
map :: (i -> o) -> Pipe i o u m u
map f :: i -> o
f = (i -> Pipe i o u m ()) -> Pipe i o u m u
forall i o u (m :: * -> *) a.
(i -> Pipe i o u m a) -> Pipe i o u m u
awaitForever (o -> Pipe i o u m ()
forall o i u (m :: * -> *). o -> Pipe i o u m ()
yield (o -> Pipe i o u m ()) -> (i -> o) -> i -> Pipe i o u m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. i -> o
f)

-- | Map a monadic function to process every input, and yield its output.
mapM :: Monad m => (i -> m o) -> Pipe i o u m u
mapM :: (i -> m o) -> Pipe i o u m u
mapM f :: i -> m o
f = (i -> Pipe i o u m ()) -> Pipe i o u m u
forall i o u (m :: * -> *) a.
(i -> Pipe i o u m a) -> Pipe i o u m u
awaitForever ((o -> Pipe i o u m ()
forall o i u (m :: * -> *). o -> Pipe i o u m ()
yield (o -> Pipe i o u m ()) -> Pipe i o u m o -> Pipe i o u m ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<<) (Pipe i o u m o -> Pipe i o u m ())
-> (i -> Pipe i o u m o) -> i -> Pipe i o u m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m o -> Pipe i o u m o
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m o -> Pipe i o u m o) -> (i -> m o) -> i -> Pipe i o u m o
forall b c a. (b -> c) -> (a -> b) -> a -> c
. i -> m o
f)

-- | Execute a monadic function to process every input, passing through the
-- original value back downstream.
--
-- @since 0.2.1.0
iterM :: Monad m => (i -> m ()) -> Pipe i i u m u
iterM :: (i -> m ()) -> Pipe i i u m u
iterM f :: i -> m ()
f = (i -> m i) -> Pipe i i u m u
forall (m :: * -> *) i o u. Monad m => (i -> m o) -> Pipe i o u m u
mapM (\x :: i
x -> i
x i -> m () -> m i
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ i -> m ()
f i
x)

-- | Map a pure "stateful" function over each incoming item.  Give
-- a function to update the state and return an output and an initial
-- state.
mapAccum
    :: (i -> s -> (s, o))       -- ^ update state and output
    -> s                        -- ^ initial state
    -> Pipe i o u m u
mapAccum :: (i -> s -> (s, o)) -> s -> Pipe i o u m u
mapAccum f :: i -> s -> (s, o)
f = s -> Pipe i o u m u
forall u (m :: * -> *). s -> Pipe i o u m u
go
  where
    go :: s -> Pipe i o u m u
go !s
x = (i -> Pipe i o u m u) -> Pipe i o u m u
forall i o u (m :: * -> *). (i -> Pipe i o u m u) -> Pipe i o u m u
awaitWith ((i -> Pipe i o u m u) -> Pipe i o u m u)
-> (i -> Pipe i o u m u) -> Pipe i o u m u
forall a b. (a -> b) -> a -> b
$ \y :: i
y ->
        let (!s
x', !o
z) = i -> s -> (s, o)
f i
y s
x
        in  o -> Pipe i o u m ()
forall o i u (m :: * -> *). o -> Pipe i o u m ()
yield o
z Pipe i o u m () -> Pipe i o u m u -> Pipe i o u m u
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> s -> Pipe i o u m u
go s
x'

-- | Like 'foldl', but yields every accumulator value downstream.
--
-- >>> runPipePure $ sourceList [1..10]
--       .| scan (+) 0
--       .| sinkList
-- [1,3,6,10,15,21,28,36,45,55]
-- @
scan
    :: (o -> i -> o)
    -> o
    -> Pipe i o u m u
scan :: (o -> i -> o) -> o -> Pipe i o u m u
scan f :: o -> i -> o
f = o -> Pipe i o u m u
forall u (m :: * -> *). o -> Pipe i o u m u
go
  where
    go :: o -> Pipe i o u m u
go !o
x = (i -> Pipe i o u m u) -> Pipe i o u m u
forall i o u (m :: * -> *). (i -> Pipe i o u m u) -> Pipe i o u m u
awaitWith ((i -> Pipe i o u m u) -> Pipe i o u m u)
-> (i -> Pipe i o u m u) -> Pipe i o u m u
forall a b. (a -> b) -> a -> b
$ \y :: i
y ->
      let x' :: o
x' = o -> i -> o
f o
x i
y
      in  o -> Pipe i o u m ()
forall o i u (m :: * -> *). o -> Pipe i o u m ()
yield o
x' Pipe i o u m () -> Pipe i o u m u -> Pipe i o u m u
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> o -> Pipe i o u m u
go o
x'

-- | Yield consecutive pairs of values.
--
-- >>> runPipePure $ sourceList [1..5]
--       .| pairs
--       .| sinkList
-- [(1,2),(2,3),(3,4),(4,5)]
pairs :: Pipe i (i, i) u m u
pairs :: Pipe i (i, i) u m u
pairs = (i -> Pipe i (i, i) u m u) -> Pipe i (i, i) u m u
forall i o u (m :: * -> *). (i -> Pipe i o u m u) -> Pipe i o u m u
awaitWith i -> Pipe i (i, i) u m u
forall t u (m :: * -> *). t -> Pipe t (t, t) u m u
go
  where
    go :: t -> Pipe t (t, t) u m u
go x :: t
x = (t -> Pipe t (t, t) u m u) -> Pipe t (t, t) u m u
forall i o u (m :: * -> *). (i -> Pipe i o u m u) -> Pipe i o u m u
awaitWith ((t -> Pipe t (t, t) u m u) -> Pipe t (t, t) u m u)
-> (t -> Pipe t (t, t) u m u) -> Pipe t (t, t) u m u
forall a b. (a -> b) -> a -> b
$ \y :: t
y -> do
      (t, t) -> Pipe t (t, t) u m ()
forall o i u (m :: * -> *). o -> Pipe i o u m ()
yield (t
x, t
y)
      t -> Pipe t (t, t) u m u
go t
y

-- | Yield consecutive runs of at most @n@ of values, starting with an
-- empty sequence.
--
-- To get only "full" sequences, pipe with 'filter'.
--
-- >>> runPipePure $ sourceList [1..6]
--       .| consecutive 3
--       .| map toList
--       .| sinkList
-- [[],[1],[1,2],[1,2,3],[2,3,4],[3,4,5],[4,5,6]]
--
-- >>> runPipePure $ sourceList [1..6]
--       .| consecutive 3
--       .| filter ((== 3) . Seq.length)
--       .| map toList
--       .| sinkList
-- [[1,2,3],[2,3,4],[3,4,5],[4,5,6]]
consecutive :: Int -> Pipe i (Seq.Seq i) u m u
consecutive :: Int -> Pipe i (Seq i) u m u
consecutive n :: Int
n = Seq i -> Pipe i (Seq i) u m u
forall i b (m :: * -> *). Seq i -> Pipe i (Seq i) b m b
go Seq i
forall a. Seq a
Seq.empty
  where
    go :: Seq i -> Pipe i (Seq i) b m b
go xs :: Seq i
xs = do
      Seq i -> Pipe i (Seq i) b m ()
forall o i u (m :: * -> *). o -> Pipe i o u m ()
yield Seq i
xs
      (i -> Pipe i (Seq i) b m b) -> Pipe i (Seq i) b m b
forall i o u (m :: * -> *). (i -> Pipe i o u m u) -> Pipe i o u m u
awaitWith ((i -> Pipe i (Seq i) b m b) -> Pipe i (Seq i) b m b)
-> (i -> Pipe i (Seq i) b m b) -> Pipe i (Seq i) b m b
forall a b. (a -> b) -> a -> b
$ \y :: i
y -> Seq i -> Pipe i (Seq i) b m b
go (Seq i -> Pipe i (Seq i) b m b)
-> (Seq i -> Seq i) -> Seq i -> Pipe i (Seq i) b m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Seq i -> Seq i
forall a. Int -> Seq a -> Seq a
Seq.drop (Seq i -> Int
forall a. Seq a -> Int
Seq.length Seq i
xs Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ 1) (Seq i -> Pipe i (Seq i) b m b) -> Seq i -> Pipe i (Seq i) b m b
forall a b. (a -> b) -> a -> b
$ (Seq i
xs Seq i -> i -> Seq i
forall a. Seq a -> a -> Seq a
Seq.:|> i
y)


-- | Let a given number of items pass through the stream uninhibited, and
-- then stop producing forever.
--
-- This is most useful if you sequence a second conduit after it.
--
-- >>> runPipePure $ sourceList [1..8]
--       .| (do take 3 .| map (*2)         -- double the first 3 items
--              map negate                 -- negate the rest
--          )
--       .| sinkList
-- [2,4,6,-4,-5,-6,-7,-8]
take :: Int -> Pipe i i u m ()
take :: Int -> Pipe i i u m ()
take n :: Int
n = Pipe i i u m (Maybe ()) -> Pipe i i u m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Pipe i i u m (Maybe ()) -> Pipe i i u m ())
-> (MaybeT (Pipe i i u m) () -> Pipe i i u m (Maybe ()))
-> MaybeT (Pipe i i u m) ()
-> Pipe i i u m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MaybeT (Pipe i i u m) () -> Pipe i i u m (Maybe ())
forall (m :: * -> *) a. MaybeT m a -> m (Maybe a)
runMaybeT (MaybeT (Pipe i i u m) () -> Pipe i i u m (Maybe ()))
-> (MaybeT (Pipe i i u m) () -> MaybeT (Pipe i i u m) ())
-> MaybeT (Pipe i i u m) ()
-> Pipe i i u m (Maybe ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> MaybeT (Pipe i i u m) () -> MaybeT (Pipe i i u m) ()
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
n (MaybeT (Pipe i i u m) () -> Pipe i i u m ())
-> MaybeT (Pipe i i u m) () -> Pipe i i u m ()
forall a b. (a -> b) -> a -> b
$
    Pipe i i u m () -> MaybeT (Pipe i i u m) ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Pipe i i u m () -> MaybeT (Pipe i i u m) ())
-> (i -> Pipe i i u m ()) -> i -> MaybeT (Pipe i i u m) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. i -> Pipe i i u m ()
forall o i u (m :: * -> *). o -> Pipe i o u m ()
yield (i -> MaybeT (Pipe i i u m) ())
-> MaybeT (Pipe i i u m) i -> MaybeT (Pipe i i u m) ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Pipe i i u m (Maybe i) -> MaybeT (Pipe i i u m) i
forall (m :: * -> *) a. m (Maybe a) -> MaybeT m a
MaybeT Pipe i i u m (Maybe i)
forall i o u (m :: * -> *). Pipe i o u m (Maybe i)
await

-- | Let elements pass until an element is received that does not satisfy
-- the predicate, then stop producing forever.
--
-- Like 'take', is most useful if you sequence a second conduit after it.
takeWhile :: (i -> Bool) -> Pipe i i u m ()
takeWhile :: (i -> Bool) -> Pipe i i u m ()
takeWhile p :: i -> Bool
p = Pipe i i u m ()
forall u (m :: * -> *). Pipe i i u m ()
go
  where
    go :: Pipe i i u m ()
go = Pipe i i u m (Maybe i)
forall i o u (m :: * -> *). Pipe i o u m (Maybe i)
await Pipe i i u m (Maybe i)
-> (Maybe i -> Pipe i i u m ()) -> Pipe i i u m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      Nothing -> () -> Pipe i i u m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
      Just x :: i
x
        | i -> Bool
p i
x       -> i -> Pipe i i u m ()
forall o i u (m :: * -> *). o -> Pipe i o u m ()
yield i
x Pipe i i u m () -> Pipe i i u m () -> Pipe i i u m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> Pipe i i u m ()
go
        | Bool
otherwise -> () -> Pipe i i u m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

-- | Only allow values satisfying a predicate to pass.
filter
    :: (i -> Bool)
    -> Pipe i i u m u
filter :: (i -> Bool) -> Pipe i i u m u
filter p :: i -> Bool
p = (i -> Pipe i i u m ()) -> Pipe i i u m u
forall i o u (m :: * -> *) a.
(i -> Pipe i o u m a) -> Pipe i o u m u
awaitForever ((i -> Pipe i i u m ()) -> Pipe i i u m u)
-> (i -> Pipe i i u m ()) -> Pipe i i u m u
forall a b. (a -> b) -> a -> b
$ \x :: i
x -> Bool -> Pipe i i u m () -> Pipe i i u m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (i -> Bool
p i
x) (Pipe i i u m () -> Pipe i i u m ())
-> Pipe i i u m () -> Pipe i i u m ()
forall a b. (a -> b) -> a -> b
$ i -> Pipe i i u m ()
forall o i u (m :: * -> *). o -> Pipe i o u m ()
yield i
x

-- | Map a function returning a container onto every incoming item, and
-- yield all of the outputs from that function.
concatMap
    :: Foldable t
    => (i -> t o)
    -> Pipe i o u m u
concatMap :: (i -> t o) -> Pipe i o u m u
concatMap f :: i -> t o
f = (i -> Pipe i o u m ()) -> Pipe i o u m u
forall i o u (m :: * -> *) a.
(i -> Pipe i o u m a) -> Pipe i o u m u
awaitForever (t o -> Pipe i o u m ()
forall (t :: * -> *) a i u (m :: * -> *).
Foldable t =>
t a -> Pipe i a u m ()
sourceList (t o -> Pipe i o u m ()) -> (i -> t o) -> i -> Pipe i o u m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. i -> t o
f)

-- | Take an input of containers and output each of their elements
-- successively.
concat :: Foldable t => Pipe (t i) i u m u
concat :: Pipe (t i) i u m u
concat = (t i -> Pipe (t i) i u m ()) -> Pipe (t i) i u m u
forall i o u (m :: * -> *) a.
(i -> Pipe i o u m a) -> Pipe i o u m u
awaitForever t i -> Pipe (t i) i u m ()
forall (t :: * -> *) a i u (m :: * -> *).
Foldable t =>
t a -> Pipe i a u m ()
sourceList

-- | Right-fold every input into an accumulated value.
--
-- Essentially this builds up a giant continuation that will be run all at
-- once on the final result.
foldr :: (a -> b -> b) -> b -> Pipe a o u m b
foldr :: (a -> b -> b) -> b -> Pipe a o u m b
foldr f :: a -> b -> b
f z :: b
z = Pipe a o u m b
forall o u (m :: * -> *). Pipe a o u m b
go
  where
    go :: Pipe a o u m b
go = Pipe a o u m (Maybe a)
forall i o u (m :: * -> *). Pipe i o u m (Maybe i)
await Pipe a o u m (Maybe a)
-> (Maybe a -> Pipe a o u m b) -> Pipe a o u m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      Nothing -> b -> Pipe a o u m b
forall (f :: * -> *) a. Applicative f => a -> f a
pure b
z
      Just x :: a
x  -> a -> b -> b
f a
x (b -> b) -> Pipe a o u m b -> Pipe a o u m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Pipe a o u m b
go

-- | Left-fold every input into an accumulated value.
--
-- Essentially this maintains a state and modifies that state with every
-- input, using the given accumulating function.
foldl :: (b -> a -> b) -> b -> Pipe a o u m b
foldl :: (b -> a -> b) -> b -> Pipe a o u m b
foldl f :: b -> a -> b
f = b -> Pipe a o u m b
forall o u (m :: * -> *). b -> Pipe a o u m b
go
  where
    go :: b -> Pipe a o u m b
go !b
z = Pipe a o u m (Maybe a)
forall i o u (m :: * -> *). Pipe i o u m (Maybe i)
await Pipe a o u m (Maybe a)
-> (Maybe a -> Pipe a o u m b) -> Pipe a o u m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      Nothing -> b -> Pipe a o u m b
forall (f :: * -> *) a. Applicative f => a -> f a
pure b
z
      Just !a
x -> b -> Pipe a o u m b
go (b -> a -> b
f b
z a
x)

-- | Fold every incoming item monoidally, and return the result once
-- finished.
fold :: Monoid a => Pipe a o u m a
fold :: Pipe a o u m a
fold = (a -> a -> a) -> a -> Pipe a o u m a
forall b a o u (m :: * -> *). (b -> a -> b) -> b -> Pipe a o u m b
foldl a -> a -> a
forall a. Semigroup a => a -> a -> a
(<>) a
forall a. Monoid a => a
mempty

-- | Fold every incoming item according to a monoidal projection, and
-- return the result once finished.
--
-- This can be used to implement many useful consumers, like ones that find
-- the sum or the maximum item:
--
-- @
-- sum :: Num i => Pipe i o u m i
-- sum = getSum <$> foldMap Sum
--
-- maximum :: Ord i => Pipe i o u m (Maybe i)
-- maximum = fmap getMax <$> foldMap (Just . Max)
-- @
foldMap :: Monoid a => (i -> a) -> Pipe i o u m a
foldMap :: (i -> a) -> Pipe i o u m a
foldMap f :: i -> a
f = (a -> i -> a) -> a -> Pipe i o u m a
forall b a o u (m :: * -> *). (b -> a -> b) -> b -> Pipe a o u m b
foldl (\x :: a
x y :: i
y -> a
x a -> a -> a
forall a. Semigroup a => a -> a -> a
<> i -> a
f i
y) a
forall a. Monoid a => a
mempty

-- | Sink every incoming item into a list.
--
-- Note that this keeps the entire list in memory until it is all
-- eventually read.
sinkList :: Pipe i o u m [i]
sinkList :: Pipe i o u m [i]
sinkList = (i -> [i] -> [i]) -> [i] -> Pipe i o u m [i]
forall a b o u (m :: * -> *). (a -> b -> b) -> b -> Pipe a o u m b
foldr (:) []

-- | Ignore a certain amount of items from the input stream, and then stop
-- producing forever.
--
-- This is most useful if you sequence a second consumer after it:
--
-- >>> runPipePure $ sourceList [1..8]
--       .| (drop 3 >> 'sinkList')
-- [4,5,6,7,8]
drop :: Int -> Pipe i o u m ()
drop :: Int -> Pipe i o u m ()
drop n :: Int
n = Int -> Pipe i o u m (Maybe i) -> Pipe i o u m ()
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
n Pipe i o u m (Maybe i)
forall i o u (m :: * -> *). Pipe i o u m (Maybe i)
await

-- | Ignore items from an input stream as long as they match a predicate.
-- Afterwards, stop producing forever.
--
-- Like for 'drop', is most useful of you sequence a second consumer after
-- it.
dropWhile
    :: (i -> Bool)
    -> Pipe i o u m ()
dropWhile :: (i -> Bool) -> Pipe i o u m ()
dropWhile p :: i -> Bool
p = Pipe i o u m ()
forall o u (m :: * -> *). Pipe i o u m ()
go
  where
    go :: Pipe i o u m ()
go = Pipe i o u m (Maybe i)
forall i o u (m :: * -> *). Pipe i o u m (Maybe i)
await Pipe i o u m (Maybe i)
-> (Maybe i -> Pipe i o u m ()) -> Pipe i o u m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      Nothing -> () -> Pipe i o u m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
      Just x :: i
x
        | i -> Bool
p i
x       -> Pipe i o u m ()
go
        | Bool
otherwise -> () -> Pipe i o u m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

-- | Consume an entire input stream and ignore all of its outputs.
sinkNull :: Pipe i o u m ()
sinkNull :: Pipe i o u m ()
sinkNull = Pipe i o u m (Maybe i)
forall i o u (m :: * -> *). Pipe i o u m (Maybe i)
await Pipe i o u m (Maybe i)
-> (Maybe i -> Pipe i o u m ()) -> Pipe i o u m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Nothing -> () -> Pipe i o u m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    Just _  -> Pipe i o u m ()
forall i o u (m :: * -> *). Pipe i o u m ()
sinkNull

-- | Get the last item emitted by a stream.
--
-- To get the first item ("head"), use 'await' or 'awaitSurely'.
last :: Pipe i o u m (Maybe i)
last :: Pipe i o u m (Maybe i)
last = (Last i -> i) -> Maybe (Last i) -> Maybe i
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Last i -> i
forall a. Last a -> a
getLast (Maybe (Last i) -> Maybe i)
-> Pipe i o u m (Maybe (Last i)) -> Pipe i o u m (Maybe i)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (i -> Maybe (Last i)) -> Pipe i o u m (Maybe (Last i))
forall a i o u (m :: * -> *).
Monoid a =>
(i -> a) -> Pipe i o u m a
foldMap (Last i -> Maybe (Last i)
forall a. a -> Maybe a
Just (Last i -> Maybe (Last i)) -> (i -> Last i) -> i -> Maybe (Last i)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. i -> Last i
forall a. a -> Last a
Last)