Big Chungus
02/27/2021, 10:14 AMHannes
02/27/2021, 10:41 AMwhile (true){
val element = myChannel.receive()
if (element is not highest priority){
// put it in the end of the queue again
myChannel.send(element)
continue;
}
}
Hannes
02/27/2021, 10:42 AMBig Chungus
02/27/2021, 10:43 AMBig Chungus
02/27/2021, 10:45 AMHannes
02/27/2021, 10:46 AMBig Chungus
02/27/2021, 10:48 AMHannes
02/27/2021, 10:50 AMBig Chungus
02/27/2021, 10:51 AMBig Chungus
02/27/2021, 3:30 PMoverride val artifacts: Flow<MCArtifact> = channelFlow {
val pageChannel = Channel<List<MavenRepositoryClient.RepoItem>>(Channel.BUFFERED)
supervisedLaunch {
client.listRepositoryPath("")?.let { pageChannel.send(it) }
}
// Tracker
supervisedLaunch {
var ticks = 0
do {
delay(1.minutes)
if (pageChannel.isEmpty) {
ticks++
<http://logger.info|logger.info>("Page channel empty, ${5 - ticks} ticks remaining until close")
} else {
ticks = 0
}
} while (ticks < 5)
pageChannel.close()
}
// Workers
List(Runtime.getRuntime().availableProcessors() * 2) {
supervisedLaunch {
for (page in pageChannel) {
val mavenMetadata = page.find { it.value == "maven-metadata.xml" }
if (mavenMetadata != null) {
client.getArtifactDetails(mavenMetadata.path)?.let {
send(MCArtifact(it.group, it.name, it.latestVersion))
}
} else {
page
.filter(MavenRepositoryClient.RepoItem::isDirectory)
.map {
logger.info("Scanning MC page ${it.path}")
supervisedLaunch { client.listRepositoryPath(it.path)?.let { item -> pageChannel.send(item) } }
}
}
}
}
}.joinAll()
}
The idea basically is that you have n worker coroutines
that are reading from buffer channel and are able to send back to it. Then you have one tracker coroutine
that checks the channel every x
minutes and eventually closes it once it deems it properly empty.Big Chungus
02/27/2021, 3:30 PM