Akka Actor 示例

功能说明

实现由 Master 从外部接收任务,并创建多个 Worker,根据 Worker 的消费能力,主动从 Master 拉取任务。

代码示例

/**
 * Copyright (C), Allen ZHANG 1960
 */
package com.allen.coding.concurrent.example.akka;

/**
 * 携带信息的对象
 * @author Allen.ZHANG
 * @version Mailer.java 2017年7月28日 下午2:00:52
 */
public class Mailer {

    private String message;

    public Mailer(String message) {
        this.message = message;
    }

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }

}
/**
 * Copyright (C), Allen ZHANG 1960
 */
package com.allen.coding.concurrent.example.akka;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;

/**
 * @author Allen.ZHANG
 * @version ActorWorker.java 2017年7月28日 下午1:55:24
 */
public class ActorWorker extends AbstractActor {

    /**
     * 谁创建该Worker的谁就是他爸
     */
    private ActorRef master = getContext().getParent();

    @Override
    public Receive createReceive() {
        return receiveBuilder().match(Mailer.class, task -> {
            //收到任务执行
            System.out.println(task.getMessage());
            //主动去拉任务
            master.tell(new Mailer(null), getSelf());
        }).build();
    }

    /**
     * 线程开始时会执行,主动去拉任务
     * */
    @Override
    public void preStart() throws Exception {
        master.tell(new Mailer(null), getSelf());
    }

    /**
     * 线程结束时会执行
     * */
    @Override
    public void postStop() {
        System.out.println(getSelf() + " exited.");
    }

}
/**
 * Copyright (C), Allen ZHANG 1960
 */
package com.allen.coding.concurrent.example.akka;

import akka.actor.AbstractActor;
import akka.actor.Props;

/**
 * @author Allen.ZHANG
 * @version ActorMaster.java 2017年7月28日 下午1:50:43
 */
public class ActorMaster extends AbstractActor {

    private static int workers  = 10;
    private static int messages = 0;

    @Override
    public Receive createReceive() {
        //收到拉取任务消息,开始循环拉任务并异步发送出去
        return receiveBuilder().match(Mailer.class, task -> {
            if (messages < 10000) {
                getSender().tell(new Mailer(String.valueOf(messages)), getSelf());
                messages++;
            } else {
                //只会收到worker发来的拉任务消息
                //当没有任务的时候,对运行中的workers减1
                //当workers减为0的时候,自己可以退出了
                workers--;
                if (workers == 0) {
                    getContext().stop(getSelf());
                }
            }
        }).build();
    }

    /**
     * 线程开始时会执行
     * */
    @Override
    public void preStart() {
        /**
         * 创建Worker
         * */
        for (int i = 0; i < workers; i++) {
            getContext().actorOf(Props.create(ActorWorker.class));
        }
    }

    /**
     * 线程结束时会执行
     * */
    @Override
    public void postStop() {
        System.out.println(getSelf() + " exited.");
    }

}
/**
 * Copyright (C), Allen ZHANG 1960
 */
package com.allen.coding.concurrent.example.akka;

import akka.Main;

/**
 * 启动器
 * @author Allen.ZHANG
 * @version ActorStarter.java 2017年7月28日 下午2:22:50
 */
public class ActorStarter {

    public static void main(String[] args) {

        /**
         * 启动方式一:Worker以守护线程启动,Master退出的Worker线程不会退出
         * */
        /*ActorSystem system = ActorSystem.create("myapp");
        system.actorOf(Props.create(Master.class), "master");*/

        /**
         * 启动方式二:这种情况下,Master退出后,Worker执行完也会退出
         * */
        Main.main(new String[] { ActorMaster.class.getName() });

    }

}
...
9823
9822
Actor[akka://Main/user/app/$j#1261486908] exited.
Actor[akka://Main/user/app/$a#1740598742] exited.
Actor[akka://Main/user/app/$g#1521980561] exited.
Actor[akka://Main/user/app/$e#239898740] exited.
Actor[akka://Main/user/app/$f#-716187742] exited.
Actor[akka://Main/user/app/$h#305048470] exited.
Actor[akka://Main/user/app/$b#-983302080] exited.
Actor[akka://Main/user/app/$i#-1143925282] exited.
Actor[akka://Main/user/app/$d#-2047955123] exited.
Actor[akka://Main/user/app/$c#-1074785494] exited.
Actor[akka://Main/user/app#981625686] exited.
[INFO] [08/06/2017 19:40:19.489] [Main-akka.actor.default-dispatcher-2] [akka://Main/user/app-terminator] application supervisor has terminated, shutting down
我所有的精力都在思考,怎么往前看,从来不回头。
原文地址:https://www.cnblogs.com/allen100/p/7295737.html