zhibing.pu
2024-08-21 f10ef4933c6decda69308c1cf6e4d0449856ba1f
新增加websocket
4个文件已修改
13个文件已添加
836 ■■■■■ 已修改文件
ruoyi-api/ruoyi-api-other/src/main/java/com/ruoyi/other/api/dto/WebSocketMsg.java 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-api/ruoyi-api-other/src/main/java/com/ruoyi/other/api/factory/WebSocketFallbackFactory.java 33 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-api/ruoyi-api-other/src/main/java/com/ruoyi/other/api/feignClient/WebSocketClient.java 24 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-api/ruoyi-api-other/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-other/pom.xml 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/RuoYiOtherApplication.java 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/controller/TGoodsController.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/controller/WebSocketController.java 31 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/ChildChannelHandler.java 36 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/Global.java 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/Method.java 33 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/NettyChannelMap.java 123 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/NettyMsg.java 94 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/NettyServer.java 77 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/NettyWebSocketController.java 154 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/WebSocketHandler.java 160 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/createSSLContext.java 32 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-api/ruoyi-api-other/src/main/java/com/ruoyi/other/api/dto/WebSocketMsg.java
New file
@@ -0,0 +1,19 @@
package com.ruoyi.other.api.dto;
import lombok.Data;
/**
 * @author zhibing.pu
 * @Date 2024/8/21 17:43
 */
@Data
public class WebSocketMsg {
    /**
     * 用户id
     */
    private Long userId;
    /**
     * 消息内容
     */
    private String msg;
}
ruoyi-api/ruoyi-api-other/src/main/java/com/ruoyi/other/api/factory/WebSocketFallbackFactory.java
New file
@@ -0,0 +1,33 @@
package com.ruoyi.other.api.factory;
import com.ruoyi.common.core.domain.R;
import com.ruoyi.other.api.domain.TVip;
import com.ruoyi.other.api.dto.WebSocketMsg;
import com.ruoyi.other.api.feignClient.VipClient;
import com.ruoyi.other.api.feignClient.WebSocketClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.openfeign.FallbackFactory;
import org.springframework.stereotype.Component;
/**
 * 会员服务降级处理
 *
 * @author ruoyi
 */
@Component
public class WebSocketFallbackFactory implements FallbackFactory<WebSocketClient> {
    private static final Logger log = LoggerFactory.getLogger(WebSocketFallbackFactory.class);
    @Override
    public WebSocketClient create(Throwable throwable) {
        log.error("WebSocket调用失败:{}", throwable.getMessage());
        return new WebSocketClient() {
            @Override
            public R send(WebSocketMsg webSocketMsg) {
                return R.fail("发送WebSocket消息失败:" + throwable.getMessage());
            }
        };
    }
}
ruoyi-api/ruoyi-api-other/src/main/java/com/ruoyi/other/api/feignClient/WebSocketClient.java
New file
@@ -0,0 +1,24 @@
package com.ruoyi.other.api.feignClient;
import com.ruoyi.common.core.constant.ServiceNameConstants;
import com.ruoyi.common.core.domain.R;
import com.ruoyi.other.api.dto.WebSocketMsg;
import com.ruoyi.other.api.factory.WebSocketFallbackFactory;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
/**
 * @author zhibing.pu
 * @Date 2024/8/21 17:41
 */
