Pu Zhibing
3 天以前 607a2413533f10be28ebbfe1c96a4669a3150ce4
提交推送服务
9个文件已修改
147 ■■■■ 已修改文件
MessagePushTravel/pom.xml 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
MessagePushTravel/src/main/java/com/sinata/push/controller/NettyController.java 14 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
MessagePushTravel/src/main/java/com/sinata/push/util/URLUtil.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
MessagePushTravel/src/main/java/com/sinata/push/util/applets/ChildChannelHandler.java 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
MessagePushTravel/src/main/java/com/sinata/push/util/applets/NettyWebSocketController.java 11 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
MessagePushTravel/src/main/java/com/sinata/push/util/applets/WebSocketHandler.java 14 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
MessagePushTravel/src/main/java/com/sinata/push/util/echo/DiscardServerHandler.java 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java 40 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
MessagePushTravel/src/main/resources/application.yml 35 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
MessagePushTravel/pom.xml
@@ -26,6 +26,14 @@
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>io.lettuce</groupId>
            <artifactId>lettuce-core</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
        </dependency>
        <!-- netty -->
        <dependency>
            <groupId>io.netty</groupId>
MessagePushTravel/src/main/java/com/sinata/push/controller/NettyController.java
@@ -3,10 +3,12 @@
import com.alibaba.fastjson.JSON;
import com.sinata.push.util.ResultUtil;
import com.sinata.push.util.SpringUtil;
import com.sinata.push.util.applets.NettyWebSocketController;
import com.sinata.push.util.echo.NettyChannelMap;
import com.sinata.push.util.echo.NettyServerController;
import io.netty.channel.ChannelHandlerContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
@@ -15,6 +17,12 @@
@RestController
@RequestMapping("/netty")
public class NettyController {
    @Autowired
    private NettyServerController nettyServerController;
    @Autowired
    private NettyWebSocketController nettyWebSocketController;
    /**
@@ -28,12 +36,12 @@
        if(type == 1){//用户端
            ChannelHandlerContext channel = NettyChannelMap.getData("Applets" + id);//小程序
            if(null != channel){
                NettyWebSocketController.sendMsgToClient(channel, msg);
                nettyWebSocketController.sendMsgToClient(channel, msg);
                return JSON.toJSONString(ResultUtil.success());
            }
            channel = NettyChannelMap.getData("USER" + id);
            if(null != channel){
                NettyServerController.sendMsgToClient(channel, msg);
                nettyServerController.sendMsgToClient(channel, msg);
                return JSON.toJSONString(ResultUtil.success());
            }
            return JSON.toJSONString(ResultUtil.error("推送失败-----用户id=" + id));
@@ -43,7 +51,7 @@
        if(type == 2){//司机端
            ChannelHandlerContext channel = NettyChannelMap.getData("DRIVER" + id);
            if(null != channel){
                NettyServerController.sendMsgToClient(channel, msg);
                nettyServerController.sendMsgToClient(channel, msg);
                return JSON.toJSONString(ResultUtil.success());
            }
            return JSON.toJSONString(ResultUtil.error("推送失败-----司机id=" + id));
MessagePushTravel/src/main/java/com/sinata/push/util/URLUtil.java
@@ -8,5 +8,5 @@
    /**
     * 业务网关接口地址
     */
    String zuul = "http://172.21.35.45:80";
    String zuul = "http://172.21.35.45:8010";
}
MessagePushTravel/src/main/java/com/sinata/push/util/applets/ChildChannelHandler.java
@@ -18,20 +18,13 @@
public class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
//        String path = "C:\\app\\cert\\tomcat\\www.gzjwzc.com.pfx";
//        SSLContext sslContext = createSSLContext.createSSLContext("PKCS12"
//                , path, "79uc9bsd");
//        String path = "/root/server/app/cert/qytzt.cn.jks";
//        SSLContext sslContext = createSSLContext.createSSLContext("JKS"
//                , path, "bo27xqbr");
//        SSLEngine engine = sslContext.createSSLEngine();
        File path = new File("/root/server/app/cert/qytzt.cn.key");
        File path1 = new File("/root/server/app/cert/qytzt.cn.pem");
        SslContext sslContext = SslContextBuilder.forServer(path, path1, null).clientAuth(ClientAuth.NONE).build();
        //SSLEngine 此类允许使用ssl安全套接层协议进行安全通信
        SSLEngine engine = sslContext.newEngine(socketChannel.alloc());
        engine.setUseClientMode(false);
        socketChannel.pipeline().addLast("ssl", new SslHandler(engine));
