RxJava 语法速览

📦 1. 创建操作符(Creating Observables)

just - 发射固定的数据序列

1
2
3
4
5
6
import io.reactivex.rxjava3.core.Observable

fun main() {
Observable.just("A", "B", "C")
.subscribe { println(it) }
}

fromArray / fromIterable

1
2
3
4
fun main() {
Observable.fromArray("A", "B", "C")
.subscribe { println(it) }
}

create - 手动发射

1
2
3
4
5
6
7
8
9
10
11
import io.reactivex.rxjava3.core.ObservableEmitter

fun main() {
val observable = Observable.create<String> { emitter: ObservableEmitter<String> ->
emitter.onNext("Hello")
emitter.onNext("RxJava")
emitter.onComplete()
}

observable.subscribe { println(it) }
}

🔄 2. 转换操作符(Transforming Observables)

map

1
2
3
4
5
fun main() {
Observable.just(1, 2, 3)
.map { it * 10 }
.subscribe { println(it) }
}

flatMap

1
2
3
4
5
fun main() {
Observable.just("A", "B")
.flatMap { letter -> Observable.just("$letter-1", "$letter-2") }
.subscribe { println(it) }
}

concatMap(保持顺序)

1
2
3
4
5
fun main() {
Observable.just("A", "B")
.concatMap { letter -> Observable.just("$letter-1", "$letter-2") }
.subscribe { println(it) }
}

🎛️ 3. 过滤操作符(Filtering)

filter

1
2
3
4
5
fun main() {
Observable.just(1, 2, 3, 4)
.filter { it % 2 == 0 }
.subscribe { println(it) }
}

take, skip

1
2
3
4
5
fun main() {
Observable.just("A", "B", "C", "D")
.take(2)
.subscribe { println(it) }
}

distinct, distinctUntilChanged

1
2
3
4
5
fun main() {
Observable.just(1, 2, 2, 3, 3, 3, 4)
.distinctUntilChanged()
.subscribe { println(it) }
}

⏱️ 4. 时间相关操作符

interval, timer

1
2
3
4
5
6
7
8
9
import java.util.concurrent.TimeUnit

fun main() {
Observable.interval(1, TimeUnit.SECONDS)
.take(3)
.subscribe { println("Tick $it") }

Thread.sleep(4000)
}

📋 5. 组合操作符(Combining Observables)

merge, concat

1
2
3
4
5
6
7
fun main() {
val o1 = Observable.just("A", "B")
val o2 = Observable.just("1", "2")

Observable.merge(o1, o2)
.subscribe { println(it) }
}

zip

1
2
3
4
5
6
7
fun main() {
val o1 = Observable.just("A", "B")
val o2 = Observable.just("1", "2")

Observable.zip(o1, o2) { a, b -> "$a$b" }
.subscribe { println(it) }
}

🔁 6. 重试与重复

repeat, retry

1
2
3
4
5
fun main() {
Observable.just("Hello")
.repeat(3)
.subscribe { println(it) }
}

🧵 7. 线程调度(Schedulers)

1
2
3
4
5
6
7
8
9
10
import io.reactivex.rxjava3.schedulers.Schedulers

fun main() {
Observable.just("A", "B", "C")
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe { println("Thread: ${Thread.currentThread().name}, Value: $it") }

Thread.sleep(1000)
}

✅ 8. 观察者写法(Observer / Consumer)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import io.reactivex.rxjava3.core.Observer
import io.reactivex.rxjava3.disposables.Disposable

fun main() {
val observable = Observable.just("A", "B", "C")

val observer = object : Observer<String> {
override fun onSubscribe(d: Disposable) {
println("Subscribed")
}

override fun onNext(t: String) {
println("Received: $t")
}

override fun onError(e: Throwable) {
println("Error: ${e.message}")
}

override fun onComplete() {
println("Completed")
}
}

observable.subscribe(observer)
}

🧹 9. 错误处理

onErrorReturn

1
2
3
4
5
6
fun main() {
Observable.just(1, 2, 0, 3)
.map { 10 / it }
.onErrorReturn { -1 }
.subscribe { println(it) }
}

onErrorResumeNext

1
2
3
4
5
fun main() {
Observable.error<Int>(Throwable("Oops"))
.onErrorResumeNext(Observable.just(100))
.subscribe { println(it) }
}

