```fun <T> ObservableEmitter<T>.setCan...
# reaktive
s
Copy code
fun <T> ObservableEmitter<T>.setCancellable(cancellable: () -> Unit) {
  setDisposable(CancellableDisposable(cancellable))
}

private class CancellableDisposable(cancellable: () -> Unit) : Disposable {
  private val cancellableRef = AtomicReference<(() -> Unit)?>(cancellable)

  override val isDisposed: Boolean
    get() = cancellableRef.value != null

  override fun dispose() {
    if (!isDisposed) {
      // getAndSet() would be the right way for doing this
      // but kotlin's AtomicReference doesn't offer it yet.
      val c = cancellableRef.value
      cancellableRef.value = null
      c?.invoke()
    }
  }
}