MessagePushTravel/pom.xml
@@ -26,6 +26,14 @@ <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>io.lettuce</groupId> <artifactId>lettuce-core</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> </dependency> <!-- netty --> <dependency> <groupId>io.netty</groupId> MessagePushTravel/src/main/java/com/sinata/push/controller/NettyController.java
@@ -3,10 +3,12 @@ import com.alibaba.fastjson.JSON; import com.sinata.push.util.ResultUtil; import com.sinata.push.util.SpringUtil; import com.sinata.push.util.applets.NettyWebSocketController; import com.sinata.push.util.echo.NettyChannelMap; import com.sinata.push.util.echo.NettyServerController; import io.netty.channel.ChannelHandlerContext; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; @@ -15,6 +17,12 @@ @RestController @RequestMapping("/netty") public class NettyController { @Autowired private NettyServerController nettyServerController; @Autowired private NettyWebSocketController nettyWebSocketController; /** @@ -28,12 +36,12 @@ if(type == 1){//用户端 ChannelHandlerContext channel = NettyChannelMap.getData("Applets" + id);//小程序 if(null != channel){ NettyWebSocketController.sendMsgToClient(channel, msg); nettyWebSocketController.sendMsgToClient(channel, msg); return JSON.toJSONString(ResultUtil.success()); } channel = NettyChannelMap.getData("USER" + id); if(null != channel){ NettyServerController.sendMsgToClient(channel, msg); nettyServerController.sendMsgToClient(channel, msg); return JSON.toJSONString(ResultUtil.success()); } return JSON.toJSONString(ResultUtil.error("推送失败-----用户id=" + id)); @@ -43,7 +51,7 @@ if(type == 2){//司机端 ChannelHandlerContext channel = NettyChannelMap.getData("DRIVER" + id); if(null != channel){ NettyServerController.sendMsgToClient(channel, msg); nettyServerController.sendMsgToClient(channel, msg); return JSON.toJSONString(ResultUtil.success()); } return JSON.toJSONString(ResultUtil.error("推送失败-----司机id=" + id)); MessagePushTravel/src/main/java/com/sinata/push/util/URLUtil.java
@@ -8,5 +8,5 @@ /** * 业务网关接口地址 */ String zuul = "http://172.21.35.45:80"; String zuul = "http://172.21.35.45:8010"; } MessagePushTravel/src/main/java/com/sinata/push/util/applets/ChildChannelHandler.java
@@ -18,20 +18,13 @@ public class ChildChannelHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // String path = "C:\\app\\cert\\tomcat\\www.gzjwzc.com.pfx"; // SSLContext sslContext = createSSLContext.createSSLContext("PKCS12" // , path, "79uc9bsd"); // String path = "/root/server/app/cert/qytzt.cn.jks"; // SSLContext sslContext = createSSLContext.createSSLContext("JKS" // , path, "bo27xqbr"); // SSLEngine engine = sslContext.createSSLEngine(); File path = new File("/root/server/app/cert/qytzt.cn.key"); File path1 = new File("/root/server/app/cert/qytzt.cn.pem"); SslContext sslContext = SslContextBuilder.forServer(path, path1, null).clientAuth(ClientAuth.NONE).build(); //SSLEngine 此类允许使用ssl安全套接层协议进行安全通信 SSLEngine engine = sslContext.newEngine(socketChannel.alloc()); engine.setUseClientMode(false); socketChannel.pipeline().addLast("ssl", new SslHandler(engine)); // //SSLEngine 此类允许使用ssl安全套接层协议进行安全通信 // engine.setUseClientMode(false); // socketChannel.pipeline().addLast("ssl", new SslHandler(engine)); // 设置30秒没有读到数据,则触发一个READER_IDLE事件。 // pipeline.addLast(new IdleStateHandler(30, 0, 0)); MessagePushTravel/src/main/java/com/sinata/push/util/applets/NettyWebSocketController.java
@@ -13,19 +13,24 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.HashMap; import java.util.Hashtable; import java.util.Timer; import java.util.TimerTask; @Component public class NettyWebSocketController { public static Hashtable<String, Hashtable<ChannelHandlerContext, String>> map = new Hashtable<String, Hashtable<ChannelHandlerContext,String>>(); private RedisTemplate<String, String> redisTemplate = SpringUtil.getObject(StringRedisTemplate.class); @Resource private RedisTemplate<String, Object> redisTemplate; public static Hashtable<String,String> table; static{ @@ -112,7 +117,7 @@ } } catch (Exception e) { if(isdebug) { NettyWebSocketController.sendMsgToClient(ctx, "__error__" + msg.toString()); this.sendMsgToClient(ctx, "__error__" + msg.toString()); } e.printStackTrace(); } MessagePushTravel/src/main/java/com/sinata/push/util/applets/WebSocketHandler.java
@@ -1,6 +1,7 @@ package com.sinata.push.util.applets; import com.alibaba.fastjson.JSONObject; import com.sinata.push.util.SpringUtil; import com.sinata.push.util.echo.Method; import com.sinata.push.util.echo.NettyChannelMap; import com.sinata.push.util.echo.NettyMsg; @@ -15,6 +16,7 @@ import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.CharsetUtil; import org.springframework.data.redis.core.StringRedisTemplate; import java.util.HashMap; @@ -23,7 +25,9 @@ //用于websocket握手的处理类 private WebSocketServerHandshaker handshaker; private static final String WEB_SOCKET_URL = "wss://localhost:8808/websocket"; private static final String WEB_SOCKET_URL = "ws://localhost:8888/websocket"; private NettyWebSocketController nettyWebSocketController = SpringUtil.getObject(NettyWebSocketController.class); @@ -73,6 +77,10 @@ } private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) { HttpHeaders headers = req.headers(); headers.names().forEach(name -> { System.out.println(name + ":" + headers.get(name)); }); // Http解码失败,向服务器指定传输的协议为Upgrade:websocket if(!req.getDecoderResult().isSuccess() || !("websocket").equals(req.headers().get("Upgrade"))){ sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)); @@ -119,8 +127,8 @@ jsonObject.put("data", new JSONObject()); TextWebSocketFrame tws = new TextWebSocketFrame(jsonObject.toJSONString()); // ctx.channel().writeAndFlush(tws); new NettyWebSocketController().JudgeOperation(ctx,requestmsg);//小程序心跳处理 nettyWebSocketController.JudgeOperation(ctx,requestmsg);//小程序心跳处理 // 群发服务端心跳响应 Global.group.writeAndFlush(new TextWebSocketFrame((tws).text())); MessagePushTravel/src/main/java/com/sinata/push/util/echo/DiscardServerHandler.java
@@ -1,5 +1,6 @@ package com.sinata.push.util.echo; import com.sinata.push.util.SpringUtil; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; @@ -7,13 +8,14 @@ import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.ReferenceCountUtil; import org.springframework.data.redis.core.RedisTemplate; import java.net.InetSocketAddress; import java.util.HashMap; public class DiscardServerHandler extends SimpleChannelInboundHandler<String> { private NettyServerController nettyServerController = new NettyServerController(); private NettyServerController nettyServerController = SpringUtil.getObject(NettyServerController.class); public static boolean isdebug = true; MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java
@@ -10,16 +10,13 @@ import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.http.HttpEntity; import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import org.springframework.web.client.RestTemplate; import javax.annotation.Resource; import java.text.SimpleDateFormat; import java.util.*; import java.util.concurrent.TimeUnit; @@ -31,13 +28,16 @@ * @createDate 2016年6月3日 * @version 1.0 */ @Component public class NettyServerController { public static Hashtable<String, Hashtable<ChannelHandlerContext, String>> map = new Hashtable<String, Hashtable<ChannelHandlerContext,String>>(); public static Hashtable<String,String> table; private RedisTemplate<String, String> redisTemplate = SpringUtil.getObject(StringRedisTemplate.class); @Resource private RedisTemplate<String, Object> redisTemplate; private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); @@ -99,7 +99,7 @@ String device = jsonCon.getString("device"); String version = jsonCon.getString("version"); if(StringUtil.isNotEmpty(userId1)){ String fluid_control = redisTemplate.opsForValue().get("fluid_control_" + userId1 + "_" + type); String fluid_control = (String)redisTemplate.opsForValue().get("fluid_control_" + userId1 + "_" + type); if(!StringUtils.hasLength(fluid_control)){ redisTemplate.opsForValue().set("fluid_control_" + userId1 + "_" + type, System.currentTimeMillis() + ""); }else{ @@ -148,20 +148,6 @@ ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes())); } }else{ Map<String, Object> params = new HashMap<>(); params.put("driverId", userId1); HttpRequest post = HttpUtil.createPost(URLUtil.zuul + "/driver-server/base/driverOnline/addDriverOnline"); post.contentType(MediaType.APPLICATION_FORM_URLENCODED.getType()); post.form(params); HttpResponse execute = post.execute(); if(200 != execute.getStatus()){ System.err.println("调用driver-server添加司机在线数据出错了"); } JSONObject jsonObject = JSON.parseObject(execute.body(), JSONObject.class); if(jsonObject.getIntValue("code") != 200){ System.err.println("调用driver-server添加司机在线数据出错了"); } //TODO 存储最后一次上传的时间(用于保证车载端断电后1小时自动下班) if(StringUtil.isNotEmpty(device) && device.equals("carDevice")){ redisTemplate.opsForValue().set("DEVICE_" + userId1, String.valueOf(System.currentTimeMillis())); @@ -276,16 +262,16 @@ System.out.println("id:" + driverId + "---lon" + lon + "---lat" + lat); redisTemplate.opsForValue().set("DRIVER" + driverId, lon + "," + lat, 300, TimeUnit.SECONDS);//实时位置存入redis中 }else{ NettyServerController.sendMsgToClient(ctx, "__error__" + msg.toString()); this.sendMsgToClient(ctx, "__error__" + msg.toString()); } }else{ NettyServerController.sendMsgToClient(ctx, "__error__" + msg.toString()); this.sendMsgToClient(ctx, "__error__" + msg.toString()); } } } catch (Exception e) { if(isdebug) { NettyServerController.sendMsgToClient(ctx, "__error__" + msg.toString()); this.sendMsgToClient(ctx, "__error__" + msg.toString()); } e.printStackTrace(); } @@ -298,7 +284,7 @@ * @param msg * @author TaoNingBo */ public static void sendMsgToClient(ChannelHandlerContext ctx, String msg) { public void sendMsgToClient(ChannelHandlerContext ctx, String msg) { if (ctx != null && ctx.channel().isActive()) { ByteBuf buffer = Unpooled.copiedBuffer((msg).getBytes()); ChannelFuture sync; MessagePushTravel/src/main/resources/application.yml
@@ -9,31 +9,12 @@ multipart: max-request-size: 100MB max-file-size: 100MB data: redis: host: 172.21.35.151 database: 0 password: SC_cache@20#25 time-out: 1000 # 集群节点 cluster: nodes: 172.21.35.151:6512,172.21.35.152:6512,172.21.35.153:6512,172.21.35.151:6513,172.21.35.152:6513,172.21.35.153:6513 # 重定向最大次数 max-redirects: 3 lettuce: cluster: refresh: # 集群拓扑自适应刷新 adaptive: true # 集群拓扑刷新周期 毫秒 period: 3000 pool: # 最大链接数量 max-active: 100 # 最大阻塞时间 负数没有限制 max-wait: -1 # 最大空闲链接 max-idle: 10 # 最小空闲链接 min-idle: 0 redis: host: 172.21.35.151 port: 6512 password: SC_cache@20#25 database: 0 timeout: 1000 cluster: nodes: 172.21.35.151:6512,172.21.35.152:6512,172.21.35.153:6512,172.21.35.151:6513,172.21.35.152:6513,172.21.35.153:6513