springboot socketio

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.sys</groupId>
    <artifactId>springboot-socketio</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>springboot-socketio</name>
    <description>Demo project for Spring Boot</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.6.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.corundumstudio.socketio/netty-socketio -->
        <dependency>
            <groupId>com.corundumstudio.socketio</groupId>
            <artifactId>netty-socketio</artifactId>
            <version>1.7.12</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>


</project>

2 启动加载 socket

import com.corundumstudio.socketio.SocketIOServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
 
 
@Component
@Order(value=1)
public class MyCommandLineRunner implements CommandLineRunner {
    private final SocketIOServer server;
 
 
    @Autowired
    public MyCommandLineRunner(SocketIOServer server) {
        this.server = server;
    }
 
 
    @Override
    public void run(String... args) throws Exception {
        server.start();
        System.out.println("socket.io启动成功!");
    }
}

3 建立连接

import com.corundumstudio.socketio.AckRequest;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.SocketIOServer;
import com.corundumstudio.socketio.annotation.OnConnect;
import com.corundumstudio.socketio.annotation.OnDisconnect;
import com.corundumstudio.socketio.annotation.OnEvent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
 
import java.util.ArrayList;
import java.util.Date;
import java.util.UUID;
 
 
@Component
public class MessageEventHandler {
    public static SocketIOServer socketIoServer;
    public static ArrayList<UUID> listClient = new ArrayList<>();
    //static final int limitSeconds = 60;
 
    @Autowired
    public MessageEventHandler(SocketIOServer server) {
        MessageEventHandler.socketIoServer = server;
    }
 
    @OnConnect
    public void onConnect(SocketIOClient client) {
        listClient.add(client.getSessionId());
        System.err.println(listClient.size());
        System.out.println("客户端:" + client.getSessionId() + "已连接");
    }
 
    @OnDisconnect
    public void onDisconnect(SocketIOClient client) {
        System.out.println("客户端:" + client.getSessionId() + "断开连接");
        listClient.remove(client.getSessionId());
    }
 
 
    @OnEvent(value = "messageevent") //value是监听事件的名称
    public void onEvent(SocketIOClient client, AckRequest request, Object data) {
        //System.out.println("发来消息:" + data.toString());
        //socketIoServer.getClient(client.getSessionId()).sendEvent("messageevent", "back data"+new Date().getTime());
    }
 
 
    public static void sendBuyLogEvent() {   //这里就是向客户端推消息了
        long dateTime = new Date().getTime();
 
        for (UUID clientId : listClient) {
            if (socketIoServer.getClient(clientId) == null) continue;
            socketIoServer.getClient(clientId).sendEvent("messageevent", dateTime, 1);
        }
    }
    
}

4 springboot 启动类SpringbootSocketioApplication  配置socket bean

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;

import com.corundumstudio.socketio.SocketIOServer;
import com.corundumstudio.socketio.annotation.SpringAnnotationScanner;
import com.sys.demo.kafka.KafkaSender;


@SpringBootApplication
public class SpringbootSocketioApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context =SpringApplication.run(SpringbootSocketioApplication.class, args);
        //SpringApplication.run(SpringBootKakfaApplication.class, args);
        /*KafkaSender sender = context.getBean(KafkaSender.class);

        for (int i = 0; i < 3; i++) {
            //调用消息发送类中的消息发送方法
            sender.send();

            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        
        */
        
    }
    @Bean
    public SocketIOServer socketIOServer() {
        com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();
        
        String os = System.getProperty("os.name");
        if(os.toLowerCase().startsWith("win")){   //在本地window环境测试时用localhost
            System.out.println("this is  windows");
            config.setHostname("localhost");
        } else {
            config.setHostname("123.123.111.222");   //部署到你的远程服务器正式发布环境时用服务器公网ip
        }
        config.setPort(9092);
 
        /*config.setAuthorizationListener(new AuthorizationListener() {//类似过滤器
            @Override
            public boolean isAuthorized(HandshakeData data) {
                //http://localhost:8081?username=test&password=test
                //例如果使用上面的链接进行connect,可以使用如下代码获取用户密码信息,本文不做身份验证
                // String username = data.getSingleUrlParam("username");
                // String password = data.getSingleUrlParam("password");
                return true;
            }
        });*/
 
 
        final SocketIOServer server = new SocketIOServer(config);
        return server;
    }
 
    @Bean
    public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) {
        return new SpringAnnotationScanner(socketServer);
    }
    
}

5 监听kafka 发送消息给客户端

package com.sys.demo.kafka;

import java.util.Optional;
import java.util.UUID;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import com.sys.demo.MessageEventHandler;

@Component
public class KafkaReceiver {
    @Autowired
    private MessageEventHandler messageEventHandler;
    @KafkaListener(topics = {"mycall_out"})
    public void listen(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            System.err.println("to client message:"+message);
            for (UUID clientId : messageEventHandler.listClient) {
                if (messageEventHandler.socketIoServer.getClient(clientId) == null) 
                    continue;
                //发送消息
                messageEventHandler.socketIoServer.getClient(clientId).sendEvent("messageevent", message);
            }
        }

    }
}
原文地址:https://www.cnblogs.com/syscn/p/10282417.html