{-# Language BlockArguments, ScopedTypeVariables #-}
{-|
Module      : Hookup.Concurrent
Description : Concurrently run actions until one succeeds or all fail
Copyright   : (c) Eric Mertens, 2020
License     : ISC
Maintainer  : emertens@gmail.com

-}
module Hookup.Concurrent (concurrentAttempts) where

import Control.Concurrent (forkIO, throwTo)
import Control.Concurrent.Async (Async, AsyncCancelled(..), async, asyncThreadId, cancel, waitCatch, waitCatchSTM)
import Control.Concurrent.STM (STM, atomically, check, orElse, readTVar, registerDelay, retry)
import Control.Exception (SomeException, finally, mask_, onException)
import Control.Monad (join)
import Data.Foldable (for_)

concurrentAttempts ::
  Int {- ^ microsecond delay between attempts -} ->
  (a -> IO ()) {- ^ release unneeded success -} ->
  [IO a] {- ^ ordered list of attempts -} ->
  IO (Either [SomeException] a)
concurrentAttempts :: forall a.
Int -> (a -> IO ()) -> [IO a] -> IO (Either [SomeException] a)
concurrentAttempts Int
delay a -> IO ()
release [IO a]
actions =
  let st :: St a
st = St { threads :: [Async a]
threads = [],
                errors :: [SomeException]
errors = [],
                work :: [IO a]
work = [IO a]
actions,
                delay :: Int
delay = Int
delay,
                clean :: a -> IO ()
clean = a -> IO ()
release,
                readySTM :: STM ()
readySTM = STM ()
forall a. STM a
retry }
  in IO (Either [SomeException] a) -> IO (Either [SomeException] a)
forall a. IO a -> IO a
mask_ (St a -> IO (Either [SomeException] a)
forall a. St a -> Answer a
loop St a
st)

data St a = St
  { forall a. St a -> [Async a]
threads :: [Async a]
  , forall a. St a -> [SomeException]
errors  :: [SomeException]
  , forall a. St a -> [IO a]
work    :: [IO a]
  , forall a. St a -> Int
delay   :: !Int
  , forall a. St a -> a -> IO ()
clean   :: a -> IO ()
  , forall a. St a -> STM ()
readySTM :: STM ()
  }

type Answer a = IO (Either [SomeException] a)

-- | Main event loop for concurrent attempt system
loop :: forall a. St a -> Answer a
loop :: forall a. St a -> Answer a
loop St a
st = if [Async a] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null (St a -> [Async a]
forall a. St a -> [Async a]
threads St a
st) then St a -> Answer a
forall a. St a -> Answer a
nothingRunning St a
st else St a -> Answer a
forall a. St a -> Answer a
waitForEvent St a
st

-- | No threads are active, either start a new thread or return the complete error list
nothingRunning :: St a -> Answer a
nothingRunning :: forall a. St a -> Answer a
nothingRunning St a
st =
  case St a -> [IO a]
forall a. St a -> [IO a]
work St a
st of
    [] -> Either [SomeException] a -> Answer a
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([SomeException] -> Either [SomeException] a
forall a b. a -> Either a b
Left (St a -> [SomeException]
forall a. St a -> [SomeException]
errors St a
st))
    IO a
x:[IO a]
xs -> IO a -> St a -> Answer a
forall a. IO a -> St a -> Answer a
start IO a
x St a
st{work :: [IO a]
work = [IO a]
xs}

-- | Start a new thread for the given attempt
start :: IO a -> St a -> Answer a
start :: forall a. IO a -> St a -> Answer a
start IO a
x St a
st =
  do Async a
thread <- IO a -> IO (Async a)
forall a. IO a -> IO (Async a)
async IO a
x
     STM ()
ready <- if [IO a] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null (St a -> [IO a]
forall a. St a -> [IO a]
work St a
st) then STM () -> IO (STM ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure STM ()
forall a. STM a
retry else Int -> IO (STM ())
startTimer (St a -> Int
forall a. St a -> Int
delay St a
st)
     St a -> Answer a
forall a. St a -> Answer a
loop St a
st { threads :: [Async a]
threads = Async a
thread Async a -> [Async a] -> [Async a]
forall a. a -> [a] -> [a]
: St a -> [Async a]
forall a. St a -> [Async a]
threads St a
st, readySTM :: STM ()
readySTM = STM ()
ready }

-- Nothing to do but wait for a thread to finish or the timer to fire
waitForEvent :: St a -> Answer a
waitForEvent :: forall a. St a -> Answer a
waitForEvent St a
st = IO (IO (Either [SomeException] a)) -> IO (Either [SomeException] a)
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (STM (IO (Either [SomeException] a))
-> IO (IO (Either [SomeException] a))
forall a. STM a -> IO a
atomically (St a
-> [Async a] -> [Async a] -> STM (IO (Either [SomeException] a))
forall a. St a -> [Async a] -> [Async a] -> STM (Answer a)
finish St a
st [] (St a -> [Async a]
forall a. St a -> [Async a]
threads St a
st))
                  IO (IO (Either [SomeException] a))
-> IO () -> IO (IO (Either [SomeException] a))
forall a b. IO a -> IO b -> IO a
`onException` (a -> IO ()) -> [Async a] -> IO ()
forall a. (a -> IO ()) -> [Async a] -> IO ()
cleanup (St a -> a -> IO ()
forall a. St a -> a -> IO ()
clean St a
st) (St a -> [Async a]
forall a. St a -> [Async a]
threads St a
st))

-- Search for an event out of the active threads and timer
finish :: St a -> [Async a] -> [Async a] -> STM (Answer a)
finish :: forall a. St a -> [Async a] -> [Async a] -> STM (Answer a)
finish St a
st [Async a]
threads' [] = St a -> STM (Answer a)
forall a. St a -> STM (Answer a)
fresh St a
st
finish St a
st [Async a]
threads' (Async a
t:[Async a]
ts) = St a -> [Async a] -> Async a -> STM (Answer a)
forall a. St a -> [Async a] -> Async a -> STM (Answer a)
finish1 St a
st ([Async a]
threads' [Async a] -> [Async a] -> [Async a]
forall a. [a] -> [a] -> [a]
++ [Async a]
ts) Async a
t
                   STM (Answer a) -> STM (Answer a) -> STM (Answer a)
forall a. STM a -> STM a -> STM a
`orElse` St a -> [Async a] -> [Async a] -> STM (Answer a)
forall a. St a -> [Async a] -> [Async a] -> STM (Answer a)
finish St a
st (Async a
tAsync a -> [Async a] -> [Async a]
forall a. a -> [a] -> [a]
:[Async a]
threads') [Async a]
ts

-- Handle a thread completion event
finish1 :: St a -> [Async a] -> Async a -> STM (Answer a)
finish1 :: forall a. St a -> [Async a] -> Async a -> STM (Answer a)
finish1 St a
st [Async a]
threads' Async a
t =
      do Either SomeException a
res <- Async a -> STM (Either SomeException a)
forall a. Async a -> STM (Either SomeException a)
waitCatchSTM Async a
t
         Answer a -> STM (Answer a)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure case Either SomeException a
res of
           Right a
s -> a -> Either [SomeException] a
forall a b. b -> Either a b
Right a
s Either [SomeException] a -> IO () -> Answer a
forall a b. a -> IO b -> IO a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ (a -> IO ()) -> [Async a] -> IO ()
forall a. (a -> IO ()) -> [Async a] -> IO ()
cleanup (St a -> a -> IO ()
forall a. St a -> a -> IO ()
clean St a
st) [Async a]
threads'
           Left SomeException
e -> St a -> Answer a
forall a. St a -> Answer a
loop St a
st { errors :: [SomeException]
errors = SomeException
e SomeException -> [SomeException] -> [SomeException]
forall a. a -> [a] -> [a]
: St a -> [SomeException]
forall a. St a -> [SomeException]
errors St a
st, threads :: [Async a]
threads = [Async a]
threads'}

-- Handle a new thread timer event
fresh :: St a -> STM (Answer a)
fresh :: forall a. St a -> STM (Answer a)
fresh St a
st =
  case St a -> [IO a]
forall a. St a -> [IO a]
work St a
st of
    [] -> STM (Answer a)
forall a. STM a
retry
    IO a
x:[IO a]
xs -> IO a -> St a -> Answer a
forall a. IO a -> St a -> Answer a
start IO a
x St a
st{work :: [IO a]
work = [IO a]
xs} Answer a -> STM () -> STM (Answer a)
forall a b. a -> STM b -> STM a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ St a -> STM ()
forall a. St a -> STM ()
readySTM St a
st

-- | Create an STM action that only succeeds after at least 'n' microseconds have passed.
startTimer :: Int -> IO (STM ())
startTimer :: Int -> IO (STM ())
startTimer Int
n =
  do TVar Bool
v <- Int -> IO (TVar Bool)
registerDelay Int
n
     STM () -> IO (STM ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool -> STM ()
check (Bool -> STM ()) -> STM Bool -> STM ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
v)

-- non-blocking cancelation of the remaining threads
cleanup :: (a -> IO ()) -> [Async a] -> IO ()
cleanup :: forall a. (a -> IO ()) -> [Async a] -> IO ()
cleanup a -> IO ()
release [Async a]
xs =
  () () -> IO ThreadId -> IO ()
forall a b. a -> IO b -> IO a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ IO () -> IO ThreadId
forkIO do [Async a] -> (Async a -> IO ()) -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ [Async a]
xs \Async a
x -> ThreadId -> AsyncCancelled -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
throwTo (Async a -> ThreadId
forall a. Async a -> ThreadId
asyncThreadId Async a
x) AsyncCancelled
AsyncCancelled
                  [Async a] -> (Async a -> IO ()) -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ [Async a]
xs \Async a
x -> do Either SomeException a
res <- Async a -> IO (Either SomeException a)
forall a. Async a -> IO (Either SomeException a)
waitCatch Async a
x
                                   Either SomeException a -> (a -> IO ()) -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ Either SomeException a
res a -> IO ()
release