飞道的博客

RxJava系列 (四) 变换型操作符

205人阅读  评论(0)

1. 变换型操作符,把上游发出来的事件拦截后,对事件实体参数做一个转换,然后再把事件发给下游。

类似WPF数据绑定的类型转换, XXXXXConvertor.

 

事件源头(上游)int --------------------------- 变换操作符---------------------》String事件处理(下游)

 

2. 可以 多次变换。

 

事件源头(上游) --------------------------- 变换操作符---------------变换操作符×n------------------------》事件处理(下游)

 

变换型操作符有以下几种:

1. map

上游发出整数1以后,map操作符拦截了事件值1,并且把整数1做一些处理后,转换成String.

然后在下游接到的就是转换后的 “to string 1”


  
  1. Observable.just( 1)
  2. .map( new Function<Integer, String>() {
  3. @Override
  4. public String apply(Integer integer) throws Exception {
  5. return "to string 1";
  6. }
  7. }).subscribe( new Consumer<String>() {
  8. @Override
  9. public void accept(String o) throws Exception {
  10. }
  11. });

2.flatMap

flatMap转换符在接收到上游发出来的值后,可以做一下转换。特殊的地方是flatMap转换成的是ObservableSource<String>,而ObservableSource本身是继承Observalbe的。 又由于下游接受的是String, 所以必须调用ObservableSource<String>再发射String出去。此时可以多次发送事件。


  
  1. Observable.just( 1)
  2. .flatMap( new Function<String, ObservableSource<String>>() {
  3. @Override
  4. public ObservableSource<String> apply(String s) throws Exception {
  5. return Observable.create( new ObservableOnSubscribe<String>() {
  6. @Override
  7. public void subscribe(ObservableEmitter<String> emitter) throws Exception {
  8. emitter.onNext( "to string 1");
  9. emitter.onNext( "to string 2");
  10. }
  11. });
  12. }
  13. }).subscribe( new Consumer<String>() {
  14. @Override
  15. public void accept(String value) throws Exception {
  16. Log.d( "", "下游接收到的是:" + value);
  17. }
  18. });

flatMap是不排序的,对上游接收到的事件是个合并操作,没有固定顺序。也就是说上游emitter.onNext()发送的事件到flatMap这儿是没有固定顺序得,上游先发的事件,到apply方法里不一定先接收到。


  
  1. Observable.just( 1, 2, 3)
  2. .flatMap( new Function<Integer, ObservableSource<String>>() {
  3. @Override
  4. public ObservableSource<String> apply(Integer s) throws Exception {
  5. return Observable.just(s + "");
  6. }
  7. }).subscribe( new Consumer<String>() {
  8. @Override
  9. public void accept(String value) throws Exception {
  10. Log.d( "", "下游接收到的是:" + value);
  11. }
  12. });

此时输出可能是1,2,3也可能是2,3,1等等。

3.concatMap

concatMap和flatMap用法一样,区别的地方是concatMap排序的。


  
  1. Observable.just( 1, 2, 3)
  2. .concatMap( new Function<Integer, ObservableSource<String>>() {
  3. @Override
  4. public ObservableSource<String> apply(Integer s) throws Exception {
  5. return Observable.just(s + "");
  6. }
  7. }).subscribe( new Consumer<String>() {
  8. @Override
  9. public void accept(String value) throws Exception {
  10. Log.d( "", "下游接收到的是:" + value);
  11. }
  12. });

 

4. groupBy

把上游事件值进行归类, 归类的组名以及归类方式,由groupBy定义。

下游和flapMap一样有点复杂。订阅得到的是GroupedObservable<String, Integer>,继承于Observalbe,是一个小上游。 而这不是开发者真正想要的。真正想要的是GroupedObservable里面的东西:String, Integer。所以需要再次订阅GroupedObservable<String, Integer>,获取真正想要的事件值。


  
  1. // 事先有先价格, 需要对这些价格有一个分组,用于判断是"高端配置电脑"还是"中端配置电脑"。
  2. Observable.just( 6000, 7000, 8000, 9000, 10000)
  3. .groupBy( new Function<Integer, String>() {
  4. @Override
  5. public String apply(Integer integer) throws Exception {
  6. return integer > 8000? "高端配置电脑": "中端配置电脑"; //分组后组名:"高端配置电脑"或"中端配置电脑"
  7. }
  8. })
  9. //使用groupBy下游是有标准的。下游订阅得到的是GroupedObservable<String, Integer>,继承与Observalbe,是一个小上游。 而这不是开发者真正想要的。真正想要的是GroupedObservable里面的东西:String, Integer。所以需要再次订阅GroupedObservable<String, Integer>,获取真正想要的事件值
  10. .subscribe( new Consumer<GroupedObservable<String, Integer>>() {
  11. @Override
  12. public void accept(final GroupedObservable<String, Integer> stringIntegerGroupedObservable) throws Exception {
  13. Log.d( "", "accpet: " + stringIntegerGroupedObservable.getKey()); //这里key其实就是groupBy定义的组名,"高端配置电脑"或"中端配置电脑"
  14. //GroupedObservable继承的是Observable, 所以GroupedObservable是一个中上游。
  15. // 具体组成员是什么需要在GroupedObservable这个小上游里订阅后,再取出来。
  16. stringIntegerGroupedObservable.subscribe( new Consumer<Integer>() {
  17. @Override
  18. public void accept(Integer integer) throws Exception {
  19. Log.d( "", "accept: 类别:" + stringIntegerGroupedObservable.getKey() + "价格:" + integer);
  20. }
  21. });
  22. }
  23. });

 

5. buffer

上游发出的一系列事件对象,并不是马上到达下游。而是在达到buffer规定的size后才一起以队列的形式发送给下游。


  
  1. Observable.create( new ObservableOnSubscribe<Integer>() {
  2. @Override
  3. public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
  4. for ( int i = 0; i < 100; i++) {
  5. emitter.onNext(i);
  6. }
  7. emitter.onComplete(); //假如只有99个数字,输出结果是怎么的? 再者不调用onComplete()下游输出结果又会怎么样呢?
  8. }
  9. }).buffer( 20).subscribe( new Consumer<List<Integer>>() {
  10. @Override
  11. public void accept(List<Integer> integers) throws Exception {
  12. Log.d( "", "[");
  13. for (Integer integer : integers) {
  14. Log.d( "", integer.toString());
  15. }
  16. Log.d( "", "]\n");
  17. }
  18. });

以上是常见的5个转换型操作符。

可能比较难于理解的是flatMap的ObservableSource和groupBy的GroupedObservable。

试想,转换型操作符,可以把integer转换成String, 难道integer就不能转换成ObservableSource,或者GroupedObservable吗?既然接到的是ObservableSource,或者GroupedObservable, 那么是不是只要发射满足下游订阅条件的事件就可以了?

这么想是不是就明白了?有点递归感觉是不是?

 

 


转载:https://blog.csdn.net/zjuter/article/details/104617124
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场