发布时间: 阅读量

Springboot集成socket监听消息

想想就搞笑么,kafka那边环境刚弄好,今儿负责人说,不用了,局方要求使用socket监听消息!心里一万匹脱缰的野马!不想多说啥,团队性质就是这样,咱们拿人钱财替人消好灾就行了。

关于socket脑海中就用过两次,一次是实在学java的时候,老师讲的聊天室!后来学习点餐系统有讲到过,服务端有新的订单生产会发送消息到客户端做弹窗提示(前端,前后端的一个简单的通信)
接触socket就仅于此了!

这里想说一下chatgpt,这个人工智能不得不佩服,你只需把你的需求告诉他,能给解决90%以上的问题!如下图
WechatIMG621.jpg
步骤就是说出你的需求,等待回答,将示例代码复制,运行测试,就这么简单,当然idea也集成有gpt插件,但是那个会按流量收费,其实效果都是一样的!真是特别感谢给无私奉献的大佬!

贴个代码吧,

@Component
@Slf4j

public class SocketServer {

    private ServerSocket serverSocket;
    private ExecutorService executorService;
    @Value("${socket.port}")
    private int port;
    @Value("${socket.threadPool.size}")
    private int threadPoolSize;

    @Resource
    private ITInvalidAlarmService invalidAlarmService;

    @PostConstruct
    public void init() {
        new Thread(() -> {
            try  {
                serverSocket = new ServerSocket(port);
                log.info("【Socket服务器已启动】");
                // 创建固定线程数量的线程池
                executorService = Executors.newFixedThreadPool(threadPoolSize);
                // 开始监听客户端连接
                startListening();
            } catch (IOException e) { 
                log.error("【socket服务启动异常】异常消息:{}",e.getMessage());
                e.printStackTrace();
            }
        }).start();

    }

    protected void startListening() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                // 接受客户端连接
                Socket clientSocket = serverSocket.accept();
                log.info("【socket客户端已连接】连接IP:{}",clientSocket.getInetAddress().getHostAddress());
                // 提交任务到线程池处理客户端连接
                executorService.submit(() -> handleClientConnection(clientSocket));
            } catch (IOException e) {
                log.error("【socket监听异常】异常消息:{}",e.getMessage());
                e.printStackTrace();
            }
        }
    }

    protected void handleClientConnection(Socket clientSocket) {

        log.info("【socket监听到消息并开始处理】空闲线程:编号:{}.名字:{}开始处理
        请求", Thread.currentThread().getId(), Thread.currentThread().getName());
        try (
                BufferedReader reader = new BufferedReader(new 
                InputStreamReader(clientSocket.getInputStream()));
                PrintWriter writer = new PrintWriter(clientSocket.getOutputStream(), true)
        ) {
            // 读取数据
            StringBuilder sb = new StringBuilder();
            String inputLine;
            while ((inputLine = reader.readLine()) != null) {
                sb.append(inputLine);
            }
            String jsonStr = sb.toString();
            System.out.println("接收到客户端消息:" + jsonStr);
         
        } catch (IOException e) {
            log.error("【socket读取消息异常】异常信息:{}",e.getMessage());
            e.printStackTrace();

        } finally {
            try {
                clientSocket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }



    @PreDestroy
    public void shutdown() {
        try {
            // 停止监听并关闭Socket服务器和线程池
            serverSocket.close();
            executorService.shutdown();
            log.info("【Socket服务器已关闭");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

发送socket消息demo,发送的时候需要指定服务端的端口和ip

public class SocketClient {

    public void sendMessage(String host, int port, String message) {
        try {
            Socket socket = new Socket(host, port);
            OutputStream outputStream = socket.getOutputStream();

            // 发送消息
            outputStream.write(message.getBytes());
            outputStream.flush();

            // 关闭连接
            outputStream.close();
            socket.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
  
}

开发过程中遇到个奇葩问题,就是启动springboot项目的时候初始化完socket以后控制台日志就停了,问了gpt才知道是主线程阻塞造成的!举了个列子就是你和相亲对象下班去约会呢,路上刚好领导打电话,代码出bug了,你要是回去改,约会就黄了,这时候你想着小王还在加班呢!让他去看看,不要影响你约会!

所以把初始化socket的事儿交给其他线程去处理,不要影响主线程

 new Thread(() -> {
            try  {
                serverSocket = new ServerSocket(port);
                log.info("【Socket服务器已启动】");
                // 创建固定线程数量的线程池
                executorService = Executors.newFixedThreadPool(threadPoolSize);
                // 开始监听客户端连接
                startListening();
            } catch (IOException e) {
                System.out.println("Failed to start the socket server");
               
                log.error("【socket服务启动异常】异常消息:{}",e.getMessage());
                e.printStackTrace();
            }
        }).start();

简单的使用就这些了,告警数据每天200w条,这个监听接口不知道能不能撑的住!