🧪 10. Flowable(处理背压 Backpressure)

1
2
3
4
5
6
7
8
9
10
11
12
import io.reactivex.rxjava3.core.BackpressureStrategy
import io.reactivex.rxjava3.core.Flowable

fun main() {
Flowable.create<String>({ emitter ->
for (i in 1..5) {
emitter.onNext("Item $i")
}
emitter.onComplete()
}, BackpressureStrategy.BUFFER)
.subscribe { println(it) }
}

⛓️ 11. Disposable & 清理资源

1
2
3
4
5
6
7
8
9
10
11
import io.reactivex.rxjava3.disposables.CompositeDisposable

fun main() {
val disposables = CompositeDisposable()

val d = Observable.just("A", "B")
.subscribe { println(it) }

disposables.add(d)
disposables.clear() // 清理所有
}

⛳ 12. Subject(桥接 Observer 与 Observable)

PublishSubject

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import io.reactivex.rxjava3.subjects.PublishSubject

fun main() {
val subject = PublishSubject.create<String>()

subject.subscribe { println("Subscriber 1: $it") }

subject.onNext("A")
subject.onNext("B")

subject.subscribe { println("Subscriber 2: $it") }

subject.onNext("C")
}

🧠 常用类型对比

类型 特点
Observable 无背压处理,适用于少量数据
Flowable 可处理大量数据,支持背压
Single 发射一个值或错误
Maybe 发射一个值、无值或错误
Completable 只关心完成或出错,不发射任何值

继续为你整理 RxJava 全部语法,使用 Kotlin 编写、可直接运行的代码示例。这部分涵盖的是中级到高级语法:


🔄 8. flatMap():将每个 item 映射为另一个 Observable

1
2
3
4
5
6
7
8
9
import io.reactivex.rxjava3.core.Observable

fun main() {
Observable.just("A", "B")
.flatMap { letter ->
Observable.fromArray("$letter-1", "$letter-2")
}
.subscribe { println(it) }
}

输出:

1
2
3
4
A-1
A-2
B-1
B-2

🧷 9. concatMap():与 flatMap 相似,但保持顺序

1
2
3
4
5
6
7
8
9
10
import io.reactivex.rxjava3.core.Observable
import java.util.concurrent.TimeUnit

fun main() {
Observable.just("A", "B")
.concatMap { letter ->
Observable.fromArray("$letter-1", "$letter-2").delay(100, TimeUnit.MILLISECONDS)
}
.blockingSubscribe { println(it) }
}

🔍 10. filter():过滤数据

1
2
3
4
5
6
7
import io.reactivex.rxjava3.core.Observable

fun main() {
Observable.just(1, 2, 3, 4, 5)
.filter { it % 2 == 0 }
.subscribe { println(it) } // 输出 2, 4
}

🧮 11. reduce():归约操作,返回一个 Single

1
2
3
4
5
6
7
import io.reactivex.rxjava3.core.Observable

fun main() {
Observable.just(1, 2, 3, 4)
.reduce { t1, t2 -> t1 + t2 }
.subscribe { println(it) } // 输出 10
}

📦 12. scan():类似 reduce,但会发出每一步的结果

1
2
3
4
5
6
7
import io.reactivex.rxjava3.core.Observable

fun main() {
Observable.just(1, 2, 3, 4)
.scan { acc, next -> acc + next }
.subscribe { println(it) }
}

输出:

1
2
3
4
1
3
6
10

⌛ 13. delay():延迟发射

1
2
3
4
5
6
7
8
import io.reactivex.rxjava3.core.Observable
import java.util.concurrent.TimeUnit

fun main() {
Observable.just("Hello")
.delay(1, TimeUnit.SECONDS)
.blockingSubscribe { println(it) }
}

🧪 14. take():只取前 N 个数据

1
2
3
4
5
6
7
import io.reactivex.rxjava3.core.Observable

fun main() {
Observable.range(1, 10)
.take(3)
.subscribe { println(it) } // 输出 1, 2, 3
}

🔁 15. repeat():重复整个流

1
2
3
4
5
6
7
import io.reactivex.rxjava3.core.Observable

fun main() {
Observable.just("A")
.repeat(3)
.subscribe { println(it) } // 输出 A A A
}

