jw
upstream.publish { o -> just(specialValue).delaySubscription(3, SECONDS).takeUntil(o).mergeWith(o) }