基于TCP/IP协议的socket通讯server

思路:

socket必须要随项目启动时启动,所以需用Spring自带的监听器,需要保持长连接,要用死循环,所以必须另外起线程,不能阻碍主线程运行

1.在项目的web.xml中配置listener

<listener>
    <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
  </listener>
<listener>
    <listener-class>com.ra.car.utils.MyListener</listener-class>
  </listener>

2.因为是一个独立的线程,所以需要调用的注入类不能通过@resource或@aotowire注入,需要应用上下文获取

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
	xmlns:mvc="http://www.springframework.org/schema/mvc"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:aop="http://www.springframework.org/schema/aop" 
	xmlns:tx="http://www.springframework.org/schema/tx"
	xmlns:task="http://www.springframework.org/schema/task"
	xsi:schemaLocation="http://www.springframework.org/schema/beans 
		http://www.springframework.org/schema/beans/spring-beans-4.0.xsd 
		http://www.springframework.org/schema/mvc 
		http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd 
		http://www.springframework.org/schema/context 
		http://www.springframework.org/schema/context/spring-context-4.0.xsd 
		http://www.springframework.org/schema/aop 
		http://www.springframework.org/schema/aop/spring-aop-4.0.xsd 
		http://www.springframework.org/schema/tx 
		http://www.springframework.org/schema/tx/spring-tx-4.0.xsd 
		http://www.springframework.org/schema/task 
		http://www.springframework.org/schema/task/spring-task-4.0.xsd">
		
	<!-- 扫描包加载Service实现类 -->
	<context:component-scan base-package="com.ra.*.service.impl"></context:component-scan>
	 <bean id="DataCallBackService" class="com.ra.truck.service.impl.DataCallBackServiceImpl"/>
	 <bean id="RdTrackInfoService" class="com.ra.truck.service.impl.RdTrackInfoServiceImpl"/>
	 <bean id="OutInterfaceService" class="com.ra.truck.service.impl.OutInterfaceImpl"/>
	 <bean id="RdPhotoInfoService" class="com.ra.truck.service.impl.RdPhotoInfoServiceImpl"/>
	<bean id="MessagePackegerService" class="com.ra.truck.service.impl.MessagePackegerServiceImpl"/>
	 <!--<bean id="redis" class="com.ra.redis.service.impl.JedisClientCluster"/>-->
</beans>

  

3.创建listener监听器类

package com.ra.car.utils;

import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.ra.car.rabbitMQ.PBWRabbitMQCustomer;
import com.ra.car.rabbitMQ.RabbitMQCustomer;

/**
 * listener监听器类
 * 
 */
public class MyListener implements ServletContextListener {

    protected static final Logger logge = LoggerFactory
            .getLogger(MyListener.class);

    @Override
    public void contextInitialized(ServletContextEvent arg0) {
        //必须单独启线程去跑listener
        Mythread myThread = new Mythread();
        //创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程
//        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
//        cachedThreadPool.execute(myThread);
        Thread thread = new Thread(myThread);  
        thread.start();
        //启动MQTT
//        MQTTSubMsg client = new MQTTSubMsg();
//        client.start();
        RabbitMQCustomer customer=new RabbitMQCustomer();
        Thread threadCustomer = new Thread(customer);
        threadCustomer.start();
        
        PBWRabbitMQCustomer pbwcustomer=new PBWRabbitMQCustomer();
        Thread pbwT = new Thread(pbwcustomer);
        pbwT.start();
    }

    @Override
    public void contextDestroyed(ServletContextEvent arg0) {
        logge.info("进入ListenerUtil的contextDestroyed方法.........");
    }

}
package com.ra.car.utils;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 多线程类
 *
 */
public class Mythread implements Runnable{

    protected static final Logger logge = LoggerFactory
            .getLogger(Mythread.class);
    
    @Override
    public void run() {
        logge.info("进入ListenerUtil的contextInitialized方法.........");
        try {
            ServerSocket serverSocket = new ServerSocket(8888);
            logge.info("socket通信服务端已启动,等待客户端连接.......");
            logge.info("我是111111111111111");
            while (true) {
                Socket socket = serverSocket.accept();// 侦听并接受到此套接字的连接,返回一个Socket对象
                JavaTCPServer socketThread = new JavaTCPServer(socket);
                socketThread.run();
                try {
                    //休眠10毫秒,压力测试50000次连接无压力
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        } catch (IOException e) {
            logge.error("通信服务器启动失败!", e);
        }
    }
    public static String stampToDate(String s){
        Long timestamp = Long.parseLong(s)*1000;  
          String date = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new java.util.Date(timestamp));

        return date;
    }
    
}
package com.ra.car.utils;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JavaTCPServer {
    protected static final Logger logger=LoggerFactory.getLogger(JavaTCPServer.class);

    private Socket socket;
    
    public JavaTCPServer(Socket socket) {
        this.socket = socket;
    }
    
    public void run() {
        MyThread2 myThread2=null;
        try {
            myThread2 = new MyThread2(socket);
        } catch (IOException e) {
            e.printStackTrace();
        }
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        cachedThreadPool.execute(myThread2);
    }

    
     
}
package com.ra.car.utils;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.ra.truck.model.RdDeviceCallBackDataDomain;
import com.ra.truck.service.DataCallBackService;
import com.ra.truck.service.RdPhotoInfoService;
import com.ra.truck.service.RdTrackInfoService;
import com.ra.truck.service.outInterface.OutInterfaceService;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.context.ContextLoader;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.Socket;
import java.text.SimpleDateFormat;
import java.util.*;

public class MyThread2 implements Runnable {

    protected static final Logger logger = LoggerFactory
            .getLogger(MyThread2.class);

    private Socket socket;
    private InputStream inputStream;
    private OutputStream outputStream;
    private PrintWriter printWriter;
    
