wrap higher-order effect - Polysemy

Welcome to the Functional Programming Zulip Chat Archive. You can join the chat here.

Torsten Schmits

I'm trying to call async in an interpretH:

data W m a where
  W :: m a -> W m ()

makeSem ''W

interpretW ::
  Member Async r =>
  InterpreterFor W r
interpretW =
  interpretH $ \case
    W m -> do
      mm <- raise . interpretW =<< runT m
      mm1 <- runT m
      async mm
      async mm1
      undefined

I'm struggling to get the right shape for the async call, in the first case I get an f a, in the second case it's a Sem that has one W too many in the Tactics part. Any pointers?

TheMatten

@Torsten Schmits What's the goal of W?

Torsten Schmits

@TheMatten my use case is just bundling a few effects to shorten the Members list

Torsten Schmits

@TheMatten thanks! that's a nice tool. In my case though, the function for which I want to bundle is not a Sem, but a Stream (Of a) (Sem r) (), and I can't get it to work there (I tried MFunctor.hoist (sendBundle @Async).

Also, my impression is that calling other effects from an interpreter is a fairly integral mechanism, so shouldn't it be possible with higher-order effects? Do you know if there's a way? I tried dissecting the machinery with decomp and fiddling with the Weaving and Union stuff, but it probably takes a while to internalize this enough to be able to work with it competently.

Torsten Schmits

ok, I see now that I can use lift $ send $ injBundle $ Async.Async m inside of the function. A bit verbose, but it will do.
Still interested in how to do the thing from the first post though!

Torsten Schmits

also I want to use helpers like sequenceConcurrently, I'll try to find a way to bundle those

Torsten Schmits

oh whoops, I can just use lift . sendBundle :big_smile:

Torsten Schmits

so I'm trying my best but I can't get the bundle to work with expressions like

sendBundle (sem `finally` sendBundle otherBundledEffectSem)

i.e. if there are higher order effects, again (Resource here).

Torsten Schmits

real code:

closeQueueBundle ::
   a r r0 .
  Member (StmQueue a) r0 =>
  Member (Bundle r0) r =>
  Sem r ()
closeQueueBundle =
  sendBundle StmQueue.closeQueue

finalizeBundle ::
   a r r0 o .
  Member (StmQueue a) r0 =>
  Member Resource r0 =>
  Member (Bundle r0) r =>
  Sem r o ->
  Sem r o
finalizeBundle s =
  sendBundle (s `finally` closeQueueBundle)

saying Expected type: Sem (Resource : r) o, Actual type: Sem r o on the finally expression

Torsten Schmits

ok, that was easily solved with raise :slight_smile:

Torsten Schmits
bundle ::
  Member e r0 =>
  Member (Bundle r0) r =>
  (Sem (e : r) a -> Sem (e : r) b) ->
  Sem r a ->
  Sem r b
bundle f s =
  sendBundle (f (raise s))

this does the trick: bundle async sem!

Torsten Schmits

final version:

bundle ::
  Member e r0 =>
  Member (Bundle r0) r =>
  (Sem (e : r) a -> Sem (e : r) b) ->
  Sem r a ->
  Sem r b
bundle f s =
  sendBundle (f (raise s))

bundle2 ::
  Member e r0 =>
  Member (Bundle r0) r =>
  (Sem (e : r) a -> Sem (e : r) b -> Sem (e : r) c) ->
  Sem r a ->
  Sem r b ->
  Sem r c
bundle2 f s1 s2 =
  sendBundle (f (raise s1) (raise s2))

sequenceBundled ::
  Traversable t =>
  Member Async r0 =>
  Member (Bundle r0) r =>
  t (Sem r ()) ->
  Sem r (t (Maybe ()))
sequenceBundled =
  sendBundle . sequenceConcurrently . fmap raise

raiseStream ::
  Stream (Of a) (Sem r) x ->
  Stream (Of a) (Sem (e : r)) x
raiseStream =
  MM.hoist raise

bundleStream ::
  Member e r0 =>
  Member (Bundle r0) r =>
  (Stream (Of a) (Sem (e : r)) o -> Sem (e : r) o2) ->
  Stream (Of a) (Sem r) o ->
  Sem r o2
bundleStream f =
  sendBundle . f . raiseStream

mergeStreams ::
  Members [Resource, Async, StmQueue a] r0 =>
  Member (Bundle r0) r =>
  Traversable t =>
  t (Stream (Of a) (Sem r) ()) ->
  Stream (Of a) (Sem r) ()
mergeStreams strs = do
  inThread <- lift $ bundle async mergedProducer
  Stream.untilLeft consume <* lift (sendBundle StmQueue.closeQueue *> sendBundle (await inThread))
  where
    mergedProducer =
      bundle2 finally (sequenceBundled (bundleStream producer <$> strs)) (sendBundle StmQueue.closeQueue)
    consume =
      maybeToRight () <$> sendBundle StmQueue.readQueue

pretty verbose. It would be really nice to hide this in an actual effect interpreter.

Torsten Schmits

FYI!
After having fortified my intuition about Polysemy, I took another stab at this and it turned out to be quite simple with a piece of code from the interpreter for Async:

data W m a where
  W :: m a -> W m ()

makeSem ''W

interpretW ::
  Member Async r =>
  InterpreterFor W r
interpretW =
  interpretH $ \case
      W m -> do
      ins <- getInspectorT
      handle <- raise . async . interpretW =<< runT m
      (fmap void) $ pureT (fmap (>>= inspect ins) handle)
Sandy Maguire

sweet mother of mary that's beginning to look like ghcjs code

Sandy Maguire

what have i unleashed upon the world

Torsten Schmits

@Sandy Maguire I'm open to suggestions :slight_smile:

Sandy Maguire

sorry, let me rephrase that

Sandy Maguire

"why is polysemy so complicated that that stuff is necessary?"

Torsten Schmits

well, after having worked with it intensely for a month, I would say that compared to other concepts I've used it is, on average, less complicated :slight_smile: the Tactics thing is pretty intense, sure.

Torsten Schmits

as far as I could tell, using Tactics directly isn't exactly popular (searching github for the functions yields few real results), so I'd assume that it needs more feedback to evolve into something more intuitive

Torsten Schmits

and almost all results just use a simple m a from the interpreter effect, not others, and no transformers.

Torsten Schmits

I have now reimplemented the stream merging effect I posted earlier, and I'm hitting another wall:

liftStreamEffect ::
  Functor f =>
  Members [StmQueue d, Resource, Async] r =>
  m a ->
  Sem (WithTactics (MergeStreams ThreadHandle d) f m r) a
liftStreamEffect s = do
  s1 <- runT s
  liftT (interpretMergeStreamsWith s1)

runMerge ::
   t d m f r a .
  Functor f =>
  Members [StmQueue d, Resource, Async] r =>
  Traversable t =>
  t (Stream (Of d) m a) ->
  Sem (WithTactics (MergeStreams ThreadHandle d) f m r) (f (ThreadHandle ()))
runMerge ps = do
  handle <- async (finally (sequenceConcurrently prod) StmQueue.closeQueue)
  pureT (ThreadHandle (void <$> handle))
  where
    prod =
      producer . Stream.hoistExposedPost liftStreamEffect <$> ps

interpretMergeStreamsWith ::
   d r k .
  Members [StmQueue d, Resource, Async] r =>
  InterpreterFor (MergeStreams ThreadHandle d) r

In liftStreamEffect, I need to return an a, while I have an f a, like in the W example. But I can't use the inspector trick, because it seems to swallow the stream's state (it produces the first element ad infinitum).

Torsten Schmits

would it be possible to implement a constructor for Tactics that absorbs an f back into the monad?

Torsten Schmits

finally!! My mistake was to put some of the logic in the interpreter. I just had to prepare the producer Sems in the program:

newtype ThreadHandle a =
  ThreadHandle (CC.Async (Maybe a))

producer ::
  Member (Tagged k (MergeStreams h d)) r =>
  Stream (Of d) (Sem r) o ->
  Sem r ()
producer =
  traverse_ next <=< Stream.next
  where
    next (a, str') =
      unlessM (tag MergeStreams.terminated) (tag (MergeStreams.write a) *> producer str')

mergeStreams ::
  Member (Tagged k (MergeStreams h d)) r =>
  Traversable t =>
  t (Stream (Of d) (Sem (MergeStreams h d : r)) ()) ->
  Stream (Of d) (Sem r) ()
mergeStreams inputs = do
  producerThread <- lift (tag (MergeStreams.runProducer (producer <$> inputs)))
  Stream.untilLeft consume <* lift (tag (MergeStreams.finalize producerThread))
  where
    consume =
      maybeToRight () <$> tag MergeStreams.consume

interpretMergeStreamsWith ::
  Members [StmQueue d, Resource, Async] r =>
  InterpreterFor (MergeStreams ThreadHandle d) r
interpretMergeStreamsWith =
  interpretH $ \case
    RunProducer ps -> do
      handle <- async (finally (sequenceConcurrently (recurse <$> ps)) StmQueue.closeQueue)
      pureT (ThreadHandle (void <$> handle))
      where
        recurse =
          raise . interpretMergeStreamsWith <=< runT
    Consume ->
      liftT StmQueue.readQueue
    Finalize (ThreadHandle producerThread) ->
      liftT (StmQueue.closeQueue *> await producerThread)
    Terminated ->
      liftT StmQueue.closedQueue
    Write d ->
      liftT (StmQueue.writeQueue d)

Kinda obvious in hindsight, but I've learned a lot :grinning_face_with_smiling_eyes: