| | |
| | | package com.ruoyi.errand.utils; |
| | | |
| | | |
| | | import com.alibaba.fastjson2.JSONObject; |
| | | import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; |
| | | import com.baomidou.mybatisplus.core.metadata.IPage; |
| | | import com.baomidou.mybatisplus.extension.plugins.pagination.Page; |
| | | import com.ruoyi.common.exception.ServiceException; |
| | | import com.ruoyi.errand.domain.AppUser; |
| | | import com.ruoyi.errand.domain.CommunityCourier; |
| | | import com.ruoyi.errand.domain.Order; |
| | | import com.ruoyi.errand.mapper.AppUserMapper; |
| | | import com.ruoyi.errand.mapper.CommunityCourierMapper; |
| | | import com.ruoyi.errand.mapper.OrderMapper; |
| | | import com.ruoyi.errand.object.vo.app.CourierOrderListVO; |
| | | import io.jsonwebtoken.Claims; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.data.redis.core.RedisTemplate; |
| | | |
| | | import org.springframework.security.core.context.SecurityContextHolder; |
| | | import org.springframework.stereotype.Component; |
| | | |
| | | import javax.annotation.Resource; |
| | | import javax.websocket.CloseReason; |
| | | import javax.websocket.OnClose; |
| | | import javax.websocket.OnOpen; |
| | | import javax.websocket.Session; |
| | | import javax.websocket.*; |
| | | import javax.websocket.server.PathParam; |
| | | import javax.websocket.server.ServerEndpoint; |
| | | import java.io.IOException; |
| | |
| | | |
| | | @ServerEndpoint("/ws/delivery") |
| | | @Component |
| | | public class DeliveryWebSocket { |
| | | @Slf4j |
| | | public class DeliveryWebSocket { |
| | | |
| | | private static ConcurrentHashMap<String, Session> deliveryPersonSessions = new ConcurrentHashMap<>(); |
| | | |
| | | private static final ConcurrentHashMap<String, Session> deliveryPersonSessions = new ConcurrentHashMap<>(); |
| | | |
| | | private static RedisTemplate redisTemplate; |
| | | @Autowired |
| | |
| | | DeliveryWebSocket.redisTemplate = redisTemplate; |
| | | } |
| | | |
| | | private static AppUserMapper appUserMapper; |
| | | private static OrderMapper orderMapper; |
| | | private static CommunityCourierMapper communityCourierMapper; |
| | | |
| | | @Autowired |
| | | public void setAppUserMapper(AppUserMapper appUserMapper) { |
| | | DeliveryWebSocket.appUserMapper = appUserMapper; |
| | | } |
| | | @Autowired |
| | | public void setOrderMapper(OrderMapper orderMapper) { |
| | | DeliveryWebSocket.orderMapper = orderMapper; |
| | | } |
| | | |
| | | @Autowired |
| | | public void setCommunityCourierMapper(CommunityCourierMapper communityCourierMapper) { |
| | | DeliveryWebSocket.communityCourierMapper = communityCourierMapper; |
| | | } |
| | | |
| | | @OnOpen |
| | | public void onOpen(Session session) { |
| | | // 从查询参数获取token |
| | | String token = session.getRequestParameterMap().get("token").stream().findFirst().orElse(null); |
| | | // 验证token |
| | | String userId=validateToken(token); |
| | | if (null==userId) { |
| | | try { |
| | | session.close(new CloseReason(CloseReason.CloseCodes.VIOLATED_POLICY, "Invalid token")); |
| | | } catch (IOException e) { |
| | | e.printStackTrace(); |
| | | } |
| | | return; |
| | | } |
| | | try { |
| | | String token = getTokenFromSession(session); |
| | | String userId = validateToken(token); |
| | | |
| | | deliveryPersonSessions.put(userId, session); |
| | | checkPendingNotifications(userId); |
| | | if (userId == null) { |
| | | closeSessionWithReason(session, "Invalid token"); |
| | | return; |
| | | } |
| | | |
| | | deliveryPersonSessions.put(userId, session); |
| | | checkPendingNotifications(userId); |
| | | sendWaitOrderNum(userId); |
| | | } catch (Exception e) { |
| | | handleSessionError(session, "Connection error", e); |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | @OnClose |
| | | public void onClose(Session session) { |
| | | try { |
| | | String token = getTokenFromSession(session); |
| | | String userId = validateToken(token); |
| | | |
| | | if (userId != null) { |
| | | deliveryPersonSessions.remove(userId); |
| | | } |
| | | } catch (Exception e) { |
| | | log.info("关闭失败:{}",e.getMessage()); |
| | | } |
| | | } |
| | | |
| | | @OnError |
| | | public void onError(Session session, Throwable error) { |
| | | handleSessionError(session, "WebSocket error", error); |
| | | } |
| | | |
| | | // Helper Methods |
| | | |
| | | private String getTokenFromSession(Session session) { |
| | | return session.getRequestParameterMap().get("Authorization").stream().findFirst().orElse(null); |
| | | } |
| | | |
| | | private String validateToken(String token) { |
| | |
| | | return null; |
| | | } |
| | | try { |
| | | //解析token 获取userid,再查询到AppUser |
| | | String realToken = token.substring(4); |
| | | Claims claims = JwtUtil.parseJWT(realToken); |
| | | String userId = claims.get("userId").toString(); |
| | | return userId; |
| | | return claims.get("userId").toString(); |
| | | } catch (Exception e) { |
| | | return null; |
| | | } |
| | | } |
| | | |
| | | |
| | | @OnClose |
| | | public void onClose(Session session) { |
| | | String token = session.getRequestParameterMap().get("token").stream().findFirst().orElse(null); |
| | | // 验证token |
| | | String userId=validateToken(token); |
| | | if (null==userId) { |
| | | try { |
| | | session.close(new CloseReason(CloseReason.CloseCodes.VIOLATED_POLICY, "Invalid token")); |
| | | } catch (IOException e) { |
| | | e.printStackTrace(); |
| | | } |
| | | return; |
| | | } |
| | | deliveryPersonSessions.remove(userId); |
| | | private AppUser getAuthenticatedAppUser(String userId) { |
| | | return appUserMapper.selectById(userId); |
| | | // return (AppUser) SecurityContextHolder.getContext().getAuthentication().getPrincipal(); |
| | | } |
| | | |
| | | private void validateCourierStatus(Session session, AppUser appUser) throws IOException { |
| | | if (appUser.getCourierId() == null) { |
| | | sendMessageToSession(session, "您已不是跑腿员"); |
| | | throw new IllegalStateException("您已不是跑腿员"); |
| | | } |
| | | } |
| | | |
| | | private CommunityCourier getCommunityCourier(AppUser appUser) { |
| | | return communityCourierMapper.selectOne( |
| | | new LambdaQueryWrapper<CommunityCourier>() |
| | | .eq(CommunityCourier::getCourierId, appUser.getCourierId()) |
| | | ); |
| | | } |
| | | |
| | | private void validateCommunityBinding(Session session, CommunityCourier courier) throws IOException { |
| | | if (courier == null) { |
| | | sendMessageToSession(session, "您还未绑定需代办的小区"); |
| | | throw new IllegalStateException("您还未绑定需代办的小区"); |
| | | } |
| | | } |
| | | |
| | | private long countAvailableOrders(CommunityCourier courier) { |
| | | return orderMapper.selectCount( |
| | | new LambdaQueryWrapper<Order>() |
| | | .eq(Order::getDelFlag, 0) |
| | | .eq(Order::getOrderStatus, 1) |
| | | .eq(Order::getPayStatus, 2) |
| | | .eq(Order::getCommunityId, courier.getCommunityId()) |
| | | ); |
| | | } |
| | | |
| | | public void sendWaitOrderNum(String userId) { |
| | | Session session = deliveryPersonSessions.get(userId); |
| | | AppUser appUser = getAuthenticatedAppUser(userId); |
| | | CommunityCourier courier = getCommunityCourier(appUser); |
| | | try { |
| | | validateCommunityBinding(session, courier); |
| | | } catch (IOException e) { |
| | | log.info("校验绑定跑腿员失败:{},跑腿员id:{}",e.getMessage(),courier.getCourierId()); |
| | | } |
| | | |
| | | long orderCount = countAvailableOrders(courier); |
| | | JSONObject message = new JSONObject(); |
| | | message.put("orderNum", orderCount); |
| | | sendNotification(userId, message.toJSONString()); |
| | | } |
| | | |
| | | |
| | | private void checkPendingNotifications(String userId) { |
| | | String key = "delivery:notification:" + userId; |
| | | List<Object> notifications = redisTemplate.opsForList().range(key, 0, -1); |
| | | |
| | | if (notifications != null && !notifications.isEmpty()) { |
| | | for (Object notification : notifications) { |
| | | sendNotification(userId, notification.toString()); |
| | | } |
| | | notifications.forEach(notification -> |
| | | sendNotification(userId, notification.toString()) |
| | | ); |
| | | redisTemplate.delete(key); |
| | | } |
| | | } |
| | | |
| | | public void sendNotification(String userId, String message) { |
| | | public static void sendNotification(String userId, String message) { |
| | | Session session = deliveryPersonSessions.get(userId); |
| | | if (session != null && session.isOpen()) { |
| | | try { |
| | | session.getBasicRemote().sendText(message); |
| | | } catch (IOException e) { |
| | | e.printStackTrace(); |
| | | } |
| | | sendMessageToSession(session, message); |
| | | } else { |
| | | // 用户不在线,存入Redis |
| | | String key = "delivery:notification:" + userId; |
| | | redisTemplate.opsForList().rightPush(key, message); |
| | | // 设置过期时间,比如1天 |
| | | redisTemplate.expire(key, 1, TimeUnit.DAYS); |
| | | } |
| | | } |
| | | |
| | | private static void sendMessageToSession(Session session, String message) { |
| | | try { |
| | | session.getBasicRemote().sendText(message); |
| | | } catch (IOException e) { |
| | | log.info("发送消息错误:{},消息内容:{}",e.getMessage(),message); |
| | | } |
| | | } |
| | | |
| | | private void closeSessionWithReason(Session session, String reason) { |
| | | try { |
| | | session.close(new CloseReason(CloseReason.CloseCodes.VIOLATED_POLICY, reason)); |
| | | } catch (IOException e) { |
| | | log.info("关闭session错误:{}",e.getMessage()); |
| | | } |
| | | } |
| | | |
| | | private void handleSessionError(Session session, String message, Throwable error) { |
| | | try { |
| | | sendMessageToSession(session, message); |
| | | closeSessionWithReason(session, message); |
| | | } finally { |
| | | String token = getTokenFromSession(session); |
| | | String userId = validateToken(token); |
| | | if (userId != null) { |
| | | deliveryPersonSessions.remove(userId); |
| | | } |
| | | } |
| | | } |
| | | } |