💥 16. retry():出错时重试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import io.reactivex.rxjava3.core.Observable

fun main() {
var attempt = 0
Observable.create<String> { emitter ->
if (++attempt < 3) {
emitter.onError(RuntimeException("fail"))
} else {
emitter.onNext("Success")
emitter.onComplete()
}
}
.retry(2)
.subscribe({ println(it) }, { println("Error: ${it.message}") })
}

⏳ 17. timeout():超时控制

1
2
3
4
5
6
7
8
9
10
11
12
import io.reactivex.rxjava3.core.Observable
import java.util.concurrent.TimeUnit

fun main() {
Observable.just("Hello")
.delay(2, TimeUnit.SECONDS)
.timeout(1, TimeUnit.SECONDS)
.subscribe(
{ println(it) },
{ println("Timeout error: ${it.message}") }
)
}

继续为你整理 RxJava 全部语法(Kotlin 示例,可直接运行),现在进入组合、错误处理与调度等高级主题:


🔗 18. zip():组合多个 Observable,逐个配对

1
2
3
4
5
6
7
8
9
10
import io.reactivex.rxjava3.core.Observable

fun main() {
val numbers = Observable.just(1, 2, 3)
val letters = Observable.just("A", "B", "C")

Observable.zip(numbers, letters) { number, letter ->
"$number-$letter"
}.subscribe { println(it) }
}

输出:

1
2
3
1-A
2-B
3-C

🔄 19. combineLatest():任意一个 Observable 发射数据就组合最新值

1
2
3
4
5
6
7
8
9
10
11
import io.reactivex.rxjava3.core.Observable
import java.util.concurrent.TimeUnit

fun main() {
val obs1 = Observable.interval(100, TimeUnit.MILLISECONDS).map { "A$it" }
val obs2 = Observable.interval(150, TimeUnit.MILLISECONDS).map { "B$it" }

Observable.combineLatest(obs1, obs2) { a, b -> "$a:$b" }
.take(5)
.blockingSubscribe { println(it) }
}

🔄 20. withLatestFrom():当主 Observable 发射时,结合最近另一个 Observable 的数据

1
2
3
4
5
6
7
8
9
10
11
import io.reactivex.rxjava3.core.Observable
import java.util.concurrent.TimeUnit

fun main() {
val main = Observable.interval(200, TimeUnit.MILLISECONDS).map { "Main-$it" }
val other = Observable.interval(100, TimeUnit.MILLISECONDS).map { "Other-$it" }

main.withLatestFrom(other) { m, o -> "$m + $o" }
.take(5)
.blockingSubscribe { println(it) }
}

⚠️ 21. onErrorReturn():发生错误时返回一个默认值

1
2
3
4
5
6
7
8
9
10
import io.reactivex.rxjava3.core.Observable

fun main() {
Observable.create<Int> {
it.onError(Exception("Boom"))
}.onErrorReturn {
println("Caught error: ${it.message}")
-1
}.subscribe { println(it) }
}

🛠 22. onErrorResumeNext():出错后切换到备用流

1
2
3
4
5
6
7
import io.reactivex.rxjava3.core.Observable

fun main() {
Observable.error<Int>(Exception("Oops"))
.onErrorResumeNext(Observable.just(100, 200))
.subscribe { println(it) }
}

🧯 23. doOnError():处理但不拦截错误

1
2
3
4
5
6
7
8
9
10
11
12
import io.reactivex.rxjava3.core.Observable

fun main() {
Observable.create<Int> {
it.onError(Exception("Boom"))
}.doOnError {
println("Logging error: ${it.message}")
}.subscribe(
{ println("Next: $it") },
{ println("Final error: ${it.message}") }
)
}

⚙️ 24. 调度器(Schedulers)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import io.reactivex.rxjava3.core.Observable
import io.reactivex.rxjava3.schedulers.Schedulers

fun main() {
Observable.create<String> {
println("Emitting on thread: ${Thread.currentThread().name}")
it.onNext("Hello")
it.onComplete()
}
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe {
println("Received on thread: ${Thread.currentThread().name} -> $it")
}

Thread.sleep(500) // 保证异步线程有时间执行
}

📥 25. Flowable 与背压处理(Backpressure)

1
2
3
4
5
6
7
8
9
10
11
12
import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.schedulers.Schedulers