@FeignClient(contextId = "WebSocketClient", value = ServiceNameConstants.ORDER_SERVICE, fallbackFactory = WebSocketFallbackFactory.class)
public interface WebSocketClient {
    /**
     * 发送WebSocket消息
     * @return
     */
    @PostMapping("/webSocket/send")
    R send(WebSocketMsg webSocketMsg);
}
ruoyi-api/ruoyi-api-other/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
@@ -6,4 +6,5 @@
com.ruoyi.other.api.factory.IntegralRuleFallbackFactory
com.ruoyi.other.api.factory.GoodsFallbackFactory
com.ruoyi.other.api.factory.CouponFallbackFactory
com.ruoyi.other.api.factory.InvoiceTypeFallbackFactory
com.ruoyi.other.api.factory.InvoiceTypeFallbackFactory
com.ruoyi.other.api.factory.WebSocketFallbackFactory
ruoyi-service/ruoyi-other/pom.xml
@@ -117,6 +117,10 @@
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
        </dependency>
    </dependencies>
    <build>
ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/RuoYiOtherApplication.java
@@ -3,6 +3,7 @@
import com.ruoyi.common.security.annotation.EnableCustomConfig;
import com.ruoyi.common.security.annotation.EnableRyFeignClients;
import com.ruoyi.common.swagger.annotation.EnableCustomSwagger2;
import com.ruoyi.other.webSocket.NettyServer;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@@ -23,6 +24,7 @@
public class RuoYiOtherApplication {
    public static void main(String[] args) {
        SpringApplication.run(RuoYiOtherApplication.class, args);
        new NettyServer().bind();
        System.out.println("(♥◠‿◠)ノ゙  基础模块启动成功   ლ(´ڡ`ლ)゙  \n" +
                " .-------.       ____     __        \n" +
                " |  _ _   \\      \\   \\   /  /    \n" +
ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/controller/TGoodsController.java
@@ -12,7 +12,7 @@
import com.ruoyi.other.api.domain.TActivity;
import com.ruoyi.other.api.domain.TGoods;
import com.ruoyi.other.api.dto.AdvertisingDTO;
import com.ruoyi.other.api.dto.ExchangeDto;
import com.ruoyi.order.api.vo.ExchangeDto;
import com.ruoyi.other.api.dto.GoodsDTO;
import com.ruoyi.other.service.TActivityService;
import com.ruoyi.other.service.TAdvertisingService;
ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/controller/WebSocketController.java
New file
@@ -0,0 +1,31 @@
package com.ruoyi.other.controller;
import com.ruoyi.common.core.domain.R;
import com.ruoyi.other.webSocket.NettyChannelMap;
import com.ruoyi.other.webSocket.NettyWebSocketController;
import io.netty.channel.ChannelHandlerContext;
import org.springframework.web.bind.annotation.*;
/**
 * @author zhibing.pu
 * @Date 2024/8/21 17:31
 */
@RestController
@RequestMapping("/webSocket")
public class WebSocketController {
    /**
     * 发送websocket消息
     * @param userId
     * @param msg
     * @return
     */
    @ResponseBody
    @PostMapping("/send")
    public R send(@RequestParam("userId") Long userId, @RequestParam("msg") String msg){
        ChannelHandlerContext channel = NettyChannelMap.getData("Applets" + userId);
        return NettyWebSocketController.sendMsgToClient(channel, msg);
    }
}
ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/ChildChannelHandler.java
New file
@@ -0,0 +1,36 @@
package com.ruoyi.other.webSocket;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
public class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        String path = "C:\\cert\\tomcat\\scs1723443475715_fxcx.yunyu666.com_server.jks";
//        String path = "/usr/local/server/apache-tomcat-80/conf/cert/6064978_okyueche.com.pfx";
        SSLContext sslContext = createSSLContext.createSSLContext("JKS"
                , path, "Pe7>4nS#st$dAnpp");
        //SSLEngine 此类允许使用ssl安全套接层协议进行安全通信
        SSLEngine engine = sslContext.createSSLEngine();
        engine.setUseClientMode(false);
        socketChannel.pipeline().addLast("ssl", new SslHandler(engine));
        // 设置30秒没有读到数据,则触发一个READER_IDLE事件。
        // pipeline.addLast(new IdleStateHandler(30, 0, 0));
        // HttpServerCodec:将请求和应答消息解码为HTTP消息
        socketChannel.pipeline().addLast("http-codec", new HttpServerCodec());
        // HttpObjectAggregator:将HTTP消息的多个部分合成一条完整的HTTP消息
        socketChannel.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
        // ChunkedWriteHandler:向客户端发送HTML5文件
        socketChannel.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
        // 在管道中添加我们自己的接收数据实现方法
        socketChannel.pipeline().addLast("handler", new WebSocketHandler());
    }
}
ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/Global.java
New file
@@ -0,0 +1,9 @@
package com.ruoyi.other.webSocket;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
public class Global {
    public static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}
ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/Method.java
New file
@@ -0,0 +1,33 @@
package com.ruoyi.other.webSocket;
/**
 * 即时通讯【通讯类型类】
 *
 * @author TaoNingBo
 * @version 1.0
 * @createDate 2016年6月14日
 */
public class Method {
    /**
     * 心跳【推送】
     */
    public static final String ok = "OK";
    /**
     * 心跳【接收】
     */
    public final static String ping = "PING";
    /**
     * 心跳【响应】
     */
    public final static String pong = "PONG";
    /**
     * 司机上传位置
     */
    public static final String location = "LOCATION";
}
ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/NettyChannelMap.java
New file
@@ -0,0 +1,123 @@
package com.ruoyi.other.webSocket;
import io.netty.channel.ChannelHandlerContext;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
public class NettyChannelMap {
    public static Map<String, ChannelHandlerContext> ctxMap = new HashMap<>();//单点登录存储的通道
    protected static Map<String, ChannelHandlerContext> map = new ConcurrentHashMap<>();
    private NettyChannelMap() {
        // 放置外部实例化
    }
    /**
     * Get data from source.
     *
     * @param key
     * @return
     */
    public static ChannelHandlerContext getData(String key) {
        if (map == null) {
            map = new HashMap<String, ChannelHandlerContext>();
        }
        return map.get(key);
    }
    /**
     * Save data from source.
     *
     * @param key
     * @param val
     */
    public static synchronized void saveData(String key, ChannelHandlerContext val) {
        map.put(key, val);
    }
    /**
     * Determine whether the cache key contains the key.
     *
     * @param key
     * @return true|false
     * @author TaoNingBo
     */
    public static synchronized boolean containsKey(String key) {
        return map.containsKey(key);
    }
    /**
     * Determine whether the cache value contains the value.
     *
     * @param val
     * @return
     */
    public static synchronized boolean containsVal(ChannelHandlerContext val) {
        return map.containsValue(val);
    }
    /**
     * Remove the data resources.
     *
     * @param value
     */
    @SuppressWarnings("rawtypes")
    public static synchronized void remove(ChannelHandlerContext value) {
        Set<String> strings = map.keySet();
        for (String key : strings) {
            ChannelHandlerContext channelHandlerContext = map.get(key);
            String s = channelHandlerContext.channel().remoteAddress().toString();
            String s1 = value.channel().remoteAddress().toString();
            if (s.equals(s1)) {
                channelHandlerContext.close();//关闭通道
                map.remove(key);
            }
        }
    }
    public static synchronized void remove_(ChannelHandlerContext value) {
        Set<String> strings = ctxMap.keySet();
        for (String key : strings) {
            ChannelHandlerContext channelHandlerContext = ctxMap.get(key);
            String s = channelHandlerContext.channel().remoteAddress().toString();
            String s1 = value.channel().remoteAddress().toString();
            if (s.equals(s1)) {
                channelHandlerContext.close();//关闭通道
                ctxMap.remove(key);
            }
        }
    }
    /**
     * Remove the data resources.
     *
     * @param key
     * @author TaoNingBo
     */
    public static synchronized void remove(String key) {
        map.remove(key);
    }
    /**
     * Update the data resources.
     *
     * @param key
     * @param value
     */
    public static synchronized void update(String key, ChannelHandlerContext value) {
        map.put(key, value);
    }
    public static synchronized void update_(String key, ChannelHandlerContext value) {
        ctxMap.put(key, value);
    }
}
ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/NettyMsg.java
New file
@@ -0,0 +1,94 @@
package com.ruoyi.other.webSocket;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class NettyMsg {
    /**
     * 返回一个正确数据
     *
     * @param method
     * @param data
     * @return
     * @author TaoNingBo
     */
    public static String setMsg(String method, Map<String, Object> data) {
        StringBuffer json = new StringBuffer();
        json.append(getHeader(200, "SUCCESS", method));
        json.append(JSON.toJSONString(data));
        json.append("}");
        //return JSON.toJSONString(json);
        return json.toString();
    }
    /**
     * 返回一个正确数据
     *
     * @param method
     * @param data
     * @return
     */
    public static String setMsg(String method, List<Map<String, Object>> data) {
        StringBuffer json = new StringBuffer();
        json.append(getHeader(200, "SUCCESS", method));
        List<JSONObject> jsonList = new ArrayList<JSONObject>();
        for (Map<String, Object> map : data) {
            JSONObject dataJson = new JSONObject(map);
            jsonList.add(dataJson);
        }
        json.append(jsonList);
        json.append("}");
//        return JSON.toJSONString(json);
        return json.toString();
    }
    /**
     * 返回一个错误数据
     *
     * @param method
     * @param data
     * @return
     * @author TaoNingBo
     */
    public static String setErrMsg(String method, String data) {
        StringBuffer json = new StringBuffer();
        json.append(getHeader(-1, "FAILURE", method));
        json.append("\"" + data + "\"");
        json.append("}");
//        return JSON.toJSONString(json);
        return json.toString();
    }
    /**
     * 生成一个返回JSON的头
     *
     * @param code
     * @param msg
     * @param method
     * @return
     * @author TaoNingBo
     */
    private static String getHeader(int code, String msg, String method) {
        StringBuffer header = new StringBuffer();
        header.append("{");
        header.append("\"code\":\"" + code);
        header.append("\",\"msg\":\"" + msg);
        header.append("\",\"method\":\"" + method);
        header.append("\",\"data\":");
        return header.toString();
    }
}
ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/NettyServer.java
New file
@@ -0,0 +1,77 @@
package com.ruoyi.other.webSocket;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.util.Timer;
import java.util.TimerTask;
/**
 * 即时通讯服务启动类
 *
 * @version 1.0
 * @date 2016年6月25日
 */
public class NettyServer {
    /**
     * 延迟启动设置
     * <p>
     * NettyServer启动方法.
     */
    public void bind() {
        final Thread thread = new Thread(new NettyRunnable());
        Timer timer = new Timer();
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                thread.start();
            }
        }, 1000 * 2);
    }
    /**
     * 即时通讯服务启动
     *
     * @version 1.0
     * @date 2016年6月24日
     */
    public class NettyRunnable implements Runnable {
        /**
         * 获取即时通讯启动端口
         */
        @Override
        public void run() {
            System.out.println("===========================Netty端口启动========");
            // Boss线程:由这个线程池提供的线程是boss种类的,用于创建、连接、绑定socket,
            // (有点像门卫)然后把这些socket传给worker线程池。
            // 在服务器端每个监听的socket都有一个boss线程来处理。在客户端,只有一个boss线程来处理所有的socket。
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            // Worker线程:Worker线程执行所有的异步I/O,即处理操作
            EventLoopGroup workrGroup = new NioEventLoopGroup();
            try {
                // ServerBootstrap 启动NIO服务的辅助启动类,负责初始话netty服务器,并且开始监听端口的socket请求
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workrGroup);
                // 设置非阻塞,用它来建立新accept的连接,用于构造serversocketchannel的工厂类
                b.channel(NioServerSocketChannel.class);
                // ChildChannelHandler 对出入的数据进行的业务操作,其继承ChannelInitializer
                b.childHandler(new ChildChannelHandler());
                System.out.println("服务端开启等待客户端连接 ... ...");
                Channel ch = b.bind(9090).sync().channel();
                ch.closeFuture().sync();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                bossGroup.shutdownGracefully();
                workrGroup.shutdownGracefully();
            }
        }
    }
}
ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/NettyWebSocketController.java
New file
@@ -0,0 +1,154 @@
package com.ruoyi.other.webSocket;
import com.alibaba.fastjson.JSONObject;
import com.ruoyi.common.core.domain.R;
import com.ruoyi.common.core.utils.StringUtils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import java.util.Hashtable;
public class NettyWebSocketController {
    public static Hashtable<String, Hashtable<ChannelHandlerContext, String>> map = new Hashtable<String, Hashtable<ChannelHandlerContext, String>>();
    public static Hashtable<String, String> table;
    public static int i = 0;
    static {
        if (table == null) {
            table = new Hashtable<>();
        }
    }
    /**
     * 向客户端发送消息
     *
     * @param ctx
     * @param msg
     * @author TaoNingBo
     */
    public static R sendMsgToClient(ChannelHandlerContext ctx, String msg) {
        if (ctx != null && ctx.channel().isActive()) {
            ByteBuf buffer = Unpooled.copiedBuffer((msg).getBytes());
            ChannelFuture sync;
            try {
                sync = ctx.channel().writeAndFlush(new TextWebSocketFrame(msg)).sync();
                if (!sync.isSuccess()) {
                    boolean b = true;
                    for (int i = 0; i < 10; i++) {
                        ctx.wait(3000);
                        sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();
                        if (sync.isSuccess()) {
                            b = false;
                            break;
                        }
                        System.err.println("小程序-》推送不成功,将继续推送" + msg);
                    }
                    if (b) {
                        NettyChannelMap.remove(ctx);
                        return R.fail("无效的消息通道");
                    }
                }
                return R.ok();
            } catch (Exception e) {
                NettyChannelMap.remove(ctx);
                e.printStackTrace();
                return R.fail("发送消息失败:" + e.getMessage());
            }
        } else {
            NettyChannelMap.remove(ctx);
            return R.fail("无效的消息通道");
        }
    }
    //    **链接断开 将推送消息记录
    public static void sendMsgToClient(String cacheType, Integer id, String msg) {
        ChannelHandlerContext ctx = NettyChannelMap.getData(cacheType + id);
        if (ctx != null) {
            ChannelFuture sync;
            try {
                sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();
                if (!sync.isSuccess()) {
                    for (int i = 0; i < 10; i++) {
                        sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();
                        ;
                        if (!sync.isSuccess()) {
                            sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();
                            ;
                            System.err.println("推送不成功,将继续推送" + msg);
                            if (i == 9) {
                                table.put(cacheType + id, msg);
                                ctx.close();
                                System.err.println("推送发生异常,记录:" + msg);
                            }
                        } else {
                            break;
                        }
                    }
                }
            } catch (Exception e) {
                table.put(cacheType + id, msg);
                System.err.println("推送发生异常,记录:" + msg);
            }
        } else {
            table.put(cacheType + id, msg);
            System.err.println("链接断开,记录:id=" + cacheType + id + ",消息:" + msg);
        }
    }
    /**
     * 判断客户端要执行什么操作
     *
     * @param ctx
     * @param msg
     * @author TaoNingBo
     */
    public void JudgeOperation(ChannelHandlerContext ctx, String msg) {
        try {
            // 验证即时通讯命令是否正确有效
            if (StringUtils.isEmpty(msg)) {
                return;
            }
            String msgStr = msg.toString();
            if (msgStr.indexOf("{") == -1 || msgStr.indexOf("}") == -1 || msgStr.indexOf("code") == -1 || msgStr.indexOf("msg") == -1 || msgStr.indexOf("data") == -1 || msgStr.indexOf("method") == -1) {
                return;
            }
            // 获取socket信息,保存相应的socket
            JSONObject jsonMsg = JSONObject.parseObject(msg.toString());
            int code = jsonMsg.getIntValue("code");
            String message = jsonMsg.getString("msg");
            String method = jsonMsg.getString("method");
            if (code != 200 || !message.equals("SUCCESS")) {
                return;
            }
            JSONObject jsonCon = JSONObject.parseObject(jsonMsg.get("data").toString());
            if (null != ctx && ctx.channel().isActive()) {
                jsonMsg.put("method", Method.pong);
                sendMsgToClient(ctx, jsonMsg.toJSONString());
            }
            // ############################### 心跳  ############################
            // 心跳
            if (method.equals(Method.ping)) {
                String userId1 = jsonCon.getString("userId");
                if (StringUtils.isNotEmpty(userId1)) {
                    //存储业务使用的通道
                    if (null != ctx && ctx.channel().isActive()) {
                        NettyChannelMap.update("Applets" + userId1, ctx);
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/WebSocketHandler.java
New file
@@ -0,0 +1,160 @@
package com.ruoyi.other.webSocket;
import com.alibaba.fastjson.JSONObject;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import java.util.HashMap;
public class WebSocketHandler extends SimpleChannelInboundHandler<Object> {
    private static final String WEB_SOCKET_URL = "wss://localhost:9090/websocket";
    //用于websocket握手的处理类
    private WebSocketServerHandshaker handshaker;
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }
    /**
     * 心跳
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state().equals(IdleState.READER_IDLE)) {
                //
            } else if (event.state().equals(IdleState.WRITER_IDLE)) {
                //
            } else if (event.state().equals(IdleState.ALL_IDLE)) {
                String msg = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
                if (ctx != null && ctx.channel().isActive()) {
                    ctx.writeAndFlush(Unpooled.copiedBuffer((msg).getBytes()));
                }
            }
        }
//        super.userEventTriggered(ctx, evt);
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
    private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
        // Http解码失败,向服务器指定传输的协议为Upgrade:websocket
        if (!req.getDecoderResult().isSuccess() || !("websocket").equals(req.headers().get("Upgrade"))) {
            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
            return;
        }
        // 握手相应处理,创建websocket握手的工厂类,
        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(WEB_SOCKET_URL, null, false);
        // 根据工厂类和HTTP请求创建握手类
        handshaker = wsFactory.newHandshaker(req);
        if (handshaker == null) {
            // 不支持websocket
            WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
        } else {
            // 通过它构造握手响应消息返回给客户端
            handshaker.handshake(ctx.channel(), req);
        }
    }
    private void handleWebSocketRequest(ChannelHandlerContext ctx, WebSocketFrame req) throws Exception {
        if (req instanceof CloseWebSocketFrame) {
            // 关闭websocket连接
            handshaker.close(ctx.channel(), (CloseWebSocketFrame) req.retain());
            return;
        }
        if (req instanceof PingWebSocketFrame) {
            ctx.channel().write(new PongWebSocketFrame(req.content().retain()));
            return;
        }
        if (!(req instanceof TextWebSocketFrame)) {
            throw new UnsupportedOperationException("当前只支持文本消息,不支持二进制消息");
        }
        if (ctx == null || this.handshaker == null || ctx.isRemoved()) {
            throw new Exception("尚未握手成功,无法向客户端发送WebSocket消息");
        }
        String requestmsg = ((TextWebSocketFrame) req).text();
        //给连接的客户端返回数据
        //返回心跳
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("code", 200);
        jsonObject.put("method", Method.ok);
        jsonObject.put("msg", "SUCCESS");
        jsonObject.put("data", new JSONObject());
        TextWebSocketFrame tws = new TextWebSocketFrame(jsonObject.toJSONString());
//        ctx.channel().writeAndFlush(tws);
        new NettyWebSocketController().JudgeOperation(ctx, requestmsg);//小程序心跳处理
        // 群发服务端心跳响应
        Global.group.writeAndFlush(new TextWebSocketFrame((tws).text()));
    }
    private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {
        // BAD_REQUEST(400) 客户端请求错误返回的应答消息
        if (res.getStatus().code() != 200) {
            ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
            res.content().writeBytes(buf);
            buf.release();
        }
        //服务端向客户端发送数据
        ChannelFuture f = ctx.channel().writeAndFlush(res);
        // 非法连接直接关闭连接
        if (res.getStatus().code() != 200) {
            f.addListener(ChannelFutureListener.CLOSE);
        }
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Global.group.add(ctx.channel());
        System.err.println("客户端与服务器端开启");
    }
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Global.group.remove(ctx.channel());
        NettyChannelMap.remove(ctx);
        System.err.println("客户端与服务器链接关闭");
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof FullHttpRequest) {
            // websocket连接请求
            handleHttpRequest(ctx, (FullHttpRequest) msg);
        } else if (msg instanceof WebSocketFrame) {
            // websocket业务处理
            handleWebSocketRequest(ctx, (WebSocketFrame) msg);
        }
    }
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof FullHttpRequest) {
            // websocket连接请求
            handleHttpRequest(ctx, (FullHttpRequest) msg);
        } else if (msg instanceof WebSocketFrame) {
            // websocket业务处理
            handleWebSocketRequest(ctx, (WebSocketFrame) msg);
        }
    }
}
ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/createSSLContext.java
New file
@@ -0,0 +1,32 @@
package com.ruoyi.other.webSocket;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import java.io.FileInputStream;
import java.io.InputStream;
import java.security.KeyStore;
public class createSSLContext {
    /**
     * 获取SSLContext
     *
     * @param type
     * @param path
     * @param password
     * @return
     * @throws Exception
     */
    public static SSLContext createSSLContext(String type, String path, String password) throws Exception {
        KeyStore ks = KeyStore.getInstance(type); /// "JKS"
        InputStream ksInputStream = new FileInputStream(path); /// 证书存放地址
        ks.load(ksInputStream, password.toCharArray());
        //KeyManagerFactory充当基于密钥内容源的密钥管理器的工厂。
        KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());//getDefaultAlgorithm:获取默认的 KeyManagerFactory 算法名称。
        kmf.init(ks, password.toCharArray());
        //SSLContext的实例表示安全套接字协议的实现,它充当用于安全套接字工厂或 SSLEngine 的工厂。
        SSLContext sslContext = SSLContext.getInstance("TLS");
        sslContext.init(kmf.getKeyManagers(), null, null);
        return sslContext;
    }
}