第一个项目总结

1项目架构

基本框架5个服务器:

2游戏服线程

main-server and logic server配置

main-server

ExecutorService bossExecutor   = Executors.newCachedThreadPool();
        ExecutorService workerExecutor = Executors.newCachedThreadPool();

        ChannelFactory channelFactory     = new NioServerSocketChannelFactory(bossExecutor, workerExecutor);
        ServerBootstrap bootstrap         = new ServerBootstrap(channelFactory);

        ExecutionHandler executionHandler = new ExecutionHandler(
                new OrderedMemoryAwareThreadPoolExecutor(32, 0, 0, 15, TimeUnit.MINUTES, new ThreadFactory() {
                    private AtomicInteger count = new AtomicInteger(0);
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread t = new Thread(r, "server-biz-pool-" + count.getAndAdd(1));
                        return t;
                    }
                }));//定义名字

        bootstrap.setPipelineFactory(new GameServerPipelineFactory(listenercon, executionHandler));//GameServerPipelineFactory 加载 规定的Handler.并且会将ExecutionHandler 加在lengthFieldDecoder之后

        // 禁用Nagle 算法,适用于交互性强的客户端
        bootstrap.setOption("child.tcpNoDelay", true);
        // 设置keepalive
        bootstrap.setOption("child.keepAlive", listenerConfiguration.isKeepalive());
        bootstrap.bind(new InetSocketAddress(listenerConfiguration.getIp(), listenerConfiguration.getPort()));

logic-server 两种 type connection[对于mainserve来说],每种 挂了8个.共有16个连接

push[负责消息推送]

logic[负责业务处理]

pipelineFactory配置了相应的filter and Handler:

项目的Handler有这几个:

lengthFieldDecoder
ExecutionHandler[执行 ChannelHandler 链的整个过程是同步的,如果业务逻辑的耗时较长,会将导致Work线程长时间被占用得不到释放,从而影响了整个服务器的并发处理能力。

所 以,为了提高并发数,一般通过ExecutionHandler线程池来异步处理ChannelHandler链(worker线程在经过 ExecutionHandler后就结束了,它会被ChannelFactoryworker线程池所回收)。]

jsonDecoder
requestHandler[客户端传过来的action是string,回filter,like is login,通过规定的bean name和method 利用invoke执行,request num使用atomInteger record.]

lengthFieldPrepender
jsonEncoder

对于ExecutionHandler需要的线程池模型,Netty提供了两种可选:

1) MemoryAwareThreadPoolExecutor 通过对线程池内存的使用控制,可控制Executor中待处理任务的上限(超过上限时,后续进来的任务将被阻塞),并可控制单个Channel待处理任务的上限,防止内存溢出错误;

2) OrderedMemoryAwareThreadPoolExecutor 是 MemoryAwareThreadPoolExecutor 的子类。除了MemoryAwareThreadPoolExecutor 的功能之外,它还可以保证同一Channel中处理的事件流的顺序性,这主要是控制事件在异步处理模式下可能出现的错误的事件顺序,但它并不保证同一 Channel中的事件都在一个线程中执行(通常也没必要)。

例如:

Thread X: --- Channel A (Event A1) --.   .-- Channel B (Event B2) --- Channel B (Event B3) --->

                                       /

                                       X

                                      / 

Thread Y: --- Channel B (Event B1) --'   '-- Channel A (Event A2) --- Channel A (Event A3) --->

上图表达的意思有几个:

1)对整个线程池而言,处理同一个Channel的事件,必须是按照顺序来处理的。例如,必须先处理完Channel A (Event A1) ,再处理Channel A (Event A2)Channel A (Event A3)

2)同一个Channel的多个事件,会分布到线程池的多个线程中去处理。

3)不同Channel的事件可以同时处理(分担到多个线程),互不影响。  

OrderedMemoryAwareThreadPoolExecutor 的这种事件处理有序性是有意义的,因为通常情况下,请求发送端希望服务器能够按照顺序处理自己的请求,特别是需要多次握手的应用层协议。例如:XMPP协议。

(ExecutionHandler参考:http://www.cnblogs.com/TeaMax/archive/2013/04/03/2997874.html)

//项目中使用的
ExecutionHandler exeHandler = new ExecutionHandler(new MemoryAwareThreadPoolExecutor(8, 0, 0));

3项目消息队列

推送队列

原理:将需要的推送信息,按照优先级存进redis相应的队列中,然后按照优先级取出来.

  //异步消息处理
  //主要线程
  private void commandProcess() {

    try {

      while (true) {
        final Command command = commandDao.get(Priority.HIGH, Priority.NORMAL, Priority.LOWER, Priority.MESSAGE);//阻塞

        if (command != null) {

          final CommandHandler commandHandler = CommandHandlerFactory.getInstance().getHandler(command.getCommand());

          Runnable task = new Runnable() {

            @Override
            public void run() {
              commandHandler.handle(command);//将处理好的消息通过上面的push connection传输
            }
          };

          GameTaskProcessor.ins().general(task);//线程池exec

        } else {
          break;
        }
      }

    } catch (Exception e) {
      LOG.error(e.getMessage(), e);

      try {
        Thread.sleep(1000 * 5);
      } catch (InterruptedException ex) {
        LOG.error(ex.getMessage(), ex);
      }

    }

  }


//缓存Handler.也可以使用spring getbeanbyname代替.
public class CommandHandlerFactory {

    private static final Logger LOG = LoggerFactory.getLogger(CommandHandlerFactory.class);

    private Map<Integer, CommandHandler> commandHandlerMap = new HashMap<Integer, CommandHandler>();

    private static CommandHandlerFactory instance = null;

    private CommandHandlerFactory() {

    }

    public static CommandHandlerFactory getInstance() {

        if (instance == null) {
            instance = new CommandHandlerFactory();
        }
        return instance;
    }

    public void register(int command, CommandHandler handler) {//spring 实例化相应的Handler时,init method会调用
        LOG.info("command handler register.command[" + command + "]");
        commandHandlerMap.put(command, handler);
    }

    public CommandHandler getHandler(int command) {

        int key = command / 10000;

        if (commandHandlerMap.containsKey(key)) {
            return commandHandlerMap.get(key);
        } else {
            LOG.warn("命令处理器不存在.command[" + command + "], key[" + key + "]");
        }

        return null;
    }
}

//put/get command
public class CommandDaoRedisImpl implements CommandDao {

    private static final int TIME_OUT = 2;

    private String getKey(int priority) {
        return RedisKey.getCommandKey(priority);
    }

    @Override
    public boolean add(Command command) {
        String key = getKey(command.getPriority());

        JedisUtils.pushMsg(key, Json.toJson(command));

        return true;
    }

    @Override
    public Command get(Integer... proritys) {

        String[] keys = new String[proritys.length];
        for (int i = 0; i < proritys.length; i++) {
            String key = getKey(proritys[i]);
            keys[i] = key;
        }

        String json = JedisUtils.blockPopMsg(TIME_OUT, keys);//阻塞方法
        if (json != null) {
            return Json.toObject(json, Command.class);
        }
        return null;
    }
}

最后利用push connection 随机一个connection,交给netty处理.

logic队列

利用netty channel link+java reflection.

给笨笨的自己提个醒>_<~
原文地址:https://www.cnblogs.com/ephuizi/p/4348461.html