-
Notifications
You must be signed in to change notification settings - Fork 31
Description
Hi all,
I was trying to do something that would allow me to partition a stream to parts, work with them as independent streams, and sensibly merge the results. Essentially a "diamond" style workflow, but extensible in how many branches the diamond has.
In the current iteration I arrived to a set of helpers that can be used to make a pipeline:
toLputs items intoInLdupLsis likecopybut the copied stream only contains the stuff from the left side ofSumjoinRstakes a transformed stream and saves its results back into the bigger stream inInRfromRtakes the stream with the transformations applied, and drops the leftover "left" Sums, only leaving the results.
Schematically, one make a stream of (Sum input output), and copy the inputs into as many other output-producing streams as feasible (bracketed between joinRs . ... . dupLs), finally scraping all the results with fromR. Code example below. I'm adding a picture of how I imagine the process (corresponds to: ... $ fromR $ joinRs . S.map (+1) . dupLs $ toL $ ...):
,--- S.map (+1)-.
,---(InL)--> dupLs -------------- \ ---- - - - -----v
x -> toL --(InR)--> - - - --------------- \ --> joinRs --- fromR --> (x+1)
| '----^ |
| repeatable as needed ................ |
So having this kinda working, I thought I'd better ask here:
- I didn't find a way to do this easily with existing functionality in
streaming; in case this looks useful to anyone, should I PR? In the other case, is this somehow easily composable from the existing stuff in streaming? - It seems to me that the use of
destroymight be too much in cases, but I failed find any smaller gun to do this properly; is there any way to express this e.g. withmapped? - can one somehow easily get rid of the fixed
Ofin the type offromR? I guess we'd instead need something more generic, such as Comonad's extract?
Thanks for any comments!
Code example:
import Streaming
import qualified Streaming.Prelude as S
toL :: (Functor f, Monad m) => Stream f m r -> Stream (Sum f x) m r
toL = maps InL
dupLs ::
(Functor f, Functor g, Monad m)
=> Stream (Sum f g) m r
-> Stream f (Stream (Sum f g) m) r
dupLs s =
destroy
s
(\x ->
case x of
InL fss -> wrap (effect (yields (InL fss)) <$ fss)
InR gss -> effect (yields (InR gss)))
(effect . lift)
return
joinRs ::
(Functor f, Functor g, Monad m)
=> Stream g (Stream (Sum f g) m) r
-> Stream (Sum f g) m r
joinRs s = destroy s (\gss -> wrap $ InR gss) join return
fromR :: (Monad m, Functor g) => Stream (Sum (Of x) g) m r -> Stream g m r
fromR = S.effects . separate
test :: IO ()
test =
S.print
. fromR
$ joinRs . mapped S.toList . chunksOf 3 . dupLs -- list chunks aren't great for streaming but it shows how the results interleave
$ joinRs . mapped S.toList . chunksOf 5 . dupLs
$ toL
$ S.each [1 .. 20 :: Int]test prints:
[1,2,3]
[1,2,3,4,5]
[4,5,6]
[7,8,9]
[6,7,8,9,10]
[10,11,12]
[11,12,13,14,15]
[13,14,15]
[16,17,18]
[16,17,18,19,20]
[19,20]
EDITS: fixed type of test to compile with monomorphism restriction, added the note about fromR