SharedFlow 详解
在 Kotlin 协程中,SharedFlow 是一种非常强大且灵活的热流 (Hot Flow),它旨在解决 BroadcastChannel 的一些痛点,并提供比 StateFlow 更通用的多播能力。你可以把它看作是 StateFlow 和 Channel 的结合体,拥有它们各自的优点,并在此基础上提供了更精细的控制。
SharedFlow 核心特性详解
SharedFlow 就像一个广播电台,它的生产者不断地“播放”数据,而多个“听众”(收集器)可以同时“收听”这些数据。即使新的听众中途加入,他们也可以选择从某个历史点开始“收听”(如果配置了重放)。
1. 热流 (Hot Flow)
- 独立于收集器存在:与冷流 (
Flow) 不同,SharedFlow的生产者即使在没有活跃收集器的情况下也会继续运行并发出值。 - 立即发送:当一个收集器开始收集时,它会立即收到
SharedFlow根据其配置(如重放)所拥有的最新值或历史值。
2. 多播 (Multicast)
- 多个订阅者:
SharedFlow允许多个协程同时收集同一个流。每个订阅者都会收到相同的事件序列(或根据重放配置收到部分历史事件)。 - 事件广播:这使得
SharedFlow非常适合作为事件总线或在多个组件之间广播数据更新。
3. 可配置的重放 (Replay)
replay参数:这是SharedFlow最强大的特性之一。当你创建MutableSharedFlow时,可以指定replay参数,它表示SharedFlow会保留多少个最近发出的值。- 新订阅者的行为:当一个新的收集器开始收集时,它会首先收到这些被重放的历史值,然后才开始接收新的值。
- 默认
replay = 0:这意味着默认情况下,SharedFlow不会重放任何历史值。新订阅者只会收到它订阅之后发出的值。 - 用途:非常适用于“粘性事件”或当新加入的订阅者需要立即获取当前状态的场景(例如,网络连接状态,但又不需要像
StateFlow那样始终保持一个“值”)。
4. 可配置的缓冲区 (Buffer)
extraBufferCapacity参数:除了重放缓冲区之外,SharedFlow还有一个“额外缓冲区”。这个缓冲区用于存储那些已经发出但尚未被所有订阅者处理的值。- 背压处理:如果订阅者处理值的速度跟不上生产者的速度,这些值会暂时存储在额外缓冲区中。当缓冲区满时,
send()操作会根据onBufferOverflow策略进行处理。 onBufferOverflow策略:定义当缓冲区溢出时如何处理新发出的值:SUSPEND(默认):发送操作会挂起,直到缓冲区有空间。这提供了背压机制。DROP_OLDEST:丢弃缓冲区中最旧的值,为新值腾出空间。DROP_LATEST:丢弃新发出的值,不将其添加到缓冲区。
5. 单向性 (Unidirectionality)
MutableSharedFlow具有emit()方法用于发送值。SharedFlow接口则只提供collect()方法用于收集值。这强制了生产者和消费者之间的分离,提供了更好的封装。
6. 永不完成
- 与
Flow不同,SharedFlow和StateFlow一样,永远不会“完成”。它会持续运行并发出值,直到被垃圾回收。
7. 对比 StateFlow 和 Channel
- VS
StateFlow:StateFlow总是持有一个当前值,并且只重放最新的一个值 (replay = 1),且会自动去重。SharedFlow更通用,可以重放多个值 (replay >= 0),不强制持有“当前值”概念(除非replay = 1),且默认不去重(除非手动实现)。StateFlow是SharedFlow的一个特例:MutableStateFlow本质上就是MutableSharedFlow(replay = 1, onBufferOverflow = DROP_OLDEST),并增加了去重功能。
- VS
Channel:Channel通常是点对点的通信(一个元素只被一个消费者接收)。SharedFlow是多播的(一个元素可以被所有活跃消费者接收)。Channel可以关闭并表示完成;SharedFlow不会完成。
SharedFlow 典型使用场景
SharedFlow 的灵活性使其适用于多种场景,尤其是需要广播事件或共享不可变状态的场景:
一次性事件 (One-Shot Events):当需要从 ViewModel 向 UI 发送一次性事件时(例如,显示 Toast 消息、导航事件、弹出 Snackbar),
SharedFlow是一个理想选择,特别是当replay为 0 时。如果配置了replay = 1,它也可以模拟LiveData的粘性事件行为。1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
// ViewModel private val _events = MutableSharedFlow<MyEvent>() val events: SharedFlow<MyEvent> = _events.asSharedFlow() fun onUserClickedSomething() { viewModelScope.launch { _events.emit(MyEvent.ShowToast("Button clicked!")) } } // UI (Fragment/Activity) lifecycleScope.launch { repeatOnLifecycle(Lifecycle.State.STARTED) { viewModel.events.collect { event -> when (event) { is MyEvent.ShowToast -> Toast.makeText(context, event.message, Toast.LENGTH_SHORT).show() } } } }
共享不可变数据流:当多个组件需要监听同一个不可变数据流时,例如实时更新的配置信息、网络连接状态、用户登录状态等。
事件总线 (Event Bus):在应用程序的不同部分之间传递事件,实现松散耦合。
取代
BroadcastChannel:BroadcastChannel已经被弃用,官方推荐使用SharedFlow来实现其功能。
创建和使用 SharedFlow
创建 MutableSharedFlow
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.delay
fun main() = runBlocking {
// 创建一个 MutableSharedFlow,不重放历史值,没有额外缓冲区,默认 SUSPEND 策略
val eventFlow = MutableSharedFlow<String>()
// 创建一个 MutableSharedFlow,重放最近的 2 个值,额外缓冲区容量为 5,当缓冲区满时丢弃最旧的值
val stickyEventFlow = MutableSharedFlow<String>(
replay = 2,
extraBufferCapacity = 5,
onBufferOverflow = kotlinx.coroutines.channels.BufferOverflow.DROP_OLDEST
)
// 作为 SharedFlow 公开,只读
val publicEventFlow: SharedFlow<String> = eventFlow.asSharedFlow()
// 生产者发送事件
val producerJob = launch {
println("Producer starts emitting...")
eventFlow.emit("Event 1")
delay(100)
eventFlow.emit("Event 2")
delay(100)
eventFlow.emit("Event 3") // 只有 Event 3 会被订阅者 1 收到 (因为它没有重放)
delay(100)
stickyEventFlow.emit("Sticky 1")
delay(10)
stickyEventFlow.emit("Sticky 2")
delay(10)
stickyEventFlow.emit("Sticky 3") // Sticky 2 和 Sticky 3 会被订阅者 2 收到 (replay=2)
}
// 订阅者 1 (立即订阅)
val collector1 = launch {
println("Collector 1 started...")
publicEventFlow.collect { event ->
println("Collector 1 received: $event")
}
}
delay(250) // 等待一些事件发出
// 订阅者 2 (延迟订阅,观察 replay 效果)
val collector2 = launch {
println("Collector 2 started...")
stickyEventFlow.collect { event ->
println("Collector 2 received (sticky): $event")
}
}
delay(200) // 等待更多事件和收集
producerJob.cancel()
collector1.cancel()
collector2.cancel()
}
SharedFlow 的关键参数和其影响
replay:replay = 0(默认): 最常用于一次性事件。新收集器只会收到订阅后发出的值。replay = 1: 类似于StateFlow,新收集器会收到最新发出的一个值。但与StateFlow不同,它不提供自动去重。replay > 1: 新收集器会收到最近replay数量的历史值。
extraBufferCapacity:- 定义了在所有收集器处理完之前可以缓冲多少个值。
- 如果
replay + extraBufferCapacity为 0,则emit操作是挂起的,直到有收集器准备好接收。
onBufferOverflow:SUSPEND(默认): 如果缓冲区满了,emit会挂起。这提供了背压,确保所有事件都被处理。DROP_OLDEST: 丢弃缓冲区中最老的值,为新值腾出空间。适合于那些不需要处理所有中间状态,只关心最新状态的场景(例如,UI 更新)。DROP_LATEST: 丢弃新发出的值。这可能会导致数据丢失,应谨慎使用。
总结
SharedFlow 是 Kotlin 协程中一个非常通用的热流实现,它结合了 StateFlow 的状态共享能力和 Channel 的事件传递能力,并通过 replay 和 onBufferOverflow 参数提供了高度的灵活性。
- 当你需要多播事件给多个订阅者时。
- 当你需要重放历史事件给新的订阅者时。
- 当你需要实现一次性事件(如 Toast、导航)且不希望它们像
StateFlow那样保持“当前值”时。 - 当你需要替换已废弃的
BroadcastChannel时。
SharedFlow 都是一个比 StateFlow 或 Channel 更优或更灵活的选择。理解并善用其配置参数是发挥其强大功能,构建健壮和响应式并发应用程序的关键。