1. 变换型操作符,把上游发出来的事件拦截后,对事件实体参数做一个转换,然后再把事件发给下游。
类似WPF数据绑定的类型转换, XXXXXConvertor.
事件源头(上游)int --------------------------- 变换操作符---------------------》String事件处理(下游)
2. 可以 多次变换。
事件源头(上游) --------------------------- 变换操作符---------------变换操作符×n------------------------》事件处理(下游)
变换型操作符有以下几种:
1. map
上游发出整数1以后,map操作符拦截了事件值1,并且把整数1做一些处理后,转换成String.
然后在下游接到的就是转换后的 “to string 1”
-
Observable.just(
1)
-
.map(
new Function<Integer, String>() {
-
@Override
-
public String apply(Integer integer) throws Exception {
-
return
"to string 1";
-
}
-
}).subscribe(
new Consumer<String>() {
-
@Override
-
public void accept(String o) throws Exception {
-
-
}
-
});
2.flatMap
flatMap转换符在接收到上游发出来的值后,可以做一下转换。特殊的地方是flatMap转换成的是ObservableSource<String>,而ObservableSource本身是继承Observalbe的。 又由于下游接受的是String, 所以必须调用ObservableSource<String>再发射String出去。此时可以多次发送事件。
-
Observable.just(
1)
-
.flatMap(
new Function<String, ObservableSource<String>>() {
-
@Override
-
public ObservableSource<String> apply(String s) throws Exception {
-
return Observable.create(
new ObservableOnSubscribe<String>() {
-
@Override
-
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
-
emitter.onNext(
"to string 1");
-
emitter.onNext(
"to string 2");
-
}
-
});
-
}
-
}).subscribe(
new Consumer<String>() {
-
@Override
-
public void accept(String value) throws Exception {
-
Log.d(
"",
"下游接收到的是:" + value);
-
}
-
});
flatMap是不排序的,对上游接收到的事件是个合并操作,没有固定顺序。也就是说上游emitter.onNext()发送的事件到flatMap这儿是没有固定顺序得,上游先发的事件,到apply方法里不一定先接收到。
-
Observable.just(
1,
2,
3)
-
.flatMap(
new Function<Integer, ObservableSource<String>>() {
-
@Override
-
public ObservableSource<String> apply(Integer s) throws Exception {
-
return Observable.just(s +
"");
-
}
-
}).subscribe(
new Consumer<String>() {
-
@Override
-
public void accept(String value) throws Exception {
-
Log.d(
"",
"下游接收到的是:" + value);
-
}
-
});
此时输出可能是1,2,3也可能是2,3,1等等。
3.concatMap
concatMap和flatMap用法一样,区别的地方是concatMap排序的。
-
Observable.just(
1,
2,
3)
-
.concatMap(
new Function<Integer, ObservableSource<String>>() {
-
@Override
-
public ObservableSource<String> apply(Integer s) throws Exception {
-
return Observable.just(s +
"");
-
}
-
}).subscribe(
new Consumer<String>() {
-
@Override
-
public void accept(String value) throws Exception {
-
Log.d(
"",
"下游接收到的是:" + value);
-
}
-
});
4. groupBy
把上游事件值进行归类, 归类的组名以及归类方式,由groupBy定义。
下游和flapMap一样有点复杂。订阅得到的是GroupedObservable<String, Integer>,继承于Observalbe,是一个小上游。 而这不是开发者真正想要的。真正想要的是GroupedObservable里面的东西:String, Integer。所以需要再次订阅GroupedObservable<String, Integer>,获取真正想要的事件值。
-
// 事先有先价格, 需要对这些价格有一个分组,用于判断是"高端配置电脑"还是"中端配置电脑"。
-
Observable.just(
6000,
7000,
8000,
9000,
10000)
-
.groupBy(
new Function<Integer, String>() {
-
@Override
-
public String apply(Integer integer) throws Exception {
-
return integer >
8000?
"高端配置电脑":
"中端配置电脑";
//分组后组名:"高端配置电脑"或"中端配置电脑"
-
}
-
})
-
-
//使用groupBy下游是有标准的。下游订阅得到的是GroupedObservable<String, Integer>,继承与Observalbe,是一个小上游。 而这不是开发者真正想要的。真正想要的是GroupedObservable里面的东西:String, Integer。所以需要再次订阅GroupedObservable<String, Integer>,获取真正想要的事件值
-
.subscribe(
new Consumer<GroupedObservable<String, Integer>>() {
-
@Override
-
public void accept(final GroupedObservable<String, Integer> stringIntegerGroupedObservable) throws Exception {
-
Log.d(
"",
"accpet: " + stringIntegerGroupedObservable.getKey());
//这里key其实就是groupBy定义的组名,"高端配置电脑"或"中端配置电脑"
-
-
//GroupedObservable继承的是Observable, 所以GroupedObservable是一个中上游。
-
// 具体组成员是什么需要在GroupedObservable这个小上游里订阅后,再取出来。
-
stringIntegerGroupedObservable.subscribe(
new Consumer<Integer>() {
-
@Override
-
public void accept(Integer integer) throws Exception {
-
Log.d(
"",
"accept: 类别:" + stringIntegerGroupedObservable.getKey() +
"价格:" + integer);
-
}
-
});
-
}
-
});
5. buffer
上游发出的一系列事件对象,并不是马上到达下游。而是在达到buffer规定的size后才一起以队列的形式发送给下游。
-
Observable.create(
new ObservableOnSubscribe<Integer>() {
-
@Override
-
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
-
for (
int i =
0; i <
100; i++) {
-
emitter.onNext(i);
-
}
-
-
emitter.onComplete();
//假如只有99个数字,输出结果是怎么的? 再者不调用onComplete()下游输出结果又会怎么样呢?
-
}
-
}).buffer(
20).subscribe(
new Consumer<List<Integer>>() {
-
@Override
-
public void accept(List<Integer> integers) throws Exception {
-
Log.d(
"",
"[");
-
for (Integer integer : integers) {
-
Log.d(
"", integer.toString());
-
}
-
Log.d(
"",
"]\n");
-
}
-
});
以上是常见的5个转换型操作符。
可能比较难于理解的是flatMap的ObservableSource和groupBy的GroupedObservable。
试想,转换型操作符,可以把integer转换成String, 难道integer就不能转换成ObservableSource,或者GroupedObservable吗?既然接到的是ObservableSource,或者GroupedObservable, 那么是不是只要发射满足下游订阅条件的事件就可以了?
这么想是不是就明白了?有点递归感觉是不是?
转载:https://blog.csdn.net/zjuter/article/details/104617124