This message was deleted.
# coroutines
s
This message was deleted.
u
I did not find the time to really understand what you are doing, but your first code snippet already shows signs of bad flow design:
Copy code
viewModelScope.launch {
            lateinit var newTx: ITransaction
            cacheRepository.createChainTxAsFlow(SwapTransaction(match = subj))
                .map {
                    newTx = it
                    repository.swap(subj)
                }
                .onEach { preProcessResponse(it, newTx) }
                .flowOn(backgroundDispatcher)
                .collect { processResponse(it) }
        }
You’d usually want flow operators to be ‘pure’. I.e. have no side effects. In this case you should get rid of
onEach
. For
map
that means you should generally only map input data to output data and not set newTx or call repository.swap. Additionally calling back to the repository from the view model should be reserved for view-triggered actions.
To see how that can be done, please let me know what above code is supposed to do and/or what these external dependencies (repository.swap, preProcessResponse, SwapTransaction, createChainTxAsFlow) do.
d
Thanks for taking a look on this. Yes, your remarks are correct, but I do not see yet a better way to implement my use case in a "kotlin way". The bigger context is I am developing exchange platform which uses the ledger (ethereum). My first iteration shown I should not do all logic on the ledger, so my 2nd iteration scattered the business logic across the server and the ledger. Here is a link on branch where migration work is going now: https://github.com/Gelassen/swap/compare/main...feature/blockchain-integration-part-2
In case if you would prefer get explanation of what happens here in chat, please let me know and will do it.
u
yes, please
if you could just explain, what above sample is supposed to do
d
Ok, mobile client initially runs functions calls 'on-chain'
It doesn't receive feedback immediately. Now it takes near 15 seconds, but I think about possible future business constraints
I moved this work into background with help of WorkManager
Each worker then put the result into the local cache
After that each mined tx should be confirmed\left record on the server side.
This piece of code get data from the local cache and run POST request to the server
Because time of receiving response from 'on-chain' request is not clear, local cache is pulled by schedule
Actually no, mobile client listen changes in local cache and when it has been updated queue received new portion of data. Until that it sleeps and wake up by schedule to check if there is not portion of data waiting for POSTing to the server
Briefly that's all.
u
Sorry, I am having a hard time relating what you write to your initial code snippet
Anyway. Some more feedback… Not sure if it will be related in the end, but this needs clean up before diagnosis 🙂
• Move flowOn to
createChainTxAsFlow
as this is where the blocking stuff happens
• Why use a flow at all if you ever only emit one single item. (If you know RxJava,
Single
in general maps to kotlin suspend function)
• If you need flows at all why not move everything into collect or processResponse?
Copy code
collect { newTx ->
  val sawp = repository.swap()
  preprocessResponse(swap, chainTx)
  processResponse(swap)
}
Ask yourself, why you use flows at all, why do you create a flow in preProcessResponse (by calling cacheRepository.createChainTxAsFlow) and why do you then throw the flow away. If not collected, it will not do any thing. Types and methods and data structures are more than pure functionality. They talk to a reader. If I see a flow I expect it to be the start of a stream that is later processed. If I see a map operator I expect it to do one thing: take an input and produce an output. If I see collect, I expect it to be the only place to actually act upon the emissions of a flow.
Besides the general feedback, let me get back to your original question. You can, as you already mentioned in your comments, always factor out functionality of your operator lambdas into further methods. And you can compse your flow step by step:
Copy code
fun itemFlow(item) = flow<Pair<ITransaction, Service>> {
                // TODO big part of code 
}
Copy code
itemFlow().flatMapConcat { item ->
                    handleItem(item)
                }
                .onStart { state.update { state -> state.copy(isLoading = true) } }
                .flatMapConcat {
                    if (it is Response.Data) { personRepository.cacheAccount(it.data) }
                    flow { emit(it) }
                }
                .collect { it ->
                    Log.d(App.TAG, "[add offer] start collect data in viewmodel")
                    processServerResponse(it) { addOfferSpecialHandler(it) }
                }
Copy code
fun handleItem() {
                    // TODO consider to move each branch case into the separate class function
                    when(item.first.type) {
                        MintTransaction::class.java.simpleName -> {
                            /* token are created only for offer, demands registered just as a db record */
//                            val mintItem = (item.first as MintTransaction).tokenId
                            logger.d("[queue polling] process MintTransaction item")
                            item.second.chainService.tokenId = (item.first as MintTransaction).tokenId
                            // val chainService = item.second//ChainService(userWalletAddress = state.value.profile.userWalletAddress, tokenId = mintItem)
                            val newService = item.second//Service(title = proposal.get()!!, date = 0L, index = listOf(), chainService = chainService)
                            logger.d("[addOffer::server] ${item.first} \n and \n ${item.second}")
                            personRepository.addOffer(
                                contact = uiState.value.profile.contact,
                                secret = uiState.value.profile.secret,
                                newService = newService
                            )
                        }
                        else -> { throw UnsupportedOperationException("Did you forget to add support of ${item.first.type} class tx?") }
                    }
                }
d
You may noticed this commit marked as a "WIP" and a whole branch is under name "feature/blockchain-integration", it is ongoing work and not finished yet. My comments regarding your feedback: 1. flowOn operator might change during the flow execution, this a reason why I prefer to expose to the level where the flow is actually invoked and read 2. API is redesigned during ongoing work and new\changed requirements, some API calls duplicate each other either by I still considering a better choice or haven't migrated yet the rest of the codebase on the new API. In general I prefer Flows over suspend functions for their chaining power. 3. collect is usually (always?) called within main thread, resource consuming operations like write to the disk or communications over network is not allowed here - offered alternative solution would not work here 4. Regarding createChainTxAsFlow you are right, I do not remember reasons why it was wrote as is, but it wasn't tested yet 5. Thanks for 'micro-flow as a separate methods' suggestion, it was out of my consideration so far and I will try it to see would it fit my use cases. Thank you for investing time helping me with this. "WIP" commits and integration branches are not something which shared for reviews, but there no better way to explain by issue with more details.