package cn.mb.cloud.gateway.util.echo; import cn.mb.cloud.gateway.util.RedisUtil; import cn.mb.cloud.gateway.util.SinataUtil; import cn.mb.cloud.gateway.util.SpringUtil; import com.alibaba.csp.sentinel.util.StringUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import org.springframework.http.HttpEntity; import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; import org.springframework.web.client.RestTemplate; import java.util.HashMap; import java.util.Hashtable; import java.util.Timer; import java.util.TimerTask; /** * Netty业务逻辑层 * @author sinata * @createDate 2016年6月3日 * @version 1.0 */ public class NettyServerController { public static Hashtable> map = new Hashtable>(); public static Hashtable table; private RedisUtil redisUtil = SpringUtil.getObject(RedisUtil.class); private RestTemplate internalRestTemplate = SpringUtil.getObject(RestTemplate.class); static{ if(table == null){ table = new Hashtable<>(); } } public static boolean isdebug = false; public static int i = 0; /** * 判断客户端要执行什么操作 * * @param ctx * @param msg * @author TaoNingBo */ public void JudgeOperation(ChannelHandlerContext ctx, Object msg) { try { // ByteBuf转String ByteBuf byteBuf = (ByteBuf) msg; byte[] req = new byte[byteBuf.readableBytes()]; byteBuf.readBytes(req); msg = new String(req, "UTF-8"); // 验证即时通讯命令是否正确有效 if(SinataUtil.isEmpty(msg)) { return; } String msgStr = msg.toString(); if(msgStr.indexOf("{") == -1 || msgStr.indexOf("}") == -1 || msgStr.indexOf("code") == -1 || msgStr.indexOf("msg") == -1 || msgStr.indexOf("data") == -1 || msgStr.indexOf("method") == -1) { return; } if(isdebug) { // System.out.println("<<<--receive-->>>" + msg); } // 获取socket信息,保存相应的socket JSONObject jsonMsg = JSONObject.parseObject(msg.toString()); int code = jsonMsg.getInteger("code"); String message = jsonMsg.getString("msg"); String method = jsonMsg.getString("method"); if(code != 200 || !message.equals("SUCCESS")) { return; } JSONObject jsonCon = JSONObject.parseObject(jsonMsg.get("data").toString()); if(null != ctx && ctx.channel().isActive()){ jsonMsg.put("method", Method.pong); sendMsgToClient(ctx, jsonMsg.toJSONString()); } //心跳 if(method.equals(Method.ping)) { Integer type = jsonCon.getInteger("type"); String token = jsonCon.getString("token"); String userId1 = jsonCon.getString("userId"); String device = jsonCon.getString("device"); String version = jsonCon.getString("version"); if(StringUtil.isNotEmpty(userId1)){ //判断用户或者司机长连接 if(type==1){ //确保账号在单个设备上登录 if(StringUtil.isNotEmpty(token)){ String token_ = redisUtil.getValue("USER_APP_"+ userId1);//获取缓存中最新的数据 if(StringUtil.isNotEmpty(token_) && !token.equals(token_)){//不在同一设备上登录,向其他设备发送数据 JSONObject msg_ = new JSONObject(); msg_.put("code", 200); msg_.put("msg", "SUCCESS"); msg_.put("method", "OFFLINE"); msg_.put("data", new Object()); this.sendMsgToClient(ctx, msg_.toJSONString());//给当前通道发送消息 TimerTask timerTask = new TimerTask() { @Override public void run() { NettyChannelMap.remove_(ctx); NettyChannelMap.remove(ctx); } }; Timer timer = new Timer(); timer.schedule(timerTask, 3000); timer.cancel(); }else{ NettyChannelMap.update_(token.substring(0, 23), ctx); NettyChannelMap.update("USER" + userId1, ctx); String s = NettyMsg.setMsg(Method.ok, new HashMap()); ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes())); } if(StringUtil.isEmpty(token_)){//确保登录的时候存储token失败的情况 redisUtil.setStrValue("USER_APP_" + userId1, token); } } }else{ //添加司机在线 HttpHeaders headers = new HttpHeaders(); // 以表单的方式提交 headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED); //将请求头部和参数合成一个请求 MultiValueMap params = new LinkedMultiValueMap<>(); params.add("driverId", userId1); params.add("device", (null != device && device.equals("carDevice")) ? "2" : "1"); params.add("version", version); HttpEntity> requestEntity = new HttpEntity<>(params, headers); String w = internalRestTemplate.postForObject("http://driver-server/base/driverOnline/addDriverOnline",requestEntity , String.class); JSONObject jsonObject = JSON.parseObject(w, JSONObject.class); if(jsonObject.getIntValue("code") != 200){ System.err.println("调用driver-server添加司机在线数据出错了"); } if(StringUtil.isNotEmpty(device) && device.equals("carDevice")){ redisUtil.setStrValue("DEVICE_" + userId1, String.valueOf(System.currentTimeMillis())); String token_ = redisUtil.getValue("DRIVER_" + userId1);//缓存中拿最新数据 if(StringUtil.isNotEmpty(token_) && !token_.equals(token)){ //如果是车载端登录,则将其它端都强迫下线 JSONObject msg_ = new JSONObject(); msg_.put("code", 200); msg_.put("msg", "SUCCESS"); msg_.put("method", "OFFLINE"); msg_.put("data", new Object()); this.sendMsgToClient(ctx, msg_.toJSONString());//给当前通道发送消息 TimerTask timerTask = new TimerTask() { @Override public void run() { NettyChannelMap.remove_(ctx); NettyChannelMap.remove(ctx); } }; Timer timer = new Timer(); timer.schedule(timerTask, 3000); timer.cancel(); }else{ // System.err.println("开始存储司机通道" + userId1); NettyChannelMap.update("DRIVER" + userId1, ctx); NettyChannelMap.update_(token.substring(0, 23), ctx); String s = NettyMsg.setMsg(Method.ok, new HashMap()); ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes())); } if(StringUtil.isEmpty(token_)){//确保登录的时候存储token失败的情况 redisUtil.setStrValue("DRIVER_" + userId1, token); } } //确保账号在单个设备上登录 String value = redisUtil.getValue("DEVICE_" + userId1); if(StringUtil.isNotEmpty(token) && StringUtil.isEmpty(device) && StringUtil.isEmpty(value)){//APP端登录的操作 String token_ = redisUtil.getValue("DRIVER_" + userId1);//缓存中拿最新数据 if(StringUtil.isNotEmpty(token_) && !token.equals(token_)){//不在同一设备上登录,向当前设备发送数据 JSONObject msg_ = new JSONObject(); msg_.put("code", 200); msg_.put("msg", "SUCCESS"); msg_.put("method", "OFFLINE"); msg_.put("data", new Object()); this.sendMsgToClient(ctx, msg_.toJSONString());//给当前通道发送消息 TimerTask timerTask = new TimerTask() { @Override public void run() { NettyChannelMap.remove_(ctx); NettyChannelMap.remove(ctx); } }; Timer timer = new Timer(); timer.schedule(timerTask, 3000); timer.cancel(); }else{ // System.err.println("开始存储司机通道" + userId1); NettyChannelMap.update("DRIVER" + userId1, ctx); NettyChannelMap.update_(token.substring(0, 23), ctx); String s = NettyMsg.setMsg(Method.ok, new HashMap()); ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes())); } if(StringUtil.isEmpty(token_)){//确保登录的时候存储token失败的情况 redisUtil.setStrValue("DRIVER_" + userId1, token); } } //存储通讯通道 if(null != ctx && ctx.channel().isActive()){ // System.err.println("开始存储司机通道" + userId1); NettyChannelMap.update("DRIVER" + userId1, ctx); String s = NettyMsg.setMsg(Method.ok, new HashMap()); ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes())); } } } } //司机上传位置 if(method.equals(Method.location)){ Integer driverId = jsonCon.getInteger("driverId"); Integer orderId = jsonCon.getInteger("orderId"); Integer orderType = jsonCon.getInteger("orderType"); Double lon = jsonCon.getDouble("lon"); Double lat = jsonCon.getDouble("lat"); Double computeAzimuth = jsonCon.getDouble("computeAzimuth"); Double altitude = jsonCon.getDouble("altitude"); if(SinataUtil.isNotEmpty(driverId)){ if(null != lon && 0 != lon && null != lat && 0 != lat){ if(null != orderId && 0 != driverId && null != orderType && 0 != orderType){//开始存入数据库 HttpHeaders headers = new HttpHeaders(); // 以表单的方式提交 headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED); //将请求头部和参数合成一个请求 MultiValueMap params = new LinkedMultiValueMap<>(); params.add("orderType", String.valueOf(orderType)); params.add("orderId", String.valueOf(orderId)); params.add("driverId", String.valueOf(driverId)); params.add("lon", String.valueOf(lon)); params.add("lat", String.valueOf(lat)); params.add("directionAngle", String.valueOf(computeAzimuth)); params.add("altitude", String.valueOf(altitude)); HttpEntity> requestEntity = new HttpEntity<>(params, headers); String s = internalRestTemplate.postForObject("http://driver-server/base/savePosition",requestEntity , String.class); JSONObject jsonObject = JSON.parseObject(s, JSONObject.class); if(jsonObject.getIntValue("code") != 200){ System.err.println("调用driver-server存储位置数据出错了"); } } redisUtil.setStrValue("DRIVER" + driverId, lon + "," + lat, 300);//实时位置存入redis中 }else{ NettyServerController.sendMsgToClient(ctx, "__error__" + msg.toString()); } }else{ NettyServerController.sendMsgToClient(ctx, "__error__" + msg.toString()); } } } catch (Exception e) { if(isdebug) { NettyServerController.sendMsgToClient(ctx, "__error__" + msg.toString()); } e.printStackTrace(); } } /** * 向客户端发送消息 * * @param ctx * @param msg * @author TaoNingBo */ public static void sendMsgToClient(ChannelHandlerContext ctx, String msg) { if (ctx != null && ctx.channel().isActive()) { ByteBuf buffer = Unpooled.copiedBuffer((msg).getBytes()); ChannelFuture sync; try { sync = ctx.writeAndFlush(buffer).sync(); if(!sync.isSuccess()){//如果推送失败则继续推送10次 boolean b = true; for (int i = 0; i < 10; i++) { ctx.wait(3000); sync = ctx.writeAndFlush(buffer).sync(); if(sync.isSuccess()){ b = false; break; } System.err.println("推送不成功,将继续推送"+msg); } if(b){ NettyChannelMap.remove(ctx); } } } catch (Exception e) { System.err.println("推送发生异常,记录:"+msg); NettyChannelMap.remove(ctx); } if(isdebug) { System.err.println("<<<--send-->>>" + msg) ; } }else{ System.err.println("推送失败,长连接不存在"); NettyChannelMap.remove(ctx); } } // **链接断开 将推送消息记录 public static void sendMsgToClient(String cacheType, Integer id,String msg) { ChannelHandlerContext ctx = NettyChannelMap.getData(cacheType + id); if (ctx != null) { ByteBuf buffer = Unpooled.copiedBuffer((msg).getBytes()); ChannelFuture sync; try { sync = ctx.writeAndFlush(buffer).sync(); // System.out.println("推送状态"+sync.isSuccess()); if(!sync.isSuccess()){ for (int i = 0; i < 10; i++) { sync = ctx.writeAndFlush(buffer).sync(); if(!sync.isSuccess()){ sync = ctx.writeAndFlush(buffer).sync(); System.err.println("推送不成功,将继续推送"+msg); if(i == 9){ table.put(cacheType+id, msg); ctx.close(); System.err.println("推送发生异常,记录:"+msg); } }else{ break; } } } } catch (Exception e) { table.put(cacheType+id, msg); System.err.println("推送发生异常,记录:"+msg); } if(isdebug) { // System.out.println("<<<--send-->>>" + msg); } }else{ table.put(cacheType+id, msg); System.err.println("链接断开,记录:id="+cacheType+id+",消息:"+msg); } } /** * 记录推送不成功消息,并在心跳连接续推 * @param token */ public static void resendMsg(String token){ String msg = table.get(token); ChannelHandlerContext ctx = NettyChannelMap.getData(token); if(SinataUtil.isNotEmpty(msg) && ctx != null && ctx.channel().isActive()){ ByteBuf buffer = Unpooled.copiedBuffer((msg).getBytes()); ChannelFuture sync; try { sync = ctx.writeAndFlush(buffer).sync(); System.err.println("重发异常推送状态"+sync.isSuccess()+",位置:"+token+",消息内容:"+msg); if(!sync.isSuccess()){ i++; if(i == 10){ i =0; ctx.close(); return; } System.err.println("重发异常推送不成功,将继续推送"+msg); resendMsg(token); }else{ i=0; } table.remove(token); } catch (Exception e) { resendMsg(token); System.err.println("重发推送发生异常,记录:"+msg); } } } }