fun main() {
Flowable.range(1, 1_000_000)
.onBackpressureDrop()
.observeOn(Schedulers.computation(), false, 10)
.subscribe({ println("Received: $it") },
{ println("Error: ${it.message}") })

Thread.sleep(1000)
}

✅ 26. Single 用法(发射一个值或错误)

1
2
3
4
5
6
import io.reactivex.rxjava3.core.Single

fun main() {
Single.just("Single Result")
.subscribe { value -> println("Received: $value") }
}

✅ 27. Maybe 用法(可能发射一个值,也可能什么都不发)

1
2
3
4
5
6
7
8
9
10
import io.reactivex.rxjava3.core.Maybe

fun main() {
Maybe.just("Maybe Result")
.subscribe(
{ println("Success: $it") },
{ println("Error: ${it.message}") },
{ println("Completed with no item") }
)
}

✅ 28. Completable 用法(只发射完成或错误)

1
2
3
4
5
6
7
8
9
10
import io.reactivex.rxjava3.core.Completable

fun main() {
Completable.fromRunnable {
println("Doing something...")
}.subscribe(
{ println("Completed!") },
{ println("Error: ${it.message}") }
)
}

好的,继续补充 RxJava 全部语法(Kotlin 可运行示例),进入进阶部分:自定义操作符、冷/热 Observable、多播、以及与 Retrofit 配合使用的实战示例。


🧰 29. compose():封装通用操作链

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import io.reactivex.rxjava3.core.Observable
import io.reactivex.rxjava3.core.ObservableTransformer

fun <T> applyCommonSchedulers(): ObservableTransformer<T, T> {
return ObservableTransformer { upstream ->
upstream.doOnNext { println("Logging: $it") }
.filter { it != null }
}
}

fun main() {
Observable.just("One", "Two", "Three")
.compose(applyCommonSchedulers())
.subscribe { println("Received: $it") }
}

🔨 30. lift():自定义操作符(不推荐常用)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import io.reactivex.rxjava3.core.*

fun main() {
val customOperator = Observable.lift<String, String> { observer ->
object : Observer<String> {
override fun onSubscribe(d: Disposable) {
observer.onSubscribe(d)
}

override fun onNext(t: String) {
observer.onNext("[$t]") // 自定义包装输出
}

override fun onError(e: Throwable) {
observer.onError(e)
}

override fun onComplete() {
observer.onComplete()
}
}
}

Observable.just("A", "B")
.lift(customOperator)
.subscribe { println("Result: $it") }
}

🔥 31. 冷 vs 热 Observable

冷(Cold)Observable:每次订阅都会重新执行

1
2
3
4
5
6
7
8
9
10
fun main() {
val cold = Observable.create<Int> {
println("Generating...")
it.onNext((1..100).random())
it.onComplete()
}

cold.subscribe { println("Observer 1: $it") }
cold.subscribe { println("Observer 2: $it") }
}

热(Hot)Observable:共享事件流,不重复发射

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import io.reactivex.rxjava3.subjects.PublishSubject

fun main() {
val hot = PublishSubject.create<String>()

hot.subscribe { println("Observer 1: $it") }

hot.onNext("A")
hot.onNext("B")

hot.subscribe { println("Observer 2: $it") }

hot.onNext("C")
}

输出:

1
2
3
4
Observer 1: A
Observer 1: B
Observer 1: C
Observer 2: C

📡 32. 多播操作符:publish(), connect(), refCount()

使用 publish().connect() 构建热 Observable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
fun main() {
val source = Observable.interval(500, java.util.concurrent.TimeUnit.MILLISECONDS)
.take(5)
.publish()

source.subscribe { println("Observer 1: $it") }
source.connect()

Thread.sleep(1200)

source.subscribe { println("Observer 2: $it") }

Thread.sleep(2000)
}

🌐 33. Retrofit + RxJava 实战示例

Gradle 添加依赖:

1
2
3
4
implementation("com.squareup.retrofit2:retrofit:2.9.0")
implementation("com.squareup.retrofit2:adapter-rxjava3:2.9.0")
implementation("io.reactivex.rxjava3:rxjava:3.1.6")
implementation("com.squareup.retrofit2:converter-gson:2.9.0")

接口定义:

