Kotlin协程中的Channel用于处理多个数据组合的流,随用随取,时刻准备着,就像自来水一样,打开开关就有水了。
fun main() = runBlocking { logX("开始") val channel = Channel<Int> { } launch { (1..3).forEach{ channel.send(it) logX("发送数据: $it") } // 关闭channel, 节省资源 channel.close() } launch { for (i in channel){ logX("接收数据: $i") } } logX("结束") }
示例代码 使用Channel创建了一组int类型的数据流,通过send发送数据,并通过for循环取出channel中的数据,最后channel是一种协程资源,使用结束后应该及时调用close方法关闭,以免浪费不必要的资源。
public fun <E> Channel( capacity: Int = RENDEZVOUS, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND, onUndeliveredElement: ((E) -> Unit)? = null ): Channel<E> = when (capacity) { RENDEZVOUS -> {} CONFLATED -> {} UNLIMITED -> {} else -> {} }
可以看到Channel的构造函数包含了三个参数,分别是capacity、onBufferOverflow、onUndeliveredElement.
首先看capacity,这个参数代表了管道的容量,默认参数是RENDEZVOUS,取值是0,还有其他一些值:
接下来看onBufferOverflow, 顾名思义就是管道容量满了,怎么办?默认是挂起,也就是suspend,一共有三种分别是: SUSPNED、DROP_OLDEST以及DROP_LATEST
public enum class BufferOverflow { /** * Suspend on buffer overflow. */ SUSPEND, /** * Drop **the oldest** value in the buffer on overflow, add the new value to the buffer, do not suspend. */ DROP_OLDEST, /** * Drop **the latest** value that is being added to the buffer right now on buffer overflow * (so that buffer contents stay the same), do not suspend. */ DROP_LATEST }
最后一个参数是onUndeliveredElement,从名字看像是没有投递成功的回调,也确实如此,当管道中某些数据没有成功接收时,这个就会被调用。
综合这个参数使用一下
fun main() = runBlocking { println("开始") val channel = Channel<Int>(capacity = 2, onBufferOverflow = BufferOverflow.DROP_OLDEST) { println("onUndeliveredElement = $it") } launch { (1..3).forEach{ channel.send(it) println("发送数据: $it") } // 关闭channel, 节省资源 channel.close() } launch { for (i in channel){ println("接收数据: $i") } } println("结束") } 输出结果如下: 开始 结束 发送数据: 1 发送数据: 2 发送数据: 3 接收数据: 2 接收数据: 3
先看一个例子
val channel: ReceiveChannel<Int> = produce { (1..100).forEach{ send(it) println("发送: $it") } } while (!channel.isClosedForReceive){ val i = channel.receive(); println("接收: $i") } 输出报错信息: Exception in thread "main" kotlinx.coroutines.channels.ClosedReceiveChannelException: Channel was closed
可以看到使用isClosedForReceive判断是否关闭再使用receive方法接收数据,依然会报错,所以不推荐使用这种方式。
推荐使用上面for循环的方式取数据,还有kotlin推荐的consumeEach方式,看一下示例代码
val channel: ReceiveChannel<Int> = produce { (1..100).forEach{ send(it) println("发送: $it") } } channel.consumeEach { println("接收:$it") }
所以,当我们想要获取Channel当中的数据时,我们尽量使用 for 循环,或者是channel.consumeEach {},不要直接调用channel.receive()。
先看一下代码
println("开始") val channel = Channel<Int>(capacity = 3, onBufferOverflow = BufferOverflow.DROP_OLDEST) { println("onUndeliveredElement = $it") } launch { (1..3).forEach{ channel.send(it) println("发送数据: $it") } } println("结束") } 输出: 开始 结束 发送数据: 1 发送数据: 2 发送数据: 3
可以看到上述代码中并没有 取channel中的数据,但是发送的代码正常执行了,这种“不管有没有接收方,发送方都会工作”的模式,就是我们将其认定为“热”的原因。
举个例子,就像去海底捞吃火锅一样,你不需要主动要求服务员加水,服务员看到你的杯子中水少了,会自动给你添加,你只管拿起水杯喝水就行了。
总的来说,不管接收方是否存在,Channel 的发送方一定会工作。
通过源码可以看到Channel只是一个接口,它的能力来源于SendChannel和ReceiveChannel,一个发送管道,一个接收管道,相当于做了一个组合。
这也是一种良好的设计思想,“对读取开放,对写入封闭”的开闭原则。