Trident学习笔记(二)

aggregator

------------------

聚合动作;聚合操作可以是基于batch、stream、partiton

[聚合方式-分区聚合]

partitionAggregate

  分区聚合;基于分区进行聚合运算;作用于分区而不是batch。

  mystream.partitionAggregate(new Fields("x"), new Count(), new Fields("count"));

[聚合方式-batch聚合]

aggregate

  批次聚合;先将同一batch的所有分区的tuple进行global再分区,将其汇集到一个分区中,再进行聚合运算。

  .aggregate(new Fields("a"), new Count(), new Fields("count"));   // 批次聚合

  聚合函数

    [ReducerAggregator]

      init();

      reduce();

    Aggregator

    CombinerAggregator

import org.apache.storm.trident.operation.ReducerAggregator;
import org.apache.storm.trident.tuple.TridentTuple;

/**
 * 自定义sum聚合函数
 */
public class SumReducerAggregator implements ReducerAggregator<Integer> {
    
    private static final long serialVersionUID = 1L;
    
    @Override
    public Integer init() {
        return 0;
    }

    @Override
    public Integer reduce(Integer curr, TridentTuple tuple) {
        return curr + tuple.getInteger(0) + tuple.getInteger(1);
    }
    
}

分区聚合

import net.baiqu.shop.report.trident.demo01.PrintFunction;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.testing.FixedBatchSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

public class TridentTopologyApp4 {

    public static void main(String[] args) {
        // 创建topology
        TridentTopology topology = new TridentTopology();

        // 创建spout
        FixedBatchSpout testSpout = new FixedBatchSpout(new Fields("a", "b"), 4,
                new Values(1, 2),
                new Values(2, 3),
                new Values(3, 4),
                new Values(4, 5));

        // 创建流
        Stream stream = topology.newStream("testSpout", testSpout);
        stream.partitionAggregate(new Fields("a", "b"), new SumReducerAggregator(), new Fields("sum"))
                .shuffle().each(new Fields("sum"), new PrintFunction(), new Fields("result"));

        // 本地提交
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("TridentDemo4", new Config(), topology.build());
    }

}

[ReducerAggregator]

  init();

  reduce();

  public interface ReducerAggregator<T> extends Serializable {

    T init();

    T reduce(T curr, TridentTuple tuple);

  }

[Aggregator]

  描述同ReducerAggregator.

  public interface Aggregator<T> extends Operation {

    // 开始聚合之间调用,主要用于保存状态。共下面的两个方法使用   

    T init(Object batchId, TridentCollector collector);

    // 迭代batch的每个tuple, 处理每个tuple后更新state的状态。

    void aggreate(T val, TridentTuple tuple, TridentCollector collector);

    // 所有tuple处理完成后调用,返回单个tuple给每个batch。

    void complete(T val, TridentCollector collector);

  }

CombinerAggregator

import org.apache.storm.trident.operation.BaseAggregator;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Values;

import java.io.Serializable;

/**
 * 批次求和函数
 */
public class SumAggregator extends BaseAggregator<SumAggregator.State> {

    private static final long serialVersionUID = 1L;

    static class State implements Serializable {
        private static final long serialVersionUID = 1L;
        int sum = 0;
    }

    @Override
    public SumAggregator.State init(Object batchId, TridentCollector collector) {
        return new State();
    }

    @Override
    public void aggregate(SumAggregator.State state, TridentTuple tuple, TridentCollector collector) {
        state.sum = state.sum + tuple.getInteger(0) + tuple.getInteger(1);
    }

    @Override
    public void complete(SumAggregator.State state, TridentCollector collector) {
        collector.emit(new Values(state.sum));
    }

}

批次聚合

package net.baiqu.shop.report.trident.demo04;

import net.baiqu.shop.report.trident.demo01.PrintFunction;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.testing.FixedBatchSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

public class TridentTopologyApp4 {

    public static void main(String[] args) {
        // 创建topology
        TridentTopology topology = new TridentTopology();

        // 创建spout
        FixedBatchSpout testSpout = new FixedBatchSpout(new Fields("a", "b"), 4,
                new Values(1, 2),
                new Values(2, 3),
                new Values(3, 4),
                new Values(4, 5));

        // 创建流
        Stream stream = topology.newStream("testSpout", testSpout);
        stream.aggregate(new Fields("a", "b"), new SumAggregator(), new Fields("sum"))
                .shuffle().each(new Fields("sum"), new PrintFunction(), new Fields("result"));

        // 本地提交
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("TridentDemo4", new Config(), topology.build());
    }

}