1
2
3
4
5
6
7
8
9
import io.reactivex.rxjava3.core.Single
import retrofit2.http.GET

data class Post(val userId: Int, val id: Int, val title: String)

interface ApiService {
@GET("posts/1")
fun getPost(): Single<Post>
}

Retrofit 配置与调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import retrofit2.Retrofit
import retrofit2.converter.gson.GsonConverterFactory
import retrofit2.adapter.rxjava3.RxJava3CallAdapterFactory

fun main() {
val retrofit = Retrofit.Builder()
.baseUrl("https://jsonplaceholder.typicode.com/")
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava3CallAdapterFactory.create())
.build()

val api = retrofit.create(ApiService::class.java)

api.getPost()
.subscribe(
{ println("Title: ${it.title}") },
{ println("Error: ${it.message}") }
)

Thread.sleep(1000)
}

继续为你整理 RxJava 全部语法(Kotlin 可运行示例),本节重点:Subject 系列、错误处理、调度器详解、测试工具等。


🎯 34. Subject 类型比较

类型 特性
PublishSubject 仅发送订阅之后的数据
BehaviorSubject 发送最新的一个数据 + 后续数据
ReplaySubject 发送所有历史数据 + 后续数据
AsyncSubject 只发送最后一个数据(在 onComplete 时发送)

🔊 PublishSubject 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import io.reactivex.rxjava3.subjects.PublishSubject

fun main() {
val subject = PublishSubject.create<String>()

subject.subscribe { println("Observer1: $it") }

subject.onNext("A")
subject.onNext("B")

subject.subscribe { println("Observer2: $it") }

subject.onNext("C")
}

输出:

1
2
3
4
Observer1: A
Observer1: B
Observer1: C
Observer2: C

🧭 BehaviorSubject 示例

1
2
3
4
5
6
7
8
9
10
import io.reactivex.rxjava3.subjects.BehaviorSubject

fun main() {
val subject = BehaviorSubject.createDefault("Initial")

subject.onNext("A")
subject.subscribe { println("Observer1: $it") }

subject.onNext("B")
}

输出:

1
2
Observer1: A
Observer1: B

🔁 ReplaySubject 示例

1
2
3
4
5
6
7
8
9
10
import io.reactivex.rxjava3.subjects.ReplaySubject

fun main() {
val subject = ReplaySubject.create<String>()

subject.onNext("A")
subject.onNext("B")

subject.subscribe { println("Observer: $it") }
}

输出:

1
2
Observer: A
Observer: B

🕘 AsyncSubject 示例

1
2
3
4
5
6
7
8
9
10
11
12
import io.reactivex.rxjava3.subjects.AsyncSubject

fun main() {
val subject = AsyncSubject.create<String>()

subject.onNext("A")
subject.onNext("B")
subject.subscribe { println("Observer: $it") }

subject.onNext("C")
subject.onComplete()
}

输出:

1
Observer: C

🚨 35. 错误处理相关操作符

onErrorReturn:错误时返回默认值

1
2
3
4
5
6
7
8
import io.reactivex.rxjava3.core.Observable

fun main() {
Observable.just(1, 2, 0)
.map { 10 / it }
.onErrorReturn { -1 }
.subscribe { println(it) }
}

onErrorResumeNext:错误时切换 Observable

1
2
3
4
5
6
7
fun main() {
Observable.create<Int> {
it.onNext(1)
it.onError(RuntimeException("Oops"))
}.onErrorResumeNext(Observable.just(100, 200))
.subscribe { println(it) }
}

retry():失败后重试

1
2
3
4
5
6
7
8
9
10
fun main() {
var count = 0

Observable.create<Int> {
count++
if (count < 3) it.onError(Exception("Fail $count"))
else it.onNext(100)
}.retry(3)
.subscribe({ println("Success: $it") }, { println("Error: ${it.message}") })
}

🧵 36. 调度器详解(Schedulers)

Scheduler 类型 说明
Schedulers.io() IO 密集型,如网络请求、文件读写
Schedulers.computation() CPU 密集型操作,如事件计算、算法
Schedulers.newThread() 每次创建一个新线程
Schedulers.single() 单线程
AndroidSchedulers.mainThread() Android UI 主线程(需引入 RxAndroid)

调度器示例

