First of all, downstream should not pass anything to upstream.
Second, I think there are no built in operator for the purpose you need. You can try piping the success result to another subject for caching.
(Side note: This is a Kotlin slack after all, so your question should be related to Kotlin)