From d2de40b544b5b069cd525143ff5a9e0d922b2c66 Mon Sep 17 00:00:00 2001
From: liujie <1793218484@qq.com>
Date: 星期五, 08 八月 2025 15:18:41 +0800
Subject: [PATCH] 我的订单顺风车
---
MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java | 147 +++++++++++++++++++++---------------------------
1 files changed, 64 insertions(+), 83 deletions(-)
diff --git a/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java b/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java
index 014705c..0f10527 100644
--- a/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java
+++ b/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java
@@ -10,16 +10,16 @@
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
-import org.springframework.stereotype.Component;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
import org.springframework.util.StringUtils;
+import org.springframework.web.client.RestTemplate;
-import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.*;
-import java.util.concurrent.TimeUnit;
/**
@@ -28,20 +28,17 @@
* @createDate 2016年6月3日
* @version 1.0
*/
-@Component
public class NettyServerController {
public static Hashtable<String, Hashtable<ChannelHandlerContext, String>> map = new Hashtable<String, Hashtable<ChannelHandlerContext,String>>();
public static Hashtable<String,String> table;
-
- @Resource
- private RedisTemplate<String, Object> redisTemplate;
-
+ private RedisUtil redisUtil = SpringUtil.getObject(RedisUtil.class);
+
+
+
private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
-
-
static{
@@ -61,7 +58,7 @@
* @param msg
* @author TaoNingBo
*/
- public void JudgeOperation(ChannelHandlerContext ctx, Object msg) {
+ public synchronized void JudgeOperation(ChannelHandlerContext ctx, Object msg) {
try {
// ByteBuf转String
ByteBuf byteBuf = (ByteBuf) msg;
@@ -99,105 +96,87 @@
String device = jsonCon.getString("device");
String version = jsonCon.getString("version");
if(StringUtil.isNotEmpty(userId1)){
- String fluid_control = (String)redisTemplate.opsForValue().get("fluid_control_" + userId1 + "_" + type);
- if(!StringUtils.hasLength(fluid_control)){
- redisTemplate.opsForValue().set("fluid_control_" + userId1 + "_" + type, System.currentTimeMillis() + "");
- }else{
- long l = System.currentTimeMillis() - Long.valueOf(fluid_control);
- if(l >= 10000){
- redisTemplate.opsForValue().set("fluid_control_" + userId1 + "_" + type, System.currentTimeMillis() + "");
- }else{
- String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
- ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
- return;
- }
- }
- //判断用户或者司机长连接
+ //判断用户或者司机长连接
if(type==1){
- //存储通讯通道
- if(null != ctx && ctx.channel().isActive()){
- System.err.println("开始存储用户通道:" + sdf.format(new Date()) + "----" + userId1);
- NettyChannelMap.update("USER" + userId1, ctx);
- String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
- ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
- }
//确保账号在单个设备上登录
if(StringUtil.isNotEmpty(token)){
- String token_ = (String)redisTemplate.opsForValue().get("USER_" + userId1);//获取缓存中最新的数据
+ String token_ = redisUtil.getValue("USER_APP_"+ userId1);//获取缓存中最新的数据
if(StringUtil.isNotEmpty(token_) && !token.equals(token_)){//不在同一设备上登录,向其他设备发送数据
- ChannelHandlerContext context = NettyChannelMap.getData("USER" + userId1);
+ ChannelHandlerContext data_ = NettyChannelMap.getData_(token_.substring(token_.length() - 16));
JSONObject msg_ = new JSONObject();
msg_.put("code", 200);
msg_.put("msg", "SUCCESS");
msg_.put("method", "OFFLINE");
msg_.put("data", new Object());
- this.sendMsgToClient(context, msg_.toJSONString());//给当前通道发送消息
- TimerTask timerTask = new TimerTask() {
+ boolean b = this.sendMsgToClient(data_, msg_.toJSONString());//给当前通道发送消息
+ if(b){
+ NettyChannelMap.remove_(data_);
+ }
+ new Timer().schedule(new TimerTask() {
@Override
public void run() {
- NettyChannelMap.remove(context);
+ NettyChannelMap.remove_(data_);
}
- };
- Timer timer = new Timer();
- timer.schedule(timerTask, 3000);
- timer.cancel();
+ }, 5000);
}
- if(StringUtil.isEmpty(token_)){//确保登录的时候存储token失败的情况
- redisTemplate.opsForValue().set("USER_" + userId1, token);
- }
+ NettyChannelMap.update_(token.substring(token.length() - 16), ctx);
+ NettyChannelMap.update("USER" + userId1, ctx);
+ redisUtil.setStrValue("USER_APP_" + userId1, token);
}
+
+ //存储通讯通道
+ if(null != ctx && ctx.channel().isActive()){
+ NettyChannelMap.update("USER" + userId1, ctx);
+ }
}else{
- //存储通讯通道
- if(null != ctx && ctx.channel().isActive()){
- System.err.println("开始存储司机通道:" + sdf.format(new Date()) + "----" + userId1);
- NettyChannelMap.update("DRIVER" + userId1, ctx);
- String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
- ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
- }
//确保账号在单个设备上登录
- String value = (String)redisTemplate.opsForValue().get("DEVICE_" + userId1);
- if(StringUtil.isNotEmpty(token) && StringUtil.isEmpty(value)){//APP端登录的操作
- String token_ = (String)redisTemplate.opsForValue().get("DRIVER_" + userId1);//缓存中拿最新数据
+ if(StringUtil.isNotEmpty(token)){//APP端登录的操作
+ String token_ = redisUtil.getValue("DRIVER_" + userId1);//缓存中拿最新数据
if(StringUtil.isNotEmpty(token_) && !token.equals(token_)){//不在同一设备上登录,向当前设备发送数据
- ChannelHandlerContext context = NettyChannelMap.getData("DRIVER" + userId1);
- JSONObject msg_ = new JSONObject();
- msg_.put("code", 200);
- msg_.put("msg", "SUCCESS");
- msg_.put("method", "OFFLINE");
- msg_.put("data", new Object());
- this.sendMsgToClient(context, msg_.toJSONString());//给当前通道发送消息
- TimerTask timerTask = new TimerTask() {
- @Override
- public void run() {
- NettyChannelMap.remove(context);
- }
- };
- Timer timer = new Timer();
- timer.schedule(timerTask, 3000);
- timer.cancel();
+ ChannelHandlerContext data_ = NettyChannelMap.getData_(token_.substring(token_.length() - 16));
+ if(null != data_){
+ JSONObject msg_ = new JSONObject();
+ msg_.put("code", 200);
+ msg_.put("msg", "SUCCESS");
+ msg_.put("method", "OFFLINE");
+ msg_.put("data", new Object());
+ boolean b = this.sendMsgToClient(data_, msg_.toJSONString());//给当前通道发送消息
+ if(b){
+ NettyChannelMap.remove_(data_);
+ }
+ }
}
- if(StringUtil.isEmpty(token_)){//确保登录的时候存储token失败的情况
- redisTemplate.opsForValue().set("DRIVER_" + userId1, token);
- }
+ NettyChannelMap.update("DRIVER" + userId1, ctx);
+ NettyChannelMap.update_(token.substring(token.length() - 16), ctx);
+ redisUtil.setStrValue("DRIVER_" + userId1, token);
+ }
+ //存储通讯通道
+ if(null != ctx && ctx.channel().isActive()){
+ NettyChannelMap.update("DRIVER" + userId1, ctx);
}
}
}
+
+ if(null != ctx && ctx.channel().isActive()){
+ jsonMsg.put("method", Method.pong);
+ sendMsgToClient(ctx, jsonMsg.toJSONString());
+ }
}
//司机上传位置
if(method.equals(Method.location)){
Integer driverId = jsonCon.getInteger("driverId");
- String fluid_control = (String)redisTemplate.opsForValue().get("location_" + driverId);
+ String fluid_control = redisUtil.getValue("location_" + driverId);
if(!StringUtils.hasLength(fluid_control)){
- redisTemplate.opsForValue().set("location_" + driverId, System.currentTimeMillis() + "");
+ redisUtil.setStrValue("location_" + driverId, System.currentTimeMillis() + "");
}else{
long l = System.currentTimeMillis() - Long.valueOf(fluid_control);
if(l < 5000){
return;
}
- redisTemplate.opsForValue().set("location_" + driverId, System.currentTimeMillis() + "");
+ redisUtil.setStrValue("location_" + driverId, System.currentTimeMillis() + "");
}
-
+
Integer orderId = jsonCon.getInteger("orderId");
Integer orderType = jsonCon.getInteger("orderType");
Double lon = jsonCon.getDouble("lon");
@@ -228,8 +207,7 @@
System.err.println("调用driver-server存储位置数据出错了");
}
}
- System.out.println("id:" + driverId + "---lon" + lon + "---lat" + lat);
- redisTemplate.opsForValue().set("DRIVER" + driverId, lon + "," + lat, 300, TimeUnit.SECONDS);//实时位置存入redis中
+ redisUtil.setStrValue("DRIVER" + driverId, lon + "," + lat, 30);//实时位置存入redis中
}else{
this.sendMsgToClient(ctx, "__error__" + msg.toString());
}
@@ -240,7 +218,7 @@
} catch (Exception e) {
if(isdebug) {
- this.sendMsgToClient(ctx, "__error__" + msg.toString());
+ NettyServerController.sendMsgToClient(ctx, "__error__" + msg.toString());
}
e.printStackTrace();
}
@@ -253,7 +231,7 @@
* @param msg
* @author TaoNingBo
*/
- public void sendMsgToClient(ChannelHandlerContext ctx, String msg) {
+ public static boolean sendMsgToClient(ChannelHandlerContext ctx, String msg) {
if (ctx != null && ctx.channel().isActive()) {
ByteBuf buffer = Unpooled.copiedBuffer((msg).getBytes());
ChannelFuture sync;
@@ -273,7 +251,9 @@
if(b){
NettyChannelMap.remove(ctx);
}
+ return true;
}
+ return sync.isSuccess();
} catch (Exception e) {
System.err.println("推送发生异常,记录:"+msg);
NettyChannelMap.remove(ctx);
@@ -285,6 +265,7 @@
System.err.println("推送失败,长连接不存在");
NettyChannelMap.remove(ctx);
}
+ return false;
}
// **链接断开 将推送消息记录
--
Gitblit v1.7.1