RxJava--Buffer,GroupBy 对比

Buffer

  • 设定收集n个元素为一组,以下方代码为例,三个为一组,则当组满三个元素时,返回一次List数据
  • 没组满三个元素时,如果调用onComplete,直接发送剩余元素,没调用onComplete,一直等待
   PublishSubject<String> subject = PublishSubject.create();
       Disposable disposable = subject
               .buffer(3)//获取三个为一组发送
               .subscribe(new Consumer<List<String>>() {
                   @Override
                   public void accept(List<String> s) throws Exception {
                       StringBuilder content = new StringBuilder();
                       for (String index : s) {
                           content.append(index).append(",");
                       }
                       LogUtils.e("rxJavaBuffer==" + content);
                   }
               });
       subject.onNext("1");
       subject.onNext("2");
       subject.onNext("3");
       subject.onNext("4");
       subject.onNext("5");
       subject.onNext("6");
       subject.onNext("7");
       subject.onNext("8");
       subject.onNext("9");
       subject.onNext("10");
       subject.onComplete();

复制代码

GroupBy

  • 很有意思的操作符。先将获取的元素分组(自己分配key),生成对应的GroupedObservable
  • GroupedObservable有点类似HashMap,包含key(自己分配的)和元素
  • 应该注意的是,当每组GroupedObservable首次订阅新的订阅者后,后续同组元素直接将数据发送给新的订阅者。看代码
  PublishSubject<String> subject = PublishSubject.create();
      Disposable disposable = subject
              .groupBy(new Function<String, String>() {
                  @Override
                  public String apply(String s) throws Exception {
                      // ?  第一步。 数据分类,分配不同的key
                      if (Integer.valueOf(s) < 4) {
                          return "one";
                      } else if (Integer.valueOf(s) < 7) {
                          return "two";
                      }
                      if (Integer.valueOf(s) < 10) {
                          return "three";
                      }
                      return "other";
                  }
              })
              .subscribe(new Consumer<GroupedObservable<String, String>>() {
                  @Override
                  public void accept(GroupedObservable<String, String> sub) throws Exception {
                       // ?  第二步。 根据不同的key,绑定新的订阅者。
                       //               如果改组已经订阅了新的订阅者,直接发送给新的订阅者
                      LogUtils.e(sub.getKey());
                      switch (sub.getKey()) {
                          case "one":
                              sub.subscribe(new Consumer<String>() {
                                  @Override
                                  public void accept(String s) throws Exception {
                                      LogUtils.e("GroupedObservable==one" + s);
                                  }
                              });
                              break;
                          case "two":
                              sub.subscribe(new Consumer<String>() {
                                  @Override
                                  public void accept(String s) throws Exception {
                                      LogUtils.e("GroupedObservable==two" + s);
                                  }
                              });
                              break;
                          case "three":
                              sub.subscribe(new Consumer<String>() {
                                  @Override
                                  public void accept(String s) throws Exception {
                                      LogUtils.e("GroupedObservable==three" + s);
                                  }
                              });
                              break;
                          default:
                              sub.subscribe(new Consumer<String>() {
                                  @Override
                                  public void accept(String s) throws Exception {
                                      LogUtils.e("GroupedObservable==other" + s);
                                  }
                              });
                              break;
                      }
                  }
              });
      subject.onNext("1");
      subject.onNext("2");
      subject.onNext("3");
      subject.onNext("4");
      subject.onNext("5");
      subject.onNext("6");
      subject.onNext("7");
      subject.onNext("8");
      subject.onNext("9");
      subject.onNext("10");
      subject.onComplete();
 
复制代码

原文地址:https://www.cnblogs.com/twodog/p/12134981.html