From 1c40baaf9ca0183945b9881d11ceed5aeebc8290 Mon Sep 17 00:00:00 2001
From: Pu Zhibing <393733352@qq.com>
Date: 星期四, 23 十月 2025 11:35:44 +0800
Subject: [PATCH] 修改bug
---
MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java | 200 +++++++++++++++++++++++++++----------------------
1 files changed, 111 insertions(+), 89 deletions(-)
diff --git a/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java b/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java
index c4ae7ad..cf893ba 100644
--- a/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java
+++ b/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java
@@ -5,21 +5,22 @@
import cn.hutool.http.HttpUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
+import com.sinata.push.config.QYTConfig;
import com.sinata.push.util.*;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
-import org.springframework.stereotype.Component;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
import org.springframework.util.StringUtils;
+import org.springframework.web.client.RestTemplate;
-import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.*;
-import java.util.concurrent.TimeUnit;
/**
@@ -28,26 +29,30 @@
* @createDate 2016年6月3日
* @version 1.0
*/
-@Component
public class NettyServerController {
public static Hashtable<String, Hashtable<ChannelHandlerContext, String>> map = new Hashtable<String, Hashtable<ChannelHandlerContext,String>>();
public static Hashtable<String,String> table;
- @Resource
- private RedisTemplate<String, Object> redisTemplate;
+ public static QYTConfig qytConfig;
+
+ private RedisUtil redisUtil = SpringUtil.getObject(RedisUtil.class);
+
+
-
private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
-
-
-
+
static{
if(table == null){
table = new Hashtable<>();
}
+ }
+
+
+ public static void setQytConfig(QYTConfig qytConfig) {
+ NettyServerController.qytConfig = qytConfig;
}
public static boolean isdebug = false;
@@ -61,7 +66,7 @@
* @param msg
* @author TaoNingBo
*/
- public void JudgeOperation(ChannelHandlerContext ctx, Object msg) {
+ public synchronized void JudgeOperation(ChannelHandlerContext ctx, Object msg) {
try {
// ByteBuf转String
ByteBuf byteBuf = (ByteBuf) msg;
@@ -77,9 +82,7 @@
if(msgStr.indexOf("{") == -1 || msgStr.indexOf("}") == -1 || msgStr.indexOf("code") == -1 || msgStr.indexOf("msg") == -1 || msgStr.indexOf("data") == -1 || msgStr.indexOf("method") == -1) {
return;
}
- if(isdebug) {
-// System.out.println("<<<--receive-->>>" + msg);
- }
+ System.out.println("<<<--receive-->>>" + msg);
// 获取socket信息,保存相应的socket
JSONObject jsonMsg = JSONObject.parseObject(msg.toString());
@@ -98,106 +101,93 @@
String userId1 = jsonCon.getString("userId");
String device = jsonCon.getString("device");
String version = jsonCon.getString("version");
+ String businessType = jsonCon.getString("businessType");//1:打车,2=代驾
+ String business = "2".equals(businessType) ? "daijia" : "dache";
if(StringUtil.isNotEmpty(userId1)){
- String fluid_control = (String)redisTemplate.opsForValue().get("fluid_control_" + userId1 + "_" + type);
- if(!StringUtils.hasLength(fluid_control)){
- redisTemplate.opsForValue().set("fluid_control_" + userId1 + "_" + type, System.currentTimeMillis() + "");
- }else{
- long l = System.currentTimeMillis() - Long.valueOf(fluid_control);
- if(l >= 10000){
- redisTemplate.opsForValue().set("fluid_control_" + userId1 + "_" + type, System.currentTimeMillis() + "");
- }else{
- String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
- ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
- return;
- }
- }
- //判断用户或者司机长连接
+ //判断用户或者司机长连接
if(type==1){
- //存储通讯通道
- if(null != ctx && ctx.channel().isActive()){
- System.err.println("开始存储用户通道:" + sdf.format(new Date()) + "----" + userId1);
- NettyChannelMap.update("USER" + userId1, ctx);
- String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
- ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
- }
//确保账号在单个设备上登录
if(StringUtil.isNotEmpty(token)){
- String token_ = (String)redisTemplate.opsForValue().get("USER_" + userId1);//获取缓存中最新的数据
+ String token_ = redisUtil.getValue(business + ":USER_APP_"+ userId1);//获取缓存中最新的数据
if(StringUtil.isNotEmpty(token_) && !token.equals(token_)){//不在同一设备上登录,向其他设备发送数据
- ChannelHandlerContext context = NettyChannelMap.getData("USER" + userId1);
+ ChannelHandlerContext data_ = NettyChannelMap.getData_(token_.substring(token_.length() - 16));
JSONObject msg_ = new JSONObject();
msg_.put("code", 200);
msg_.put("msg", "SUCCESS");
msg_.put("method", "OFFLINE");
msg_.put("data", new Object());
- this.sendMsgToClient(context, msg_.toJSONString());//给当前通道发送消息
- TimerTask timerTask = new TimerTask() {
+ boolean b = this.sendMsgToClient(data_, msg_.toJSONString());//给当前通道发送消息
+ if(b){
+ NettyChannelMap.remove_(data_);
+ }
+ new Timer().schedule(new TimerTask() {
@Override
public void run() {
- NettyChannelMap.remove(context);
+ NettyChannelMap.remove_(data_);
}
- };
- Timer timer = new Timer();
- timer.schedule(timerTask, 3000);
- timer.cancel();
+ }, 5000);
}
- if(StringUtil.isEmpty(token_)){//确保登录的时候存储token失败的情况
- redisTemplate.opsForValue().set("USER_" + userId1, token);
- }
+ NettyChannelMap.update_(token.substring(token.length() - 16), ctx);
+ NettyChannelMap.update(business + ":USER" + userId1, ctx);
+ redisUtil.setStrValue(business + ":USER_APP_" + userId1, token);
}
+
+ //存储通讯通道
+ if(null != ctx && ctx.channel().isActive()){
+ NettyChannelMap.update(business + ":USER" + userId1, ctx);
+ }
}else{
- //存储通讯通道
- if(null != ctx && ctx.channel().isActive()){
- System.err.println("开始存储司机通道:" + sdf.format(new Date()) + "----" + userId1);
- NettyChannelMap.update("DRIVER" + userId1, ctx);
- String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
- ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
- }
//确保账号在单个设备上登录
- String value = (String)redisTemplate.opsForValue().get("DEVICE_" + userId1);
- if(StringUtil.isNotEmpty(token) && StringUtil.isEmpty(value)){//APP端登录的操作
- String token_ = (String)redisTemplate.opsForValue().get("DRIVER_" + userId1);//缓存中拿最新数据
+ if(StringUtil.isNotEmpty(token)){//APP端登录的操作
+ String token_ = redisUtil.getValue(business + ":DRIVER_" + userId1);//缓存中拿最新数据
if(StringUtil.isNotEmpty(token_) && !token.equals(token_)){//不在同一设备上登录,向当前设备发送数据
- ChannelHandlerContext context = NettyChannelMap.getData("DRIVER" + userId1);
- JSONObject msg_ = new JSONObject();
- msg_.put("code", 200);
- msg_.put("msg", "SUCCESS");
- msg_.put("method", "OFFLINE");
- msg_.put("data", new Object());
- this.sendMsgToClient(context, msg_.toJSONString());//给当前通道发送消息
- TimerTask timerTask = new TimerTask() {
- @Override
- public void run() {
- NettyChannelMap.remove(context);
- }
- };
- Timer timer = new Timer();
- timer.schedule(timerTask, 3000);
- timer.cancel();
+ ChannelHandlerContext data_ = NettyChannelMap.getData_(token_.substring(token_.length() - 16));
+ if(null != data_){
+ JSONObject msg_ = new JSONObject();
+ msg_.put("code", 200);
+ msg_.put("msg", "SUCCESS");
+ msg_.put("method", "OFFLINE");
+ msg_.put("data", new Object());
+ boolean b = this.sendMsgToClient(data_, msg_.toJSONString());//给当前通道发送消息
+ if(b){
+ NettyChannelMap.remove_(data_);
+ }
+ }
}
- if(StringUtil.isEmpty(token_)){//确保登录的时候存储token失败的情况
- redisTemplate.opsForValue().set("DRIVER_" + userId1, token);
- }
+ NettyChannelMap.update(business + ":DRIVER" + userId1, ctx);
+ NettyChannelMap.update_(token.substring(token.length() - 16), ctx);
+ redisUtil.setStrValue(business + ":DRIVER_" + userId1, token);
+ }
+ //存储通讯通道
+ if(null != ctx && ctx.channel().isActive()){
+ NettyChannelMap.update(business + ":DRIVER" + userId1, ctx);
}
}
}
+
+ if(null != ctx && ctx.channel().isActive()){
+ jsonMsg.put("method", Method.pong);
+ sendMsgToClient(ctx, jsonMsg.toJSONString());
+ }
}
//司机上传位置
if(method.equals(Method.location)){
Integer driverId = jsonCon.getInteger("driverId");
- String fluid_control = (String)redisTemplate.opsForValue().get("location_" + driverId);
+ String businessType = jsonCon.getString("businessType");//1:打车,2=代驾
+ String business = "2".equals(businessType) || null==businessType ? "daijia" : "dache";
+
+ String fluid_control = redisUtil.getValue(business + ":location_" + driverId);
if(!StringUtils.hasLength(fluid_control)){
- redisTemplate.opsForValue().set("location_" + driverId, System.currentTimeMillis() + "");
+ redisUtil.setStrValue(business + ":location_" + driverId, System.currentTimeMillis() + "");
}else{
long l = System.currentTimeMillis() - Long.valueOf(fluid_control);
if(l < 5000){
return;
}
- redisTemplate.opsForValue().set("location_" + driverId, System.currentTimeMillis() + "");
+ redisUtil.setStrValue(business + ":location_" + driverId, System.currentTimeMillis() + "");
}
-
+
Integer orderId = jsonCon.getInteger("orderId");
Integer orderType = jsonCon.getInteger("orderType");
Double lon = jsonCon.getDouble("lon");
@@ -207,16 +197,44 @@
System.out.println("司机上传位置:" + sdf.format(new Date()) + "----" + jsonCon.toJSONString());
if(SinataUtil.isNotEmpty(driverId)){
if(null != lon && 0 != lon && null != lat && 0 != lat){
- if(null != orderId && 0 != driverId && null != orderType && 0 != orderType){//开始存入数据库
+ if("dache".equals(business)){
+ if(null != orderId && 0 != driverId && null != orderType && 0 != orderType){//开始存入数据库
+ Map<String, Object> params = new HashMap<>();
+ params.put("orderType", String.valueOf(orderType));
+ params.put("orderId", String.valueOf(orderId));
+ params.put("driverId", String.valueOf(driverId));
+ params.put("lon", String.valueOf(lon));
+ params.put("lat", String.valueOf(lat));
+ params.put("directionAngle", String.valueOf(computeAzimuth));
+ params.put("altitude", String.valueOf(altitude));
+ HttpRequest post = HttpUtil.createPost(qytConfig.getChuxingUrl() + "/driver-server/base/savePosition");
+ post.contentType(MediaType.APPLICATION_FORM_URLENCODED_VALUE);
+ post.form(params);
+ HttpResponse execute = post.execute();
+ if(200 != execute.getStatus()){
+ System.err.println("调用driver-server存储位置数据出错了");
+ }
+ JSONObject jsonObject = JSON.parseObject(execute.body(), JSONObject.class);
+ if(jsonObject.getIntValue("code") != 200){
+ System.err.println("调用driver-server存储位置数据出错了");
+ }
+ }
+ }
+
+ if("daijia".equals(business)){
+ HttpHeaders headers = new HttpHeaders();
+ // 以表单的方式提交
+ headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
+ //将请求头部和参数合成一个请求
Map<String, Object> params = new HashMap<>();
- params.put("orderType", String.valueOf(orderType));
- params.put("orderId", String.valueOf(orderId));
+ params.put("orderType", null == orderType ? orderType : String.valueOf(orderType));
+ params.put("orderId", null == orderId ? orderId : String.valueOf(orderId));
params.put("driverId", String.valueOf(driverId));
params.put("lon", String.valueOf(lon));
params.put("lat", String.valueOf(lat));
params.put("directionAngle", String.valueOf(computeAzimuth));
params.put("altitude", String.valueOf(altitude));
- HttpRequest post = HttpUtil.createPost(URLUtil.zuul + "/driver-server/base/savePosition");
+ HttpRequest post = HttpUtil.createPost(qytConfig.getDaijiaurl() + "/driver-server/base/driver/addDriverPosition");
post.contentType(MediaType.APPLICATION_FORM_URLENCODED_VALUE);
post.form(params);
HttpResponse execute = post.execute();
@@ -228,7 +246,8 @@
System.err.println("调用driver-server存储位置数据出错了");
}
}
- redisTemplate.opsForValue().set("DRIVER" + driverId, lon + "," + lat, 30, TimeUnit.SECONDS);//实时位置存入redis中
+
+ redisUtil.setStrValue(business + ":DRIVER" + driverId, lon + "," + lat, 30);//实时位置存入redis中
}else{
this.sendMsgToClient(ctx, "__error__" + msg.toString());
}
@@ -239,7 +258,7 @@
} catch (Exception e) {
if(isdebug) {
- this.sendMsgToClient(ctx, "__error__" + msg.toString());
+ NettyServerController.sendMsgToClient(ctx, "__error__" + msg.toString());
}
e.printStackTrace();
}
@@ -252,7 +271,7 @@
* @param msg
* @author TaoNingBo
*/
- public void sendMsgToClient(ChannelHandlerContext ctx, String msg) {
+ public static boolean sendMsgToClient(ChannelHandlerContext ctx, String msg) {
if (ctx != null && ctx.channel().isActive()) {
ByteBuf buffer = Unpooled.copiedBuffer((msg).getBytes());
ChannelFuture sync;
@@ -272,7 +291,9 @@
if(b){
NettyChannelMap.remove(ctx);
}
+ return true;
}
+ return sync.isSuccess();
} catch (Exception e) {
System.err.println("推送发生异常,记录:"+msg);
NettyChannelMap.remove(ctx);
@@ -284,6 +305,7 @@
System.err.println("推送失败,长连接不存在");
NettyChannelMap.remove(ctx);
}
+ return false;
}
// **链接断开 将推送消息记录
--
Gitblit v1.7.1