https://kotlinlang.org logo
Title
s

Sam Garfinkel

08/05/2020, 2:44 PM
Is it safe to read from an
InputStream
in a
flow { }
builder? Getting warnings from IntelliJ about inappropriate blocking calls.
s

streetsofboston

08/05/2020, 2:48 PM
If you have a blocking call inside your
flow { … }
builder, the collector may block as well, since its scope’s dispatcher will be used to run the flow’s builder code.
g

gildor

08/05/2020, 2:48 PM
It will block consumer until you apply explicit dispatcher on flow using flowOn operator
s

Sam Garfinkel

08/05/2020, 2:49 PM
So I want
flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
?
:yes: 2
I’m also using a STAX parser to read from an InputStream. I gather I should do the same with that to prevent it from blocking consumers?
I’m confused why this is an issue though. Flows are cold and unbuffered. Consumers execute the entire pipeline to get each value.
s

streetsofboston

08/05/2020, 2:51 PM
But blocking code in the flow (producer) can cause the consumer to block, since they would use the same dispatcher. With the
flowOn
you can prevent that
s

Sam Garfinkel

08/05/2020, 2:53 PM
In other words, the consumer’s pool might be able to do other work but instead is blocked by the flow operator? Does
flowOn
have a buffer to resolve this?
s

streetsofboston

08/05/2020, 2:53 PM
No buffer. I uses suspend callbacks.
Instead of being blocked, you collector will be suspended.
s

Sam Garfinkel

08/05/2020, 2:53 PM
Ah I see
Ok thanks. Hopefully that will clear the warning.
g

gildor

08/05/2020, 2:54 PM
One more way, is avoid calling blocking code directly from flow, instead you can wrap this blocking code to suspend function with dispatcher, so flow itself will not block, but it just a matter of style and particular case
s

streetsofboston

08/05/2020, 2:54 PM
That’s why Flow don’t have backpressure issues like Rx does…
s

Sam Garfinkel

08/05/2020, 2:55 PM
Does wrapping blocking calls in suspend properly prevent parent Coroutines from blocking?
Unless you are specifically forking with
launch
or
async
I didn’t think this was the case
g

gildor

08/05/2020, 2:56 PM
If suspend function itself is not blocking, yes
s

streetsofboston

08/05/2020, 2:56 PM
The warning/linting system may not be that smart. I can detect a
withContext(<http://Dispatchers.IO|Dispatchers.IO>)
and remove the warning, but I doubt it can figure out the
flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
to remove the warnings….
g

gildor

08/05/2020, 2:56 PM
Don't use launch, use withContexy
s

Sam Garfinkel

08/05/2020, 2:56 PM
@streetsofboston Plus I believe
withContext
is illegal in a standard flow
g

gildor

08/05/2020, 2:57 PM
Right
s

streetsofboston

08/05/2020, 2:57 PM
You can’t emit from a different scope, but I think you can emit from a different context, correct?
Ah…. I guess you can’t do that 🙂
g

gildor

08/05/2020, 2:57 PM
withContext also create scope, so it's not allowed
s

Sam Garfinkel

08/05/2020, 2:58 PM
channelFlow
exists for this purpose but I think most uses of it are code smell
s

streetsofboston

08/05/2020, 2:58 PM
It perfectly fine to use if you want to emit data from plain non-suspending callbacks
g

gildor

08/05/2020, 2:58 PM
I think flowOn is just the easiest solution
1
☝️ 2
s

Sam Garfinkel

08/05/2020, 3:00 PM
This question has gone a bit off the rails but just to conclude, if you’re bridging callbacks
callbackFlow
is designed specifically for that purpose.
g

Gabriel Feo

08/05/2020, 3:06 PM
Yes, because callbackFlow checks whether you called awaitClose in the end
s

streetsofboston

08/05/2020, 3:09 PM
IIRC, that is the only difference between the channelFlow and callbackFlow, correct?
g

Gabriel Feo

08/05/2020, 3:12 PM
AFAIK, yes
g

gildor

08/05/2020, 3:22 PM
I don't think that usage callback/channel flow would be better in this case, it created for concurrent emit from different threads, you will also pay price for it, which doesn't make a lot of sense if just want emit from some blocking loop like reading from a stream