用RxJava优雅的执行并发网络请求
Android或者ios原生开发,并发网络请求是个难排解的痛。现在来看,这个问题已经不是问题了,写个文章记录一下,毕竟当年曾经为了这个问题操碎了心。说这个事首先要明白下面这个问题:
痛在哪?——主线程之殇
Android的消息机制恨死人,众所周知,android UI操作只能在主线程进行,而且主线程里不推荐执行作UI操作以外的一切操作,当年Android早期在主线程里请求网络是不会报错的,顶多把界面卡黑屏。现在这么搞会乍样大家都知道的,编译时会有警告,运行时会crash。如何优雅地请求网络数据,然后切换到主线程去更新UI,无数志士仁人为了解决这个问题操碎了心,早期官方推荐用AsnycTask,后来出现了经典的httpAsync,后来又有volley,现在又有Retrofit。现在的小程序员们不用在乎这些问题,只要拿来用用就行,大公司也都有自己成熟的网络库。而且现在hybird、react native很流行,不会涉及到主线程的问题。但是如果把14年以前的项目源码翻出来,运气好还能看到handler满天飞的情况,真是恶心死人。
问题——为什么要并发请求
页面内容太多,难免会需要执行多个请求来展示数据的情况,比如首页,模块多,变化快;再比如审批详情这种页面,需要展示审批本身数据字段,还要展示关联的一些操作流程等等。至于为什么要并发请求,其实不一定并发,只是必须要有一个种手段来管理页面上的网络请求(或是其他一些费时的非主线程操作),为什么要这样呢?因为页面上要弹个进度条对话框呀,网络请求之前显示“加载中,请稍候”,请求结束,把对话框关了,如果只是一个请求,回调里操作一下即可;如果两个请求呢?有些同学咬咬牙,回调里再请求一下网络,然后再回调嵌套一层忍忍就过去了;如果要两个以上呢?谁设计的接口?前端不去找后端拼命才怪。就这么个看似简单的问题,曾经困扰了多少人呀,我估计到现在还困扰着很多人。
如何解决
这个问题,我当年曾经写过一个责任链的框架,专门用来解决多个网络请求的问题,其实就是顺次执行所有的网络请求。之所以不并发执行,是因为考虑到有些接口请求数据可能要依赖其他接口,控制起来太复杂,于是一股脑全部顺次执行了。
后来一个NB的神器出现了,就是RxJava,唉呀我写文章有点啰嗦,说了半天才说到重点了,,Rx是啥不解释了。RxJava可以很好的与Retrofit结合,毕竟是同一家公司的作品嘛。
解决并发请求的思路就是把网络请求的Observable用Observable.merge合并在一起,正常的Retrofit请求是这样的:1
2
3
4
5
6
7
8
9
10
11ConsultApi consultApi = RetrofitUtils.createApi(ConsultApi.class);
Observable observable = consultApi.getConsultDetail(header(), consultId);
observable.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer() {
public void accept(Object o) throws Exception {
// 业务处理
}
});
我需要在请求网络前做一些其他操作,另外再弹一个ProgressDialog,于是就是下面这样的:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40Observable.just(RemoteSettingUtils.getRemoteSettingUrl())
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.map(new Function<String, String>() {
public String apply( String s) throws Exception {
showProgressDialog();
return s;
}
})
.observeOn(Schedulers.io())
.map(new Function<String, String>() {
public String apply( String s) throws Exception {
String baseUrl = loadBaseUrlFromCache();
if (StringUtils.isBlank(baseUrl)) {
baseUrl = loadBaseUrl();
}
return baseUrl;
}
})
.observeOn(Schedulers.io())
.flatMap(apiObservableFunction)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(consumer, new Consumer<Throwable>() {
public void accept(Throwable throwable) throws Exception {
LogUtils.e("数据请求出错", throwable);
UiUtils.showToast(context, R.string.network_error);
allComplete.callback(null);
progressDialog.cancel();
}
}, new Action() {
public void run() throws Exception {
allComplete.callback(null);
progressDialog.cancel();
}
});
这里用到了flatMap,因为项目需要,在请求网络前会检查网络配配置,获取取正式环境的URL,然后重新初始化Retrofit,这里使用flatMap就非常合适了,可以在运行过程中构建observable,例如:1
2
3
4
5
6
7
8
9new Function<String, ObservableSource<HttpResponse>> {
@Override
public ObservableSource<HttpResponse> apply(String baseUrl) throws Exception {
ConsultApi consultApi = RetrofitUtils.createApi(ConsultApi.class);
ParamMap param = newParam().addParam("size", 10);
Reply reply = SysUtils.getLast(dataList);
return consultApi.getReplyList(header(), param);
}
}
使用flatMap的情况下,实现并发请求网络就非常简单了,当然不能简单的Observable.merge,因为不同的请求数据解析是不一样的。此时的flatMap是可以这么写:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22new Function<String, ObservableSource<HttpResponse>> {
public ObservableSource<HttpResponse> apply(String baseUrl) throws Exception {
Observable observable1 = consultApi.getConsultDetail()
.observeOn(AndroidSchedulers.mainThread())
.map(new ResponseFunction() {
protected void consume(HttpResponse response) {
// UI working
}
});
Observable observable2 = consultApi.getReplyList()
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.map(new ResponseFunction<List>() {
protected void consume(HttpResponse<List> httpResponse) {
// UI working
}
});
return Observable.merge(observable1, observable2);
}
这样就可以完美实现并发网络请求了,上面的代码中第二个Observable多了一行subscribeOn(Schedulers.newThread())方法,如果不指定,此时默认还是在上文中的Schedulers.io()里执行,而Schedulers.io()是顺序执行的,如果要真正实现并发,必须要用Schedulers.newThread()。
这里可以简单封装一下,flatMap就变成这样了:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18new RequestFunction<Object>() {
protected ObservableSource<HttpResponse<Object>> createRequestObservable() {
Observable observable1 = addResponseFunction(consultApi.getConsultDetail(), new ResponseFunction<ConsultDetail>() {
protected void consume(HttpResponse response) {
// UI working
}
});
Observable observable2 = addResponseFunction(consultApi.getReplyList(), new ResponseFunction<List>() {
protected void consume(HttpResponse<List> httpResponse) {
// UI working
}
}, true);
return Observable.merge(observable1, observable2);
}
}
这样就清晰得多了,有多少并发处理都不怕了。
总结
并发网络请求其实并不是个强需求,可以通过合理的接口设计或交互设计来避免,实在绕不开嘛可以使用本文的方法。Square真是家很了不起的公司,我看到RxJava后再想想当年写的责任链框架,真是有点剑走偏峰了,我都不好意思说当年还写过那么个东西了。做技术必须要保持开放,切不可坐在井里看天,瞎造轮子。