From c5c72a3cdb244371783494a68aa2af8707680258 Mon Sep 17 00:00:00 2001 From: Natasha Murashkina Date: Thu, 16 Oct 2025 16:52:26 +0100 Subject: [PATCH 01/25] add benchmarks --- .../jmh/kotlin/benchmarks/ChannelBenchmark.kt | 104 ++++++++++++++++++ 1 file changed, 104 insertions(+) create mode 100644 benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt diff --git a/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt new file mode 100644 index 0000000000..a40e759f28 --- /dev/null +++ b/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt @@ -0,0 +1,104 @@ +package benchmarks + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import org.openjdk.jmh.annotations.* +import java.util.concurrent.* + +@Warmup(iterations = 7, time = 1) +@Measurement(iterations = 10, time = 1) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@State(Scope.Benchmark) +@Fork(1) +open class ChannelBenchmark { + // max coroutines launched per benchmark + // to allow for true parallelism + val cores = 4 + + // 4 KB, 40 KB, 400 KB, 4 MB, 40 MB, 400 MB + @Param("1000", "10000", "100000", "1000000", "10000000", "100000000") + var count: Int = 0 + + // 1. Preallocate. + // 2. Different values to avoid helping the cache. + val list = ArrayList(100000000).apply { + repeat(100000000) { add(it) } + } + + @Benchmark + fun sendUnlimited() = runBlocking { + runSend(count, Channel.UNLIMITED) + } + + @Benchmark + fun sendConflated() = runBlocking { + runSend(count, Channel.CONFLATED) + } + + @Benchmark + fun sendReceiveUnlimited() = runBlocking(Dispatchers.Default) { + runSendReceive(count, Channel.UNLIMITED) + } + + @Benchmark + fun sendReceiveConflated() = runBlocking(Dispatchers.Default) { + runSendReceive(count, Channel.CONFLATED) + } + + @Benchmark + fun sendReceiveRendezvous() = runBlocking(Dispatchers.Default) { + // NB: Rendezvous is partly benchmarking the scheduler, not the channel alone. + // So don't trust the Rendezvous results too much. + runSendReceive(count, Channel.RENDEZVOUS) + } + + @Benchmark + fun oneSenderManyReceivers() = runBlocking { + runSendReceive(count, Channel.UNLIMITED, 1, cores) + } + + @Benchmark + fun manySendersOneReceiver() = runBlocking { + runSendReceive(count, Channel.UNLIMITED, cores, 1) + } + + @Benchmark + fun manySendersManyReceivers() = runBlocking { + runSendReceive(count, Channel.UNLIMITED, cores / 2, cores / 2) + } + + private suspend fun send(count: Int, channel: Channel) = coroutineScope { + list.take(count).forEach { channel.send(it) } + } + + private suspend fun runSend(count: Int, capacity: Int) { + Channel(capacity).also { + send(count, it) + } + } + + // NB: not all parameter combinations make sense in general. + // E.g., for the rendezvous channel, senders should be equal to receivers. + // If they are non-equal, it's a special case of performance under contention. + private suspend inline fun runSendReceive(count: Int, capacity: Int, senders: Int = 1, receivers: Int = 1) = + withContext(Dispatchers.Default) { + require(senders > 0 && receivers > 0) + + val channel = Channel(capacity) + repeat(receivers) { + launch { + channel.consumeEach { } + } + } + + coroutineScope { + repeat(senders) { + launch { + send(count / senders, channel) + } + } + } + channel.close() + } +} From 37be0330ef2ef38f166582e777bc89ef8096c709 Mon Sep 17 00:00:00 2001 From: Natasha Murashkina Date: Tue, 28 Oct 2025 17:53:36 +0000 Subject: [PATCH 02/25] avoid allocating (avoid list.take(count)) --- benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt index a40e759f28..41aade3831 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt @@ -69,7 +69,9 @@ open class ChannelBenchmark { } private suspend fun send(count: Int, channel: Channel) = coroutineScope { - list.take(count).forEach { channel.send(it) } + for (i in 1..count) { + channel.send(list[i]) + } } private suspend fun runSend(count: Int, capacity: Int) { From 038399294c7b7bb948bfe3d67cb5006b77d935e8 Mon Sep 17 00:00:00 2001 From: Natasha Murashkina Date: Tue, 28 Oct 2025 18:13:41 +0000 Subject: [PATCH 03/25] rename send to sendManyItems to avoid false hinting at Channel.send --- benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt index 41aade3831..52f6c78863 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt @@ -68,7 +68,7 @@ open class ChannelBenchmark { runSendReceive(count, Channel.UNLIMITED, cores / 2, cores / 2) } - private suspend fun send(count: Int, channel: Channel) = coroutineScope { + private suspend fun sendManyItems(count: Int, channel: Channel) = coroutineScope { for (i in 1..count) { channel.send(list[i]) } @@ -76,7 +76,7 @@ open class ChannelBenchmark { private suspend fun runSend(count: Int, capacity: Int) { Channel(capacity).also { - send(count, it) + sendManyItems(count, it) } } @@ -97,7 +97,7 @@ open class ChannelBenchmark { coroutineScope { repeat(senders) { launch { - send(count / senders, channel) + sendManyItems(count / senders, channel) } } } From 33f9e722c9d9b028cb90b7010c89f6dad118bfe0 Mon Sep 17 00:00:00 2001 From: Natasha Murashkina Date: Wed, 29 Oct 2025 10:15:40 +0000 Subject: [PATCH 04/25] make senders+receivers sum up to cores == 4 --- .../src/jmh/kotlin/benchmarks/ChannelBenchmark.kt | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt index 52f6c78863..a00b706cef 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt @@ -55,12 +55,12 @@ open class ChannelBenchmark { @Benchmark fun oneSenderManyReceivers() = runBlocking { - runSendReceive(count, Channel.UNLIMITED, 1, cores) + runSendReceive(count, Channel.UNLIMITED, 1, cores - 1) } @Benchmark fun manySendersOneReceiver() = runBlocking { - runSendReceive(count, Channel.UNLIMITED, cores, 1) + runSendReceive(count, Channel.UNLIMITED, cores - 1, 1) } @Benchmark @@ -83,10 +83,10 @@ open class ChannelBenchmark { // NB: not all parameter combinations make sense in general. // E.g., for the rendezvous channel, senders should be equal to receivers. // If they are non-equal, it's a special case of performance under contention. - private suspend inline fun runSendReceive(count: Int, capacity: Int, senders: Int = 1, receivers: Int = 1) = + private suspend inline fun runSendReceive(count: Int, capacity: Int, senders: Int = 1, receivers: Int = 1) { + require(senders > 0 && receivers > 0) + require(senders + receivers <= cores) withContext(Dispatchers.Default) { - require(senders > 0 && receivers > 0) - val channel = Channel(capacity) repeat(receivers) { launch { @@ -103,4 +103,5 @@ open class ChannelBenchmark { } channel.close() } + } } From bf9af79f5fa96c614d28ae204026e7c3c578b1ca Mon Sep 17 00:00:00 2001 From: Natasha Murashkina Date: Tue, 4 Nov 2025 11:51:24 +0000 Subject: [PATCH 05/25] add prefill --- .../jmh/kotlin/benchmarks/ChannelBenchmark.kt | 69 ++++++++++++++----- 1 file changed, 53 insertions(+), 16 deletions(-) diff --git a/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt index a00b706cef..8a68b0ab0a 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt @@ -22,10 +22,34 @@ open class ChannelBenchmark { // 1. Preallocate. // 2. Different values to avoid helping the cache. - val list = ArrayList(100000000).apply { - repeat(100000000) { add(it) } + val maxCount = 100000000 + val list = ArrayList(maxCount).apply { + repeat(maxCount) { add(it) } } + @State(Scope.Benchmark) + open class UnlimitedChannelWrapper { + // 0, 4 MB, 40 MB, 400 MB + @Param("0", "1000000", "10000000", "100000000") + private var prefill = 0 + + lateinit var channel: Channel + + val maxCount = 100000000 + val list = ArrayList(maxCount).apply { + repeat(maxCount) { add(it) } + } + + @Setup(Level.Invocation) + fun createPrefilledChannel() { + channel = Channel(Channel.UNLIMITED) + repeat(prefill) { + channel.trySend(list[it]) + } + } + } + + @Benchmark fun sendUnlimited() = runBlocking { runSend(count, Channel.UNLIMITED) @@ -37,35 +61,35 @@ open class ChannelBenchmark { } @Benchmark - fun sendReceiveUnlimited() = runBlocking(Dispatchers.Default) { - runSendReceive(count, Channel.UNLIMITED) + fun sendReceiveUnlimited(wrapper: UnlimitedChannelWrapper) = runBlocking(Dispatchers.Default) { + runSendReceive(wrapper.channel, count) } @Benchmark fun sendReceiveConflated() = runBlocking(Dispatchers.Default) { - runSendReceive(count, Channel.CONFLATED) + runSendReceive(Channel(Channel.CONFLATED), count) } @Benchmark fun sendReceiveRendezvous() = runBlocking(Dispatchers.Default) { // NB: Rendezvous is partly benchmarking the scheduler, not the channel alone. // So don't trust the Rendezvous results too much. - runSendReceive(count, Channel.RENDEZVOUS) + runSendReceive(Channel(Channel.RENDEZVOUS), count) } @Benchmark - fun oneSenderManyReceivers() = runBlocking { - runSendReceive(count, Channel.UNLIMITED, 1, cores - 1) + fun oneSenderManyReceivers(wrapper: UnlimitedChannelWrapper) = runBlocking { + runSendReceive(wrapper.channel, count, 1, cores - 1) } @Benchmark - fun manySendersOneReceiver() = runBlocking { - runSendReceive(count, Channel.UNLIMITED, cores - 1, 1) + fun manySendersOneReceiver(wrapper: UnlimitedChannelWrapper) = runBlocking { + runSendReceive(wrapper.channel, count, cores - 1, 1) } @Benchmark - fun manySendersManyReceivers() = runBlocking { - runSendReceive(count, Channel.UNLIMITED, cores / 2, cores / 2) + fun manySendersManyReceivers(wrapper: UnlimitedChannelWrapper) = runBlocking { + runSendReceive(wrapper.channel, count, cores / 2, cores / 2) } private suspend fun sendManyItems(count: Int, channel: Channel) = coroutineScope { @@ -83,21 +107,34 @@ open class ChannelBenchmark { // NB: not all parameter combinations make sense in general. // E.g., for the rendezvous channel, senders should be equal to receivers. // If they are non-equal, it's a special case of performance under contention. - private suspend inline fun runSendReceive(count: Int, capacity: Int, senders: Int = 1, receivers: Int = 1) { + private suspend inline fun runSendReceive(channel: Channel, count: Int, senders: Int = 1, receivers: Int = 1) { require(senders > 0 && receivers > 0) + // Can be used with more than num cores but needs thinking it through, + // e.g., what would it measure? require(senders + receivers <= cores) + // if the channel is prefilled, do not consume the prefilled items + val consumeAll = channel.isEmpty + // send almost `count` items, up to `senders - 1` items will not be sent (negligible) + val countPerSender = count / senders + // for prefilled channel only: up to `receivers - 1` items of the sent items will not be received (negligible) + val countPerReceiverAtLeast = countPerSender * senders / receivers withContext(Dispatchers.Default) { - val channel = Channel(capacity) repeat(receivers) { launch { - channel.consumeEach { } + if (consumeAll) { + channel.consumeEach { } + } else { + repeat(countPerReceiverAtLeast) { + channel.receive() + } + } } } coroutineScope { repeat(senders) { launch { - sendManyItems(count / senders, channel) + sendManyItems(countPerSender, channel) } } } From c406fbd6d65e9bdd7854ec4f51825b7267664bd2 Mon Sep 17 00:00:00 2001 From: Natasha Murashkina Date: Tue, 4 Nov 2025 11:52:26 +0000 Subject: [PATCH 06/25] remove Dispatchers.Default, it's it the runSendReceive already --- benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt index 8a68b0ab0a..96d72aeba5 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt @@ -61,7 +61,7 @@ open class ChannelBenchmark { } @Benchmark - fun sendReceiveUnlimited(wrapper: UnlimitedChannelWrapper) = runBlocking(Dispatchers.Default) { + fun sendReceiveUnlimited(wrapper: UnlimitedChannelWrapper) = runBlocking { runSendReceive(wrapper.channel, count) } From 84255616ae869006b56832072d8c6dab046bbd8f Mon Sep 17 00:00:00 2001 From: Natasha Murashkina Date: Tue, 4 Nov 2025 11:55:42 +0000 Subject: [PATCH 07/25] sendManyItems simplify and comment --- benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt index 96d72aeba5..b474fa0333 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt @@ -92,9 +92,10 @@ open class ChannelBenchmark { runSendReceive(wrapper.channel, count, cores / 2, cores / 2) } - private suspend fun sendManyItems(count: Int, channel: Channel) = coroutineScope { - for (i in 1..count) { - channel.send(list[i]) + private suspend fun sendManyItems(count: Int, channel: Channel) { + repeat(count) { + // NB: it is `send`, not `trySend`, on purpose, since we are testing the `send` performance here. + channel.send(list[it]) } } From be3f273ee2712770eda6174bd7f5821b4cdb6595 Mon Sep 17 00:00:00 2001 From: Natasha Murashkina Date: Tue, 4 Nov 2025 12:01:17 +0000 Subject: [PATCH 08/25] replace consume with receive --- .../src/jmh/kotlin/benchmarks/ChannelBenchmark.kt | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt index b474fa0333..d40425c97c 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt @@ -105,6 +105,12 @@ open class ChannelBenchmark { } } + suspend fun Channel.forEach(action: (E) -> Unit) { + for (element in this) { + action(element) + } + } + // NB: not all parameter combinations make sense in general. // E.g., for the rendezvous channel, senders should be equal to receivers. // If they are non-equal, it's a special case of performance under contention. @@ -113,8 +119,8 @@ open class ChannelBenchmark { // Can be used with more than num cores but needs thinking it through, // e.g., what would it measure? require(senders + receivers <= cores) - // if the channel is prefilled, do not consume the prefilled items - val consumeAll = channel.isEmpty + // if the channel is prefilled, do not receive the prefilled items + val receiveAll = channel.isEmpty // send almost `count` items, up to `senders - 1` items will not be sent (negligible) val countPerSender = count / senders // for prefilled channel only: up to `receivers - 1` items of the sent items will not be received (negligible) @@ -122,8 +128,8 @@ open class ChannelBenchmark { withContext(Dispatchers.Default) { repeat(receivers) { launch { - if (consumeAll) { - channel.consumeEach { } + if (receiveAll) { + channel.forEach { } } else { repeat(countPerReceiverAtLeast) { channel.receive() From b5c7bf7bdf4b9225237528b15aa8e3efd2b6a973 Mon Sep 17 00:00:00 2001 From: Natasha Murashkina Date: Tue, 4 Nov 2025 13:42:21 +0000 Subject: [PATCH 09/25] change units from ms to ns --- benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt index d40425c97c..a0cccdd47e 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt @@ -8,7 +8,7 @@ import java.util.concurrent.* @Warmup(iterations = 7, time = 1) @Measurement(iterations = 10, time = 1) @BenchmarkMode(Mode.AverageTime) -@OutputTimeUnit(TimeUnit.MILLISECONDS) +@OutputTimeUnit(TimeUnit.NANOSECONDS) @State(Scope.Benchmark) @Fork(1) open class ChannelBenchmark { From f82a49333222df716467224561bff8e159694ad6 Mon Sep 17 00:00:00 2001 From: Natasha Murashkina Date: Thu, 6 Nov 2025 11:21:15 +0000 Subject: [PATCH 10/25] fix --- .../jmh/kotlin/benchmarks/ChannelBenchmark.kt | 46 ++++++++----------- 1 file changed, 19 insertions(+), 27 deletions(-) diff --git a/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt index a0cccdd47e..6415745f3e 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt @@ -14,7 +14,7 @@ import java.util.concurrent.* open class ChannelBenchmark { // max coroutines launched per benchmark // to allow for true parallelism - val cores = 4 + val cores = Runtime.getRuntime().availableProcessors() // 4 KB, 40 KB, 400 KB, 4 MB, 40 MB, 400 MB @Param("1000", "10000", "100000", "1000000", "10000000", "100000000") @@ -22,10 +22,7 @@ open class ChannelBenchmark { // 1. Preallocate. // 2. Different values to avoid helping the cache. - val maxCount = 100000000 - val list = ArrayList(maxCount).apply { - repeat(maxCount) { add(it) } - } + val list = List(100000000) { it } @State(Scope.Benchmark) open class UnlimitedChannelWrapper { @@ -35,10 +32,7 @@ open class ChannelBenchmark { lateinit var channel: Channel - val maxCount = 100000000 - val list = ArrayList(maxCount).apply { - repeat(maxCount) { add(it) } - } + val list = List(100000000) { it } @Setup(Level.Invocation) fun createPrefilledChannel() { @@ -92,19 +86,13 @@ open class ChannelBenchmark { runSendReceive(wrapper.channel, count, cores / 2, cores / 2) } - private suspend fun sendManyItems(count: Int, channel: Channel) { + private suspend fun runSend(count: Int, capacity: Int) { + val channel = Channel(capacity) repeat(count) { - // NB: it is `send`, not `trySend`, on purpose, since we are testing the `send` performance here. channel.send(list[it]) } } - private suspend fun runSend(count: Int, capacity: Int) { - Channel(capacity).also { - sendManyItems(count, it) - } - } - suspend fun Channel.forEach(action: (E) -> Unit) { for (element in this) { action(element) @@ -115,25 +103,27 @@ open class ChannelBenchmark { // E.g., for the rendezvous channel, senders should be equal to receivers. // If they are non-equal, it's a special case of performance under contention. private suspend inline fun runSendReceive(channel: Channel, count: Int, senders: Int = 1, receivers: Int = 1) { - require(senders > 0 && receivers > 0) - // Can be used with more than num cores but needs thinking it through, - // e.g., what would it measure? - require(senders + receivers <= cores) - // if the channel is prefilled, do not receive the prefilled items + //require (senders > 0 && receivers > 0) + //require (senders + receivers <= cores) // Can be used with more than num cores, but what would it measure? + // if the channel is prefilled, only receive the items that were sent by this function val receiveAll = channel.isEmpty // send almost `count` items, up to `senders - 1` items will not be sent (negligible) val countPerSender = count / senders - // for prefilled channel only: up to `receivers - 1` items of the sent items will not be received (negligible) + // for prefilled channel only: up to `receivers - 1` items of the sent items will not be received + // (on top of the prefilled items which we do not aim to receive at all) (negligible) val countPerReceiverAtLeast = countPerSender * senders / receivers withContext(Dispatchers.Default) { repeat(receivers) { launch { if (receiveAll) { - channel.forEach { } + channel.forEach { + // possibly receive into the blackhole + } } else { repeat(countPerReceiverAtLeast) { - channel.receive() - } + // possibly receive into the blackhole + channel.receive() + } } } } @@ -141,7 +131,9 @@ open class ChannelBenchmark { coroutineScope { repeat(senders) { launch { - sendManyItems(countPerSender, channel) + repeat(countPerSender) { + channel.send(list[it]) + } } } } From 939c1d58cce6a62c3798f7262d012540fd02a108 Mon Sep 17 00:00:00 2001 From: Natasha Murashkina Date: Thu, 6 Nov 2025 11:23:45 +0000 Subject: [PATCH 11/25] attempt to align comment on top of @Param --- benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt index 6415745f3e..244e7680ab 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt @@ -17,7 +17,7 @@ open class ChannelBenchmark { val cores = Runtime.getRuntime().availableProcessors() // 4 KB, 40 KB, 400 KB, 4 MB, 40 MB, 400 MB - @Param("1000", "10000", "100000", "1000000", "10000000", "100000000") + @Param(value = ["1000", "10000", "100000", "1000000", "10000000", "100000000"]) var count: Int = 0 // 1. Preallocate. From 4647c520026c78a05c56be757c855c7fff8e6dcc Mon Sep 17 00:00:00 2001 From: Natasha Murashkina Date: Thu, 6 Nov 2025 12:33:01 +0000 Subject: [PATCH 12/25] align another entry --- benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt index 244e7680ab..16e3b46164 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt @@ -27,7 +27,7 @@ open class ChannelBenchmark { @State(Scope.Benchmark) open class UnlimitedChannelWrapper { // 0, 4 MB, 40 MB, 400 MB - @Param("0", "1000000", "10000000", "100000000") + @Param(value = ["0", "1000000", "10000000", "100000000"]) private var prefill = 0 lateinit var channel: Channel From 96bd0aef729217cbbf9de2b2f49e6e3948603e18 Mon Sep 17 00:00:00 2001 From: Natasha Murashkina Date: Thu, 6 Nov 2025 16:46:25 +0000 Subject: [PATCH 13/25] fix wording on prefilled and received counts --- .../src/jmh/kotlin/benchmarks/ChannelBenchmark.kt | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt index 16e3b46164..26947ce2f4 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt @@ -105,13 +105,15 @@ open class ChannelBenchmark { private suspend inline fun runSendReceive(channel: Channel, count: Int, senders: Int = 1, receivers: Int = 1) { //require (senders > 0 && receivers > 0) //require (senders + receivers <= cores) // Can be used with more than num cores, but what would it measure? - // if the channel is prefilled, only receive the items that were sent by this function + // If the channel is prefilled with N items, it should have (at least) N items by the end of the benchmark. + // We roughly send as many items as we receive, within this function. val receiveAll = channel.isEmpty // send almost `count` items, up to `senders - 1` items will not be sent (negligible) val countPerSender = count / senders - // for prefilled channel only: up to `receivers - 1` items of the sent items will not be received - // (on top of the prefilled items which we do not aim to receive at all) (negligible) - val countPerReceiverAtLeast = countPerSender * senders / receivers + val countSent = countPerSender * senders + // for prefilled channel only: up to `receivers - 1` number of items will not be received + // (on top of the number of prefilled items, which we do not aim to receive at all) (negligible) + val countPerReceiverAtLeast = countSent / receivers withContext(Dispatchers.Default) { repeat(receivers) { launch { From 6c7a07ee46cab43e33e15ab3dc38e2fa012e1d1a Mon Sep 17 00:00:00 2001 From: Natasha Murashkina Date: Wed, 3 Dec 2025 18:09:08 +0000 Subject: [PATCH 14/25] add nano benches (many invocations) --- .../ChannelNanoBenchmarkConflated.kt | 26 +++++++++++++++ .../ChannelNanoBenchmarkUnlimited.kt | 33 +++++++++++++++++++ 2 files changed, 59 insertions(+) create mode 100644 benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkConflated.kt create mode 100644 benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkUnlimited.kt diff --git a/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkConflated.kt b/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkConflated.kt new file mode 100644 index 0000000000..329026b68d --- /dev/null +++ b/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkConflated.kt @@ -0,0 +1,26 @@ +package benchmarks + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import org.openjdk.jmh.annotations.* +import java.util.concurrent.* + +@Warmup(iterations = 5, time = 1) +@Measurement(iterations = 5, time = 1) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@State(Scope.Benchmark) +@Fork(1) +open class ChannelNanoBenchmarkConflated { + var channel: Channel = Channel(Channel.CONFLATED) + + @Benchmark + fun send() = runBlocking { + channel.send(42) + } + + @Benchmark + fun trySend() { + channel.trySend(42) + } +} diff --git a/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkUnlimited.kt b/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkUnlimited.kt new file mode 100644 index 0000000000..34f5dea869 --- /dev/null +++ b/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkUnlimited.kt @@ -0,0 +1,33 @@ +package benchmarks + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import org.openjdk.jmh.annotations.* +import java.util.concurrent.* + +@Warmup(iterations = 5, time = 1) +@Measurement(iterations = 5, time = 1) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@State(Scope.Benchmark) +@Fork(1) +open class ChannelNanoBenchmarkUnlimited { + @Param(value = ["0", "100000", "1000000", "10000000", "100000000"]) // 0, 400 KB, 4, 40, 400 MB + private var prefill = 0 + + lateinit var channel: Channel + + @Setup(Level.Trial) + fun prefill() { + channel = Channel(Channel.UNLIMITED) + repeat(prefill) { + channel.trySend(it) + } + } + + @Benchmark + fun sendReceive() = runBlocking { + channel.send(42) + return@runBlocking channel.receive() + } +} From bcbc6ef52c9ccc37cdaf66a82ac5733df38d41a6 Mon Sep 17 00:00:00 2001 From: Natasha Murashkina Date: Wed, 3 Dec 2025 18:38:43 +0000 Subject: [PATCH 15/25] fix wording --- .../src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkUnlimited.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkUnlimited.kt b/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkUnlimited.kt index 34f5dea869..64a183c850 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkUnlimited.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkUnlimited.kt @@ -18,7 +18,7 @@ open class ChannelNanoBenchmarkUnlimited { lateinit var channel: Channel @Setup(Level.Trial) - fun prefill() { + fun createPrefilledChannel() { channel = Channel(Channel.UNLIMITED) repeat(prefill) { channel.trySend(it) From ab03044c10a49fae035e2c1c6de16d9a1fadcebe Mon Sep 17 00:00:00 2001 From: Natasha Murashkina Date: Wed, 3 Dec 2025 18:42:47 +0000 Subject: [PATCH 16/25] cleanup --- benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt index 26947ce2f4..3a70c5852f 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt @@ -16,8 +16,7 @@ open class ChannelBenchmark { // to allow for true parallelism val cores = Runtime.getRuntime().availableProcessors() - // 4 KB, 40 KB, 400 KB, 4 MB, 40 MB, 400 MB - @Param(value = ["1000", "10000", "100000", "1000000", "10000000", "100000000"]) + @Param(value = ["1000", "10000", "100000", "1000000", "10000000", "100000000"]) // 4, 40, 400, KB, 4, 40, 400 MB var count: Int = 0 // 1. Preallocate. @@ -26,8 +25,7 @@ open class ChannelBenchmark { @State(Scope.Benchmark) open class UnlimitedChannelWrapper { - // 0, 4 MB, 40 MB, 400 MB - @Param(value = ["0", "1000000", "10000000", "100000000"]) + @Param(value = ["0", "100000", "1000000", "10000000", "100000000"]) // 0, 400 KB, 4, 40, 400 MB private var prefill = 0 lateinit var channel: Channel @@ -49,6 +47,8 @@ open class ChannelBenchmark { runSend(count, Channel.UNLIMITED) } + // Similar to ChannelNanoBenchmarkConflated + // but ~4x faster due to hotpath (and it's ok) @Benchmark fun sendConflated() = runBlocking { runSend(count, Channel.CONFLATED) From 965d1abdd85ffd5f8d0f022acbcf51091a83a3d5 Mon Sep 17 00:00:00 2001 From: Natasha Murashkina Date: Wed, 3 Dec 2025 18:46:08 +0000 Subject: [PATCH 17/25] add blackhole --- .../jmh/kotlin/benchmarks/ChannelBenchmark.kt | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt index 3a70c5852f..64d3cf1981 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt @@ -3,6 +3,7 @@ package benchmarks import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import org.openjdk.jmh.annotations.* +import org.openjdk.jmh.infra.Blackhole import java.util.concurrent.* @Warmup(iterations = 7, time = 1) @@ -55,35 +56,35 @@ open class ChannelBenchmark { } @Benchmark - fun sendReceiveUnlimited(wrapper: UnlimitedChannelWrapper) = runBlocking { - runSendReceive(wrapper.channel, count) + fun sendReceiveUnlimited(bh: Blackhole, wrapper: UnlimitedChannelWrapper) = runBlocking { + runSendReceive(bh, wrapper.channel, count) } @Benchmark - fun sendReceiveConflated() = runBlocking(Dispatchers.Default) { - runSendReceive(Channel(Channel.CONFLATED), count) + fun sendReceiveConflated(bh: Blackhole) = runBlocking(Dispatchers.Default) { + runSendReceive(bh, Channel(Channel.CONFLATED), count) } @Benchmark - fun sendReceiveRendezvous() = runBlocking(Dispatchers.Default) { + fun sendReceiveRendezvous(bh: Blackhole) = runBlocking(Dispatchers.Default) { // NB: Rendezvous is partly benchmarking the scheduler, not the channel alone. // So don't trust the Rendezvous results too much. - runSendReceive(Channel(Channel.RENDEZVOUS), count) + runSendReceive(bh, Channel(Channel.RENDEZVOUS), count) } @Benchmark - fun oneSenderManyReceivers(wrapper: UnlimitedChannelWrapper) = runBlocking { - runSendReceive(wrapper.channel, count, 1, cores - 1) + fun oneSenderManyReceivers(bh: Blackhole, wrapper: UnlimitedChannelWrapper) = runBlocking { + runSendReceive(bh, wrapper.channel, count, 1, cores - 1) } @Benchmark - fun manySendersOneReceiver(wrapper: UnlimitedChannelWrapper) = runBlocking { - runSendReceive(wrapper.channel, count, cores - 1, 1) + fun manySendersOneReceiver(bh: Blackhole, wrapper: UnlimitedChannelWrapper) = runBlocking { + runSendReceive(bh, wrapper.channel, count, cores - 1, 1) } @Benchmark - fun manySendersManyReceivers(wrapper: UnlimitedChannelWrapper) = runBlocking { - runSendReceive(wrapper.channel, count, cores / 2, cores / 2) + fun manySendersManyReceivers(bh: Blackhole, wrapper: UnlimitedChannelWrapper) = runBlocking { + runSendReceive(bh, wrapper.channel, count, cores / 2, cores / 2) } private suspend fun runSend(count: Int, capacity: Int) { @@ -102,7 +103,7 @@ open class ChannelBenchmark { // NB: not all parameter combinations make sense in general. // E.g., for the rendezvous channel, senders should be equal to receivers. // If they are non-equal, it's a special case of performance under contention. - private suspend inline fun runSendReceive(channel: Channel, count: Int, senders: Int = 1, receivers: Int = 1) { + private suspend inline fun runSendReceive(bh: Blackhole, channel: Channel, count: Int, senders: Int = 1, receivers: Int = 1) { //require (senders > 0 && receivers > 0) //require (senders + receivers <= cores) // Can be used with more than num cores, but what would it measure? // If the channel is prefilled with N items, it should have (at least) N items by the end of the benchmark. @@ -119,12 +120,11 @@ open class ChannelBenchmark { launch { if (receiveAll) { channel.forEach { - // possibly receive into the blackhole + bh.consume(it) } } else { repeat(countPerReceiverAtLeast) { - // possibly receive into the blackhole - channel.receive() + bh.consume(channel.receive()) } } } From dadde28cab13b5df9a095613c7587ad794d0ba29 Mon Sep 17 00:00:00 2001 From: Natasha Murashkina Date: Thu, 4 Dec 2025 14:10:43 +0000 Subject: [PATCH 18/25] add warnings to nano benchmarks --- benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt | 2 +- .../kotlin/benchmarks/ChannelNanoBenchmarkConflated.kt | 2 +- .../kotlin/benchmarks/ChannelNanoBenchmarkUnlimited.kt | 8 +++++++- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt index 64d3cf1981..269306e7af 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt @@ -49,7 +49,7 @@ open class ChannelBenchmark { } // Similar to ChannelNanoBenchmarkConflated - // but ~4x faster due to hotpath (and it's ok) + // but ~4x faster due to lesser runBlocking switching and possibly hotpath (and it's ok) @Benchmark fun sendConflated() = runBlocking { runSend(count, Channel.CONFLATED) diff --git a/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkConflated.kt b/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkConflated.kt index 329026b68d..22e394ddd6 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkConflated.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkConflated.kt @@ -15,7 +15,7 @@ open class ChannelNanoBenchmarkConflated { var channel: Channel = Channel(Channel.CONFLATED) @Benchmark - fun send() = runBlocking { + fun sendRUNBLOCKINGOVERHEAD() = runBlocking { channel.send(42) } diff --git a/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkUnlimited.kt b/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkUnlimited.kt index 64a183c850..96765929d5 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkUnlimited.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkUnlimited.kt @@ -26,8 +26,14 @@ open class ChannelNanoBenchmarkUnlimited { } @Benchmark - fun sendReceive() = runBlocking { + fun sendReceiveRUNBLOCKINGOVERHEAD(): Int = runBlocking { channel.send(42) return@runBlocking channel.receive() } + + @Benchmark + fun trySendTryReceive(): Int { + channel.trySend(42) + return channel.tryReceive().getOrThrow() + } } From d5884dd70c05df6daae4a7ed577f079d0c546ccc Mon Sep 17 00:00:00 2001 From: Natasha Murashkina Date: Fri, 5 Dec 2025 16:21:44 +0000 Subject: [PATCH 19/25] housekeeping --- .../jmh/kotlin/benchmarks/ChannelBenchmark.kt | 26 +++++-- .../ChannelNanoBenchmarkConflated.kt | 14 +++- .../ChannelNanoBenchmarkUnlimited.kt | 78 +++++++++++++++---- 3 files changed, 96 insertions(+), 22 deletions(-) diff --git a/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt index 269306e7af..fceb4e6e39 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt @@ -6,6 +6,22 @@ import org.openjdk.jmh.annotations.* import org.openjdk.jmh.infra.Blackhole import java.util.concurrent.* +/** + * This is NOT a comprehensive channel benchmark suit. + * It's a collection of rather arbitrary low-effort benchmarks. + * + * This collection does NOT measure: + * - isolated `send` or `receive` performance + * - typical channel usage patterns (they are unknown and need to be researched) + * + * These benchmarks are affected by: + * - Initializing and maintaining structured concurrency overhead (`runBlocking`) + * - Thread scheduling + * - Internal channel structure, such as allocations + * - GC + * - Not necessarily hot-path performance + * - Not necessarily typical usage patterns + */ @Warmup(iterations = 7, time = 1) @Measurement(iterations = 10, time = 1) @BenchmarkMode(Mode.AverageTime) @@ -17,21 +33,21 @@ open class ChannelBenchmark { // to allow for true parallelism val cores = Runtime.getRuntime().availableProcessors() - @Param(value = ["1000", "10000", "100000", "1000000", "10000000", "100000000"]) // 4, 40, 400, KB, 4, 40, 400 MB + @Param(value = ["1000", "10000", "100000", "1000000", "10000000"]) // 4, 40, 400 KB, 4, 40 MB var count: Int = 0 // 1. Preallocate. // 2. Different values to avoid helping the cache. - val list = List(100000000) { it } + val list = List(100_000_000) { it } @State(Scope.Benchmark) open class UnlimitedChannelWrapper { - @Param(value = ["0", "100000", "1000000", "10000000", "100000000"]) // 0, 400 KB, 4, 40, 400 MB + @Param(value = ["0", "100000", "1000000", "10000000"]) // 0, 400 KB, 4, 40 MB private var prefill = 0 lateinit var channel: Channel - val list = List(100000000) { it } + val list = List(100_000_000) { it } @Setup(Level.Invocation) fun createPrefilledChannel() { @@ -48,8 +64,6 @@ open class ChannelBenchmark { runSend(count, Channel.UNLIMITED) } - // Similar to ChannelNanoBenchmarkConflated - // but ~4x faster due to lesser runBlocking switching and possibly hotpath (and it's ok) @Benchmark fun sendConflated() = runBlocking { runSend(count, Channel.CONFLATED) diff --git a/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkConflated.kt b/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkConflated.kt index 22e394ddd6..4d82482e7a 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkConflated.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkConflated.kt @@ -15,7 +15,7 @@ open class ChannelNanoBenchmarkConflated { var channel: Channel = Channel(Channel.CONFLATED) @Benchmark - fun sendRUNBLOCKINGOVERHEAD() = runBlocking { + fun send() = runBlocking { channel.send(42) } @@ -23,4 +23,16 @@ open class ChannelNanoBenchmarkConflated { fun trySend() { channel.trySend(42) } + + @Benchmark + fun sendReceive(): Int = runBlocking { + channel.send(42) + return@runBlocking channel.receive() + } + + @Benchmark + fun trySendTryReceive(): Int { + channel.trySend(42) + return channel.tryReceive().getOrThrow() + } } diff --git a/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkUnlimited.kt b/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkUnlimited.kt index 96765929d5..16132e3ebf 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkUnlimited.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkUnlimited.kt @@ -9,31 +9,79 @@ import java.util.concurrent.* @Measurement(iterations = 5, time = 1) @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.NANOSECONDS) -@State(Scope.Benchmark) @Fork(1) open class ChannelNanoBenchmarkUnlimited { - @Param(value = ["0", "100000", "1000000", "10000000", "100000000"]) // 0, 400 KB, 4, 40, 400 MB - private var prefill = 0 + @State(Scope.Benchmark) + open class PrefilledChannelState { + private val list = List(10_000_000) { it } - lateinit var channel: Channel + @Param(value = ["0", "100000", "1000000", "10000000"]) // 0, 400 KB, 4, 40 MB + private var prefill = 0 - @Setup(Level.Trial) - fun createPrefilledChannel() { - channel = Channel(Channel.UNLIMITED) - repeat(prefill) { - channel.trySend(it) + lateinit var channel: Channel + + @Setup(Level.Trial) + fun createPrefilledChannel() { + channel = Channel(Channel.UNLIMITED) + repeat(prefill) { + channel.trySend(list[it]) + } + } + } + + @Benchmark + fun sendReceive(s: PrefilledChannelState): Int = runBlocking { + s.channel.send(42) + return@runBlocking s.channel.receive() + } + + @Benchmark + fun trySendTryReceive(s: PrefilledChannelState): Int { + s.channel.trySend(42) + return s.channel.tryReceive().getOrThrow() + } + + @State(Scope.Benchmark) + open class EmptyChannelState { + lateinit var channel: Channel + + @Setup(Level.Iteration) + fun createEmptyChannel() { + channel = Channel(Channel.UNLIMITED) + } + } + + @Benchmark + fun send(s: EmptyChannelState) = runBlocking { + s.channel.send(42) + } + + @Benchmark + fun trySend(s: EmptyChannelState) { + s.channel.trySend(42) + } + + @State(Scope.Benchmark) + open class BigChannelState { + private val list = List(100_000_000) { it } + lateinit var channel: Channel + + @Setup(Level.Iteration) + fun createPrefilledChannel() { + channel = Channel(Channel.UNLIMITED) + for (it in list) { + channel.trySend(it) + } } } @Benchmark - fun sendReceiveRUNBLOCKINGOVERHEAD(): Int = runBlocking { - channel.send(42) - return@runBlocking channel.receive() + fun receive(s: BigChannelState): Int = runBlocking { + return@runBlocking s.channel.receive() } @Benchmark - fun trySendTryReceive(): Int { - channel.trySend(42) - return channel.tryReceive().getOrThrow() + fun tryReceive(s: BigChannelState): Int { + return s.channel.tryReceive().getOrThrow() } } From d462eac0f4de86d9d29fa8c8a5314719f245ee50 Mon Sep 17 00:00:00 2001 From: Natasha Murashkina Date: Fri, 5 Dec 2025 16:24:37 +0000 Subject: [PATCH 20/25] rename wrapper into state --- .../jmh/kotlin/benchmarks/ChannelBenchmark.kt | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt index fceb4e6e39..e18003c121 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt @@ -41,7 +41,7 @@ open class ChannelBenchmark { val list = List(100_000_000) { it } @State(Scope.Benchmark) - open class UnlimitedChannelWrapper { + open class PrefilledChannelState { @Param(value = ["0", "100000", "1000000", "10000000"]) // 0, 400 KB, 4, 40 MB private var prefill = 0 @@ -70,8 +70,8 @@ open class ChannelBenchmark { } @Benchmark - fun sendReceiveUnlimited(bh: Blackhole, wrapper: UnlimitedChannelWrapper) = runBlocking { - runSendReceive(bh, wrapper.channel, count) + fun sendReceiveUnlimited(bh: Blackhole, s: PrefilledChannelState) = runBlocking { + runSendReceive(bh, s.channel, count) } @Benchmark @@ -87,18 +87,18 @@ open class ChannelBenchmark { } @Benchmark - fun oneSenderManyReceivers(bh: Blackhole, wrapper: UnlimitedChannelWrapper) = runBlocking { - runSendReceive(bh, wrapper.channel, count, 1, cores - 1) + fun oneSenderManyReceivers(bh: Blackhole, s: PrefilledChannelState) = runBlocking { + runSendReceive(bh, s.channel, count, 1, cores - 1) } @Benchmark - fun manySendersOneReceiver(bh: Blackhole, wrapper: UnlimitedChannelWrapper) = runBlocking { - runSendReceive(bh, wrapper.channel, count, cores - 1, 1) + fun manySendersOneReceiver(bh: Blackhole, s: PrefilledChannelState) = runBlocking { + runSendReceive(bh, s.channel, count, cores - 1, 1) } @Benchmark - fun manySendersManyReceivers(bh: Blackhole, wrapper: UnlimitedChannelWrapper) = runBlocking { - runSendReceive(bh, wrapper.channel, count, cores / 2, cores / 2) + fun manySendersManyReceivers(bh: Blackhole, s: PrefilledChannelState) = runBlocking { + runSendReceive(bh, s.channel, count, cores / 2, cores / 2) } private suspend fun runSend(count: Int, capacity: Int) { From 52bdf6d5edde6be69ce2fc29c19029c8e5a3a867 Mon Sep 17 00:00:00 2001 From: Natasha Murashkina Date: Fri, 5 Dec 2025 16:24:47 +0000 Subject: [PATCH 21/25] add warning to kdocs --- .../src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkConflated.kt | 3 +++ .../src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkUnlimited.kt | 3 +++ 2 files changed, 6 insertions(+) diff --git a/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkConflated.kt b/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkConflated.kt index 4d82482e7a..42451edbc9 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkConflated.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkConflated.kt @@ -5,6 +5,9 @@ import kotlinx.coroutines.channels.* import org.openjdk.jmh.annotations.* import java.util.concurrent.* +/** + * Benchmarks with `runBlocking` are significantly skewed by `runBlocking` overhead. + */ @Warmup(iterations = 5, time = 1) @Measurement(iterations = 5, time = 1) @BenchmarkMode(Mode.AverageTime) diff --git a/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkUnlimited.kt b/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkUnlimited.kt index 16132e3ebf..74547099cf 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkUnlimited.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkUnlimited.kt @@ -5,6 +5,9 @@ import kotlinx.coroutines.channels.* import org.openjdk.jmh.annotations.* import java.util.concurrent.* +/** + * Benchmarks with `runBlocking` are significantly skewed by `runBlocking` overhead. + */ @Warmup(iterations = 5, time = 1) @Measurement(iterations = 5, time = 1) @BenchmarkMode(Mode.AverageTime) From 218c945e0d8658452262fd02a3bcada86825e455 Mon Sep 17 00:00:00 2001 From: Natasha Murashkina Date: Thu, 11 Dec 2025 14:40:00 +0000 Subject: [PATCH 22/25] adjust warmup and measurement iterations, prefill values --- .../kotlin/benchmarks/ChannelNanoBenchmarkConflated.kt | 4 ++-- .../kotlin/benchmarks/ChannelNanoBenchmarkUnlimited.kt | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkConflated.kt b/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkConflated.kt index 42451edbc9..be43e8e6cc 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkConflated.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkConflated.kt @@ -8,8 +8,8 @@ import java.util.concurrent.* /** * Benchmarks with `runBlocking` are significantly skewed by `runBlocking` overhead. */ -@Warmup(iterations = 5, time = 1) -@Measurement(iterations = 5, time = 1) +@Warmup(iterations = 30, time = 1) +@Measurement(iterations = 30, time = 1) @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.NANOSECONDS) @State(Scope.Benchmark) diff --git a/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkUnlimited.kt b/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkUnlimited.kt index 74547099cf..b07eeed46c 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkUnlimited.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkUnlimited.kt @@ -8,17 +8,17 @@ import java.util.concurrent.* /** * Benchmarks with `runBlocking` are significantly skewed by `runBlocking` overhead. */ -@Warmup(iterations = 5, time = 1) -@Measurement(iterations = 5, time = 1) +@Warmup(iterations = 30, time = 1) +@Measurement(iterations = 30, time = 1) @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.NANOSECONDS) @Fork(1) open class ChannelNanoBenchmarkUnlimited { @State(Scope.Benchmark) open class PrefilledChannelState { - private val list = List(10_000_000) { it } + private val list = List(1_000_000) { it } - @Param(value = ["0", "100000", "1000000", "10000000"]) // 0, 400 KB, 4, 40 MB + @Param(value = ["0", "10000", "100000", "1000000"]) // 0, 40, 400 KB, 4 MB private var prefill = 0 lateinit var channel: Channel From c4d6be5b74fa08af6834c500d77d8908a72aa305 Mon Sep 17 00:00:00 2001 From: Natasha Murashkina Date: Thu, 11 Dec 2025 15:21:49 +0000 Subject: [PATCH 23/25] rm ChannelBenchmark.kt --- .../jmh/kotlin/benchmarks/ChannelBenchmark.kt | 159 ------------------ 1 file changed, 159 deletions(-) delete mode 100644 benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt diff --git a/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt deleted file mode 100644 index e18003c121..0000000000 --- a/benchmarks/src/jmh/kotlin/benchmarks/ChannelBenchmark.kt +++ /dev/null @@ -1,159 +0,0 @@ -package benchmarks - -import kotlinx.coroutines.* -import kotlinx.coroutines.channels.* -import org.openjdk.jmh.annotations.* -import org.openjdk.jmh.infra.Blackhole -import java.util.concurrent.* - -/** - * This is NOT a comprehensive channel benchmark suit. - * It's a collection of rather arbitrary low-effort benchmarks. - * - * This collection does NOT measure: - * - isolated `send` or `receive` performance - * - typical channel usage patterns (they are unknown and need to be researched) - * - * These benchmarks are affected by: - * - Initializing and maintaining structured concurrency overhead (`runBlocking`) - * - Thread scheduling - * - Internal channel structure, such as allocations - * - GC - * - Not necessarily hot-path performance - * - Not necessarily typical usage patterns - */ -@Warmup(iterations = 7, time = 1) -@Measurement(iterations = 10, time = 1) -@BenchmarkMode(Mode.AverageTime) -@OutputTimeUnit(TimeUnit.NANOSECONDS) -@State(Scope.Benchmark) -@Fork(1) -open class ChannelBenchmark { - // max coroutines launched per benchmark - // to allow for true parallelism - val cores = Runtime.getRuntime().availableProcessors() - - @Param(value = ["1000", "10000", "100000", "1000000", "10000000"]) // 4, 40, 400 KB, 4, 40 MB - var count: Int = 0 - - // 1. Preallocate. - // 2. Different values to avoid helping the cache. - val list = List(100_000_000) { it } - - @State(Scope.Benchmark) - open class PrefilledChannelState { - @Param(value = ["0", "100000", "1000000", "10000000"]) // 0, 400 KB, 4, 40 MB - private var prefill = 0 - - lateinit var channel: Channel - - val list = List(100_000_000) { it } - - @Setup(Level.Invocation) - fun createPrefilledChannel() { - channel = Channel(Channel.UNLIMITED) - repeat(prefill) { - channel.trySend(list[it]) - } - } - } - - - @Benchmark - fun sendUnlimited() = runBlocking { - runSend(count, Channel.UNLIMITED) - } - - @Benchmark - fun sendConflated() = runBlocking { - runSend(count, Channel.CONFLATED) - } - - @Benchmark - fun sendReceiveUnlimited(bh: Blackhole, s: PrefilledChannelState) = runBlocking { - runSendReceive(bh, s.channel, count) - } - - @Benchmark - fun sendReceiveConflated(bh: Blackhole) = runBlocking(Dispatchers.Default) { - runSendReceive(bh, Channel(Channel.CONFLATED), count) - } - - @Benchmark - fun sendReceiveRendezvous(bh: Blackhole) = runBlocking(Dispatchers.Default) { - // NB: Rendezvous is partly benchmarking the scheduler, not the channel alone. - // So don't trust the Rendezvous results too much. - runSendReceive(bh, Channel(Channel.RENDEZVOUS), count) - } - - @Benchmark - fun oneSenderManyReceivers(bh: Blackhole, s: PrefilledChannelState) = runBlocking { - runSendReceive(bh, s.channel, count, 1, cores - 1) - } - - @Benchmark - fun manySendersOneReceiver(bh: Blackhole, s: PrefilledChannelState) = runBlocking { - runSendReceive(bh, s.channel, count, cores - 1, 1) - } - - @Benchmark - fun manySendersManyReceivers(bh: Blackhole, s: PrefilledChannelState) = runBlocking { - runSendReceive(bh, s.channel, count, cores / 2, cores / 2) - } - - private suspend fun runSend(count: Int, capacity: Int) { - val channel = Channel(capacity) - repeat(count) { - channel.send(list[it]) - } - } - - suspend fun Channel.forEach(action: (E) -> Unit) { - for (element in this) { - action(element) - } - } - - // NB: not all parameter combinations make sense in general. - // E.g., for the rendezvous channel, senders should be equal to receivers. - // If they are non-equal, it's a special case of performance under contention. - private suspend inline fun runSendReceive(bh: Blackhole, channel: Channel, count: Int, senders: Int = 1, receivers: Int = 1) { - //require (senders > 0 && receivers > 0) - //require (senders + receivers <= cores) // Can be used with more than num cores, but what would it measure? - // If the channel is prefilled with N items, it should have (at least) N items by the end of the benchmark. - // We roughly send as many items as we receive, within this function. - val receiveAll = channel.isEmpty - // send almost `count` items, up to `senders - 1` items will not be sent (negligible) - val countPerSender = count / senders - val countSent = countPerSender * senders - // for prefilled channel only: up to `receivers - 1` number of items will not be received - // (on top of the number of prefilled items, which we do not aim to receive at all) (negligible) - val countPerReceiverAtLeast = countSent / receivers - withContext(Dispatchers.Default) { - repeat(receivers) { - launch { - if (receiveAll) { - channel.forEach { - bh.consume(it) - } - } else { - repeat(countPerReceiverAtLeast) { - bh.consume(channel.receive()) - } - } - } - } - - coroutineScope { - repeat(senders) { - launch { - repeat(countPerSender) { - channel.send(list[it]) - } - } - } - } - channel.close() - } - } -} From ddf86bc459a892b555b914ed9c2e2701f594ae0f Mon Sep 17 00:00:00 2001 From: Natasha Murashkina Date: Thu, 11 Dec 2025 15:25:53 +0000 Subject: [PATCH 24/25] rm runBlocking and GC-noisy benches from ChannelNanoBenchmark* --- .../ChannelNanoBenchmarkConflated.kt | 15 ------ .../ChannelNanoBenchmarkUnlimited.kt | 54 ------------------- 2 files changed, 69 deletions(-) diff --git a/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkConflated.kt b/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkConflated.kt index be43e8e6cc..ab2e25801f 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkConflated.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkConflated.kt @@ -1,13 +1,9 @@ package benchmarks -import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import org.openjdk.jmh.annotations.* import java.util.concurrent.* -/** - * Benchmarks with `runBlocking` are significantly skewed by `runBlocking` overhead. - */ @Warmup(iterations = 30, time = 1) @Measurement(iterations = 30, time = 1) @BenchmarkMode(Mode.AverageTime) @@ -17,22 +13,11 @@ import java.util.concurrent.* open class ChannelNanoBenchmarkConflated { var channel: Channel = Channel(Channel.CONFLATED) - @Benchmark - fun send() = runBlocking { - channel.send(42) - } - @Benchmark fun trySend() { channel.trySend(42) } - @Benchmark - fun sendReceive(): Int = runBlocking { - channel.send(42) - return@runBlocking channel.receive() - } - @Benchmark fun trySendTryReceive(): Int { channel.trySend(42) diff --git a/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkUnlimited.kt b/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkUnlimited.kt index b07eeed46c..eb771b623c 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkUnlimited.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/ChannelNanoBenchmarkUnlimited.kt @@ -1,13 +1,9 @@ package benchmarks -import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import org.openjdk.jmh.annotations.* import java.util.concurrent.* -/** - * Benchmarks with `runBlocking` are significantly skewed by `runBlocking` overhead. - */ @Warmup(iterations = 30, time = 1) @Measurement(iterations = 30, time = 1) @BenchmarkMode(Mode.AverageTime) @@ -32,59 +28,9 @@ open class ChannelNanoBenchmarkUnlimited { } } - @Benchmark - fun sendReceive(s: PrefilledChannelState): Int = runBlocking { - s.channel.send(42) - return@runBlocking s.channel.receive() - } - @Benchmark fun trySendTryReceive(s: PrefilledChannelState): Int { s.channel.trySend(42) return s.channel.tryReceive().getOrThrow() } - - @State(Scope.Benchmark) - open class EmptyChannelState { - lateinit var channel: Channel - - @Setup(Level.Iteration) - fun createEmptyChannel() { - channel = Channel(Channel.UNLIMITED) - } - } - - @Benchmark - fun send(s: EmptyChannelState) = runBlocking { - s.channel.send(42) - } - - @Benchmark - fun trySend(s: EmptyChannelState) { - s.channel.trySend(42) - } - - @State(Scope.Benchmark) - open class BigChannelState { - private val list = List(100_000_000) { it } - lateinit var channel: Channel - - @Setup(Level.Iteration) - fun createPrefilledChannel() { - channel = Channel(Channel.UNLIMITED) - for (it in list) { - channel.trySend(it) - } - } - } - - @Benchmark - fun receive(s: BigChannelState): Int = runBlocking { - return@runBlocking s.channel.receive() - } - - @Benchmark - fun tryReceive(s: BigChannelState): Int { - return s.channel.tryReceive().getOrThrow() - } } From 34a449d7e44492b8938efe224157c6d42d1d39b1 Mon Sep 17 00:00:00 2001 From: Natasha Murashkina Date: Wed, 17 Dec 2025 17:08:08 +0000 Subject: [PATCH 25/25] add example benchmarks to check benchmark stability in general --- .../benchmarks/examples/CounterBenchmark.kt | 19 ++++++++ .../benchmarks/examples/ListBenchmark.kt | 48 +++++++++++++++++++ 2 files changed, 67 insertions(+) create mode 100644 benchmarks/src/jmh/kotlin/benchmarks/examples/CounterBenchmark.kt create mode 100644 benchmarks/src/jmh/kotlin/benchmarks/examples/ListBenchmark.kt diff --git a/benchmarks/src/jmh/kotlin/benchmarks/examples/CounterBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/examples/CounterBenchmark.kt new file mode 100644 index 0000000000..cf6db08764 --- /dev/null +++ b/benchmarks/src/jmh/kotlin/benchmarks/examples/CounterBenchmark.kt @@ -0,0 +1,19 @@ +package benchmarks.examples + +import org.openjdk.jmh.annotations.* +import java.util.concurrent.TimeUnit + +@Warmup(iterations = 10, time = 1) +@Measurement(iterations = 10, time = 1) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@State(Scope.Benchmark) +@Fork(1) +open class CounterBenchmark { + var counter: ULong = 0u + + @Benchmark + fun inc() { + counter++ + } +} diff --git a/benchmarks/src/jmh/kotlin/benchmarks/examples/ListBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/examples/ListBenchmark.kt new file mode 100644 index 0000000000..657174385e --- /dev/null +++ b/benchmarks/src/jmh/kotlin/benchmarks/examples/ListBenchmark.kt @@ -0,0 +1,48 @@ +package benchmarks.examples + +import org.openjdk.jmh.annotations.* +import java.util.concurrent.TimeUnit + +@Warmup(iterations = 10, time = 1) +@Measurement(iterations = 10, time = 1) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@Fork(1) +open class ListBenchmark { + @State(Scope.Benchmark) + open class PrefilledList { + @Param("0", "100000", "1000000", "10000000") + var prefill = 0 + + val lst: MutableList = mutableListOf(42) + + @Setup(Level.Iteration) + fun setup() { + repeat(prefill) { lst.add(it) } + } + } + + @Benchmark + fun add(s: PrefilledList) { + s.lst.add(42) + } + + @State(Scope.Benchmark) + open class PrefilledReusableList { + @Param("0", "100000", "1000000", "10000000") + var prefill = 0 + + val lst: MutableList = mutableListOf(42) + + @Setup(Level.Trial) + fun setup() { + repeat(prefill) { lst.add(it) } + } + } + + @Benchmark + fun addRemoveLast(s: PrefilledReusableList) { + s.lst.add(42) + s.lst.removeLast() + } +}