| | |
| | | import io.netty.buffer.Unpooled; |
| | | import io.netty.channel.ChannelFuture; |
| | | import io.netty.channel.ChannelHandlerContext; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.data.redis.core.RedisTemplate; |
| | | import org.springframework.http.HttpEntity; |
| | | import org.springframework.http.HttpHeaders; |
| | | import org.springframework.http.MediaType; |
| | | import org.springframework.stereotype.Component; |
| | | import org.springframework.util.LinkedMultiValueMap; |
| | | import org.springframework.util.MultiValueMap; |
| | | import org.springframework.util.StringUtils; |
| | | import org.springframework.web.client.RestTemplate; |
| | | |
| | | import javax.annotation.Resource; |
| | | import java.text.SimpleDateFormat; |
| | | import java.util.*; |
| | | import java.util.concurrent.TimeUnit; |
| | | |
| | | |
| | | /** |
| | |
| | | * @createDate 2016年6月3日 |
| | | * @version 1.0 |
| | | */ |
| | | @Component |
| | | public class NettyServerController { |
| | | |
| | | public static Hashtable<String, Hashtable<ChannelHandlerContext, String>> map = new Hashtable<String, Hashtable<ChannelHandlerContext,String>>(); |
| | | |
| | | public static Hashtable<String,String> table; |
| | | |
| | | @Resource |
| | | private RedisTemplate<String, Object> redisTemplate; |
| | | |
| | | |
| | | private RedisUtil redisUtil = SpringUtil.getObject(RedisUtil.class); |
| | | |
| | | |
| | | |
| | | private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); |
| | | |
| | | |
| | | |
| | | |
| | | static{ |
| | |
| | | * @param msg |
| | | * @author TaoNingBo |
| | | */ |
| | | public void JudgeOperation(ChannelHandlerContext ctx, Object msg) { |
| | | public synchronized void JudgeOperation(ChannelHandlerContext ctx, Object msg) { |
| | | try { |
| | | // ByteBuf转String |
| | | ByteBuf byteBuf = (ByteBuf) msg; |
| | |
| | | String device = jsonCon.getString("device"); |
| | | String version = jsonCon.getString("version"); |
| | | if(StringUtil.isNotEmpty(userId1)){ |
| | | String fluid_control = (String)redisTemplate.opsForValue().get("fluid_control_" + userId1 + "_" + type); |
| | | if(!StringUtils.hasLength(fluid_control)){ |
| | | redisTemplate.opsForValue().set("fluid_control_" + userId1 + "_" + type, System.currentTimeMillis() + ""); |
| | | }else{ |
| | | long l = System.currentTimeMillis() - Long.valueOf(fluid_control); |
| | | if(l >= 10000){ |
| | | redisTemplate.opsForValue().set("fluid_control_" + userId1 + "_" + type, System.currentTimeMillis() + ""); |
| | | }else{ |
| | | String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>()); |
| | | ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes())); |
| | | return; |
| | | } |
| | | } |
| | | |
| | | //判断用户或者司机长连接 |
| | | //判断用户或者司机长连接 |
| | | if(type==1){ |
| | | //存储通讯通道 |
| | | if(null != ctx && ctx.channel().isActive()){ |
| | | System.err.println("开始存储用户通道:" + sdf.format(new Date()) + "----" + userId1); |
| | | NettyChannelMap.update("USER" + userId1, ctx); |
| | | String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>()); |
| | | ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes())); |
| | | } |
| | | //确保账号在单个设备上登录 |
| | | if(StringUtil.isNotEmpty(token)){ |
| | | String token_ = (String)redisTemplate.opsForValue().get("USER_" + userId1);//获取缓存中最新的数据 |
| | | String token_ = redisUtil.getValue("USER_APP_"+ userId1);//获取缓存中最新的数据 |
| | | if(StringUtil.isNotEmpty(token_) && !token.equals(token_)){//不在同一设备上登录,向其他设备发送数据 |
| | | ChannelHandlerContext context = NettyChannelMap.getData("USER" + userId1); |
| | | ChannelHandlerContext data_ = NettyChannelMap.getData_(token_.substring(token_.length() - 16)); |
| | | JSONObject msg_ = new JSONObject(); |
| | | msg_.put("code", 200); |
| | | msg_.put("msg", "SUCCESS"); |
| | | msg_.put("method", "OFFLINE"); |
| | | msg_.put("data", new Object()); |
| | | this.sendMsgToClient(context, msg_.toJSONString());//给当前通道发送消息 |
| | | TimerTask timerTask = new TimerTask() { |
| | | boolean b = this.sendMsgToClient(data_, msg_.toJSONString());//给当前通道发送消息 |
| | | if(b){ |
| | | NettyChannelMap.remove_(data_); |
| | | } |
| | | new Timer().schedule(new TimerTask() { |
| | | @Override |
| | | public void run() { |
| | | NettyChannelMap.remove(context); |
| | | NettyChannelMap.remove_(data_); |
| | | } |
| | | }; |
| | | Timer timer = new Timer(); |
| | | timer.schedule(timerTask, 3000); |
| | | timer.cancel(); |
| | | }, 5000); |
| | | } |
| | | if(StringUtil.isEmpty(token_)){//确保登录的时候存储token失败的情况 |
| | | redisTemplate.opsForValue().set("USER_" + userId1, token); |
| | | } |
| | | NettyChannelMap.update_(token.substring(token.length() - 16), ctx); |
| | | NettyChannelMap.update("USER" + userId1, ctx); |
| | | redisUtil.setStrValue("USER_APP_" + userId1, token); |
| | | } |
| | | |
| | | //存储通讯通道 |
| | | if(null != ctx && ctx.channel().isActive()){ |
| | | NettyChannelMap.update("USER" + userId1, ctx); |
| | | } |
| | | }else{ |
| | | //存储通讯通道 |
| | | if(null != ctx && ctx.channel().isActive()){ |
| | | System.err.println("开始存储司机通道:" + sdf.format(new Date()) + "----" + userId1); |
| | | NettyChannelMap.update("DRIVER" + userId1, ctx); |
| | | String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>()); |
| | | ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes())); |
| | | } |
| | | //确保账号在单个设备上登录 |
| | | String value = (String)redisTemplate.opsForValue().get("DEVICE_" + userId1); |
| | | if(StringUtil.isNotEmpty(token) && StringUtil.isEmpty(value)){//APP端登录的操作 |
| | | String token_ = (String)redisTemplate.opsForValue().get("DRIVER_" + userId1);//缓存中拿最新数据 |
| | | if(StringUtil.isNotEmpty(token)){//APP端登录的操作 |
| | | String token_ = redisUtil.getValue("DRIVER_" + userId1);//缓存中拿最新数据 |
| | | if(StringUtil.isNotEmpty(token_) && !token.equals(token_)){//不在同一设备上登录,向当前设备发送数据 |
| | | ChannelHandlerContext context = NettyChannelMap.getData("DRIVER" + userId1); |
| | | JSONObject msg_ = new JSONObject(); |
| | | msg_.put("code", 200); |
| | | msg_.put("msg", "SUCCESS"); |
| | | msg_.put("method", "OFFLINE"); |
| | | msg_.put("data", new Object()); |
| | | this.sendMsgToClient(context, msg_.toJSONString());//给当前通道发送消息 |
| | | TimerTask timerTask = new TimerTask() { |
| | | @Override |
| | | public void run() { |
| | | NettyChannelMap.remove(context); |
| | | } |
| | | }; |
| | | Timer timer = new Timer(); |
| | | timer.schedule(timerTask, 3000); |
| | | timer.cancel(); |
| | | ChannelHandlerContext data_ = NettyChannelMap.getData_(token_.substring(token_.length() - 16)); |
| | | if(null != data_){ |
| | | JSONObject msg_ = new JSONObject(); |
| | | msg_.put("code", 200); |
| | | msg_.put("msg", "SUCCESS"); |
| | | msg_.put("method", "OFFLINE"); |
| | | msg_.put("data", new Object()); |
| | | boolean b = this.sendMsgToClient(data_, msg_.toJSONString());//给当前通道发送消息 |
| | | if(b){ |
| | | NettyChannelMap.remove_(data_); |
| | | } |
| | | } |
| | | } |
| | | if(StringUtil.isEmpty(token_)){//确保登录的时候存储token失败的情况 |
| | | redisTemplate.opsForValue().set("DRIVER_" + userId1, token); |
| | | } |
| | | NettyChannelMap.update("DRIVER" + userId1, ctx); |
| | | NettyChannelMap.update_(token.substring(token.length() - 16), ctx); |
| | | redisUtil.setStrValue("DRIVER_" + userId1, token); |
| | | } |
| | | //存储通讯通道 |
| | | if(null != ctx && ctx.channel().isActive()){ |
| | | NettyChannelMap.update("DRIVER" + userId1, ctx); |
| | | } |
| | | } |
| | | } |
| | | |
| | | if(null != ctx && ctx.channel().isActive()){ |
| | | jsonMsg.put("method", Method.pong); |
| | | sendMsgToClient(ctx, jsonMsg.toJSONString()); |
| | | } |
| | | } |
| | | //司机上传位置 |
| | | if(method.equals(Method.location)){ |
| | | Integer driverId = jsonCon.getInteger("driverId"); |
| | | String fluid_control = (String)redisTemplate.opsForValue().get("location_" + driverId); |
| | | String fluid_control = redisUtil.getValue("location_" + driverId); |
| | | if(!StringUtils.hasLength(fluid_control)){ |
| | | redisTemplate.opsForValue().set("location_" + driverId, System.currentTimeMillis() + ""); |
| | | redisUtil.setStrValue("location_" + driverId, System.currentTimeMillis() + ""); |
| | | }else{ |
| | | long l = System.currentTimeMillis() - Long.valueOf(fluid_control); |
| | | if(l < 5000){ |
| | | return; |
| | | } |
| | | redisTemplate.opsForValue().set("location_" + driverId, System.currentTimeMillis() + ""); |
| | | redisUtil.setStrValue("location_" + driverId, System.currentTimeMillis() + ""); |
| | | } |
| | | |
| | | |
| | | Integer orderId = jsonCon.getInteger("orderId"); |
| | | Integer orderType = jsonCon.getInteger("orderType"); |
| | | Double lon = jsonCon.getDouble("lon"); |
| | |
| | | System.err.println("调用driver-server存储位置数据出错了"); |
| | | } |
| | | } |
| | | redisTemplate.opsForValue().set("DRIVER" + driverId, lon + "," + lat, 30, TimeUnit.SECONDS);//实时位置存入redis中 |
| | | redisUtil.setStrValue("DRIVER" + driverId, lon + "," + lat, 30);//实时位置存入redis中 |
| | | }else{ |
| | | this.sendMsgToClient(ctx, "__error__" + msg.toString()); |
| | | } |
| | |
| | | |
| | | } catch (Exception e) { |
| | | if(isdebug) { |
| | | this.sendMsgToClient(ctx, "__error__" + msg.toString()); |
| | | NettyServerController.sendMsgToClient(ctx, "__error__" + msg.toString()); |
| | | } |
| | | e.printStackTrace(); |
| | | } |
| | |
| | | * @param msg |
| | | * @author TaoNingBo |
| | | */ |
| | | public void sendMsgToClient(ChannelHandlerContext ctx, String msg) { |
| | | public static boolean sendMsgToClient(ChannelHandlerContext ctx, String msg) { |
| | | if (ctx != null && ctx.channel().isActive()) { |
| | | ByteBuf buffer = Unpooled.copiedBuffer((msg).getBytes()); |
| | | ChannelFuture sync; |
| | |
| | | if(b){ |
| | | NettyChannelMap.remove(ctx); |
| | | } |
| | | return true; |
| | | } |
| | | return sync.isSuccess(); |
| | | } catch (Exception e) { |
| | | System.err.println("推送发生异常,记录:"+msg); |
| | | NettyChannelMap.remove(ctx); |
| | |
| | | System.err.println("推送失败,长连接不存在"); |
| | | NettyChannelMap.remove(ctx); |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | // **链接断开 将推送消息记录 |