//        //SSLEngine 此类允许使用ssl安全套接层协议进行安全通信
//        engine.setUseClientMode(false);
//        socketChannel.pipeline().addLast("ssl", new SslHandler(engine));
        // 设置30秒没有读到数据,则触发一个READER_IDLE事件。
        // pipeline.addLast(new IdleStateHandler(30, 0, 0));
MessagePushTravel/src/main/java/com/sinata/push/util/applets/NettyWebSocketController.java
@@ -13,19 +13,24 @@
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Timer;
import java.util.TimerTask;
@Component
public class NettyWebSocketController {
    public static Hashtable<String, Hashtable<ChannelHandlerContext, String>> map = new Hashtable<String, Hashtable<ChannelHandlerContext,String>>();
    private RedisTemplate<String, String> redisTemplate = SpringUtil.getObject(StringRedisTemplate.class);
    @Resource
    private RedisTemplate<String, Object> redisTemplate;
    public static Hashtable<String,String> table;
    static{
@@ -112,7 +117,7 @@
        }
    } catch (Exception e) {
        if(isdebug) {
            NettyWebSocketController.sendMsgToClient(ctx, "__error__" + msg.toString());
            this.sendMsgToClient(ctx, "__error__" + msg.toString());
        }
        e.printStackTrace();
    }
MessagePushTravel/src/main/java/com/sinata/push/util/applets/WebSocketHandler.java
@@ -1,6 +1,7 @@
package com.sinata.push.util.applets;
import com.alibaba.fastjson.JSONObject;
import com.sinata.push.util.SpringUtil;
import com.sinata.push.util.echo.Method;
import com.sinata.push.util.echo.NettyChannelMap;
import com.sinata.push.util.echo.NettyMsg;
@@ -15,6 +16,7 @@
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.HashMap;
@@ -23,7 +25,9 @@
    //用于websocket握手的处理类
    private WebSocketServerHandshaker handshaker;
    private static final String WEB_SOCKET_URL = "wss://localhost:8808/websocket";
    private static final String WEB_SOCKET_URL = "ws://localhost:8888/websocket";
    private NettyWebSocketController nettyWebSocketController = SpringUtil.getObject(NettyWebSocketController.class);