1
2
3
4
5
6
7
8
9
10
11
12
import io.reactivex.rxjava3.schedulers.Schedulers

fun main() {
Observable.just("A", "B", "C")
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe {
println("Received $it on thread: ${Thread.currentThread().name}")
}

Thread.sleep(500)
}

✅ 37. 单元测试:TestObserver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import io.reactivex.rxjava3.core.Observable
import io.reactivex.rxjava3.observers.TestObserver

fun main() {
val observable = Observable.just("One", "Two", "Three")

val testObserver = TestObserver<String>()
observable.subscribe(testObserver)

testObserver.assertComplete()
testObserver.assertValues("One", "Two", "Three")

println("All assertions passed.")
}

✅ 38. 自定义操作符(扩展函数)

1
2
3
4
5
6
7
8
9
fun <T> Observable<T>.logEach(): Observable<T> {
return this.doOnNext { println("Logging: $it") }
}

fun main() {
Observable.just(1, 2, 3)
.logEach()
.subscribe { println("Received: $it") }
}

继续为你补充 RxJava 全部语法(Kotlin 示例),本节将涵盖:

  • Flowable 与背压
  • ConnectableObservable
  • Debounce/Throttle/Buffer 等时间控制操作符
  • RxJava + Room 数据响应
  • RxBinding 防抖示例(如 EditText)

🚰 39. Flowable 与背压(Backpressure)

当发射数据速度远快于消费速度时,使用 Flowable 替代 Observable,并指定背压策略。

示例:快速发射 + 背压处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import io.reactivex.rxjava3.core.BackpressureStrategy
import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.schedulers.Schedulers

fun main() {
val flowable = Flowable.create<Int>({ emitter ->
for (i in 1..1_000_000) {
emitter.onNext(i)
}
emitter.onComplete()
}, BackpressureStrategy.BUFFER) // DROP/MISSING/LATEST/ERROR

flowable
.observeOn(Schedulers.io())
.subscribe(
{ println("Received: $it") },
{ it.printStackTrace() }
)

Thread.sleep(2000)
}

🔁 40. ConnectableObservable:控制发射时机

使多个订阅者共享同一个数据源,并延迟开始发送数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import io.reactivex.rxjava3.observables.ConnectableObservable

fun main() {
val source = ConnectableObservable.interval(1, java.util.concurrent.TimeUnit.SECONDS).publish()

source.subscribe { println("Observer 1: $it") }

Thread.sleep(2000)

source.subscribe { println("Observer 2: $it") }

source.connect() // 开始发射
Thread.sleep(5000)
}

