[RxJS] Aggregating Streams With Reduce And Scan using RxJS

What is the RxJS equivalent of Array reduce? What if I want to emit my reduced or aggregated value at each event? This brief tutorial covers Observable operators reduce() and scan(), their differences and gotchas.

In ES5, the Array's reduce function works like this:

var ary = [0,1,2,3,4];

var res = ary.reduce(function(preVal, item){
  return preVal+item ;
}, 0);

console.log(res); //10

In RxJS, also has reduce function:

It gives the same result.

var source = Rx.Observable.fromArray([0,1,2,3,4]);

source.reduce(function(preVal, item){
  return preVal+item ;
}, 0).subscribe(function(res){
  console.clear();
  console.log(res);
});

Let's do it async:

We will wait for 2.5 seconds until it gives result "10". This means the reduce() function in RxJS, it will not exec until the observable finihsed.

var source = Rx.Observable.interval(500).take(5);

source.reduce(function(preVal, item){
  return preVal+item ;
}, 0).subscribe(function(res){
  console.clear();
  console.log(res);
});

So if we just write:

var source = Rx.Observable.interval(500);

And never finish it, we won't get result by reduce() funtion.

Use sacn() function instead of reduce() to get result each time:

var source = Rx.Observable.interval(500).take(5);

source.scan(0, function(preVal, item){
  return preVal+item ;
}).subscribe(function(res){
  console.log(res);
});

/*
0
1
3
6
10
*/

when I run this, you'll see each time it ticks in, I get the next value, the next reduced value. One nice difference with scan though is, since it doesn't wait for completion, if I were to run this again, it's actually going to give me a result every time.

原文地址:https://www.cnblogs.com/Answer1215/p/4789801.html