Rxjava3 源码阅读
Rxjava是一套观察者机制
现在我们最简单的实现
Single.just("1").subscribe(object :SingleObserver<String> {
override fun onSubscribe(d: Disposable?) {}
override fun onSuccess(t: String?) {}
override fun onError(e: Throwable?) {}
})
<p>看看just
Single.java
public static <@NonNull T> Single<T> just(T item) {
Objects.requireNonNull(item, "item is null");
<strong>return RxJavaPlugins.onAssembly(new SingleJust<>(item));</strong>
}</p>
<p>@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(@NonNull SingleObserver<? super T> observer) {
.....
<strong>subscribeActual(observer);</strong>
.....
}</p>
<p>SingleJust.java
@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
<strong>observer.onSubscribe(Disposable.disposed());
observer.onSuccess(value);</strong>
}</p>
<p>
这就是Rxjava最简单的机制,被观察者通过订阅把消息传给观察者
下面我们来个复杂一点的map操作
Single.just("1").map(object :Function<String,Int>{
override fun apply(t: String?): Int { return t as Int }
}).subscribe(object :SingleObserver<Int>{ // 这里艾调用的是SingleMap.subscribe = SingleMap.subscribeActual
override fun onSubscribe(d: Disposable?) {}
override fun onSuccess(t: Int?) {}
override fun onError(e: Throwable?) {}
})
通过map转一下,
public final <@NonNull R> Single<R> map(@NonNull Function<? super T, ? extends R> mapper) {
Objects.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new SingleMap<>(this, mapper)); //this = SingleJust
}</p>
<p>SingleMap.java</p>
<p>public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) {
this.source = source; // source = SingleJust
this.mapper = mapper; // mapper = Function<String,Int> 转换器
}</p>
<p>@Override
protected void subscribeActual(final SingleObserver<? super R> t) {
source.subscribe(new MapSingleObserver<T, R>(t, mapper)); //调用SingleJust.subscribe
}</p>
<p>MapSingleObserver.java
@Override
public void onSubscribe(Disposable d) {
t.onSubscribe(d); //t = SingleObserver
}</p>
<p>@Override
public void onSuccess(T value) {
R v;
try {
v = Objects.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
onError(e);
return;
}</p>
<pre><code> t.onSuccess(v);
}
@Override
public void onError(Throwable e) {
t.onError(e);
}
所以不难发现继续调用map就是这样的流程
3.看看 Rxjava是怎么取消订阅的
Single不太行,传进去就disposed了没什么变化,看看其他的
看看Flowable怎么取消订阅
Flowable.just("1","2","3").map(object :Function<String,Int>{
override fun apply(t: String?): Int { return t as Int }
}).subscribe(object :FlowableSubscriber<Int> {
override fun onSubscribe(s: Subscription?) {}
override fun onNext(t: Int?) {}
override fun onError(t: Throwable?) {}
override fun onComplete() {}
})</p>
<p>FlowableFromArray.java
@Override
public void subscribeActual(Subscriber<? super T> s) {
if (s instanceof ConditionalSubscriber) {
s.onSubscribe(new ArrayConditionalSubscription<>(
(ConditionalSubscriber<? super T>)s, array));
} else {
<strong>s.onSubscribe(new ArraySubscription<>(s, array));</strong>
}
}
ArraySubscription.java
ArraySubscription(Subscriber<? super T> actual, T[] array) {
super(array);
<strong>this.downstream = actual;</strong>
}</p>
<p>@Override
void fastPath() {
.....
<strong>a.onNext(t);</strong>
....
if (cancelled) {
return;
}
a.onComplete();
}</p>
<p>@Override
public final void request(long n) {
if (SubscriptionHelper.validate(n)) {
if (BackpressureHelper.add(this, n) == 0L) {
if (n == Long.MAX_VALUE) {
<strong>fastPath();</strong>
} else {
<strong>slowPath(n);</strong>
}
}
}
}</p>
<p>
由此可知,subscribe处理到一半是可以关闭的,吧Dispose或者Subscription拷贝出来可以直接杀
还有一点是Flowable需要在onSubscribe(s: Subscription?) {}中request
现在我们看看怎么切线程
评论区