⏳ 41. 防抖(debounce)、节流(throttleFirst

debounce: 停止输入一段时间后才发射

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import io.reactivex.rxjava3.core.Observable
import java.util.concurrent.TimeUnit

fun main() {
Observable.create<String> { emitter ->
emitter.onNext("H")
Thread.sleep(100)
emitter.onNext("He")
Thread.sleep(400)
emitter.onNext("Hel")
Thread.sleep(600)
emitter.onNext("Hell")
Thread.sleep(1000)
emitter.onComplete()
}.debounce(500, TimeUnit.MILLISECONDS)
.subscribe { println("Final Input: $it") }
}

throttleFirst: 一段时间内只取第一个

1
2
3
4
5
6
7
8
fun main() {
Observable.interval(100, TimeUnit.MILLISECONDS)
.throttleFirst(300, TimeUnit.MILLISECONDS)
.take(10)
.subscribe { println("Throttled: $it") }

Thread.sleep(2000)
}

📦 42. buffer:收集成批次

1
2
3
4
5
fun main() {
Observable.range(1, 10)
.buffer(3)
.subscribe { println("Buffer: $it") }
}

输出:

1
2
3
4
Buffer: [1, 2, 3]
Buffer: [4, 5, 6]
Buffer: [7, 8, 9]
Buffer: [10]

🏠 43. RxJava + Room(示例结构)

Room DAO 支持返回 FlowableObservable 类型。

1
2
3
4
5
@Dao
interface UserDao {
@Query("SELECT * FROM user")
fun getAllUsers(): Flowable<List<User>>
}
1
2
3
4
userDao.getAllUsers()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe { users -> show(users) }

🎯 44. RxBinding 防抖点击

1
2
3
4
5
6
7
8
// implementation "com.jakewharton.rxbinding4:rxbinding:4.0.0"

import com.jakewharton.rxbinding4.view.clicks
import java.util.concurrent.TimeUnit

button.clicks()
.throttleFirst(1, TimeUnit.SECONDS)
.subscribe { showToast("Clicked!") }

✅ 45. intervaltimer 使用

1
2
3
4
5
6
7
// 每秒发射一个值
Observable.interval(1, TimeUnit.SECONDS)
.subscribe { println("Tick: $it") }

// 延迟 3 秒发射
Observable.timer(3, TimeUnit.SECONDS)
.subscribe { println("Time's up!") }

🧰 46. dispose / 资源释放

1
2
3
4
5
val disposable = Observable.interval(1, TimeUnit.SECONDS)
.subscribe { println("Tick: $it") }

Thread.sleep(3000)
disposable.dispose() // 停止订阅

我们继续补充 RxJava 高级用法(Kotlin 示例),涵盖以下几个实用主题:


🔀 47. RxJava 线程切换全流程

1
2
3
4
5
6
7
Observable.just("Start")
.subscribeOn(Schedulers.io()) // 上游开始在 IO 线程
.doOnNext { println("doOnNext 1 on ${Thread.currentThread().name}") }
.observeOn(Schedulers.computation())
.doOnNext { println("doOnNext 2 on ${Thread.currentThread().name}") }
.observeOn(AndroidSchedulers.mainThread()) // Android 专用
.subscribe { println("Subscribe on ${Thread.currentThread().name}") }

如果不是 Android 平台,可将 AndroidSchedulers.mainThread() 替换为 Schedulers.trampoline()


🔁 48. RxJava → Kotlin Flow 对照表

RxJava Kotlin Flow
Observable.create flow { emit(...) }
subscribeOn/observeOn flowOn, withContext in collectors
map map
flatMap flatMapConcat, flatMapMerge
zip zip
debounce debounce
filter filter
onErrorResumeNext catch { ... emit(...) }
retry retry()
Disposable.dispose() Job.cancel()

示例:

1
2
3
4
5
6
7
8
9
// RxJava
Observable.range(1, 5)
.map { it * 2 }
.subscribe { println(it) }

// Kotlin Flow
flowOf(1, 2, 3, 4, 5)
.map { it * 2 }
.collect { println(it) }

🌐 49. Retrofit + RxJava 实战

添加依赖

1
2
3
implementation "com.squareup.retrofit2:retrofit:2.9.0"
implementation "com.squareup.retrofit2:adapter-rxjava3:2.9.0"
implementation "com.squareup.retrofit2:converter-gson:2.9.0"

定义 API

1
2
3
4
interface ApiService {
@GET("users")
fun getUsers(): Observable<List<User>>
}

使用 RxJava + Retrofit

1
2
3
4
5
6
7
8
9
10
11
12
val retrofit = Retrofit.Builder()
.baseUrl("https://api.example.com/")
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava3CallAdapterFactory.create())
.build()

val api = retrofit.create(ApiService::class.java)

api.getUsers()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({ users -> println(users) }, { error -> error.printStackTrace() })

⚙️ 50. 自定义操作符

通过扩展函数封装复杂逻辑:

1
2
3
fun <T> Observable<T>.debugTag(tag: String): Observable<T> {
return this.doOnNext { println("[$tag] $it") }
}

使用:

1
2
3
Observable.range(1, 3)
.debugTag("MyLog")
.subscribe()

🔁 51. 异常恢复链:retryWhen + delay

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Observable.create<String> { emitter ->
println("Trying...")
emitter.onError(RuntimeException("Network error"))
}.retryWhen { errors ->
errors.zipWith(Observable.range(1, 3)) { e, i ->
println("Retry $i after error: $e")
i
}.flatMap {
Observable.timer(1, TimeUnit.SECONDS)
}
}.subscribe(
{ println("Success: $it") },
{ println("Failed after retries: ${it.message}") }
)

✅ 52. 流程终止处理:takeUntil, takeWhile

1
2
3
Observable.interval(500, TimeUnit.MILLISECONDS)
.takeUntil { it >= 5 }
.subscribe { println("takeUntil: $it") }
1
2
3
Observable.range(1, 10)
.takeWhile { it < 5 }
.subscribe { println("takeWhile: $it") }