@@ -73,6 +77,10 @@
    }
    private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
        HttpHeaders headers = req.headers();
        headers.names().forEach(name -> {
            System.out.println(name + ":" + headers.get(name));
        });
        // Http解码失败,向服务器指定传输的协议为Upgrade:websocket
        if(!req.getDecoderResult().isSuccess() || !("websocket").equals(req.headers().get("Upgrade"))){
            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
@@ -119,8 +127,8 @@
        jsonObject.put("data", new JSONObject());
        TextWebSocketFrame tws = new TextWebSocketFrame(jsonObject.toJSONString());
//        ctx.channel().writeAndFlush(tws);
        new NettyWebSocketController().JudgeOperation(ctx,requestmsg);//小程序心跳处理
        nettyWebSocketController.JudgeOperation(ctx,requestmsg);//小程序心跳处理
        // 群发服务端心跳响应
        Global.group.writeAndFlush(new TextWebSocketFrame((tws).text()));
MessagePushTravel/src/main/java/com/sinata/push/util/echo/DiscardServerHandler.java
@@ -1,5 +1,6 @@
package com.sinata.push.util.echo;
import com.sinata.push.util.SpringUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
@@ -7,13 +8,14 @@
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCountUtil;
import org.springframework.data.redis.core.RedisTemplate;
import java.net.InetSocketAddress;
import java.util.HashMap;
public class DiscardServerHandler extends SimpleChannelInboundHandler<String>  {
    private NettyServerController nettyServerController = new NettyServerController();
    private NettyServerController nettyServerController = SpringUtil.getObject(NettyServerController.class);
    
    public static boolean isdebug = true;
MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java
@@ -10,16 +10,13 @@
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import org.springframework.web.client.RestTemplate;
import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.TimeUnit;
@@ -31,13 +28,16 @@
 * @createDate 2016年6月3日
 * @version 1.0
 */
@Component
public class NettyServerController {
    
    public static Hashtable<String, Hashtable<ChannelHandlerContext, String>> map = new Hashtable<String, Hashtable<ChannelHandlerContext,String>>();
    public static Hashtable<String,String> table;
    private RedisTemplate<String, String> redisTemplate = SpringUtil.getObject(StringRedisTemplate.class);
    @Resource
    private RedisTemplate<String, Object> redisTemplate;
    private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
@@ -99,7 +99,7 @@
                String device = jsonCon.getString("device");
                String version = jsonCon.getString("version");
                if(StringUtil.isNotEmpty(userId1)){
                    String fluid_control = redisTemplate.opsForValue().get("fluid_control_" + userId1 + "_" + type);
                    String fluid_control = (String)redisTemplate.opsForValue().get("fluid_control_" + userId1 + "_" + type);
                    if(!StringUtils.hasLength(fluid_control)){
                        redisTemplate.opsForValue().set("fluid_control_" + userId1 + "_" + type, System.currentTimeMillis() + "");
                    }else{
@@ -148,20 +148,6 @@
                            ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
                        }
                    }else{
                        Map<String, Object> params = new HashMap<>();
                        params.put("driverId", userId1);
                        HttpRequest post = HttpUtil.createPost(URLUtil.zuul + "/driver-server/base/driverOnline/addDriverOnline");
                        post.contentType(MediaType.APPLICATION_FORM_URLENCODED.getType());
                        post.form(params);
                        HttpResponse execute = post.execute();
                        if(200 != execute.getStatus()){
                            System.err.println("调用driver-server添加司机在线数据出错了");
                        }
                        JSONObject jsonObject = JSON.parseObject(execute.body(), JSONObject.class);
                        if(jsonObject.getIntValue("code") != 200){
                            System.err.println("调用driver-server添加司机在线数据出错了");
                        }
                        //TODO 存储最后一次上传的时间(用于保证车载端断电后1小时自动下班)
                        if(StringUtil.isNotEmpty(device) && device.equals("carDevice")){
                            redisTemplate.opsForValue().set("DEVICE_" + userId1, String.valueOf(System.currentTimeMillis()));
@@ -276,16 +262,16 @@
                        System.out.println("id:" + driverId + "---lon" + lon + "---lat" + lat);
                        redisTemplate.opsForValue().set("DRIVER" + driverId, lon + "," + lat, 300, TimeUnit.SECONDS);//实时位置存入redis中
                    }else{
                        NettyServerController.sendMsgToClient(ctx, "__error__" + msg.toString());
                        this.sendMsgToClient(ctx, "__error__" + msg.toString());
                    }
                }else{
                    NettyServerController.sendMsgToClient(ctx, "__error__" + msg.toString());
                    this.sendMsgToClient(ctx, "__error__" + msg.toString());
                }
            }
            
        } catch (Exception e) {
            if(isdebug) {
                NettyServerController.sendMsgToClient(ctx, "__error__" + msg.toString());
                this.sendMsgToClient(ctx, "__error__" + msg.toString());
            }
            e.printStackTrace();
        }
@@ -298,7 +284,7 @@
     * @param msg
     * @author TaoNingBo
     */
    public static void sendMsgToClient(ChannelHandlerContext ctx, String msg) {
    public void sendMsgToClient(ChannelHandlerContext ctx, String msg) {
        if (ctx != null && ctx.channel().isActive()) {
            ByteBuf buffer = Unpooled.copiedBuffer((msg).getBytes());
            ChannelFuture sync;
MessagePushTravel/src/main/resources/application.yml
@@ -9,31 +9,12 @@
    multipart:
      max-request-size: 100MB
      max-file-size: 100MB
  data:
    redis:
      host: 172.21.35.151
      database: 0
      password: SC_cache@20#25
      time-out: 1000
      # 集群节点
      cluster:
        nodes: 172.21.35.151:6512,172.21.35.152:6512,172.21.35.153:6512,172.21.35.151:6513,172.21.35.152:6513,172.21.35.153:6513
        # 重定向最大次数
        max-redirects: 3
      lettuce:
        cluster:
          refresh:
            # 集群拓扑自适应刷新
            adaptive: true
            # 集群拓扑刷新周期 毫秒
            period: 3000
        pool:
          # 最大链接数量
          max-active: 100
          # 最大阻塞时间 负数没有限制
          max-wait: -1
          # 最大空闲链接
          max-idle: 10
          # 最小空闲链接
          min-idle: 0
  redis:
    host: 172.21.35.151
    port: 6512
    password: SC_cache@20#25
    database: 0
    timeout: 1000
    cluster:
      nodes: 172.21.35.151:6512,172.21.35.152:6512,172.21.35.153:6512,172.21.35.151:6513,172.21.35.152:6513,172.21.35.153:6513