From 607a2413533f10be28ebbfe1c96a4669a3150ce4 Mon Sep 17 00:00:00 2001 From: Pu Zhibing <393733352@qq.com> Date: 星期三, 25 六月 2025 15:04:23 +0800 Subject: [PATCH] 提交推送服务 --- MessagePushTravel/src/main/java/com/sinata/push/controller/NettyController.java | 14 +++- MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java | 40 ++++--------- MessagePushTravel/src/main/java/com/sinata/push/util/applets/NettyWebSocketController.java | 11 ++- MessagePushTravel/pom.xml | 8 ++ MessagePushTravel/src/main/java/com/sinata/push/util/applets/ChildChannelHandler.java | 19 ++---- MessagePushTravel/src/main/resources/application.yml | 35 ++--------- MessagePushTravel/src/main/java/com/sinata/push/util/URLUtil.java | 2 MessagePushTravel/src/main/java/com/sinata/push/util/echo/DiscardServerHandler.java | 4 + MessagePushTravel/src/main/java/com/sinata/push/util/applets/WebSocketHandler.java | 14 +++- 9 files changed, 69 insertions(+), 78 deletions(-) diff --git a/MessagePushTravel/pom.xml b/MessagePushTravel/pom.xml index 7ccc743..22e9a9b 100644 --- a/MessagePushTravel/pom.xml +++ b/MessagePushTravel/pom.xml @@ -26,6 +26,14 @@ <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> + <dependency> + <groupId>io.lettuce</groupId> + <artifactId>lettuce-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-pool2</artifactId> + </dependency> <!-- netty --> <dependency> <groupId>io.netty</groupId> diff --git a/MessagePushTravel/src/main/java/com/sinata/push/controller/NettyController.java b/MessagePushTravel/src/main/java/com/sinata/push/controller/NettyController.java index 178936d..c0decdf 100644 --- a/MessagePushTravel/src/main/java/com/sinata/push/controller/NettyController.java +++ b/MessagePushTravel/src/main/java/com/sinata/push/controller/NettyController.java @@ -3,10 +3,12 @@ import com.alibaba.fastjson.JSON; import com.sinata.push.util.ResultUtil; +import com.sinata.push.util.SpringUtil; import com.sinata.push.util.applets.NettyWebSocketController; import com.sinata.push.util.echo.NettyChannelMap; import com.sinata.push.util.echo.NettyServerController; import io.netty.channel.ChannelHandlerContext; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; @@ -15,6 +17,12 @@ @RestController @RequestMapping("/netty") public class NettyController { + + @Autowired + private NettyServerController nettyServerController; + + @Autowired + private NettyWebSocketController nettyWebSocketController; /** @@ -28,12 +36,12 @@ if(type == 1){//用户端 ChannelHandlerContext channel = NettyChannelMap.getData("Applets" + id);//小程序 if(null != channel){ - NettyWebSocketController.sendMsgToClient(channel, msg); + nettyWebSocketController.sendMsgToClient(channel, msg); return JSON.toJSONString(ResultUtil.success()); } channel = NettyChannelMap.getData("USER" + id); if(null != channel){ - NettyServerController.sendMsgToClient(channel, msg); + nettyServerController.sendMsgToClient(channel, msg); return JSON.toJSONString(ResultUtil.success()); } return JSON.toJSONString(ResultUtil.error("推送失败-----用户id=" + id)); @@ -43,7 +51,7 @@ if(type == 2){//司机端 ChannelHandlerContext channel = NettyChannelMap.getData("DRIVER" + id); if(null != channel){ - NettyServerController.sendMsgToClient(channel, msg); + nettyServerController.sendMsgToClient(channel, msg); return JSON.toJSONString(ResultUtil.success()); } return JSON.toJSONString(ResultUtil.error("推送失败-----司机id=" + id)); diff --git a/MessagePushTravel/src/main/java/com/sinata/push/util/URLUtil.java b/MessagePushTravel/src/main/java/com/sinata/push/util/URLUtil.java index e210b25..b7c2c2d 100644 --- a/MessagePushTravel/src/main/java/com/sinata/push/util/URLUtil.java +++ b/MessagePushTravel/src/main/java/com/sinata/push/util/URLUtil.java @@ -8,5 +8,5 @@ /** * 业务网关接口地址 */ - String zuul = "http://172.21.35.45:80"; + String zuul = "http://172.21.35.45:8010"; } diff --git a/MessagePushTravel/src/main/java/com/sinata/push/util/applets/ChildChannelHandler.java b/MessagePushTravel/src/main/java/com/sinata/push/util/applets/ChildChannelHandler.java index 5072004..5ab55f9 100644 --- a/MessagePushTravel/src/main/java/com/sinata/push/util/applets/ChildChannelHandler.java +++ b/MessagePushTravel/src/main/java/com/sinata/push/util/applets/ChildChannelHandler.java @@ -18,20 +18,13 @@ public class ChildChannelHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { -// String path = "C:\\app\\cert\\tomcat\\www.gzjwzc.com.pfx"; -// SSLContext sslContext = createSSLContext.createSSLContext("PKCS12" -// , path, "79uc9bsd"); +// String path = "/root/server/app/cert/qytzt.cn.jks"; +// SSLContext sslContext = createSSLContext.createSSLContext("JKS" +// , path, "bo27xqbr"); // SSLEngine engine = sslContext.createSSLEngine(); - - - File path = new File("/root/server/app/cert/qytzt.cn.key"); - File path1 = new File("/root/server/app/cert/qytzt.cn.pem"); - SslContext sslContext = SslContextBuilder.forServer(path, path1, null).clientAuth(ClientAuth.NONE).build(); - - //SSLEngine 此类允许使用ssl安全套接层协议进行安全通信 - SSLEngine engine = sslContext.newEngine(socketChannel.alloc()); - engine.setUseClientMode(false); - socketChannel.pipeline().addLast("ssl", new SslHandler(engine)); +// //SSLEngine 此类允许使用ssl安全套接层协议进行安全通信 +// engine.setUseClientMode(false); +// socketChannel.pipeline().addLast("ssl", new SslHandler(engine)); // 设置30秒没有读到数据,则触发一个READER_IDLE事件。 // pipeline.addLast(new IdleStateHandler(30, 0, 0)); diff --git a/MessagePushTravel/src/main/java/com/sinata/push/util/applets/NettyWebSocketController.java b/MessagePushTravel/src/main/java/com/sinata/push/util/applets/NettyWebSocketController.java index da05e85..dddbca5 100644 --- a/MessagePushTravel/src/main/java/com/sinata/push/util/applets/NettyWebSocketController.java +++ b/MessagePushTravel/src/main/java/com/sinata/push/util/applets/NettyWebSocketController.java @@ -13,19 +13,24 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Component; +import javax.annotation.Resource; import java.util.HashMap; import java.util.Hashtable; import java.util.Timer; import java.util.TimerTask; +@Component public class NettyWebSocketController { public static Hashtable<String, Hashtable<ChannelHandlerContext, String>> map = new Hashtable<String, Hashtable<ChannelHandlerContext,String>>(); - - private RedisTemplate<String, String> redisTemplate = SpringUtil.getObject(StringRedisTemplate.class); + + @Resource + private RedisTemplate<String, Object> redisTemplate; public static Hashtable<String,String> table; static{ @@ -112,7 +117,7 @@ } } catch (Exception e) { if(isdebug) { - NettyWebSocketController.sendMsgToClient(ctx, "__error__" + msg.toString()); + this.sendMsgToClient(ctx, "__error__" + msg.toString()); } e.printStackTrace(); } diff --git a/MessagePushTravel/src/main/java/com/sinata/push/util/applets/WebSocketHandler.java b/MessagePushTravel/src/main/java/com/sinata/push/util/applets/WebSocketHandler.java index 5d3b425..e1086ab 100644 --- a/MessagePushTravel/src/main/java/com/sinata/push/util/applets/WebSocketHandler.java +++ b/MessagePushTravel/src/main/java/com/sinata/push/util/applets/WebSocketHandler.java @@ -1,6 +1,7 @@ package com.sinata.push.util.applets; import com.alibaba.fastjson.JSONObject; +import com.sinata.push.util.SpringUtil; import com.sinata.push.util.echo.Method; import com.sinata.push.util.echo.NettyChannelMap; import com.sinata.push.util.echo.NettyMsg; @@ -15,6 +16,7 @@ import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.CharsetUtil; +import org.springframework.data.redis.core.StringRedisTemplate; import java.util.HashMap; @@ -23,7 +25,9 @@ //用于websocket握手的处理类 private WebSocketServerHandshaker handshaker; - private static final String WEB_SOCKET_URL = "wss://localhost:8808/websocket"; + private static final String WEB_SOCKET_URL = "ws://localhost:8888/websocket"; + + private NettyWebSocketController nettyWebSocketController = SpringUtil.getObject(NettyWebSocketController.class); @@ -73,6 +77,10 @@ } private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) { + HttpHeaders headers = req.headers(); + headers.names().forEach(name -> { + System.out.println(name + ":" + headers.get(name)); + }); // Http解码失败,向服务器指定传输的协议为Upgrade:websocket if(!req.getDecoderResult().isSuccess() || !("websocket").equals(req.headers().get("Upgrade"))){ sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)); @@ -119,8 +127,8 @@ jsonObject.put("data", new JSONObject()); TextWebSocketFrame tws = new TextWebSocketFrame(jsonObject.toJSONString()); // ctx.channel().writeAndFlush(tws); - - new NettyWebSocketController().JudgeOperation(ctx,requestmsg);//小程序心跳处理 + + nettyWebSocketController.JudgeOperation(ctx,requestmsg);//小程序心跳处理 // 群发服务端心跳响应 Global.group.writeAndFlush(new TextWebSocketFrame((tws).text())); diff --git a/MessagePushTravel/src/main/java/com/sinata/push/util/echo/DiscardServerHandler.java b/MessagePushTravel/src/main/java/com/sinata/push/util/echo/DiscardServerHandler.java index f83843a..595c397 100644 --- a/MessagePushTravel/src/main/java/com/sinata/push/util/echo/DiscardServerHandler.java +++ b/MessagePushTravel/src/main/java/com/sinata/push/util/echo/DiscardServerHandler.java @@ -1,5 +1,6 @@ package com.sinata.push.util.echo; +import com.sinata.push.util.SpringUtil; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; @@ -7,13 +8,14 @@ import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.ReferenceCountUtil; +import org.springframework.data.redis.core.RedisTemplate; import java.net.InetSocketAddress; import java.util.HashMap; public class DiscardServerHandler extends SimpleChannelInboundHandler<String> { - private NettyServerController nettyServerController = new NettyServerController(); + private NettyServerController nettyServerController = SpringUtil.getObject(NettyServerController.class); public static boolean isdebug = true; diff --git a/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java b/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java index 4e003bd..1c6782b 100644 --- a/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java +++ b/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java @@ -10,16 +10,13 @@ import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.data.redis.core.StringRedisTemplate; -import org.springframework.http.HttpEntity; -import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; -import org.springframework.util.LinkedMultiValueMap; -import org.springframework.util.MultiValueMap; +import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; -import org.springframework.web.client.RestTemplate; +import javax.annotation.Resource; import java.text.SimpleDateFormat; import java.util.*; import java.util.concurrent.TimeUnit; @@ -31,13 +28,16 @@ * @createDate 2016年6月3日 * @version 1.0 */ +@Component public class NettyServerController { public static Hashtable<String, Hashtable<ChannelHandlerContext, String>> map = new Hashtable<String, Hashtable<ChannelHandlerContext,String>>(); public static Hashtable<String,String> table; - - private RedisTemplate<String, String> redisTemplate = SpringUtil.getObject(StringRedisTemplate.class); + + @Resource + private RedisTemplate<String, Object> redisTemplate; + private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); @@ -99,7 +99,7 @@ String device = jsonCon.getString("device"); String version = jsonCon.getString("version"); if(StringUtil.isNotEmpty(userId1)){ - String fluid_control = redisTemplate.opsForValue().get("fluid_control_" + userId1 + "_" + type); + String fluid_control = (String)redisTemplate.opsForValue().get("fluid_control_" + userId1 + "_" + type); if(!StringUtils.hasLength(fluid_control)){ redisTemplate.opsForValue().set("fluid_control_" + userId1 + "_" + type, System.currentTimeMillis() + ""); }else{ @@ -148,20 +148,6 @@ ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes())); } }else{ - Map<String, Object> params = new HashMap<>(); - params.put("driverId", userId1); - HttpRequest post = HttpUtil.createPost(URLUtil.zuul + "/driver-server/base/driverOnline/addDriverOnline"); - post.contentType(MediaType.APPLICATION_FORM_URLENCODED.getType()); - post.form(params); - HttpResponse execute = post.execute(); - if(200 != execute.getStatus()){ - System.err.println("调用driver-server添加司机在线数据出错了"); - } - JSONObject jsonObject = JSON.parseObject(execute.body(), JSONObject.class); - if(jsonObject.getIntValue("code") != 200){ - System.err.println("调用driver-server添加司机在线数据出错了"); - } - //TODO 存储最后一次上传的时间(用于保证车载端断电后1小时自动下班) if(StringUtil.isNotEmpty(device) && device.equals("carDevice")){ redisTemplate.opsForValue().set("DEVICE_" + userId1, String.valueOf(System.currentTimeMillis())); @@ -276,16 +262,16 @@ System.out.println("id:" + driverId + "---lon" + lon + "---lat" + lat); redisTemplate.opsForValue().set("DRIVER" + driverId, lon + "," + lat, 300, TimeUnit.SECONDS);//实时位置存入redis中 }else{ - NettyServerController.sendMsgToClient(ctx, "__error__" + msg.toString()); + this.sendMsgToClient(ctx, "__error__" + msg.toString()); } }else{ - NettyServerController.sendMsgToClient(ctx, "__error__" + msg.toString()); + this.sendMsgToClient(ctx, "__error__" + msg.toString()); } } } catch (Exception e) { if(isdebug) { - NettyServerController.sendMsgToClient(ctx, "__error__" + msg.toString()); + this.sendMsgToClient(ctx, "__error__" + msg.toString()); } e.printStackTrace(); } @@ -298,7 +284,7 @@ * @param msg * @author TaoNingBo */ - public static void sendMsgToClient(ChannelHandlerContext ctx, String msg) { + public void sendMsgToClient(ChannelHandlerContext ctx, String msg) { if (ctx != null && ctx.channel().isActive()) { ByteBuf buffer = Unpooled.copiedBuffer((msg).getBytes()); ChannelFuture sync; diff --git a/MessagePushTravel/src/main/resources/application.yml b/MessagePushTravel/src/main/resources/application.yml index 17d9797..05ef1b6 100644 --- a/MessagePushTravel/src/main/resources/application.yml +++ b/MessagePushTravel/src/main/resources/application.yml @@ -9,31 +9,12 @@ multipart: max-request-size: 100MB max-file-size: 100MB - data: - redis: - host: 172.21.35.151 - database: 0 - password: SC_cache@20#25 - time-out: 1000 - # 集群节点 - cluster: - nodes: 172.21.35.151:6512,172.21.35.152:6512,172.21.35.153:6512,172.21.35.151:6513,172.21.35.152:6513,172.21.35.153:6513 - # 重定向最大次数 - max-redirects: 3 - lettuce: - cluster: - refresh: - # 集群拓扑自适应刷新 - adaptive: true - # 集群拓扑刷新周期 毫秒 - period: 3000 - pool: - # 最大链接数量 - max-active: 100 - # 最大阻塞时间 负数没有限制 - max-wait: -1 - # 最大空闲链接 - max-idle: 10 - # 最小空闲链接 - min-idle: 0 + redis: + host: 172.21.35.151 + port: 6512 + password: SC_cache@20#25 + database: 0 + timeout: 1000 + cluster: + nodes: 172.21.35.151:6512,172.21.35.152:6512,172.21.35.153:6512,172.21.35.151:6513,172.21.35.152:6513,172.21.35.153:6513 -- Gitblit v1.7.1