1.在每台机器的jvm中开放一个socket服务
socket服务的创建与socket侦听分别是两个线程
单线程创建socket服务,线程池启动监听线程
Thread thread = new Thread() {
@Override
public void run() {
int pocessNum = Runtime.getRuntime().availableProcessors();
int nThreads = pocessNum;
bizExecutor = new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(100), new DiscardPolicy());
long repeat = 0;
while(true) {
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(port);
} catch(Exception e) {
RecordLog.info("holding CommandCenter failed,repeat:." + repeat, e);
}
if (serverSocket != null) {
RecordLog.info("[CommandCenter] begin listening at port " + serverSocket.getLocalPort());
executorService.submit(new ServerThread(serverSocket));
break;
}
}
executorService.shutdown();
}
};
thread.start();
socket监听线程
Socket socket = null;
try {
socket = this.serverSocket.accept();
EventTask eventTask = new EventTask(socket);
bizExecutor.submit(eventTask);
} catch(Exception e) {
RecordLog.info("server error!", e);
try {
TimeUnit.SECONDS.sleep(1); // 防止死循环刷日志
} catch (InterruptedException e1) {
}
}
当有一个请求进入时,安排线程响应。
2.注册采集信息的各种command
采用全局的标识,实现command和key的映射
CommandCenter#registerCommand
Map<String, ICommandProcessor> controlMap = new HashMap<String, ICommandProcessor>
3.socket服务支持http协议的get
解析socket输入流,解析成request
Request request = new Request();
if(StringUtils.isBlank(line)) {
return request;
}
int start = line.indexOf('/');
int ask = line.indexOf('?') == -1 ? line.lastIndexOf(' ') : line.indexOf('?');
int space = line.lastIndexOf(' ');
String target = line.substring(start != -1 ? start + 1 : 0, ask != -1 ? ask : line.length());
request.setTarget(target);
if(ask == -1 || ask == space) {
return request;
}
String parameterStr = line.substring(ask != -1 ? ask + 1 : 0, space != -1 ? space : line.length());
for(String parameter : parameterStr.split("&")) {
if(StringUtils.isBlank(parameter)) {
continue;
}
String[] key_value = parameter.split("=");
if(key_value.length != 2) {
continue;
}
String value = StringUtils.trim(key_value[1]);
try {
value = URLDecoder.decode(value, "GBK");
} catch (UnsupportedEncodingException e) {
}
request.addParameter(StringUtils.trim(key_value[0]), value);
}
return request;
http响应
out = new PrintWriter(new OutputStreamWriter(
outputStream, Charset.forName("GBK")));
out.print("HTTP/1.1 200 OK
");
out.flush();