运行结果

PrintFunction: 24
PrintFunction: 0
PrintFunction: 0
......

平均值批次聚合函数

import org.apache.storm.trident.operation.BaseAggregator;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Values;

import java.io.Serializable;

/**
 * 批次求平均值函数
 */
public class AvgAggregator extends BaseAggregator<AvgAggregator.State> {

    private static final long serialVersionUID = 1L;

    static class State implements Serializable {
        private static final long serialVersionUID = 1L;
        // 元组值的总和
        float sum = 0;
        // 元组个数
        int count = 0;
    }

    /**
     * 初始化状态
     */
    @Override
    public AvgAggregator.State init(Object batchId, TridentCollector collector) {
        return new State();
    }

    /**
     * 在state变量中维护状态
     */
    @Override
    public void aggregate(AvgAggregator.State state, TridentTuple tuple, TridentCollector collector) {
        state.count = state.count + 2;
        state.sum = state.sum + tuple.getInteger(0) + tuple.getInteger(1);
    }

    /**
     * 处理完成所有元组之后,返回一个具有单个值的tuple
     */
    @Override
    public void complete(AvgAggregator.State state, TridentCollector collector) {
        collector.emit(new Values(state.sum / state.count));
    }

}

批次聚合求平均值

import net.baiqu.shop.report.trident.demo01.PrintFunction;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.testing.FixedBatchSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

public class TridentTopologyApp4 {

    public static void main(String[] args) {
        // 创建topology
        TridentTopology topology = new TridentTopology();

        // 创建spout
        FixedBatchSpout testSpout = new FixedBatchSpout(new Fields("a", "b"), 4,
                new Values(1, 2),
                new Values(2, 3),
                new Values(3, 4),
                new Values(4, 5));

        // 创建流
        Stream stream = topology.newStream("testSpout", testSpout);
        stream.aggregate(new Fields("a", "b"), new AvgAggregator(), new Fields("avg"))
                .shuffle().each(new Fields("avg"), new PrintFunction(), new Fields("result"));

        // 本地提交
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("TridentDemo4", new Config(), topology.build());
    }

}

[CombinerAggregator]

  在每个partition运行分区聚合,然后再进行global再分区将同一对batch的所有tuple分到一个partition中,最后再这一个partition中进行聚合运算,并产生结果进行输出。

  该种方式的网络流量占用少于前两种方式。

  public interface CombinerAggregator<T> extents Serializable {

    // 在每个tuple上运行,并接受字段值

    T init(TridentTuple tuple);

    // 合成tuple的值,并输出一个值的tuple

    T combine(T val1, T vak2);

    // 如果分区不含有tuple,调用该方法.

    T zero();

  }

合成聚合函数

import clojure.lang.Numbers;
import org.apache.storm.trident.operation.CombinerAggregator;
import org.apache.storm.trident.tuple.TridentTuple;

/**
 * 合成聚合函数
 */
public class SumCombinerAggregator implements CombinerAggregator<Number> {

    private static final long serialVersionUID = 1L;
    
    @Override
    public Number init(TridentTuple tuple) {
        return (Number) tuple.getValue(0);
    }

    @Override
    public Number combine(Number val1, Number val2) {
        return Numbers.add(val1, val2);
    }

    @Override
    public Number zero() {
        return 0;
    }
    
}

topology

import net.baiqu.shop.report.trident.demo01.PrintFunction;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.testing.FixedBatchSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

public class TridentTopologyApp4 {

    public static void main(String[] args) {
        // 创建topology
        TridentTopology topology = new TridentTopology();

        // 创建spout
        FixedBatchSpout testSpout = new FixedBatchSpout(new Fields("a", "b"), 4,
                new Values(1, 2),
                new Values(2, 3),
                new Values(3, 4),
                new Values(4, 5));

        // 创建流
        Stream stream = topology.newStream("testSpout", testSpout);
        stream.aggregate(new Fields("a", "b"), new SumCombinerAggregator(), new Fields("sum"))
                .shuffle().each(new Fields("sum"), new PrintFunction(), new Fields("result"));

        // 本地提交
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("TridentDemo4", new Config(), topology.build());
    }

}

输出结果

PrintFunction: 10
PrintFunction: 0
PrintFunction: 0
......

 

 

 

原文地址:https://www.cnblogs.com/tangzhe/p/9609534.html