| | |
| | | |
| | | |
| | | import com.alibaba.fastjson.JSONObject; |
| | | import com.sinata.push.util.RedisUtil; |
| | | import com.sinata.push.util.SinataUtil; |
| | | import com.sinata.push.util.SpringUtil; |
| | | import com.sinata.push.util.StringUtil; |
| | | import com.sinata.push.util.echo.Method; |
| | | import com.sinata.push.util.echo.NettyChannelMap; |
| | | import com.sinata.push.util.echo.NettyMsg; |
| | | import io.netty.buffer.ByteBuf; |
| | | import io.netty.buffer.Unpooled; |
| | | import io.netty.channel.ChannelFuture; |
| | | import io.netty.channel.ChannelHandlerContext; |
| | | import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.data.redis.core.RedisTemplate; |
| | | import org.springframework.data.redis.core.StringRedisTemplate; |
| | | import org.springframework.stereotype.Component; |
| | | |
| | | import javax.annotation.Resource; |
| | | import java.util.HashMap; |
| | | import java.util.Hashtable; |
| | | import java.util.Timer; |
| | | import java.util.TimerTask; |
| | | |
| | | @Component |
| | | public class NettyWebSocketController { |
| | | |
| | | public static Hashtable<String, Hashtable<ChannelHandlerContext, String>> map = new Hashtable<String, Hashtable<ChannelHandlerContext,String>>(); |
| | | |
| | | @Resource |
| | | private RedisTemplate<String, Object> redisTemplate; |
| | | public static Hashtable<String, Hashtable<ChannelHandlerContext, String>> map = new Hashtable<String, Hashtable<ChannelHandlerContext, String>>(); |
| | | |
| | | public static Hashtable<String,String> table; |
| | | static{ |
| | | if(table == null){ |
| | | table = new Hashtable<>(); |
| | | } |
| | | } |
| | | |
| | | private RedisUtil redisUtil = SpringUtil.getObject(RedisUtil.class); |
| | | |
| | | public static Hashtable<String, String> table; |
| | | |
| | | static { |
| | | if (table == null) { |
| | | table = new Hashtable<>(); |
| | | } |
| | | } |
| | | |
| | | public static boolean isdebug = false; |
| | | public static int i = 0; |
| | | |
| | |
| | | * @param msg |
| | | * @author TaoNingBo |
| | | */ |
| | | public void JudgeOperation(ChannelHandlerContext ctx, String msg) { |
| | | public synchronized void JudgeOperation(ChannelHandlerContext ctx, String msg) { |
| | | try { |
| | | // 验证即时通讯命令是否正确有效 |
| | | if(SinataUtil.isEmpty(msg)) { |
| | | return; |
| | | } |
| | | String msgStr = msg.toString(); |
| | | if(msgStr.indexOf("{") == -1 || msgStr.indexOf("}") == -1 || msgStr.indexOf("code") == -1 || msgStr.indexOf("msg") == -1 || msgStr.indexOf("data") == -1 || msgStr.indexOf("method") == -1) { |
| | | return; |
| | | } |
| | | if(isdebug) { |
| | | // 验证即时通讯命令是否正确有效 |
| | | if (SinataUtil.isEmpty(msg)) { |
| | | return; |
| | | } |
| | | String msgStr = msg.toString(); |
| | | if (msgStr.indexOf("{") == -1 || msgStr.indexOf("}") == -1 || msgStr.indexOf("code") == -1 || msgStr.indexOf("msg") == -1 || msgStr.indexOf("data") == -1 || msgStr.indexOf("method") == -1) { |
| | | return; |
| | | } |
| | | if (isdebug) { |
| | | // System.out.println("<<<--receive-->>>111" + msg); |
| | | } |
| | | } |
| | | |
| | | // 获取socket信息,保存相应的socket |
| | | JSONObject jsonMsg = JSONObject.parseObject(msg.toString()); |
| | | int code = jsonMsg.getIntValue("code"); |
| | | String message = jsonMsg.getString("msg"); |
| | | String method = jsonMsg.getString("method"); |
| | | if(code != 200 || !message.equals("SUCCESS")) { |
| | | return; |
| | | } |
| | | JSONObject jsonCon = JSONObject.parseObject(jsonMsg.get("data").toString()); |
| | | // ############################### 心跳 ############################ |
| | | // 心跳 |
| | | if(method.equals(Method.ping)){ |
| | | String token = jsonCon.getString("token"); |
| | | String userId1 = jsonCon.getString("userId"); |
| | | if(StringUtil.isNotEmpty(userId1)){ |
| | | //确保账号在单个设备上登录 |
| | | if(StringUtil.isNotEmpty(token)){ |
| | | String token_ = (String)redisTemplate.opsForValue().get("USER_" + userId1);//获取缓存中最新的数据 |
| | | if(StringUtil.isNotEmpty(token_) && !token.equals(token_)){//不在同一设备上登录,向其他设备发送数据 |
| | | ChannelHandlerContext context = NettyChannelMap.getData("Applets" + userId1); |
| | | JSONObject msg_ = new JSONObject(); |
| | | msg_.put("code", 200); |
| | | msg_.put("msg", "SUCCESS"); |
| | | msg_.put("method", "OFFLINE"); |
| | | msg_.put("data", new Object()); |
| | | this.sendMsgToClient(context, msg_.toJSONString()); |
| | | TimerTask timerTask = new TimerTask() { |
| | | @Override |
| | | public void run() { |
| | | NettyChannelMap.remove(context); |
| | | } |
| | | }; |
| | | Timer timer = new Timer(); |
| | | timer.schedule(timerTask, 3000); |
| | | timer.cancel(); |
| | | } |
| | | if(StringUtil.isEmpty(token_)){//确保登录的时候存储token失败的情况 |
| | | redisTemplate.opsForValue().set("USER_" + userId1, token); |
| | | } |
| | | } |
| | | // 获取socket信息,保存相应的socket |
| | | JSONObject jsonMsg = JSONObject.parseObject(msg.toString()); |
| | | int code = jsonMsg.getIntValue("code"); |
| | | String message = jsonMsg.getString("msg"); |
| | | String method = jsonMsg.getString("method"); |
| | | if (code != 200 || !message.equals("SUCCESS")) { |
| | | return; |
| | | } |
| | | JSONObject jsonCon = JSONObject.parseObject(jsonMsg.get("data").toString()); |
| | | |
| | | //存储业务使用的通道 |
| | | if(null != ctx && ctx.channel().isActive()) { |
| | | NettyChannelMap.update("Applets" + userId1, ctx); |
| | | String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>()); |
| | | ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes())); |
| | | } |
| | | if (null != ctx && ctx.channel().isActive()) { |
| | | jsonMsg.put("method", Method.pong); |
| | | sendMsgToClient(ctx, jsonMsg.toJSONString()); |
| | | } |
| | | |
| | | |
| | | // ############################### 心跳 ############################ |
| | | // 心跳 |
| | | if (method.equals(Method.ping)) { |
| | | String token = jsonCon.getString("token"); |
| | | String userId1 = jsonCon.getString("userId"); |
| | | String businessType = jsonCon.getString("businessType");//1:打车,2=代驾 |
| | | String business = "2".equals(businessType) ? "daijia" : "dache"; |
| | | if (StringUtil.isNotEmpty(userId1)) { |
| | | //确保账号在单个设备上登录 |
| | | if (StringUtil.isNotEmpty(token)) { |
| | | String token_ = redisUtil.getValue(business + ":USER_Applets_" + userId1);//获取缓存中最新的数据 |
| | | if (StringUtil.isNotEmpty(token_) && !token.equals(token_)) {//不在同一设备上登录,向其他设备发送数据 |
| | | ChannelHandlerContext data_ = NettyChannelMap.getData_(token_.substring(token_.length() - 16)); |
| | | JSONObject msg_ = new JSONObject(); |
| | | msg_.put("code", 200); |
| | | msg_.put("msg", "SUCCESS"); |
| | | msg_.put("method", "OFFLINE"); |
| | | msg_.put("data", new Object()); |
| | | this.sendMsgToClient(data_, msg_.toJSONString()); |
| | | new Timer().schedule(new TimerTask() { |
| | | @Override |
| | | public void run() { |
| | | NettyChannelMap.remove_(data_); |
| | | } |
| | | }, 5000); |
| | | } |
| | | NettyChannelMap.update_(token.substring(token.length() - 16), ctx);//存储单点登录的通道 |
| | | NettyChannelMap.update(business + ":Applets" + userId1, ctx); |
| | | redisUtil.setStrValue(business + ":USER_Applets_" + userId1, token); |
| | | } |
| | | |
| | | //存储业务使用的通道 |
| | | if (null != ctx && ctx.channel().isActive()) { |
| | | NettyChannelMap.update(business + ":Applets" + userId1, ctx); |
| | | } |
| | | } |
| | | |
| | | |
| | | } |
| | | } catch (Exception e) { |
| | | if (isdebug) { |
| | | NettyWebSocketController.sendMsgToClient(ctx, "__error__" + msg.toString()); |
| | | } |
| | | e.printStackTrace(); |
| | | } |
| | | } catch (Exception e) { |
| | | if(isdebug) { |
| | | this.sendMsgToClient(ctx, "__error__" + msg.toString()); |
| | | } |
| | | e.printStackTrace(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 向客户端发送消息 |
| | |
| | | ChannelFuture sync; |
| | | try { |
| | | sync = ctx.channel().writeAndFlush(new TextWebSocketFrame(msg)).sync(); |
| | | if(!sync.isSuccess()){ |
| | | if (!sync.isSuccess()) { |
| | | boolean b = true; |
| | | for (int i = 0; i < 10; i++) { |
| | | ctx.wait(3000); |
| | | sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync(); |
| | | if(sync.isSuccess()){ |
| | | if (sync.isSuccess()) { |
| | | b = false; |
| | | break; |
| | | } |
| | | System.err.println("小程序-》推送不成功,将继续推送"+msg); |
| | | System.err.println("小程序-》推送不成功,将继续推送" + msg); |
| | | } |
| | | if(b){ |
| | | if (b) { |
| | | NettyChannelMap.remove(ctx); |
| | | } |
| | | } |
| | | } catch (Exception e) { |
| | | System.err.println("小程序-》推送发生异常,记录:"+msg); |
| | | System.err.println("小程序-》推送发生异常,记录:" + msg); |
| | | NettyChannelMap.remove(ctx); |
| | | } |
| | | if(isdebug) { |
| | | System.err.println("小程序-》 <<<--send-->>>" + msg) ; |
| | | if (isdebug) { |
| | | System.err.println("小程序-》 <<<--send-->>>" + msg); |
| | | } |
| | | }else{ |
| | | } else { |
| | | System.err.println("小程序-》推送失败,长连接不存在"); |
| | | NettyChannelMap.remove(ctx); |
| | | } |
| | | } |
| | | |
| | | // **链接断开 将推送消息记录 |
| | | public static void sendMsgToClient(String cacheType, Integer id,String msg) { |
| | | public static void sendMsgToClient(String cacheType, Integer id, String msg) { |
| | | ChannelHandlerContext ctx = NettyChannelMap.getData(cacheType + id); |
| | | if (ctx != null) { |
| | | ChannelFuture sync; |
| | | try { |
| | | sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync(); |
| | | if(!sync.isSuccess()){ |
| | | if (!sync.isSuccess()) { |
| | | for (int i = 0; i < 10; i++) { |
| | | sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();; |
| | | if(!sync.isSuccess()){ |
| | | sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();; |
| | | System.err.println("推送不成功,将继续推送"+msg); |
| | | if(i == 9){ |
| | | table.put(cacheType+id, msg); |
| | | sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync(); |
| | | ; |
| | | if (!sync.isSuccess()) { |
| | | sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync(); |
| | | ; |
| | | System.err.println("推送不成功,将继续推送" + msg); |
| | | if (i == 9) { |
| | | table.put(cacheType + id, msg); |
| | | ctx.close(); |
| | | System.err.println("推送发生异常,记录:"+msg); |
| | | System.err.println("推送发生异常,记录:" + msg); |
| | | } |
| | | }else{ |
| | | } else { |
| | | break; |
| | | } |
| | | } |
| | | } |
| | | } catch (Exception e) { |
| | | table.put(cacheType+id, msg); |
| | | System.err.println("推送发生异常,记录:"+msg); |
| | | table.put(cacheType + id, msg); |
| | | System.err.println("推送发生异常,记录:" + msg); |
| | | } |
| | | if(isdebug) { |
| | | if (isdebug) { |
| | | System.err.println("<<<--send-->>>" + msg); |
| | | } |
| | | }else{ |
| | | table.put(cacheType+id, msg); |
| | | System.err.println("链接断开,记录:id="+cacheType+id+",消息:"+msg); |
| | | } else { |
| | | table.put(cacheType + id, msg); |
| | | System.err.println("链接断开,记录:id=" + cacheType + id + ",消息:" + msg); |
| | | } |
| | | } |
| | | } |