https://kotlinlang.org logo
#coroutines
Title
# coroutines
b

Big Chungus

02/27/2021, 10:14 AM
What would I use to basically get a channel or flow that can potentially send to itself? Basically async tail recursion where if the x condition is satisfied, you send channel item back, otherwise process the current item
h

Hannes

02/27/2021, 10:41 AM
So you mean something like this pseudo code:
Copy code
while (true){
       val element = myChannel.receive()
       if (element is not highest priority){
            // put it in the end of the queue again
            myChannel.send(element)
            continue;
       }
}
Does not feel very efficient, nor am i sure how to formulate a “not highest priority” condition as this really depends and change on runtime
b

Big Chungus

02/27/2021, 10:43 AM
Almost, but I'd basically want to have x ammount of coroutines reading from the channel until there are no more messages
Think of the problem as walking through file directories asynchronously and printing out all files until you've printed all files. So the channel item would be File. Then if it's a file, you just print it, otherwise list all children and send them back to channel
h

Hannes

02/27/2021, 10:46 AM
I got your idea. Not sure though how this could work in practice as I also want to process “low elements” things if there are no other “high priority elements” at the moment
b

Big Chungus

02/27/2021, 10:48 AM
But there's no priority, just branching out on isFile condition
h

Hannes

02/27/2021, 10:50 AM
my problem is that i want to “print all files “at some point but I want to change the order on when each file gets printed on some “priority” property
b

Big Chungus

02/27/2021, 10:51 AM
Ah no, you just print file when you come to it and forget about it.
In the end I came up with something like this:
Copy code
override val artifacts: Flow<MCArtifact> = channelFlow {
    val pageChannel = Channel<List<MavenRepositoryClient.RepoItem>>(Channel.BUFFERED)
    supervisedLaunch {
      client.listRepositoryPath("")?.let { pageChannel.send(it) }
    }
    
    // Tracker
    supervisedLaunch {
      var ticks = 0
      do {
        delay(1.minutes)
        if (pageChannel.isEmpty) {
          ticks++
          <http://logger.info|logger.info>("Page channel empty, ${5 - ticks} ticks remaining until close")
        } else {
          ticks = 0
        }
      } while (ticks < 5)
      pageChannel.close()
    }
    
    // Workers
    List(Runtime.getRuntime().availableProcessors() * 2) {
      supervisedLaunch {
        for (page in pageChannel) {
          val mavenMetadata = page.find { it.value == "maven-metadata.xml" }
          if (mavenMetadata != null) {
            client.getArtifactDetails(mavenMetadata.path)?.let {
              send(MCArtifact(it.group, it.name, it.latestVersion))
            }
          } else {
            page
              .filter(MavenRepositoryClient.RepoItem::isDirectory)
              .map {
                logger.info("Scanning MC page ${it.path}")
                supervisedLaunch { client.listRepositoryPath(it.path)?.let { item -> pageChannel.send(item) } }
              }
          }
        }
      }
    }.joinAll()
  }
The idea basically is that you have
n worker coroutines
that are reading from buffer channel and are able to send back to it. Then you have one
tracker coroutine
that checks the channel every
x
minutes and eventually closes it once it deems it properly empty.
Happy for anyone to suggest a better way
3 Views