From f10ef4933c6decda69308c1cf6e4d0449856ba1f Mon Sep 17 00:00:00 2001 From: zhibing.pu <393733352@qq.com> Date: 星期三, 21 八月 2024 18:07:33 +0800 Subject: [PATCH] 新增加websocket --- ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/Global.java | 9 ruoyi-api/ruoyi-api-other/src/main/java/com/ruoyi/other/api/feignClient/WebSocketClient.java | 24 + ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/NettyChannelMap.java | 123 ++++++++ ruoyi-api/ruoyi-api-other/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports | 3 ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/Method.java | 33 ++ ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/NettyMsg.java | 94 ++++++ ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/NettyServer.java | 77 +++++ ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/createSSLContext.java | 32 ++ ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/RuoYiOtherApplication.java | 2 ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/NettyWebSocketController.java | 154 +++++++++++ ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/controller/TGoodsController.java | 2 ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/ChildChannelHandler.java | 36 ++ ruoyi-api/ruoyi-api-other/src/main/java/com/ruoyi/other/api/dto/WebSocketMsg.java | 19 + ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/controller/WebSocketController.java | 31 ++ ruoyi-api/ruoyi-api-other/src/main/java/com/ruoyi/other/api/factory/WebSocketFallbackFactory.java | 33 ++ ruoyi-service/ruoyi-other/pom.xml | 4 ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/WebSocketHandler.java | 160 +++++++++++ 17 files changed, 834 insertions(+), 2 deletions(-) diff --git a/ruoyi-api/ruoyi-api-other/src/main/java/com/ruoyi/other/api/dto/WebSocketMsg.java b/ruoyi-api/ruoyi-api-other/src/main/java/com/ruoyi/other/api/dto/WebSocketMsg.java new file mode 100644 index 0000000..3b1405a --- /dev/null +++ b/ruoyi-api/ruoyi-api-other/src/main/java/com/ruoyi/other/api/dto/WebSocketMsg.java @@ -0,0 +1,19 @@ +package com.ruoyi.other.api.dto; + +import lombok.Data; + +/** + * @author zhibing.pu + * @Date 2024/8/21 17:43 + */ +@Data +public class WebSocketMsg { + /** + * 用户id + */ + private Long userId; + /** + * 消息内容 + */ + private String msg; +} diff --git a/ruoyi-api/ruoyi-api-other/src/main/java/com/ruoyi/other/api/factory/WebSocketFallbackFactory.java b/ruoyi-api/ruoyi-api-other/src/main/java/com/ruoyi/other/api/factory/WebSocketFallbackFactory.java new file mode 100644 index 0000000..a7e1ca0 --- /dev/null +++ b/ruoyi-api/ruoyi-api-other/src/main/java/com/ruoyi/other/api/factory/WebSocketFallbackFactory.java @@ -0,0 +1,33 @@ +package com.ruoyi.other.api.factory; + +import com.ruoyi.common.core.domain.R; +import com.ruoyi.other.api.domain.TVip; +import com.ruoyi.other.api.dto.WebSocketMsg; +import com.ruoyi.other.api.feignClient.VipClient; +import com.ruoyi.other.api.feignClient.WebSocketClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cloud.openfeign.FallbackFactory; +import org.springframework.stereotype.Component; + +/** + * 会员服务降级处理 + * + * @author ruoyi + */ +@Component +public class WebSocketFallbackFactory implements FallbackFactory<WebSocketClient> { + private static final Logger log = LoggerFactory.getLogger(WebSocketFallbackFactory.class); + + @Override + public WebSocketClient create(Throwable throwable) { + log.error("WebSocket调用失败:{}", throwable.getMessage()); + return new WebSocketClient() { + + @Override + public R send(WebSocketMsg webSocketMsg) { + return R.fail("发送WebSocket消息失败:" + throwable.getMessage()); + } + }; + } +} diff --git a/ruoyi-api/ruoyi-api-other/src/main/java/com/ruoyi/other/api/feignClient/WebSocketClient.java b/ruoyi-api/ruoyi-api-other/src/main/java/com/ruoyi/other/api/feignClient/WebSocketClient.java new file mode 100644 index 0000000..c598185 --- /dev/null +++ b/ruoyi-api/ruoyi-api-other/src/main/java/com/ruoyi/other/api/feignClient/WebSocketClient.java @@ -0,0 +1,24 @@ +package com.ruoyi.other.api.feignClient; + +import com.ruoyi.common.core.constant.ServiceNameConstants; +import com.ruoyi.common.core.domain.R; +import com.ruoyi.other.api.dto.WebSocketMsg; +import com.ruoyi.other.api.factory.WebSocketFallbackFactory; +import org.springframework.cloud.openfeign.FeignClient; +import org.springframework.web.bind.annotation.PostMapping; + +/** + * @author zhibing.pu + * @Date 2024/8/21 17:41 + */ +@FeignClient(contextId = "WebSocketClient", value = ServiceNameConstants.ORDER_SERVICE, fallbackFactory = WebSocketFallbackFactory.class) +public interface WebSocketClient { + + + /** + * 发送WebSocket消息 + * @return + */ + @PostMapping("/webSocket/send") + R send(WebSocketMsg webSocketMsg); +} diff --git a/ruoyi-api/ruoyi-api-other/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/ruoyi-api/ruoyi-api-other/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports index 9033994..df6e634 100644 --- a/ruoyi-api/ruoyi-api-other/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ b/ruoyi-api/ruoyi-api-other/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -6,4 +6,5 @@ com.ruoyi.other.api.factory.IntegralRuleFallbackFactory com.ruoyi.other.api.factory.GoodsFallbackFactory com.ruoyi.other.api.factory.CouponFallbackFactory -com.ruoyi.other.api.factory.InvoiceTypeFallbackFactory \ No newline at end of file +com.ruoyi.other.api.factory.InvoiceTypeFallbackFactory +com.ruoyi.other.api.factory.WebSocketFallbackFactory \ No newline at end of file diff --git a/ruoyi-service/ruoyi-other/pom.xml b/ruoyi-service/ruoyi-other/pom.xml index 2e9d6e4..67e4405 100644 --- a/ruoyi-service/ruoyi-other/pom.xml +++ b/ruoyi-service/ruoyi-other/pom.xml @@ -117,6 +117,10 @@ <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-all</artifactId> + </dependency> </dependencies> <build> diff --git a/ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/RuoYiOtherApplication.java b/ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/RuoYiOtherApplication.java index e3090fc..310262a 100644 --- a/ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/RuoYiOtherApplication.java +++ b/ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/RuoYiOtherApplication.java @@ -3,6 +3,7 @@ import com.ruoyi.common.security.annotation.EnableCustomConfig; import com.ruoyi.common.security.annotation.EnableRyFeignClients; import com.ruoyi.common.swagger.annotation.EnableCustomSwagger2; +import com.ruoyi.other.webSocket.NettyServer; import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @@ -23,6 +24,7 @@ public class RuoYiOtherApplication { public static void main(String[] args) { SpringApplication.run(RuoYiOtherApplication.class, args); + new NettyServer().bind(); System.out.println("(♥◠‿◠)ノ゙ 基础模块启动成功 ლ(´ڡ`ლ)゙ \n" + " .-------. ____ __ \n" + " | _ _ \\ \\ \\ / / \n" + diff --git a/ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/controller/TGoodsController.java b/ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/controller/TGoodsController.java index 122c167..357c25a 100644 --- a/ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/controller/TGoodsController.java +++ b/ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/controller/TGoodsController.java @@ -12,7 +12,7 @@ import com.ruoyi.other.api.domain.TActivity; import com.ruoyi.other.api.domain.TGoods; import com.ruoyi.other.api.dto.AdvertisingDTO; -import com.ruoyi.other.api.dto.ExchangeDto; +import com.ruoyi.order.api.vo.ExchangeDto; import com.ruoyi.other.api.dto.GoodsDTO; import com.ruoyi.other.service.TActivityService; import com.ruoyi.other.service.TAdvertisingService; diff --git a/ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/controller/WebSocketController.java b/ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/controller/WebSocketController.java new file mode 100644 index 0000000..ab27190 --- /dev/null +++ b/ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/controller/WebSocketController.java @@ -0,0 +1,31 @@ +package com.ruoyi.other.controller; + +import com.ruoyi.common.core.domain.R; +import com.ruoyi.other.webSocket.NettyChannelMap; +import com.ruoyi.other.webSocket.NettyWebSocketController; +import io.netty.channel.ChannelHandlerContext; +import org.springframework.web.bind.annotation.*; + +/** + * @author zhibing.pu + * @Date 2024/8/21 17:31 + */ +@RestController +@RequestMapping("/webSocket") +public class WebSocketController { + + + /** + * 发送websocket消息 + * @param userId + * @param msg + * @return + */ + @ResponseBody + @PostMapping("/send") + public R send(@RequestParam("userId") Long userId, @RequestParam("msg") String msg){ + ChannelHandlerContext channel = NettyChannelMap.getData("Applets" + userId); + return NettyWebSocketController.sendMsgToClient(channel, msg); + } + +} diff --git a/ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/ChildChannelHandler.java b/ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/ChildChannelHandler.java new file mode 100644 index 0000000..9b79f04 --- /dev/null +++ b/ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/ChildChannelHandler.java @@ -0,0 +1,36 @@ +package com.ruoyi.other.webSocket; + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.ssl.SslHandler; +import io.netty.handler.stream.ChunkedWriteHandler; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; + +public class ChildChannelHandler extends ChannelInitializer<SocketChannel> { + @Override + protected void initChannel(SocketChannel socketChannel) throws Exception { + String path = "C:\\cert\\tomcat\\scs1723443475715_fxcx.yunyu666.com_server.jks"; +// String path = "/usr/local/server/apache-tomcat-80/conf/cert/6064978_okyueche.com.pfx"; + SSLContext sslContext = createSSLContext.createSSLContext("JKS" + , path, "Pe7>4nS#st$dAnpp"); + //SSLEngine 此类允许使用ssl安全套接层协议进行安全通信 + SSLEngine engine = sslContext.createSSLEngine(); + engine.setUseClientMode(false); + socketChannel.pipeline().addLast("ssl", new SslHandler(engine)); + + // 设置30秒没有读到数据,则触发一个READER_IDLE事件。 + // pipeline.addLast(new IdleStateHandler(30, 0, 0)); + // HttpServerCodec:将请求和应答消息解码为HTTP消息 + socketChannel.pipeline().addLast("http-codec", new HttpServerCodec()); + // HttpObjectAggregator:将HTTP消息的多个部分合成一条完整的HTTP消息 + socketChannel.pipeline().addLast("aggregator", new HttpObjectAggregator(65536)); + // ChunkedWriteHandler:向客户端发送HTML5文件 + socketChannel.pipeline().addLast("http-chunked", new ChunkedWriteHandler()); + // 在管道中添加我们自己的接收数据实现方法 + socketChannel.pipeline().addLast("handler", new WebSocketHandler()); + } +} diff --git a/ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/Global.java b/ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/Global.java new file mode 100644 index 0000000..2d43045 --- /dev/null +++ b/ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/Global.java @@ -0,0 +1,9 @@ +package com.ruoyi.other.webSocket; + +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.util.concurrent.GlobalEventExecutor; + +public class Global { + public static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); +} diff --git a/ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/Method.java b/ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/Method.java new file mode 100644 index 0000000..c5e712e --- /dev/null +++ b/ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/Method.java @@ -0,0 +1,33 @@ +package com.ruoyi.other.webSocket; + +/** + * 即时通讯【通讯类型类】 + * + * @author TaoNingBo + * @version 1.0 + * @createDate 2016年6月14日 + */ +public class Method { + + /** + * 心跳【推送】 + */ + public static final String ok = "OK"; + + /** + * 心跳【接收】 + */ + public final static String ping = "PING"; + + /** + * 心跳【响应】 + */ + public final static String pong = "PONG"; + + /** + * 司机上传位置 + */ + public static final String location = "LOCATION"; + + +} diff --git a/ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/NettyChannelMap.java b/ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/NettyChannelMap.java new file mode 100644 index 0000000..fdc9c24 --- /dev/null +++ b/ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/NettyChannelMap.java @@ -0,0 +1,123 @@ +package com.ruoyi.other.webSocket; + +import io.netty.channel.ChannelHandlerContext; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +public class NettyChannelMap { + + public static Map<String, ChannelHandlerContext> ctxMap = new HashMap<>();//单点登录存储的通道 + protected static Map<String, ChannelHandlerContext> map = new ConcurrentHashMap<>(); + + + private NettyChannelMap() { + // 放置外部实例化 + } + + /** + * Get data from source. + * + * @param key + * @return + */ + public static ChannelHandlerContext getData(String key) { + if (map == null) { + map = new HashMap<String, ChannelHandlerContext>(); + } + return map.get(key); + } + + + /** + * Save data from source. + * + * @param key + * @param val + */ + public static synchronized void saveData(String key, ChannelHandlerContext val) { + map.put(key, val); + } + + /** + * Determine whether the cache key contains the key. + * + * @param key + * @return true|false + * @author TaoNingBo + */ + public static synchronized boolean containsKey(String key) { + return map.containsKey(key); + } + + /** + * Determine whether the cache value contains the value. + * + * @param val + * @return + */ + public static synchronized boolean containsVal(ChannelHandlerContext val) { + return map.containsValue(val); + } + + /** + * Remove the data resources. + * + * @param value + */ + @SuppressWarnings("rawtypes") + public static synchronized void remove(ChannelHandlerContext value) { + Set<String> strings = map.keySet(); + for (String key : strings) { + ChannelHandlerContext channelHandlerContext = map.get(key); + String s = channelHandlerContext.channel().remoteAddress().toString(); + String s1 = value.channel().remoteAddress().toString(); + if (s.equals(s1)) { + channelHandlerContext.close();//关闭通道 + map.remove(key); + } + } + } + + + public static synchronized void remove_(ChannelHandlerContext value) { + Set<String> strings = ctxMap.keySet(); + for (String key : strings) { + ChannelHandlerContext channelHandlerContext = ctxMap.get(key); + String s = channelHandlerContext.channel().remoteAddress().toString(); + String s1 = value.channel().remoteAddress().toString(); + if (s.equals(s1)) { + channelHandlerContext.close();//关闭通道 + ctxMap.remove(key); + } + } + } + + + /** + * Remove the data resources. + * + * @param key + * @author TaoNingBo + */ + public static synchronized void remove(String key) { + map.remove(key); + } + + /** + * Update the data resources. + * + * @param key + * @param value + */ + public static synchronized void update(String key, ChannelHandlerContext value) { + map.put(key, value); + } + + + public static synchronized void update_(String key, ChannelHandlerContext value) { + ctxMap.put(key, value); + } +} diff --git a/ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/NettyMsg.java b/ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/NettyMsg.java new file mode 100644 index 0000000..5c7b5f2 --- /dev/null +++ b/ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/NettyMsg.java @@ -0,0 +1,94 @@ +package com.ruoyi.other.webSocket; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + + +public class NettyMsg { + + /** + * 返回一个正确数据 + * + * @param method + * @param data + * @return + * @author TaoNingBo + */ + public static String setMsg(String method, Map<String, Object> data) { + StringBuffer json = new StringBuffer(); + json.append(getHeader(200, "SUCCESS", method)); + json.append(JSON.toJSONString(data)); + json.append("}"); + //return JSON.toJSONString(json); + return json.toString(); + } + + /** + * 返回一个正确数据 + * + * @param method + * @param data + * @return + */ + public static String setMsg(String method, List<Map<String, Object>> data) { + StringBuffer json = new StringBuffer(); + json.append(getHeader(200, "SUCCESS", method)); + List<JSONObject> jsonList = new ArrayList<JSONObject>(); + for (Map<String, Object> map : data) { + JSONObject dataJson = new JSONObject(map); + jsonList.add(dataJson); + } + json.append(jsonList); + json.append("}"); + +// return JSON.toJSONString(json); + return json.toString(); + } + + /** + * 返回一个错误数据 + * + * @param method + * @param data + * @return + * @author TaoNingBo + */ + public static String setErrMsg(String method, String data) { + StringBuffer json = new StringBuffer(); + json.append(getHeader(-1, "FAILURE", method)); + json.append("\"" + data + "\""); + json.append("}"); +// return JSON.toJSONString(json); + return json.toString(); + } + + /** + * 生成一个返回JSON的头 + * + * @param code + * @param msg + * @param method + * @return + * @author TaoNingBo + */ + private static String getHeader(int code, String msg, String method) { + StringBuffer header = new StringBuffer(); + header.append("{"); + header.append("\"code\":\"" + code); + header.append("\",\"msg\":\"" + msg); + header.append("\",\"method\":\"" + method); + header.append("\",\"data\":"); + return header.toString(); + } + + +} diff --git a/ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/NettyServer.java b/ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/NettyServer.java new file mode 100644 index 0000000..aaa748d --- /dev/null +++ b/ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/NettyServer.java @@ -0,0 +1,77 @@ +package com.ruoyi.other.webSocket; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; + +import java.util.Timer; +import java.util.TimerTask; + + +/** + * 即时通讯服务启动类 + * + * @version 1.0 + * @date 2016年6月25日 + */ +public class NettyServer { + + /** + * 延迟启动设置 + * <p> + * NettyServer启动方法. + */ + public void bind() { + final Thread thread = new Thread(new NettyRunnable()); + Timer timer = new Timer(); + timer.schedule(new TimerTask() { + @Override + public void run() { + thread.start(); + } + }, 1000 * 2); + } + + /** + * 即时通讯服务启动 + * + * @version 1.0 + * @date 2016年6月24日 + */ + public class NettyRunnable implements Runnable { + + /** + * 获取即时通讯启动端口 + */ + @Override + public void run() { + System.out.println("===========================Netty端口启动========"); + // Boss线程:由这个线程池提供的线程是boss种类的,用于创建、连接、绑定socket, + // (有点像门卫)然后把这些socket传给worker线程池。 + // 在服务器端每个监听的socket都有一个boss线程来处理。在客户端,只有一个boss线程来处理所有的socket。 + EventLoopGroup bossGroup = new NioEventLoopGroup(); + // Worker线程:Worker线程执行所有的异步I/O,即处理操作 + EventLoopGroup workrGroup = new NioEventLoopGroup(); + try { + + // ServerBootstrap 启动NIO服务的辅助启动类,负责初始话netty服务器,并且开始监听端口的socket请求 + ServerBootstrap b = new ServerBootstrap(); + b.group(bossGroup, workrGroup); + // 设置非阻塞,用它来建立新accept的连接,用于构造serversocketchannel的工厂类 + b.channel(NioServerSocketChannel.class); + // ChildChannelHandler 对出入的数据进行的业务操作,其继承ChannelInitializer + b.childHandler(new ChildChannelHandler()); + System.out.println("服务端开启等待客户端连接 ... ..."); + Channel ch = b.bind(9090).sync().channel(); + ch.closeFuture().sync(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + bossGroup.shutdownGracefully(); + workrGroup.shutdownGracefully(); + } + } + } +} diff --git a/ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/NettyWebSocketController.java b/ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/NettyWebSocketController.java new file mode 100644 index 0000000..b9c3d7f --- /dev/null +++ b/ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/NettyWebSocketController.java @@ -0,0 +1,154 @@ +package com.ruoyi.other.webSocket; + + +import com.alibaba.fastjson.JSONObject; +import com.ruoyi.common.core.domain.R; +import com.ruoyi.common.core.utils.StringUtils; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; + +import java.util.Hashtable; + +public class NettyWebSocketController { + + public static Hashtable<String, Hashtable<ChannelHandlerContext, String>> map = new Hashtable<String, Hashtable<ChannelHandlerContext, String>>(); + + + public static Hashtable<String, String> table; + public static int i = 0; + + static { + if (table == null) { + table = new Hashtable<>(); + } + } + + /** + * 向客户端发送消息 + * + * @param ctx + * @param msg + * @author TaoNingBo + */ + public static R sendMsgToClient(ChannelHandlerContext ctx, String msg) { + if (ctx != null && ctx.channel().isActive()) { + ByteBuf buffer = Unpooled.copiedBuffer((msg).getBytes()); + ChannelFuture sync; + try { + sync = ctx.channel().writeAndFlush(new TextWebSocketFrame(msg)).sync(); + if (!sync.isSuccess()) { + boolean b = true; + for (int i = 0; i < 10; i++) { + ctx.wait(3000); + sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync(); + if (sync.isSuccess()) { + b = false; + break; + } + System.err.println("小程序-》推送不成功,将继续推送" + msg); + } + if (b) { + NettyChannelMap.remove(ctx); + return R.fail("无效的消息通道"); + } + } + return R.ok(); + } catch (Exception e) { + NettyChannelMap.remove(ctx); + e.printStackTrace(); + return R.fail("发送消息失败:" + e.getMessage()); + } + } else { + NettyChannelMap.remove(ctx); + return R.fail("无效的消息通道"); + } + } + + // **链接断开 将推送消息记录 + public static void sendMsgToClient(String cacheType, Integer id, String msg) { + ChannelHandlerContext ctx = NettyChannelMap.getData(cacheType + id); + if (ctx != null) { + ChannelFuture sync; + try { + sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync(); + if (!sync.isSuccess()) { + for (int i = 0; i < 10; i++) { + sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync(); + ; + if (!sync.isSuccess()) { + sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync(); + ; + System.err.println("推送不成功,将继续推送" + msg); + if (i == 9) { + table.put(cacheType + id, msg); + ctx.close(); + System.err.println("推送发生异常,记录:" + msg); + } + } else { + break; + } + } + } + } catch (Exception e) { + table.put(cacheType + id, msg); + System.err.println("推送发生异常,记录:" + msg); + } + } else { + table.put(cacheType + id, msg); + System.err.println("链接断开,记录:id=" + cacheType + id + ",消息:" + msg); + } + } + + /** + * 判断客户端要执行什么操作 + * + * @param ctx + * @param msg + * @author TaoNingBo + */ + public void JudgeOperation(ChannelHandlerContext ctx, String msg) { + try { + // 验证即时通讯命令是否正确有效 + if (StringUtils.isEmpty(msg)) { + return; + } + String msgStr = msg.toString(); + if (msgStr.indexOf("{") == -1 || msgStr.indexOf("}") == -1 || msgStr.indexOf("code") == -1 || msgStr.indexOf("msg") == -1 || msgStr.indexOf("data") == -1 || msgStr.indexOf("method") == -1) { + return; + } + + // 获取socket信息,保存相应的socket + JSONObject jsonMsg = JSONObject.parseObject(msg.toString()); + int code = jsonMsg.getIntValue("code"); + String message = jsonMsg.getString("msg"); + String method = jsonMsg.getString("method"); + if (code != 200 || !message.equals("SUCCESS")) { + return; + } + JSONObject jsonCon = JSONObject.parseObject(jsonMsg.get("data").toString()); + + if (null != ctx && ctx.channel().isActive()) { + jsonMsg.put("method", Method.pong); + sendMsgToClient(ctx, jsonMsg.toJSONString()); + } + + + // ############################### 心跳 ############################ + // 心跳 + if (method.equals(Method.ping)) { + String userId1 = jsonCon.getString("userId"); + if (StringUtils.isNotEmpty(userId1)) { + //存储业务使用的通道 + if (null != ctx && ctx.channel().isActive()) { + NettyChannelMap.update("Applets" + userId1, ctx); + } + } + } + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/WebSocketHandler.java b/ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/WebSocketHandler.java new file mode 100644 index 0000000..2581ff4 --- /dev/null +++ b/ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/WebSocketHandler.java @@ -0,0 +1,160 @@ +package com.ruoyi.other.webSocket; + +import com.alibaba.fastjson.JSONObject; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.*; +import io.netty.handler.codec.http.websocketx.*; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.util.CharsetUtil; + +import java.util.HashMap; + + +public class WebSocketHandler extends SimpleChannelInboundHandler<Object> { + private static final String WEB_SOCKET_URL = "wss://localhost:9090/websocket"; + //用于websocket握手的处理类 + private WebSocketServerHandshaker handshaker; + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + ctx.flush(); + } + + /** + * 心跳 + */ + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent) { + IdleStateEvent event = (IdleStateEvent) evt; + if (event.state().equals(IdleState.READER_IDLE)) { + // + } else if (event.state().equals(IdleState.WRITER_IDLE)) { + // + } else if (event.state().equals(IdleState.ALL_IDLE)) { + String msg = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>()); + if (ctx != null && ctx.channel().isActive()) { + ctx.writeAndFlush(Unpooled.copiedBuffer((msg).getBytes())); + } + } + } +// super.userEventTriggered(ctx, evt); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + ctx.close(); + } + + private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) { + // Http解码失败,向服务器指定传输的协议为Upgrade:websocket + if (!req.getDecoderResult().isSuccess() || !("websocket").equals(req.headers().get("Upgrade"))) { + sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)); + return; + } + // 握手相应处理,创建websocket握手的工厂类, + WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(WEB_SOCKET_URL, null, false); + // 根据工厂类和HTTP请求创建握手类 + handshaker = wsFactory.newHandshaker(req); + if (handshaker == null) { + // 不支持websocket + WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel()); + } else { + // 通过它构造握手响应消息返回给客户端 + handshaker.handshake(ctx.channel(), req); + } + } + + private void handleWebSocketRequest(ChannelHandlerContext ctx, WebSocketFrame req) throws Exception { + if (req instanceof CloseWebSocketFrame) { + // 关闭websocket连接 + handshaker.close(ctx.channel(), (CloseWebSocketFrame) req.retain()); + return; + } + if (req instanceof PingWebSocketFrame) { + ctx.channel().write(new PongWebSocketFrame(req.content().retain())); + return; + } + if (!(req instanceof TextWebSocketFrame)) { + throw new UnsupportedOperationException("当前只支持文本消息,不支持二进制消息"); + } + if (ctx == null || this.handshaker == null || ctx.isRemoved()) { + throw new Exception("尚未握手成功,无法向客户端发送WebSocket消息"); + } + String requestmsg = ((TextWebSocketFrame) req).text(); + + + //给连接的客户端返回数据 + //返回心跳 + JSONObject jsonObject = new JSONObject(); + jsonObject.put("code", 200); + jsonObject.put("method", Method.ok); + jsonObject.put("msg", "SUCCESS"); + jsonObject.put("data", new JSONObject()); + TextWebSocketFrame tws = new TextWebSocketFrame(jsonObject.toJSONString()); +// ctx.channel().writeAndFlush(tws); + + new NettyWebSocketController().JudgeOperation(ctx, requestmsg);//小程序心跳处理 + + // 群发服务端心跳响应 + Global.group.writeAndFlush(new TextWebSocketFrame((tws).text())); + } + + private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) { + // BAD_REQUEST(400) 客户端请求错误返回的应答消息 + if (res.getStatus().code() != 200) { + ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8); + res.content().writeBytes(buf); + buf.release(); + } + //服务端向客户端发送数据 + ChannelFuture f = ctx.channel().writeAndFlush(res); + // 非法连接直接关闭连接 + if (res.getStatus().code() != 200) { + f.addListener(ChannelFutureListener.CLOSE); + } + } + + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + Global.group.add(ctx.channel()); + System.err.println("客户端与服务器端开启"); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + Global.group.remove(ctx.channel()); + NettyChannelMap.remove(ctx); + System.err.println("客户端与服务器链接关闭"); + } + + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof FullHttpRequest) { + // websocket连接请求 + handleHttpRequest(ctx, (FullHttpRequest) msg); + } else if (msg instanceof WebSocketFrame) { + // websocket业务处理 + handleWebSocketRequest(ctx, (WebSocketFrame) msg); + } + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof FullHttpRequest) { + // websocket连接请求 + handleHttpRequest(ctx, (FullHttpRequest) msg); + } else if (msg instanceof WebSocketFrame) { + // websocket业务处理 + handleWebSocketRequest(ctx, (WebSocketFrame) msg); + } + } +} diff --git a/ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/createSSLContext.java b/ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/createSSLContext.java new file mode 100644 index 0000000..21a7b72 --- /dev/null +++ b/ruoyi-service/ruoyi-other/src/main/java/com/ruoyi/other/webSocket/createSSLContext.java @@ -0,0 +1,32 @@ +package com.ruoyi.other.webSocket; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import java.io.FileInputStream; +import java.io.InputStream; +import java.security.KeyStore; + +public class createSSLContext { + + /** + * 获取SSLContext + * + * @param type + * @param path + * @param password + * @return + * @throws Exception + */ + public static SSLContext createSSLContext(String type, String path, String password) throws Exception { + KeyStore ks = KeyStore.getInstance(type); /// "JKS" + InputStream ksInputStream = new FileInputStream(path); /// 证书存放地址 + ks.load(ksInputStream, password.toCharArray()); + //KeyManagerFactory充当基于密钥内容源的密钥管理器的工厂。 + KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());//getDefaultAlgorithm:获取默认的 KeyManagerFactory 算法名称。 + kmf.init(ks, password.toCharArray()); + //SSLContext的实例表示安全套接字协议的实现,它充当用于安全套接字工厂或 SSLEngine 的工厂。 + SSLContext sslContext = SSLContext.getInstance("TLS"); + sslContext.init(kmf.getKeyManagers(), null, null); + return sslContext; + } +} -- Gitblit v1.7.1