Create my own effect, which uses Resource and Async - Polysemy

Welcome to the Functional Programming Zulip Chat Archive. You can join the chat here.

Martins

Hello guys, maybe you could help me with this.

I am trying to build my own effect, which needs to start two or more threads initially and clean up on completion.
I use bracket and async from Polysemy.Resource and Polysemy.Async for this.
The DummyHandler interpreter part DOES NOT USE neither bracket nor async.

Currently it looks similar to this:

import           Polysemy
import           Polysemy.Async
import           Polysemy.Resource

data DummyHandler m a where
    GetDummy :: DummyHandler m ()

-- First worker to be started asynchronously
workerOne :: Members '[Embed IO, Resource] r => Sem r ()
workerOne = undefined
{-
If I use 'bracket' from Resource in workerOne, I get exceptions like:
thread blocked indefinitely in an MVar operation
thread blocked indefinitely in an STM transaction
-}

-- Second worker to be started asynchronously
workerTwo :: Members '[Embed IO, Resource] r => Sem r ()
workerTwo = undefined

runDummyHandler :: Members '[Embed IO, Async, Resource] r
                => Sem (DummyHandler ': r) a
                -> Sem r a
runDummyHandler sem =
    bracket
        (do
            threadOne <- async workerOne
            threadTwo <- async workerTwo
            return (threadOne, threadTwo)
        )
        (\(threadOne, threadTwo) -> await threadOne >> await threadTwo)
        $ \_ ->
            interpret (\case
                GetDummy -> return ()
            ) sem

program :: Member DummyHandler r => Sem r ()
program = undefined

test :: IO ()
test = runM . resourceToIO . asyncToIO . runDummyHandler $ program

There are two problems with this:
1. runDummyHandler exposes the fact, that it needs Resource and Async. I would like to hide that from clients and somehow move resourceToIO . asyncToIO inside of runDummyHandler.
2. If I run another bracket inside of worker threads (workerOne - see above), I get STM related exceptions (I use STM there):

thread blocked indefinitely in an MVar operation
thread blocked indefinitely in an STM transaction

Could you guide me how should go about this? My feeling is I need to raise :: forall e r a. Sem r a -> Sem (e ': r) a Resource and Async and then consume them.
But how exactly?
And what do you think causes STM timeouts in my case?

desiredRunDummyHandler :: Members '[Embed IO] r
                       => Sem (DummyHandler ': r) a
                       -> Sem r a
desiredRunDummyHandler sem =
    resourceToIO . asyncToIO $ runDummyHandler sem
{-
 src/BfHaskell/StreamingAPI/Temp.hs:51:32: error:
    • Occurs check: cannot construct the infinite type:
        r ~ Async : Resource : r
      Expected type: Sem (Async : Resource : r) a
        Actual type: Sem r a
    • In the second argument of ‘($)’, namely ‘runDummyHandler sem’
      In the expression: resourceToIO . asyncToIO $ runDummyHandler sem
      In an equation for ‘desiredRunDummyHandler’:
          desiredRunDummyHandler sem
            = resourceToIO . asyncToIO $ runDummyHandler sem
    • Relevant bindings include
        sem :: Sem (DummyHandler : r) a
          (bound at src/BfHaskell/StreamingAPI/Temp.hs:50:24)
        desiredRunDummyHandler :: Sem (DummyHandler : r) a -> Sem r a
          (bound at src/BfHaskell/StreamingAPI/Temp.hs:50:1)
   |
51 |     resourceToIO . asyncToIO $ runDummyHandler sem
   |                                ^^^^^^^^^^^^^^^^^^^
-}

desiredTest :: IO ()
desiredTest = runM . desiredRunDummyHandler $ program
TheMatten

@Martins you did compile your code with -threaded, right?

Martins

@TheMatten Yes, I compiled with -threaded. If I remove inner bracket, three threads run and intercommunicate alright. It's not the biggest problem.
Actually, I am more interested in my first question above. That is, how to rewrite runDummyHandler (get rid of Async and Resource members and move them inside of runDummyHandler).

runDummyHandler :: Members '[Embed IO, Async, Resource] r
                => Sem (DummyHandler ': r) a
                -> Sem r a

into

runDummyHandler :: Members '[Embed IO] r
                       => Sem (DummyHandler ': r) a
                       -> Sem r a
TheMatten

@Martins

runDummyHandler :: Member (Embed IO) r
                => Sem (DummyHandler ': r) a
                -> Sem r a
runDummyHandler sem =
  resourceToIO $ asyncToIO $ bracket
    do threadOne <- async workerOne
       threadTwo <- async workerTwo
       pure (threadOne, threadTwo)
    do \(threadOne, threadTwo) ->
         await threadOne >> await threadTwo
    do \_ -> raise $ raise $
         interpret (\GetDummy -> pure ()) sem
TheMatten

You can raise interpreted action inside of last argument and then interpret added effects on top

Martins

@TheMatten Great, it compiles! Thank you very much!
I will try the same in real code now.