继续为你补充 RxJava 高级用法(Kotlin 示例),包括并发执行、流合并、生命周期绑定、登录实战、RxJava → Flow 迁移技巧。


🚀 53. concatMapEager:并发执行、顺序输出

1
2
3
4
5
6
7
8
9
Observable.just(1, 2, 3)
.concatMapEager { id ->
Observable.fromCallable {
println("Start job $id on ${Thread.currentThread().name}")
Thread.sleep(1000L - id * 200)
"Result $id"
}.subscribeOn(Schedulers.io())
}
.subscribe { println("Received: $it") }

✅ 并发执行任务,但结果按原始顺序发射。


🔗 54. 多流合并策略

merge:同时发射(无序)

1
2
3
4
5
6
Observable.merge(
Observable.interval(300, TimeUnit.MILLISECONDS).map { "A$it" },
Observable.interval(500, TimeUnit.MILLISECONDS).map { "B$it" }
)
.take(5)
.subscribe { println(it) }

concat:按顺序发射(一个结束后下一个)

1
2
3
4
Observable.concat(
Observable.just("First"),
Observable.just("Second")
).subscribe { println(it) }

zip:配对发射(两个都准备好)

1
2
3
4
5
Observable.zip(
Observable.just("A", "B", "C"),
Observable.just(1, 2, 3)
) { a, b -> "$a$b" }
.subscribe { println(it) } // 输出:A1, B2, C3

combineLatest:任一源更新即组合最新值

1
2
3
4
5
6
Observable.combineLatest(
Observable.interval(100, TimeUnit.MILLISECONDS),
Observable.interval(250, TimeUnit.MILLISECONDS)
) { a, b -> "Latest A=$a, B=$b" }
.take(5)
.subscribe { println(it) }

📱 55. 与 Android Lifecycle 管理绑定(RxLifecycle)

依赖:

1
implementation "com.trello.rxlifecycle4:rxlifecycle-android-lifecycle-kotlin:4.0.2"

使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class MainActivity : AppCompatActivity(), LifecycleProvider<Lifecycle.Event> {
private val lifecycleSubject = BehaviorSubject.create<Lifecycle.Event>()

override fun lifecycle(): Observable<Lifecycle.Event> = lifecycleSubject.hide()

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
lifecycleSubject.onNext(Lifecycle.Event.ON_CREATE)

Observable.interval(1, TimeUnit.SECONDS)
.compose(bindUntilEvent(Lifecycle.Event.ON_DESTROY))
.subscribe { println("tick: $it") }
}

override fun onDestroy() {
super.onDestroy()
lifecycleSubject.onNext(Lifecycle.Event.ON_DESTROY)
}
}

🧪 56. 登录请求实战:防抖 + 接口 + 错误恢复

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
val loginClicks = PublishSubject.create<Pair<String, String>>()

loginClicks
.debounce(300, TimeUnit.MILLISECONDS)
.distinctUntilChanged()
.switchMapSingle { (username, password) ->
loginApi.login(username, password)
.retryWhen { errors ->
errors.take(3).delay(1, TimeUnit.SECONDS)
}
.subscribeOn(Schedulers.io())
}
.observeOn(AndroidSchedulers.mainThread())
.subscribe({ result ->
println("Login success: $result")
}, { error ->
println("Login failed: ${error.message}")
})

🔄 57. RxJava → Kotlin Flow 实际迁移技巧

原始 RxJava:

1
2
3
4
Observable.interval(1, TimeUnit.SECONDS)
.map { it * 2 }
.filter { it % 3 == 0 }
.subscribe { println(it) }

Kotlin Flow 改写:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
fun flowExample(): Flow<Long> = flow {
var i = 0L
while (true) {
emit(i++)
delay(1000)
}
}

runBlocking {
flowExample()
.map { it * 2 }
.filter { it % 3 == 0L }
.collect { println(it) }
}

迁移小贴士:

RxJava Kotlin Flow
Observable/Flowable Flow
subscribe() collect()
Schedulers.io() withContext(Dispatchers.IO)
retryWhen, onErrorResumeNext retry, catch
debounce debounce()
switchMap flatMapLatest
Disposable.dispose() Job.cancel()