📦 1. 创建操作符(Creating
Observables)
just
-
发射固定的数据序列
1 2 3 4 5 6 import io.reactivex.rxjava3.core.Observablefun main () { Observable.just("A" , "B" , "C" ) .subscribe { println(it) } }
fromArray
/
fromIterable
1 2 3 4 fun main () { Observable.fromArray("A" , "B" , "C" ) .subscribe { println(it) } }
create
- 手动发射
1 2 3 4 5 6 7 8 9 10 11 import io.reactivex.rxjava3.core.ObservableEmitterfun main () { val observable = Observable.create<String> { emitter: ObservableEmitter<String> -> emitter.onNext("Hello" ) emitter.onNext("RxJava" ) emitter.onComplete() } observable.subscribe { println(it) } }
map
1 2 3 4 5 fun main () { Observable.just(1 , 2 , 3 ) .map { it * 10 } .subscribe { println(it) } }
flatMap
1 2 3 4 5 fun main () { Observable.just("A" , "B" ) .flatMap { letter -> Observable.just("$letter -1" , "$letter -2" ) } .subscribe { println(it) } }
concatMap
(保持顺序)
1 2 3 4 5 fun main () { Observable.just("A" , "B" ) .concatMap { letter -> Observable.just("$letter -1" , "$letter -2" ) } .subscribe { println(it) } }
🎛️ 3. 过滤操作符(Filtering)
filter
1 2 3 4 5 fun main () { Observable.just(1 , 2 , 3 , 4 ) .filter { it % 2 == 0 } .subscribe { println(it) } }
take
, skip
1 2 3 4 5 fun main () { Observable.just("A" , "B" , "C" , "D" ) .take(2 ) .subscribe { println(it) } }
distinct
,
distinctUntilChanged
1 2 3 4 5 fun main () { Observable.just(1 , 2 , 2 , 3 , 3 , 3 , 4 ) .distinctUntilChanged() .subscribe { println(it) } }
⏱️ 4. 时间相关操作符
interval
, timer
1 2 3 4 5 6 7 8 9 import java.util.concurrent.TimeUnitfun main () { Observable.interval(1 , TimeUnit.SECONDS) .take(3 ) .subscribe { println("Tick $it " ) } Thread.sleep(4000 ) }
📋 5. 组合操作符(Combining
Observables)
merge
, concat
1 2 3 4 5 6 7 fun main () { val o1 = Observable.just("A" , "B" ) val o2 = Observable.just("1" , "2" ) Observable.merge(o1, o2) .subscribe { println(it) } }
zip
1 2 3 4 5 6 7 fun main () { val o1 = Observable.just("A" , "B" ) val o2 = Observable.just("1" , "2" ) Observable.zip(o1, o2) { a, b -> "$a $b " } .subscribe { println(it) } }
🔁 6. 重试与重复
repeat
, retry
1 2 3 4 5 fun main () { Observable.just("Hello" ) .repeat(3 ) .subscribe { println(it) } }
🧵 7. 线程调度(Schedulers)
1 2 3 4 5 6 7 8 9 10 import io.reactivex.rxjava3.schedulers.Schedulersfun main () { Observable.just("A" , "B" , "C" ) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()) .subscribe { println("Thread: ${Thread.currentThread().name} , Value: $it " ) } Thread.sleep(1000 ) }
✅ 8. 观察者写法(Observer /
Consumer)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import io.reactivex.rxjava3.core.Observerimport io.reactivex.rxjava3.disposables.Disposablefun main () { val observable = Observable.just("A" , "B" , "C" ) val observer = object : Observer<String> { override fun onSubscribe (d: Disposable ) { println("Subscribed" ) } override fun onNext (t: String ) { println("Received: $t " ) } override fun onError (e: Throwable ) { println("Error: ${e.message} " ) } override fun onComplete () { println("Completed" ) } } observable.subscribe(observer) }
🧹 9. 错误处理
onErrorReturn
1 2 3 4 5 6 fun main () { Observable.just(1 , 2 , 0 , 3 ) .map { 10 / it } .onErrorReturn { -1 } .subscribe { println(it) } }
onErrorResumeNext
1 2 3 4 5 fun main () { Observable.error<Int >(Throwable("Oops" )) .onErrorResumeNext(Observable.just(100 )) .subscribe { println(it) } }
🧪 10. Flowable(处理背压
Backpressure)
1 2 3 4 5 6 7 8 9 10 11 12 import io.reactivex.rxjava3.core.BackpressureStrategyimport io.reactivex.rxjava3.core.Flowablefun main () { Flowable.create<String>({ emitter -> for (i in 1. .5 ) { emitter.onNext("Item $i " ) } emitter.onComplete() }, BackpressureStrategy.BUFFER) .subscribe { println(it) } }
⛓️ 11. Disposable & 清理资源
1 2 3 4 5 6 7 8 9 10 11 import io.reactivex.rxjava3.disposables.CompositeDisposablefun main () { val disposables = CompositeDisposable() val d = Observable.just("A" , "B" ) .subscribe { println(it) } disposables.add(d) disposables.clear() }
⛳ 12. Subject(桥接
Observer 与 Observable)
PublishSubject
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import io.reactivex.rxjava3.subjects.PublishSubjectfun main () { val subject = PublishSubject.create<String>() subject.subscribe { println("Subscriber 1: $it " ) } subject.onNext("A" ) subject.onNext("B" ) subject.subscribe { println("Subscriber 2: $it " ) } subject.onNext("C" ) }
🧠 常用类型对比
Observable
无背压处理,适用于少量数据
Flowable
可处理大量数据,支持背压
Single
发射一个值或错误
Maybe
发射一个值、无值或错误
Completable
只关心完成或出错,不发射任何值
继续为你整理 RxJava 全部语法 ,使用
Kotlin
编写、可直接运行的代码示例。这部分涵盖的是中级到高级语法:
🔄 8.
flatMap()
:将每个 item 映射为另一个 Observable
1 2 3 4 5 6 7 8 9 import io.reactivex.rxjava3.core.Observablefun main () { Observable.just("A" , "B" ) .flatMap { letter -> Observable.fromArray("$letter -1" , "$letter -2" ) } .subscribe { println(it) } }
输出:
🧷 9.
concatMap()
:与 flatMap 相似,但保持顺序
1 2 3 4 5 6 7 8 9 10 import io.reactivex.rxjava3.core.Observableimport java.util.concurrent.TimeUnitfun main () { Observable.just("A" , "B" ) .concatMap { letter -> Observable.fromArray("$letter -1" , "$letter -2" ).delay(100 , TimeUnit.MILLISECONDS) } .blockingSubscribe { println(it) } }
🔍 10. filter()
:过滤数据
1 2 3 4 5 6 7 import io.reactivex.rxjava3.core.Observablefun main () { Observable.just(1 , 2 , 3 , 4 , 5 ) .filter { it % 2 == 0 } .subscribe { println(it) } }
🧮 11.
reduce()
:归约操作,返回一个 Single
1 2 3 4 5 6 7 import io.reactivex.rxjava3.core.Observablefun main () { Observable.just(1 , 2 , 3 , 4 ) .reduce { t1, t2 -> t1 + t2 } .subscribe { println(it) } }
📦 12.
scan()
:类似 reduce,但会发出每一步的结果
1 2 3 4 5 6 7 import io.reactivex.rxjava3.core.Observablefun main () { Observable.just(1 , 2 , 3 , 4 ) .scan { acc, next -> acc + next } .subscribe { println(it) } }
输出:
⌛ 13. delay()
:延迟发射
1 2 3 4 5 6 7 8 import io.reactivex.rxjava3.core.Observableimport java.util.concurrent.TimeUnitfun main () { Observable.just("Hello" ) .delay(1 , TimeUnit.SECONDS) .blockingSubscribe { println(it) } }
🧪 14. take()
:只取前 N
个数据
1 2 3 4 5 6 7 import io.reactivex.rxjava3.core.Observablefun main () { Observable.range(1 , 10 ) .take(3 ) .subscribe { println(it) } }
🔁 15. repeat()
:重复整个流
1 2 3 4 5 6 7 import io.reactivex.rxjava3.core.Observablefun main () { Observable.just("A" ) .repeat(3 ) .subscribe { println(it) } }
💥 16. retry()
:出错时重试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import io.reactivex.rxjava3.core.Observablefun main () { var attempt = 0 Observable.create<String> { emitter -> if (++attempt < 3 ) { emitter.onError(RuntimeException("fail" )) } else { emitter.onNext("Success" ) emitter.onComplete() } } .retry(2 ) .subscribe({ println(it) }, { println("Error: ${it.message} " ) }) }
⏳ 17. timeout()
:超时控制
1 2 3 4 5 6 7 8 9 10 11 12 import io.reactivex.rxjava3.core.Observableimport java.util.concurrent.TimeUnitfun main () { Observable.just("Hello" ) .delay(2 , TimeUnit.SECONDS) .timeout(1 , TimeUnit.SECONDS) .subscribe( { println(it) }, { println("Timeout error: ${it.message} " ) } ) }
继续为你整理 RxJava 全部语法 (Kotlin
示例,可直接运行),现在进入组合、错误处理与调度等高级主题:
🔗 18.
zip()
:组合多个 Observable,逐个配对
1 2 3 4 5 6 7 8 9 10 import io.reactivex.rxjava3.core.Observablefun main () { val numbers = Observable.just(1 , 2 , 3 ) val letters = Observable.just("A" , "B" , "C" ) Observable.zip(numbers, letters) { number, letter -> "$number -$letter " }.subscribe { println(it) } }
输出:
🔄 19.
combineLatest()
:任意一个 Observable
发射数据就组合最新值
1 2 3 4 5 6 7 8 9 10 11 import io.reactivex.rxjava3.core.Observableimport java.util.concurrent.TimeUnitfun main () { val obs1 = Observable.interval(100 , TimeUnit.MILLISECONDS).map { "A$it " } val obs2 = Observable.interval(150 , TimeUnit.MILLISECONDS).map { "B$it " } Observable.combineLatest(obs1, obs2) { a, b -> "$a :$b " } .take(5 ) .blockingSubscribe { println(it) } }
🔄
20. withLatestFrom()
:当主 Observable
发射时,结合最近另一个 Observable 的数据
1 2 3 4 5 6 7 8 9 10 11 import io.reactivex.rxjava3.core.Observableimport java.util.concurrent.TimeUnitfun main () { val main = Observable.interval(200 , TimeUnit.MILLISECONDS).map { "Main-$it " } val other = Observable.interval(100 , TimeUnit.MILLISECONDS).map { "Other-$it " } main.withLatestFrom(other) { m, o -> "$m + $o " } .take(5 ) .blockingSubscribe { println(it) } }
⚠️ 21.
onErrorReturn()
:发生错误时返回一个默认值
1 2 3 4 5 6 7 8 9 10 import io.reactivex.rxjava3.core.Observablefun main () { Observable.create<Int > { it.onError(Exception("Boom" )) }.onErrorReturn { println("Caught error: ${it.message} " ) -1 }.subscribe { println(it) } }
🛠 22.
onErrorResumeNext()
:出错后切换到备用流
1 2 3 4 5 6 7 import io.reactivex.rxjava3.core.Observablefun main () { Observable.error<Int >(Exception("Oops" )) .onErrorResumeNext(Observable.just(100 , 200 )) .subscribe { println(it) } }
🧯 23.
doOnError()
:处理但不拦截错误
1 2 3 4 5 6 7 8 9 10 11 12 import io.reactivex.rxjava3.core.Observablefun main () { Observable.create<Int > { it.onError(Exception("Boom" )) }.doOnError { println("Logging error: ${it.message} " ) }.subscribe( { println("Next: $it " ) }, { println("Final error: ${it.message} " ) } ) }
⚙️ 24. 调度器(Schedulers)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import io.reactivex.rxjava3.core.Observableimport io.reactivex.rxjava3.schedulers.Schedulersfun main () { Observable.create<String> { println("Emitting on thread: ${Thread.currentThread().name} " ) it.onNext("Hello" ) it.onComplete() } .subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()) .subscribe { println("Received on thread: ${Thread.currentThread().name} -> $it " ) } Thread.sleep(500 ) }
📥 25. Flowable
与背压处理(Backpressure)
1 2 3 4 5 6 7 8 9 10 11 12 import io.reactivex.rxjava3.core.Flowableimport io.reactivex.rxjava3.schedulers.Schedulersfun main () { Flowable.range(1 , 1_000_000 ) .onBackpressureDrop() .observeOn(Schedulers.computation(), false , 10 ) .subscribe({ println("Received: $it " ) }, { println("Error: ${it.message} " ) }) Thread.sleep(1000 ) }
✅ 26. Single
用法(发射一个值或错误)
1 2 3 4 5 6 import io.reactivex.rxjava3.core.Singlefun main () { Single.just("Single Result" ) .subscribe { value -> println("Received: $value " ) } }
✅ 27.
Maybe
用法(可能发射一个值,也可能什么都不发)
1 2 3 4 5 6 7 8 9 10 import io.reactivex.rxjava3.core.Maybefun main () { Maybe.just("Maybe Result" ) .subscribe( { println("Success: $it " ) }, { println("Error: ${it.message} " ) }, { println("Completed with no item" ) } ) }
✅ 28.
Completable
用法(只发射完成或错误)
1 2 3 4 5 6 7 8 9 10 import io.reactivex.rxjava3.core.Completablefun main () { Completable.fromRunnable { println("Doing something..." ) }.subscribe( { println("Completed!" ) }, { println("Error: ${it.message} " ) } ) }
好的,继续补充 RxJava 全部语法(Kotlin
可运行示例) ,进入进阶部分:自定义操作符、冷/热
Observable、多播、以及与 Retrofit 配合使用的实战示例。
🧰 29.
compose()
:封装通用操作链
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import io.reactivex.rxjava3.core.Observableimport io.reactivex.rxjava3.core.ObservableTransformerfun <T> applyCommonSchedulers () : ObservableTransformer<T, T> { return ObservableTransformer { upstream -> upstream.doOnNext { println("Logging: $it " ) } .filter { it != null } } } fun main () { Observable.just("One" , "Two" , "Three" ) .compose(applyCommonSchedulers()) .subscribe { println("Received: $it " ) } }
🔨 30.
lift()
:自定义操作符(不推荐常用)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import io.reactivex.rxjava3.core.*fun main () { val customOperator = Observable.lift<String, String> { observer -> object : Observer<String> { override fun onSubscribe (d: Disposable ) { observer.onSubscribe(d) } override fun onNext (t: String ) { observer.onNext("[$t ]" ) } override fun onError (e: Throwable ) { observer.onError(e) } override fun onComplete () { observer.onComplete() } } } Observable.just("A" , "B" ) .lift(customOperator) .subscribe { println("Result: $it " ) } }
🔥 31. 冷 vs 热 Observable
冷(Cold)Observable:每次订阅都会重新执行
1 2 3 4 5 6 7 8 9 10 fun main () { val cold = Observable.create<Int > { println("Generating..." ) it.onNext((1. .100 ).random()) it.onComplete() } cold.subscribe { println("Observer 1: $it " ) } cold.subscribe { println("Observer 2: $it " ) } }
热(Hot)Observable:共享事件流,不重复发射
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import io.reactivex.rxjava3.subjects.PublishSubjectfun main () { val hot = PublishSubject.create<String>() hot.subscribe { println("Observer 1: $it " ) } hot.onNext("A" ) hot.onNext("B" ) hot.subscribe { println("Observer 2: $it " ) } hot.onNext("C" ) }
输出:
1 2 3 4 Observer 1: A Observer 1: B Observer 1: C Observer 2: C
📡 32.
多播操作符:publish()
, connect()
,
refCount()
使用
publish().connect()
构建热 Observable
1 2 3 4 5 6 7 8 9 10 11 12 13 14 fun main () { val source = Observable.interval(500 , java.util.concurrent.TimeUnit.MILLISECONDS) .take(5 ) .publish() source.subscribe { println("Observer 1: $it " ) } source.connect() Thread.sleep(1200 ) source.subscribe { println("Observer 2: $it " ) } Thread.sleep(2000 ) }
🌐 33. Retrofit + RxJava 实战示例
Gradle 添加依赖:
1 2 3 4 implementation("com.squareup.retrofit2:retrofit:2.9.0" ) implementation("com.squareup.retrofit2:adapter-rxjava3:2.9.0" ) implementation("io.reactivex.rxjava3:rxjava:3.1.6" ) implementation("com.squareup.retrofit2:converter-gson:2.9.0" )
接口定义:
1 2 3 4 5 6 7 8 9 import io.reactivex.rxjava3.core.Singleimport retrofit2.http.GETdata class Post (val userId: Int , val id: Int , val title: String)interface ApiService { @GET("posts/1" ) fun getPost () : Single<Post> }
Retrofit 配置与调用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import retrofit2.Retrofitimport retrofit2.converter.gson.GsonConverterFactoryimport retrofit2.adapter.rxjava3.RxJava3CallAdapterFactoryfun main () { val retrofit = Retrofit.Builder() .baseUrl("https://jsonplaceholder.typicode.com/" ) .addConverterFactory(GsonConverterFactory.create()) .addCallAdapterFactory(RxJava3CallAdapterFactory.create()) .build() val api = retrofit.create(ApiService::class .java) api.getPost() .subscribe( { println("Title: ${it.title} " ) }, { println("Error: ${it.message} " ) } ) Thread.sleep(1000 ) }
继续为你整理 RxJava 全部语法(Kotlin
可运行示例) ,本节重点:Subject
系列、错误处理、调度器详解、测试工具等。
🎯 34. Subject
类型比较
PublishSubject
仅发送订阅之后的数据
BehaviorSubject
发送最新的一个数据 + 后续数据
ReplaySubject
发送所有历史数据 + 后续数据
AsyncSubject
只发送最后一个数据(在 onComplete
时发送)
🔊 PublishSubject
示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import io.reactivex.rxjava3.subjects.PublishSubjectfun main () { val subject = PublishSubject.create<String>() subject.subscribe { println("Observer1: $it " ) } subject.onNext("A" ) subject.onNext("B" ) subject.subscribe { println("Observer2: $it " ) } subject.onNext("C" ) }
输出:
1 2 3 4 Observer1: A Observer1: B Observer1: C Observer2: C
🧭 BehaviorSubject
示例
1 2 3 4 5 6 7 8 9 10 import io.reactivex.rxjava3.subjects.BehaviorSubjectfun main () { val subject = BehaviorSubject.createDefault("Initial" ) subject.onNext("A" ) subject.subscribe { println("Observer1: $it " ) } subject.onNext("B" ) }
输出:
1 2 Observer1: A Observer1: B
🔁 ReplaySubject
示例
1 2 3 4 5 6 7 8 9 10 import io.reactivex.rxjava3.subjects.ReplaySubjectfun main () { val subject = ReplaySubject.create<String>() subject.onNext("A" ) subject.onNext("B" ) subject.subscribe { println("Observer: $it " ) } }
输出:
🕘 AsyncSubject
示例
1 2 3 4 5 6 7 8 9 10 11 12 import io.reactivex.rxjava3.subjects.AsyncSubjectfun main () { val subject = AsyncSubject.create<String>() subject.onNext("A" ) subject.onNext("B" ) subject.subscribe { println("Observer: $it " ) } subject.onNext("C" ) subject.onComplete() }
输出:
🚨 35. 错误处理相关操作符
onErrorReturn
:错误时返回默认值
1 2 3 4 5 6 7 8 import io.reactivex.rxjava3.core.Observablefun main () { Observable.just(1 , 2 , 0 ) .map { 10 / it } .onErrorReturn { -1 } .subscribe { println(it) } }
onErrorResumeNext
:错误时切换
Observable
1 2 3 4 5 6 7 fun main () { Observable.create<Int > { it.onNext(1 ) it.onError(RuntimeException("Oops" )) }.onErrorResumeNext(Observable.just(100 , 200 )) .subscribe { println(it) } }
retry()
:失败后重试
1 2 3 4 5 6 7 8 9 10 fun main () { var count = 0 Observable.create<Int > { count++ if (count < 3 ) it.onError(Exception("Fail $count " )) else it.onNext(100 ) }.retry(3 ) .subscribe({ println("Success: $it " ) }, { println("Error: ${it.message} " ) }) }
🧵 36. 调度器详解(Schedulers)
Schedulers.io()
IO 密集型,如网络请求、文件读写
Schedulers.computation()
CPU 密集型操作,如事件计算、算法
Schedulers.newThread()
每次创建一个新线程
Schedulers.single()
单线程
AndroidSchedulers.mainThread()
Android UI 主线程(需引入 RxAndroid)
调度器示例
1 2 3 4 5 6 7 8 9 10 11 12 import io.reactivex.rxjava3.schedulers.Schedulersfun main () { Observable.just("A" , "B" , "C" ) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()) .subscribe { println("Received $it on thread: ${Thread.currentThread().name} " ) } Thread.sleep(500 ) }
✅ 37.
单元测试:TestObserver
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import io.reactivex.rxjava3.core.Observableimport io.reactivex.rxjava3.observers.TestObserverfun main () { val observable = Observable.just("One" , "Two" , "Three" ) val testObserver = TestObserver<String>() observable.subscribe(testObserver) testObserver.assertComplete() testObserver.assertValues("One" , "Two" , "Three" ) println("All assertions passed." ) }
✅ 38. 自定义操作符(扩展函数)
1 2 3 4 5 6 7 8 9 fun <T> Observable<T> .logEach () : Observable<T> { return this .doOnNext { println("Logging: $it " ) } } fun main () { Observable.just(1 , 2 , 3 ) .logEach() .subscribe { println("Received: $it " ) } }
继续为你补充 RxJava 全部语法(Kotlin
示例) ,本节将涵盖:
Flowable
与背压
ConnectableObservable
Debounce
/Throttle
/Buffer
等时间控制操作符
RxJava + Room
数据响应
RxBinding
防抖示例(如 EditText)
🚰 39. Flowable
与背压(Backpressure)
当发射数据速度远快于消费速度时,使用 Flowable
替代
Observable
,并指定背压策略。
示例:快速发射 + 背压处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import io.reactivex.rxjava3.core.BackpressureStrategyimport io.reactivex.rxjava3.core.Flowableimport io.reactivex.rxjava3.schedulers.Schedulersfun main () { val flowable = Flowable.create<Int >({ emitter -> for (i in 1. .1_000_000 ) { emitter.onNext(i) } emitter.onComplete() }, BackpressureStrategy.BUFFER) flowable .observeOn(Schedulers.io()) .subscribe( { println("Received: $it " ) }, { it.printStackTrace() } ) Thread.sleep(2000 ) }
🔁 40.
ConnectableObservable
:控制发射时机
使多个订阅者共享同一个数据源,并延迟开始发送数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import io.reactivex.rxjava3.observables.ConnectableObservablefun main () { val source = ConnectableObservable.interval(1 , java.util.concurrent.TimeUnit.SECONDS).publish() source.subscribe { println("Observer 1: $it " ) } Thread.sleep(2000 ) source.subscribe { println("Observer 2: $it " ) } source.connect() Thread.sleep(5000 ) }
⏳ 41.
防抖(debounce
)、节流(throttleFirst
)
debounce
:
停止输入一段时间后才发射
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import io.reactivex.rxjava3.core.Observableimport java.util.concurrent.TimeUnitfun main () { Observable.create<String> { emitter -> emitter.onNext("H" ) Thread.sleep(100 ) emitter.onNext("He" ) Thread.sleep(400 ) emitter.onNext("Hel" ) Thread.sleep(600 ) emitter.onNext("Hell" ) Thread.sleep(1000 ) emitter.onComplete() }.debounce(500 , TimeUnit.MILLISECONDS) .subscribe { println("Final Input: $it " ) } }
throttleFirst
:
一段时间内只取第一个
1 2 3 4 5 6 7 8 fun main () { Observable.interval(100 , TimeUnit.MILLISECONDS) .throttleFirst(300 , TimeUnit.MILLISECONDS) .take(10 ) .subscribe { println("Throttled: $it " ) } Thread.sleep(2000 ) }
📦 42. buffer
:收集成批次
1 2 3 4 5 fun main () { Observable.range(1 , 10 ) .buffer(3 ) .subscribe { println("Buffer: $it " ) } }
输出:
1 2 3 4 Buffer: [1, 2, 3] Buffer: [4, 5, 6] Buffer: [7, 8, 9] Buffer: [10]
🏠 43. RxJava + Room(示例结构)
Room DAO 支持返回 Flowable
或 Observable
类型。
1 2 3 4 5 @Dao interface UserDao { @Query("SELECT * FROM user" ) fun getAllUsers () : Flowable<List<User>> }
1 2 3 4 userDao.getAllUsers() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe { users -> show(users) }
🎯 44. RxBinding
防抖点击
1 2 3 4 5 6 7 8 import com.jakewharton.rxbinding4.view.clicksimport java.util.concurrent.TimeUnitbutton.clicks() .throttleFirst(1 , TimeUnit.SECONDS) .subscribe { showToast("Clicked!" ) }
✅ 45.
interval
、timer
使用
1 2 3 4 5 6 7 Observable.interval(1 , TimeUnit.SECONDS) .subscribe { println("Tick: $it " ) } Observable.timer(3 , TimeUnit.SECONDS) .subscribe { println("Time's up!" ) }
🧰 46. dispose
/ 资源释放
1 2 3 4 5 val disposable = Observable.interval(1 , TimeUnit.SECONDS) .subscribe { println("Tick: $it " ) } Thread.sleep(3000 ) disposable.dispose()
我们继续补充 RxJava 高级用法(Kotlin
示例) ,涵盖以下几个实用主题:
🔀 47. RxJava 线程切换全流程
1 2 3 4 5 6 7 Observable.just("Start" ) .subscribeOn(Schedulers.io()) .doOnNext { println("doOnNext 1 on ${Thread.currentThread().name} " ) } .observeOn(Schedulers.computation()) .doOnNext { println("doOnNext 2 on ${Thread.currentThread().name} " ) } .observeOn(AndroidSchedulers.mainThread()) .subscribe { println("Subscribe on ${Thread.currentThread().name} " ) }
如果不是 Android 平台,可将
AndroidSchedulers.mainThread()
替换为
Schedulers.trampoline()
。
🔁 48. RxJava → Kotlin Flow
对照表
Observable.create
flow { emit(...) }
subscribeOn
/observeOn
flowOn
, withContext
in collectors
map
map
flatMap
flatMapConcat
, flatMapMerge
zip
zip
debounce
debounce
filter
filter
onErrorResumeNext
catch { ... emit(...) }
retry
retry()
Disposable.dispose()
Job.cancel()
示例:
1 2 3 4 5 6 7 8 9 Observable.range(1 , 5 ) .map { it * 2 } .subscribe { println(it) } flowOf(1 , 2 , 3 , 4 , 5 ) .map { it * 2 } .collect { println(it) }
🌐 49. Retrofit + RxJava 实战
添加依赖
1 2 3 implementation "com.squareup.retrofit2:retrofit:2.9.0" implementation "com.squareup.retrofit2:adapter-rxjava3:2.9.0" implementation "com.squareup.retrofit2:converter-gson:2.9.0"
定义 API
1 2 3 4 interface ApiService { @GET("users" ) fun getUsers () : Observable<List<User>> }
使用 RxJava + Retrofit
1 2 3 4 5 6 7 8 9 10 11 12 val retrofit = Retrofit.Builder() .baseUrl("https://api.example.com/" ) .addConverterFactory(GsonConverterFactory.create()) .addCallAdapterFactory(RxJava3CallAdapterFactory.create()) .build() val api = retrofit.create(ApiService::class .java)api.getUsers() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe({ users -> println(users) }, { error -> error.printStackTrace() })
⚙️ 50. 自定义操作符
通过扩展函数封装复杂逻辑:
1 2 3 fun <T> Observable<T> .debugTag (tag: String ) : Observable<T> { return this .doOnNext { println("[$tag ] $it " ) } }
使用:
1 2 3 Observable.range(1 , 3 ) .debugTag("MyLog" ) .subscribe()
🔁 51.
异常恢复链:retryWhen + delay
1 2 3 4 5 6 7 8 9 10 11 12 13 14 Observable.create<String> { emitter -> println("Trying..." ) emitter.onError(RuntimeException("Network error" )) }.retryWhen { errors -> errors.zipWith(Observable.range(1 , 3 )) { e, i -> println("Retry $i after error: $e " ) i }.flatMap { Observable.timer(1 , TimeUnit.SECONDS) } }.subscribe( { println("Success: $it " ) }, { println("Failed after retries: ${it.message} " ) } )
✅ 52.
流程终止处理:takeUntil
, takeWhile
1 2 3 Observable.interval(500 , TimeUnit.MILLISECONDS) .takeUntil { it >= 5 } .subscribe { println("takeUntil: $it " ) }
1 2 3 Observable.range(1 , 10 ) .takeWhile { it < 5 } .subscribe { println("takeWhile: $it " ) }
继续为你补充 RxJava 高级用法(Kotlin
示例) ,包括并发执行、流合并、生命周期绑定、登录实战、RxJava →
Flow 迁移技巧。
🚀 53.
concatMapEager
:并发执行、顺序输出
1 2 3 4 5 6 7 8 9 Observable.just(1 , 2 , 3 ) .concatMapEager { id -> Observable.fromCallable { println("Start job $id on ${Thread.currentThread().name} " ) Thread.sleep(1000L - id * 200 ) "Result $id " }.subscribeOn(Schedulers.io()) } .subscribe { println("Received: $it " ) }
✅ 并发执行任务,但结果按原始顺序发射。
🔗 54. 多流合并策略
merge
:同时发射(无序)
1 2 3 4 5 6 Observable.merge( Observable.interval(300 , TimeUnit.MILLISECONDS).map { "A$it " }, Observable.interval(500 , TimeUnit.MILLISECONDS).map { "B$it " } ) .take(5 ) .subscribe { println(it) }
concat
:按顺序发射(一个结束后下一个)
1 2 3 4 Observable.concat( Observable.just("First" ), Observable.just("Second" ) ).subscribe { println(it) }
zip
:配对发射(两个都准备好)
1 2 3 4 5 Observable.zip( Observable.just("A" , "B" , "C" ), Observable.just(1 , 2 , 3 ) ) { a, b -> "$a $b " } .subscribe { println(it) }
combineLatest
:任一源更新即组合最新值
1 2 3 4 5 6 Observable.combineLatest( Observable.interval(100 , TimeUnit.MILLISECONDS), Observable.interval(250 , TimeUnit.MILLISECONDS) ) { a, b -> "Latest A=$a , B=$b " } .take(5 ) .subscribe { println(it) }
📱 55. 与 Android
Lifecycle 管理绑定(RxLifecycle)
依赖:
1 implementation "com.trello.rxlifecycle4:rxlifecycle-android-lifecycle-kotlin:4.0.2"
使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 class MainActivity : AppCompatActivity (), LifecycleProvider<Lifecycle.Event> { private val lifecycleSubject = BehaviorSubject.create<Lifecycle.Event>() override fun lifecycle () : Observable<Lifecycle.Event> = lifecycleSubject.hide() override fun onCreate (savedInstanceState: Bundle ?) { super .onCreate(savedInstanceState) lifecycleSubject.onNext(Lifecycle.Event.ON_CREATE) Observable.interval(1 , TimeUnit.SECONDS) .compose(bindUntilEvent(Lifecycle.Event.ON_DESTROY)) .subscribe { println("tick: $it " ) } } override fun onDestroy () { super .onDestroy() lifecycleSubject.onNext(Lifecycle.Event.ON_DESTROY) } }
🧪 56. 登录请求实战:防抖 + 接口
+ 错误恢复
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 val loginClicks = PublishSubject.create<Pair<String, String>>()loginClicks .debounce(300 , TimeUnit.MILLISECONDS) .distinctUntilChanged() .switchMapSingle { (username, password) -> loginApi.login(username, password) .retryWhen { errors -> errors.take(3 ).delay(1 , TimeUnit.SECONDS) } .subscribeOn(Schedulers.io()) } .observeOn(AndroidSchedulers.mainThread()) .subscribe({ result -> println("Login success: $result " ) }, { error -> println("Login failed: ${error.message} " ) })
🔄 57. RxJava → Kotlin Flow
实际迁移技巧
原始 RxJava:
1 2 3 4 Observable.interval(1 , TimeUnit.SECONDS) .map { it * 2 } .filter { it % 3 == 0 } .subscribe { println(it) }
Kotlin Flow 改写:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 fun flowExample () : Flow<Long > = flow { var i = 0L while (true ) { emit(i++) delay(1000 ) } } runBlocking { flowExample() .map { it * 2 } .filter { it % 3 == 0L } .collect { println(it) } }
迁移小贴士:
Observable
/Flowable
Flow
subscribe()
collect()
Schedulers.io()
withContext(Dispatchers.IO)
retryWhen
, onErrorResumeNext
retry
, catch
debounce
debounce()
switchMap
flatMapLatest
Disposable.dispose()
Job.cancel()