获取集群信息

1.在每台机器的jvm中开放一个socket服务

socket服务的创建与socket侦听分别是两个线程

单线程创建socket服务,线程池启动监听线程

Thread thread = new Thread() {
		
		@Override
		public void run() {
			int pocessNum = Runtime.getRuntime().availableProcessors();
			int nThreads = pocessNum;
			bizExecutor = new ThreadPoolExecutor(nThreads, nThreads,
                    0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<Runnable>(100), new DiscardPolicy());
			
			 
			long repeat = 0;
			while(true) {
				ServerSocket serverSocket = null;
				try {
					serverSocket = new ServerSocket(port);
				} catch(Exception e) {
					RecordLog.info("holding CommandCenter failed,repeat:." + repeat, e);
					
				}
					
				if (serverSocket != null) {
					RecordLog.info("[CommandCenter] begin listening at port " + serverSocket.getLocalPort());
					executorService.submit(new ServerThread(serverSocket));
					break;
				}
			} 
			executorService.shutdown();
		}
		
	};
	
	thread.start();

socket监听线程

			Socket socket = null;
			try {
				socket = this.serverSocket.accept();
				EventTask eventTask = new EventTask(socket);
				bizExecutor.submit(eventTask);
			} catch(Exception e) {
				RecordLog.info("server error!", e);
				try {
					TimeUnit.SECONDS.sleep(1); // 防止死循环刷日志
				} catch (InterruptedException e1) {
				}
			}

当有一个请求进入时,安排线程响应。

2.注册采集信息的各种command

采用全局的标识,实现command和key的映射
CommandCenter#registerCommand

Map<String, ICommandProcessor> controlMap = new HashMap<String, ICommandProcessor>

3.socket服务支持http协议的get

解析socket输入流,解析成request

	Request request = new Request();
	if(StringUtils.isBlank(line)) {
		return request;
	}
	int start = line.indexOf('/');
	int ask = line.indexOf('?') == -1 ? line.lastIndexOf(' ') :  line.indexOf('?');
	int space = line.lastIndexOf(' ');
	String target = line.substring(start != -1 ? start + 1 : 0, ask != -1 ? ask : line.length());
	request.setTarget(target);
	if(ask == -1 || ask == space) {
		return request;
	}
	String parameterStr = line.substring(ask != -1 ? ask + 1 : 0, space != -1 ? space : line.length());
	for(String parameter : parameterStr.split("&")) {
		if(StringUtils.isBlank(parameter)) {
			continue;
		}
		
		String[] key_value = parameter.split("=");
		if(key_value.length != 2) {
			continue;
		}
		
		String value = StringUtils.trim(key_value[1]);
		try {
			value = URLDecoder.decode(value, "GBK");
		} catch (UnsupportedEncodingException e) {
		}
		
		request.addParameter(StringUtils.trim(key_value[0]), value);
	}
	return request;

http响应

        out = new PrintWriter(new OutputStreamWriter(
				outputStream, Charset.forName("GBK")));
        out.print("HTTP/1.1 200 OK

");
		out.flush();
原文地址:https://www.cnblogs.com/zhulongchao/p/5573382.html