Pu Zhibing
7 天以前 a1f5b4bcde020c206382c3a07c9b4ecd5784a9d8
MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java
@@ -5,21 +5,22 @@
import cn.hutool.http.HttpUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.sinata.push.config.QYTConfig;
import com.sinata.push.util.*;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.util.StringUtils;
import org.springframework.web.client.RestTemplate;
import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
@@ -28,26 +29,30 @@
 * @createDate 2016年6月3日
 * @version 1.0
 */
@Component
public class NettyServerController {
   
   public static Hashtable<String, Hashtable<ChannelHandlerContext, String>> map = new Hashtable<String, Hashtable<ChannelHandlerContext,String>>();
   public static Hashtable<String,String> table;
   
   @Resource
   private RedisTemplate<String, Object> redisTemplate;
   public static QYTConfig qytConfig;
   private RedisUtil redisUtil = SpringUtil.getObject(RedisUtil.class);
   
   private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
   static{
      if(table == null){
         table = new Hashtable<>();
      }
   }
   public static void setQytConfig(QYTConfig qytConfig) {
      NettyServerController.qytConfig = qytConfig;
   }
   
   public static boolean isdebug = false;
@@ -61,7 +66,7 @@
    * @param msg
    * @author TaoNingBo
    */
   public void JudgeOperation(ChannelHandlerContext ctx, Object msg) {
   public synchronized void JudgeOperation(ChannelHandlerContext ctx, Object msg) {
        try {
           // ByteBuf转String
           ByteBuf byteBuf = (ByteBuf) msg;
@@ -98,106 +103,93 @@
                String userId1 = jsonCon.getString("userId");
            String device = jsonCon.getString("device");
            String version = jsonCon.getString("version");
            String businessType = jsonCon.getString("businessType");//1:打车,2=代驾
            String business = "2".equals(businessType) ? "daijia" : "dache";
                if(StringUtil.isNotEmpty(userId1)){
               String fluid_control = (String)redisTemplate.opsForValue().get("fluid_control_" + userId1 + "_" + type);
               if(!StringUtils.hasLength(fluid_control)){
                  redisTemplate.opsForValue().set("fluid_control_" + userId1 + "_" + type, System.currentTimeMillis() + "");
               }else{
                  long l = System.currentTimeMillis() - Long.valueOf(fluid_control);
                  if(l >= 10000){
                     redisTemplate.opsForValue().set("fluid_control_" + userId1 + "_" + type, System.currentTimeMillis() + "");
                  }else{
                     String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
                     ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
                     return;
                  }
               }
               //判断用户或者司机长连接
                    //判断用户或者司机长连接
                    if(type==1){
                       //存储通讯通道
                       if(null != ctx && ctx.channel().isActive()){
                          System.err.println("开始存储用户通道:" + sdf.format(new Date()) + "----" + userId1);
                          NettyChannelMap.update("USER" + userId1, ctx);
                          String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
                          ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
                       }
                        //确保账号在单个设备上登录
                        if(StringUtil.isNotEmpty(token)){
                            String token_ = (String)redisTemplate.opsForValue().get("USER_" + userId1);//获取缓存中最新的数据
                            String token_ = redisUtil.getValue(business + ":USER_APP_"+ userId1);//获取缓存中最新的数据
                            if(StringUtil.isNotEmpty(token_) && !token.equals(token_)){//不在同一设备上登录,向其他设备发送数据
                        ChannelHandlerContext context = NettyChannelMap.getData("USER" + userId1);
                        ChannelHandlerContext data_ = NettyChannelMap.getData_(token_.substring(token_.length() - 16));
                        JSONObject msg_ = new JSONObject();
                                msg_.put("code", 200);
                                msg_.put("msg", "SUCCESS");
                                msg_.put("method", "OFFLINE");
                                msg_.put("data", new Object());
                        this.sendMsgToClient(context, msg_.toJSONString());//给当前通道发送消息
                                TimerTask timerTask = new TimerTask() {
                        boolean b = this.sendMsgToClient(data_, msg_.toJSONString());//给当前通道发送消息
                        if(b){
                           NettyChannelMap.remove_(data_);
                        }
                        new Timer().schedule(new TimerTask() {
                                    @Override
                                    public void run() {
                              NettyChannelMap.remove(context);
                                        NettyChannelMap.remove_(data_);
                                    }
                                };
                                Timer timer = new Timer();
                                timer.schedule(timerTask, 3000);
                                timer.cancel();
                                }, 5000);
                            }
                            if(StringUtil.isEmpty(token_)){//确保登录的时候存储token失败的情况
                                redisTemplate.opsForValue().set("USER_" + userId1, token);
                            }
                            NettyChannelMap.update_(token.substring(token.length() - 16), ctx);
                            NettyChannelMap.update(business + ":USER" + userId1, ctx);
                     redisUtil.setStrValue(business + ":USER_APP_" + userId1, token);
                        }
                  //存储通讯通道
                  if(null != ctx && ctx.channel().isActive()){
                     NettyChannelMap.update(business + ":USER" + userId1, ctx);
                  }
                    }else{
                       //存储通讯通道
                       if(null != ctx && ctx.channel().isActive()){
                          System.err.println("开始存储司机通道:" + sdf.format(new Date()) + "----" + userId1);
                          NettyChannelMap.update("DRIVER" + userId1, ctx);
                          String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
                          ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
                       }
                        //确保账号在单个设备上登录
                  String value = (String)redisTemplate.opsForValue().get("DEVICE_" + userId1);
                  if(StringUtil.isNotEmpty(token) && StringUtil.isEmpty(value)){//APP端登录的操作
                            String token_ = (String)redisTemplate.opsForValue().get("DRIVER_" + userId1);//缓存中拿最新数据
                  if(StringUtil.isNotEmpty(token)){//APP端登录的操作
                            String token_ = redisUtil.getValue(business + ":DRIVER_" + userId1);//缓存中拿最新数据
                            if(StringUtil.isNotEmpty(token_) && !token.equals(token_)){//不在同一设备上登录,向当前设备发送数据
                        ChannelHandlerContext context = NettyChannelMap.getData("DRIVER" + userId1);
                        JSONObject msg_ = new JSONObject();
                                msg_.put("code", 200);
                                msg_.put("msg", "SUCCESS");
                                msg_.put("method", "OFFLINE");
                                msg_.put("data", new Object());
                        this.sendMsgToClient(context, msg_.toJSONString());//给当前通道发送消息
                                TimerTask timerTask = new TimerTask() {
                                    @Override
                                    public void run() {
                              NettyChannelMap.remove(context);
                                    }
                                };
                                Timer timer = new Timer();
                                timer.schedule(timerTask, 3000);
                                timer.cancel();
                                ChannelHandlerContext data_ = NettyChannelMap.getData_(token_.substring(token_.length() - 16));
                                if(null != data_){
                           JSONObject msg_ = new JSONObject();
                           msg_.put("code", 200);
                           msg_.put("msg", "SUCCESS");
                           msg_.put("method", "OFFLINE");
                           msg_.put("data", new Object());
                           boolean b = this.sendMsgToClient(data_, msg_.toJSONString());//给当前通道发送消息
                           if(b){
                              NettyChannelMap.remove_(data_);
                           }
                        }
                            }
                            if(StringUtil.isEmpty(token_)){//确保登录的时候存储token失败的情况
                                redisTemplate.opsForValue().set("DRIVER_" + userId1, token);
                            }
                            NettyChannelMap.update(business + ":DRIVER" + userId1, ctx);
                            NettyChannelMap.update_(token.substring(token.length() - 16), ctx);
                     redisUtil.setStrValue(business + ":DRIVER_" + userId1, token);
                        }
                  //存储通讯通道
                        if(null != ctx && ctx.channel().isActive()){
                            NettyChannelMap.update(business + ":DRIVER" + userId1, ctx);
                        }
                    }
                }
            if(null != ctx && ctx.channel().isActive()){
               jsonMsg.put("method", Method.pong);
               sendMsgToClient(ctx, jsonMsg.toJSONString());
            }
         }
         //司机上传位置
         if(method.equals(Method.location)){
            Integer driverId = jsonCon.getInteger("driverId");
            String fluid_control = (String)redisTemplate.opsForValue().get("location_" + driverId);
            String businessType = jsonCon.getString("businessType");//1:打车,2=代驾
            String business = "2".equals(businessType) || null==businessType ? "daijia" : "dache";
            String fluid_control = redisUtil.getValue(business + ":location_" + driverId);
            if(!StringUtils.hasLength(fluid_control)){
               redisTemplate.opsForValue().set("location_" + driverId, System.currentTimeMillis() + "");
               redisUtil.setStrValue(business + ":location_" + driverId, System.currentTimeMillis() + "");
            }else{
               long l = System.currentTimeMillis() - Long.valueOf(fluid_control);
               if(l < 5000){
                  return;
               }
               redisTemplate.opsForValue().set("location_" + driverId, System.currentTimeMillis() + "");
               redisUtil.setStrValue(business + ":location_" + driverId, System.currentTimeMillis() + "");
            }
            Integer orderId = jsonCon.getInteger("orderId");
            Integer orderType = jsonCon.getInteger("orderType");
            Double lon = jsonCon.getDouble("lon");
@@ -207,16 +199,44 @@
            System.out.println("司机上传位置:" + sdf.format(new Date()) + "----" + jsonCon.toJSONString());
            if(SinataUtil.isNotEmpty(driverId)){
               if(null !=  lon && 0 != lon && null !=  lat && 0 != lat){
                  if(null != orderId && 0 != driverId && null != orderType && 0 != orderType){//开始存入数据库
                  if("dache".equals(business)){
                     if(null != orderId && 0 != driverId && null != orderType && 0 != orderType){//开始存入数据库
                        Map<String, Object> params = new HashMap<>();
                        params.put("orderType", String.valueOf(orderType));
                        params.put("orderId", String.valueOf(orderId));
                        params.put("driverId", String.valueOf(driverId));
                        params.put("lon", String.valueOf(lon));
                        params.put("lat", String.valueOf(lat));
                        params.put("directionAngle", String.valueOf(computeAzimuth));
                        params.put("altitude", String.valueOf(altitude));
                        HttpRequest post = HttpUtil.createPost(qytConfig.getChuxingUrl() + "/driver-server/base/savePosition");
                        post.contentType(MediaType.APPLICATION_FORM_URLENCODED_VALUE);
                        post.form(params);
                        HttpResponse execute = post.execute();
                        if(200 != execute.getStatus()){
                           System.err.println("调用driver-server存储位置数据出错了");
                        }
                        JSONObject jsonObject = JSON.parseObject(execute.body(), JSONObject.class);
                        if(jsonObject.getIntValue("code") != 200){
                           System.err.println("调用driver-server存储位置数据出错了");
                        }
                     }
                  }
                  if("daijia".equals(business)){
                     HttpHeaders headers = new HttpHeaders();
                     // 以表单的方式提交
                     headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
                     //将请求头部和参数合成一个请求
                     Map<String, Object> params = new HashMap<>();
                     params.put("orderType", String.valueOf(orderType));
                     params.put("orderId", String.valueOf(orderId));
                     params.put("orderType", null == orderType ? orderType : String.valueOf(orderType));
                     params.put("orderId", null == orderId ? orderId : String.valueOf(orderId));
                     params.put("driverId", String.valueOf(driverId));
                     params.put("lon", String.valueOf(lon));
                     params.put("lat", String.valueOf(lat));
                     params.put("directionAngle", String.valueOf(computeAzimuth));
                     params.put("altitude", String.valueOf(altitude));
                     HttpRequest post = HttpUtil.createPost(URLUtil.zuul + "/driver-server/base/savePosition");
                     HttpRequest post = HttpUtil.createPost(qytConfig.getDaijiaurl() + "/driver-server/base/driver/addDriverPosition");
                     post.contentType(MediaType.APPLICATION_FORM_URLENCODED_VALUE);
                     post.form(params);
                     HttpResponse execute = post.execute();
@@ -228,8 +248,8 @@
                        System.err.println("调用driver-server存储位置数据出错了");
                     }
                  }
                  System.out.println("id:" + driverId + "---lon" + lon + "---lat" + lat);
                        redisTemplate.opsForValue().set("DRIVER" + driverId, lon + "," + lat, 300, TimeUnit.SECONDS);//实时位置存入redis中
                  redisUtil.setStrValue(business + ":DRIVER" + driverId, lon + "," + lat, 30);//实时位置存入redis中
               }else{
                  this.sendMsgToClient(ctx, "__error__" + msg.toString());
               }
@@ -240,7 +260,7 @@
         
      } catch (Exception e) {
         if(isdebug) {
            this.sendMsgToClient(ctx, "__error__" + msg.toString());
            NettyServerController.sendMsgToClient(ctx, "__error__" + msg.toString());
         }
         e.printStackTrace();
      }
@@ -253,7 +273,7 @@
    * @param msg
    * @author TaoNingBo
    */
   public void sendMsgToClient(ChannelHandlerContext ctx, String msg) {
   public static boolean sendMsgToClient(ChannelHandlerContext ctx, String msg) {
      if (ctx != null && ctx.channel().isActive()) {
         ByteBuf buffer = Unpooled.copiedBuffer((msg).getBytes());
         ChannelFuture sync;
@@ -273,7 +293,9 @@
               if(b){
                  NettyChannelMap.remove(ctx);
               }
               return true;
            }
            return sync.isSuccess();
         } catch (Exception e) {
            System.err.println("推送发生异常,记录:"+msg);
            NettyChannelMap.remove(ctx);
@@ -285,6 +307,7 @@
         System.err.println("推送失败,长连接不存在");
         NettyChannelMap.remove(ctx);
      }
      return false;
   }
//   **链接断开 将推送消息记录