Gao Hang
question
Kevin S
func createPublisher<T>(_ flowAdapter: FlowAdapter<T>) -> AnyPublisher<T?, KotlinError> { return Deferred<Publishers.HandleEvents<PassthroughSubject<T?, KotlinError>>> { let subject = PassthroughSubject<T?, KotlinError>() let canceller = flowAdapter.subscribe( onEach: { item in subject.send(item) }, onComplete: { subject.send(completion: .finished) }, onThrow: { error in subject.send(completion: .failure(KotlinError(error))) } ) return subject.handleEvents(receiveCancel: { canceller.cancel() }) }.eraseToAnyPublisher() }
fun subscribe( onEach: (item: T) -> Unit, onComplete: () -> Unit, onThrow: (error: Throwable) -> Unit, ): Canceller = JobCanceller( flow.onEach { onEach(it) } .catch { onThrow(it) } .onCompletion { onComplete() } .launchIn(scope), )
A modern programming language that makes developers happier.