    private int totalCount;  //总数量

    private int adasCount; // 传输的ADAS信号数量
    private int gpsCount; // 传输的GPS信号数量
    private DataCallBackService dataCallBackService;//数据回传private SimpleDateFormat df;

    public MyThread2(Socket socket) throws IOException {
        this.socket = socket;
        inputStream = socket.getInputStream();
        outputStream = socket.getOutputStream();
        printWriter = new PrintWriter(outputStream);
    
        dataCallBackService=(DataCallBackService)
                 ContextLoader.getCurrentWebApplicationContext().getBean("DataCallBackService");
        df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    }

    @Override
    public void run() {
        // 根据输入输出流和客户端连接

        // 得到一个输入流,接收客户端传递的信息
        // InputStreamReader inputStreamReader = new InputStreamReader(
        // inputStream);// 提高效率,将自己字节流转为字符流
        // bufferedReader = new BufferedReader(inputStreamReader);// 加入缓冲区
        Date timestart = new Date();
        Date timeend = null;
        long minuine = 0;
        int count = 0;
        while (true) {
            try {
                if (inputStream.available() > 0 == false) {
                    timeend = new Date();
                    minuine = timeend.getTime() - timestart.getTime();
                    if (minuine != 0 && (minuine / 1000) > 60) {
                        break;
                    }
                    continue;
                } else {
                    timestart = new Date();
                    try {
                        Thread.sleep(200);
                    } catch (InterruptedException e) {
                        logger.error("*****线程休眠出现异常*****", e);
                    }
                    count = inputStream.available();
                    byte[] b = new byte[count];
                    int readCount = 0; // 已经成功读取的字节的个数
                    while (readCount < count) {
                        readCount += inputStream.read(b, readCount, count
                                - readCount);
                    }
                    logger.info("**********当前服务器正在被连接**********");
                    logger.info("正在连接的客户端IP为:"
                            + socket.getInetAddress().getHostAddress());
                    
                    logger.info("当前时间为:" + df.format(new Date()));
                    String data = new String(b, "utf-8");
                    logger.info("传输过来的info:" + data);
                    String id = jsonStringToObject(data);
                    Map<Object, Object> map = new HashMap<Object, Object>();
                    //心跳发送不带id的json数据
                    if (StringUtils.isNotBlank(id)) {
                        map.put("id", id);
                    }
                    map.put("resultCode", "1");
                    map.put("result", "success");
                    printWriter.print(JSON.toJSONString(map) + "
");
                    printWriter.flush();
                }
            } catch (Exception e) {
                logger.error("数据传输出现异常", e);
                try {
                    outputStream = socket.getOutputStream();
                } catch (IOException e1) {
                    logger.error("获取outputStream出现异常");
                }
                // 获取一个输出流,向服务端发送信息
                // printWriter = new PrintWriter(outputStream);// 将输出流包装成打印流
                Map<Object, Object> map = new HashMap<Object, Object>();
                map.put("resultCode", "0");
                map.put("result", "fail");
                printWriter.print(JSON.toJSONString(map) + "
");
                printWriter.flush();
            }
        }
        try {
            printWriter.close();
            outputStream.close();
            inputStream.close();
            logger.info("30s没有发送数据,服务端主动关闭连接");
            logger.info("被断开的客户端IP为:"
                    + socket.getInetAddress().getHostAddress());
            SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            logger.info("被断开的时间为:" + df.format(new Date()));
            socket.close();
        } catch (IOException e) {
            logger.error("关闭socket出现异常", e);

        }

        /*
         * while ((temp = bufferedReader.readLine()) != null) { info += temp;
         * logger.info(bufferedReader.readLine());
         * logger.info("已接收到客户端连接!!!!!!"); logger.info("服务端接收到客户端信息:" +
         * info + ",当前客户端ip为:" + socket.getInetAddress().getHostAddress());
         * logger.info("服务端接收到客户端信息:" + info + ",当前客户端ip为:" +
         * socket.getInetAddress().getHostAddress()); }
         */

        /*
         * logger.info("*****测试Redis*****"); JedisClient
         * jedisClient=(JedisClient)
         * ContextLoader.getCurrentWebApplicationContext().getBean("redis");
         * jedisClient.set("testLanHao", "123456789"); String
         * str=jedisClient.get("testLanHao");
         * logger.info("从Redis中取得数据为:"+str);
         * logger.info("*****测试Redis*****");
         */

        // ApplicationContext applicationContext=new
        // ClassPathXmlApplicationContext("classpath*:applicationContext-*.xml");
        // RiskManageService
        // riskManageService=applicationContext.getBean(RiskManageService.class);
        // socket单独线程,需要重新加载上下文,扫描的类在applicationContext-service.xml配置
        /*
         * RiskManageService riskManageService=(RiskManageService)
         * ContextLoader.getCurrentWebApplicationContext().getBean("risk");
         * RdRiskEventInfo rdRiskEventInfo=new RdRiskEventInfo();
         * rdRiskEventInfo.setId("10"); try { List<RdPhotoInfo>
         * list=riskManageService.findPhotoInfoByEventId(rdRiskEventInfo);
         * logger.info(list); } catch (ServiceException e) {
         * e.printStackTrace(); }
         */
        // outputStream = socket.getOutputStream();// 获取一个输出流,向服务端发送信息
        // printWriter = new PrintWriter(outputStream);// 将输出流包装成打印流

    }

    private String jsonStringToObject(String data) {
        //数据解析方法return xx;
    }
    public static Date stampToDate(String s){
        
        Long timestamp = Long.parseLong(s)*1000;  
          Date date = new Date(timestamp);

        return date;
    }

原文地址:https://www.cnblogs.com/lazyInsects/p/8000125.html