package com.sinata.zuul.util.echo;
|
|
import com.alibaba.fastjson.JSON;
|
import com.alibaba.fastjson.JSONObject;
|
import com.sinata.zuul.util.*;
|
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.Unpooled;
|
import io.netty.channel.ChannelFuture;
|
import io.netty.channel.ChannelHandlerContext;
|
import org.springframework.http.HttpEntity;
|
import org.springframework.http.HttpHeaders;
|
import org.springframework.http.MediaType;
|
import org.springframework.util.LinkedMultiValueMap;
|
import org.springframework.util.MultiValueMap;
|
import org.springframework.web.client.RestTemplate;
|
|
import java.util.*;
|
|
|
/**
|
* Netty业务逻辑层
|
* @author sinata
|
* @createDate 2016年6月3日
|
* @version 1.0
|
*/
|
public class NettyServerController {
|
|
public static Hashtable<String, Hashtable<ChannelHandlerContext, String>> map = new Hashtable<String, Hashtable<ChannelHandlerContext,String>>();
|
|
public static Hashtable<String,String> table;
|
|
private RedisUtil redisUtil = SpringUtil.getObject(RedisUtil.class);
|
|
private GDMapGeocodingUtil gdMapGeocodingUtil = SpringUtil.getObject(GDMapGeocodingUtil.class);
|
|
private RestTemplate internalRestTemplate = SpringUtil.getObject(RestTemplate.class);
|
|
|
|
|
static{
|
if(table == null){
|
table = new Hashtable<>();
|
}
|
}
|
|
public static boolean isdebug = false;
|
public static int i = 0;
|
|
|
/**
|
* 判断客户端要执行什么操作
|
*
|
* @param ctx
|
* @param msg
|
* @author TaoNingBo
|
*/
|
public void JudgeOperation(ChannelHandlerContext ctx, Object msg) {
|
try {
|
// ByteBuf转String
|
ByteBuf byteBuf = (ByteBuf) msg;
|
|
byte[] req = new byte[byteBuf.readableBytes()];
|
byteBuf.readBytes(req);
|
msg = new String(req, "UTF-8");
|
// 验证即时通讯命令是否正确有效
|
if(SinataUtil.isEmpty(msg)) {
|
return;
|
}
|
String msgStr = msg.toString();
|
if(msgStr.indexOf("{") == -1 || msgStr.indexOf("}") == -1 || msgStr.indexOf("code") == -1 || msgStr.indexOf("msg") == -1 || msgStr.indexOf("data") == -1 || msgStr.indexOf("method") == -1) {
|
return;
|
}
|
if(isdebug) {
|
// System.out.println("<<<--receive-->>>" + msg);
|
}
|
|
// 获取socket信息,保存相应的socket
|
JSONObject jsonMsg = JSONObject.parseObject(msg.toString());
|
int code = jsonMsg.getInteger("code");
|
String message = jsonMsg.getString("msg");
|
String method = jsonMsg.getString("method");
|
if(code != 200 || !message.equals("SUCCESS")) {
|
return;
|
}
|
JSONObject jsonCon = JSONObject.parseObject(jsonMsg.get("data").toString());
|
|
if(null != ctx && ctx.channel().isActive()){
|
jsonMsg.put("method", Method.pong);
|
sendMsgToClient(ctx, jsonMsg.toJSONString());
|
}
|
|
//心跳
|
if(method.equals(Method.ping)) {
|
Integer type = jsonCon.getInteger("type");
|
String token = jsonCon.getString("token");
|
String userId1 = jsonCon.getString("userId");
|
String device = jsonCon.getString("device");
|
String version = jsonCon.getString("version");
|
if(StringUtil.isNotEmpty(userId1)){
|
|
//判断用户或者司机长连接
|
if(type==1){
|
//确保账号在单个设备上登录
|
if(StringUtil.isNotEmpty(token)){
|
String token_ = redisUtil.getValue("USER_APP_"+ userId1);//获取缓存中最新的数据
|
if(StringUtil.isNotEmpty(token_) && !token.equals(token_)){//不在同一设备上登录,向其他设备发送数据
|
ChannelHandlerContext data_ = NettyChannelMap.getData_(token_.substring(0, 23));
|
|
JSONObject msg_ = new JSONObject();
|
msg_.put("code", 200);
|
msg_.put("msg", "SUCCESS");
|
msg_.put("method", "OFFLINE");
|
msg_.put("data", new Object());
|
this.sendMsgToClient(data_, msg_.toJSONString());//给当前通道发送消息
|
TimerTask timerTask = new TimerTask() {
|
@Override
|
public void run() {
|
NettyChannelMap.remove_(data_);
|
NettyChannelMap.remove(data_);
|
}
|
};
|
Timer timer = new Timer();
|
timer.schedule(timerTask, 3000);
|
timer.cancel();
|
}
|
|
NettyChannelMap.update_(token.substring(0, 23), ctx);
|
NettyChannelMap.update("USER" + userId1, ctx);
|
String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
|
ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
|
|
if(StringUtil.isEmpty(token_)){//确保登录的时候存储token失败的情况
|
redisUtil.setStrValue("USER_APP_" + userId1, token);
|
}
|
}
|
|
}else{
|
//添加司机在线
|
HttpHeaders headers = new HttpHeaders();
|
// 以表单的方式提交
|
headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
|
//将请求头部和参数合成一个请求
|
MultiValueMap<String, Object> params = new LinkedMultiValueMap<>();
|
params.add("driverId", userId1);
|
params.add("device", (null != device && device.equals("carDevice")) ? "2" : "1");
|
params.add("version", version);
|
HttpEntity<MultiValueMap<String, Object>> requestEntity = new HttpEntity<>(params, headers);
|
String w = internalRestTemplate.postForObject("http://driver-server/base/driverOnline/addDriverOnline",requestEntity , String.class);
|
JSONObject jsonObject = JSON.parseObject(w, JSONObject.class);
|
if(jsonObject.getIntValue("code") != 200){
|
System.err.println("调用driver-server添加司机在线数据出错了");
|
}
|
|
//TODO 存储最后一次上传的时间(用于保证车载端断电后1小时自动下班)
|
if(StringUtil.isNotEmpty(device) && device.equals("carDevice")){
|
redisUtil.setStrValue("DEVICE_" + userId1, String.valueOf(System.currentTimeMillis()));
|
|
String token_ = redisUtil.getValue("DRIVER_" + userId1);//缓存中拿最新数据
|
if(StringUtil.isNotEmpty(token_) && !token_.equals(token)){
|
ChannelHandlerContext data_ = NettyChannelMap.getData_(token_.substring(0, 23));
|
|
//如果是车载端登录,则将其它端都强迫下线
|
JSONObject msg_ = new JSONObject();
|
msg_.put("code", 200);
|
msg_.put("msg", "SUCCESS");
|
msg_.put("method", "OFFLINE");
|
msg_.put("data", new Object());
|
this.sendMsgToClient(data_, msg_.toJSONString());//给当前通道发送消息
|
TimerTask timerTask = new TimerTask() {
|
@Override
|
public void run() {
|
NettyChannelMap.remove_(data_);
|
NettyChannelMap.remove(data_);
|
}
|
};
|
Timer timer = new Timer();
|
timer.schedule(timerTask, 3000);
|
timer.cancel();
|
}
|
|
NettyChannelMap.update("DRIVER" + userId1, ctx);
|
NettyChannelMap.update_(token.substring(0, 23), ctx);
|
String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
|
ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
|
|
if(StringUtil.isEmpty(token_)){//确保登录的时候存储token失败的情况
|
redisUtil.setStrValue("DRIVER_" + userId1, token);
|
}
|
}
|
|
|
//确保账号在单个设备上登录
|
String value = redisUtil.getValue("DEVICE_" + userId1);
|
if(StringUtil.isNotEmpty(token) && StringUtil.isEmpty(device) && StringUtil.isEmpty(value)){//APP端登录的操作
|
String token_ = redisUtil.getValue("DRIVER_" + userId1);//缓存中拿最新数据
|
if(StringUtil.isNotEmpty(token_) && !token.equals(token_)){//不在同一设备上登录,向当前设备发送数据
|
ChannelHandlerContext data_ = NettyChannelMap.getData_(token_.substring(0, 23));
|
|
JSONObject msg_ = new JSONObject();
|
msg_.put("code", 200);
|
msg_.put("msg", "SUCCESS");
|
msg_.put("method", "OFFLINE");
|
msg_.put("data", new Object());
|
this.sendMsgToClient(data_, msg_.toJSONString());//给当前通道发送消息
|
TimerTask timerTask = new TimerTask() {
|
@Override
|
public void run() {
|
NettyChannelMap.remove_(data_);
|
NettyChannelMap.remove(data_);
|
}
|
};
|
Timer timer = new Timer();
|
timer.schedule(timerTask, 3000);
|
timer.cancel();
|
}
|
|
NettyChannelMap.update("DRIVER" + userId1, ctx);
|
NettyChannelMap.update_(token.substring(0, 23), ctx);
|
String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
|
ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
|
|
if(StringUtil.isEmpty(token_)){//确保登录的时候存储token失败的情况
|
redisUtil.setStrValue("DRIVER_" + userId1, token);
|
}
|
}
|
|
|
//存储通讯通道
|
if(null != ctx && ctx.channel().isActive()){
|
NettyChannelMap.update("DRIVER" + userId1, ctx);
|
String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
|
ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
|
}
|
}
|
}
|
}
|
//司机上传位置
|
if(method.equals(Method.location)){
|
Integer driverId = jsonCon.getInteger("driverId");
|
Integer orderId = jsonCon.getInteger("orderId");
|
Integer orderType = jsonCon.getInteger("orderType");
|
Double lon = jsonCon.getDouble("lon");
|
Double lat = jsonCon.getDouble("lat");
|
Double computeAzimuth = jsonCon.getDouble("computeAzimuth");
|
Double altitude = jsonCon.getDouble("altitude");
|
if(SinataUtil.isNotEmpty(driverId)){
|
if(null != lon && 0 != lon && null != lat && 0 != lat){
|
if(null != orderId && 0 != driverId && null != orderType && 0 != orderType){//开始存入数据库
|
HttpHeaders headers = new HttpHeaders();
|
// 以表单的方式提交
|
headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
|
//将请求头部和参数合成一个请求
|
MultiValueMap<String, Object> params = new LinkedMultiValueMap<>();
|
params.add("orderType", String.valueOf(orderType));
|
params.add("orderId", String.valueOf(orderId));
|
params.add("driverId", String.valueOf(driverId));
|
params.add("lon", String.valueOf(lon));
|
params.add("lat", String.valueOf(lat));
|
params.add("directionAngle", String.valueOf(computeAzimuth));
|
params.add("altitude", String.valueOf(altitude));
|
HttpEntity<MultiValueMap<String, Object>> requestEntity = new HttpEntity<>(params, headers);
|
String s = internalRestTemplate.postForObject("http://driver-server/base/savePosition",requestEntity , String.class);
|
JSONObject jsonObject = JSON.parseObject(s, JSONObject.class);
|
if(jsonObject.getIntValue("code") != 200){
|
System.err.println("调用driver-server存储位置数据出错了");
|
}
|
}
|
redisUtil.setStrValue("DRIVER" + driverId, lon + "," + lat, 300);//实时位置存入redis中
|
}else{
|
NettyServerController.sendMsgToClient(ctx, "__error__" + msg.toString());
|
}
|
}else{
|
NettyServerController.sendMsgToClient(ctx, "__error__" + msg.toString());
|
}
|
}
|
|
} catch (Exception e) {
|
if(isdebug) {
|
NettyServerController.sendMsgToClient(ctx, "__error__" + msg.toString());
|
}
|
e.printStackTrace();
|
}
|
}
|
|
/**
|
* 向客户端发送消息
|
*
|
* @param ctx
|
* @param msg
|
* @author TaoNingBo
|
*/
|
public static void sendMsgToClient(ChannelHandlerContext ctx, String msg) {
|
if (ctx != null && ctx.channel().isActive()) {
|
ByteBuf buffer = Unpooled.copiedBuffer((msg).getBytes());
|
ChannelFuture sync;
|
try {
|
sync = ctx.writeAndFlush(buffer).sync();
|
if(!sync.isSuccess()){//如果推送失败则继续推送10次
|
boolean b = true;
|
for (int i = 0; i < 10; i++) {
|
ctx.wait(3000);
|
sync = ctx.writeAndFlush(buffer).sync();
|
if(sync.isSuccess()){
|
b = false;
|
break;
|
}
|
System.err.println("推送不成功,将继续推送"+msg);
|
}
|
if(b){
|
NettyChannelMap.remove(ctx);
|
}
|
}
|
} catch (Exception e) {
|
System.err.println("推送发生异常,记录:"+msg);
|
NettyChannelMap.remove(ctx);
|
}
|
if(isdebug) {
|
System.err.println("<<<--send-->>>" + msg) ;
|
}
|
}else{
|
System.err.println("推送失败,长连接不存在");
|
NettyChannelMap.remove(ctx);
|
}
|
}
|
|
// **链接断开 将推送消息记录
|
public static void sendMsgToClient(String cacheType, Integer id,String msg) {
|
ChannelHandlerContext ctx = NettyChannelMap.getData(cacheType + id);
|
if (ctx != null) {
|
ByteBuf buffer = Unpooled.copiedBuffer((msg).getBytes());
|
ChannelFuture sync;
|
try {
|
sync = ctx.writeAndFlush(buffer).sync();
|
// System.out.println("推送状态"+sync.isSuccess());
|
if(!sync.isSuccess()){
|
for (int i = 0; i < 10; i++) {
|
sync = ctx.writeAndFlush(buffer).sync();
|
if(!sync.isSuccess()){
|
sync = ctx.writeAndFlush(buffer).sync();
|
System.err.println("推送不成功,将继续推送"+msg);
|
if(i == 9){
|
table.put(cacheType+id, msg);
|
|
ctx.close();
|
System.err.println("推送发生异常,记录:"+msg);
|
}
|
}else{
|
break;
|
}
|
}
|
}
|
} catch (Exception e) {
|
table.put(cacheType+id, msg);
|
System.err.println("推送发生异常,记录:"+msg);
|
}
|
if(isdebug) {
|
// System.out.println("<<<--send-->>>" + msg);
|
}
|
}else{
|
table.put(cacheType+id, msg);
|
System.err.println("链接断开,记录:id="+cacheType+id+",消息:"+msg);
|
}
|
}
|
|
/**
|
* 记录推送不成功消息,并在心跳连接续推
|
* @param token
|
*/
|
public static void resendMsg(String token){
|
String msg = table.get(token);
|
ChannelHandlerContext ctx = NettyChannelMap.getData(token);
|
if(SinataUtil.isNotEmpty(msg) && ctx != null && ctx.channel().isActive()){
|
ByteBuf buffer = Unpooled.copiedBuffer((msg).getBytes());
|
ChannelFuture sync;
|
try {
|
sync = ctx.writeAndFlush(buffer).sync();
|
System.err.println("重发异常推送状态"+sync.isSuccess()+",位置:"+token+",消息内容:"+msg);
|
if(!sync.isSuccess()){
|
i++;
|
if(i == 10){
|
i =0;
|
ctx.close();
|
return;
|
}
|
System.err.println("重发异常推送不成功,将继续推送"+msg);
|
resendMsg(token);
|
}else{
|
i=0;
|
}
|
table.remove(token);
|
} catch (Exception e) {
|
resendMsg(token);
|
System.err.println("重发推送发生异常,记录:"+msg);
|
}
|
}
|
}
|
|
|
}
|