在Kotlin协程:协程的基础与使用中,通过使用协程中提供的flow方法可以创建一个Flow对象。这种方法得到的Flow对象实际上是一个异步冷数据流,代码如下:
private suspend fun test() { val flow = flow { emit(1) emit(2) emit(3) emit(4) } GlobalScope.launch { // 触发flow执行 flow.collect { Log.d("liduo", "test1: $it") } } GlobalScope.launch { // 再次触发flow执行 flow.collect { Log.d("liduo", "test2: $it") } } }
在上面的代码中,通过调用flow方法,构建了一个名为flow对象,并对flow对象异步执行了两次。每次都会打印出1、2、3、4,然后结束执行。无论谁在前谁在后,无论执行多少次,得到的结果都是相同的,这就是异步冷数据流的一个特点。
既然有冷数据流,那就一定有热数据流。在协程中提供了MutableSharedFlow方法来创建异步热数据流。相比于异步冷数据流,异步热数据流一般在类似广播订阅的场景中使用。
在异步热数据流中,核心接口的继承关系如下图所示:
SharedFlow接口继承自Flow接口,代码如下:
public interface SharedFlow<out T> : Flow<T> { // 用于保存最近的已经发送的数据 public val replayCache: List<T> }
MutableSharedFlow接口继承自SharedFlow接口与FlowCollector接口,并在此基础上定义了两个方法与一个常量,代码如下:
public interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> { // 该方法用于尝试发射一个数据, // 当返回true时表示发射成功,返回false时,表示缓存空间不足,需要挂起。 public fun tryEmit(value: T): Boolean // 该常量表示当前SharedFlow的订阅者的数量, // 该常量是一个状态流StateFlow,也是一个热流,当其中数值发生变化时会进行回调通知 public val subscriptionCount: StateFlow<Int> // 用于清空replayCache // 在调用该方法之前老的订阅者,可以继续收到replaycache中的缓存数据, // 在调用该方法之后的新的订阅者,只能收到emit方法发射的新数据 @ExperimentalCoroutinesApi public fun resetReplayCache() }
在协程中,可以通过调用MutableSharedFlow方法创建一个MutableSharedFlow接口指向的对象,代码如下:
public fun <T> MutableSharedFlow( replay: Int = 0, extraBufferCapacity: Int = 0, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND ): MutableSharedFlow<T> { ... }
其中构造方法中三个参数的含义如下:
当创建MutableSharedFlow类型的对象时,可以通过参数replay确定SharedFlow接口中定义的replayCache的最大容量,通过参数extraBufferCapacity设置一个不包括replay大小的缓存数量。replayCache本质上也是缓存的一部分,因此extraBufferCapacity与replay共同决定了缓存的大小。
对于处理数据慢的订阅者,可以通过从缓存中获取数据,以此来避免发射者的挂起。缓存的数量大小决定了数据处理快的订阅者与数据处理慢的订阅者之间的延迟程度。
当使用默认的构造方法创建MutableSharedFlow类型的对象时,它的缓存数量为0。当调用它的emit方法时会直接挂起,直到所有的订阅者都处理完当前emit方法发送的数据,才会恢复emit方法的挂起。如果MutableSharedFlow类型的对象没有订阅者,则调用emit方法会直接返回。
代码如下:
private suspend fun test() { // 创建一个热流 val flow = MutableSharedFlow<Int>(2, 3, BufferOverflow.SUSPEND) // 启动一个协程,发射数据:1 // 由于有缓存,因此会被添加到缓存中,不会挂起 GlobalScope.launch { flow.emit(1) } // 将MutableSharedFlow对象转换为SharedFlow对象 // SharedFlow对象不能调用emit方法,因此只能用于接收 val onlyReadFlow = flow.asSharedFlow() // 接收者1 // 启动一个新协程 GlobalScope.launch { // 订阅监听,当collect方法触发订阅时,会首先会调onSubscription方法 onlyReadFlow.onSubscription { Log.d("liduozuishuai", "test0: ") // 发射数据:3 // 向下游发射数据:3,其他接收者收不到 emit(3) }.onEach { // 处理接收的数据 Log.d("liduozuishuai", "test1: $it") }.collect() } // 接收者2 // 启动一个新的协程 GlobalScope.launch { // 触发并处理接收的数据 onlyReadFlow.collect { Log.d("liduozuishuai", "test2: $it") } } // 发送数据:2 GlobalScope.launch { flow.emit(2) } }
对于上面的代码,接收者1会依次打印出:3、1、2,接收者2会依次打印出1、2。