From 7b1772169b274e87fe441923f0dbf5e25ee30a72 Mon Sep 17 00:00:00 2001 From: huliguo <2023611923@qq.com> Date: 星期四, 31 七月 2025 14:05:56 +0800 Subject: [PATCH] 优化 --- pt-errand/src/main/java/com/ruoyi/errand/utils/DeliveryWebSocket.java | 213 ++++++++++++++++++++++++++++++++++++++++------------ 1 files changed, 163 insertions(+), 50 deletions(-) diff --git a/pt-errand/src/main/java/com/ruoyi/errand/utils/DeliveryWebSocket.java b/pt-errand/src/main/java/com/ruoyi/errand/utils/DeliveryWebSocket.java index afe69fb..dd21ec5 100644 --- a/pt-errand/src/main/java/com/ruoyi/errand/utils/DeliveryWebSocket.java +++ b/pt-errand/src/main/java/com/ruoyi/errand/utils/DeliveryWebSocket.java @@ -1,17 +1,28 @@ package com.ruoyi.errand.utils; +import com.alibaba.fastjson2.JSONObject; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import com.ruoyi.common.exception.ServiceException; +import com.ruoyi.errand.domain.AppUser; +import com.ruoyi.errand.domain.CommunityCourier; +import com.ruoyi.errand.domain.Order; +import com.ruoyi.errand.mapper.AppUserMapper; +import com.ruoyi.errand.mapper.CommunityCourierMapper; +import com.ruoyi.errand.mapper.OrderMapper; +import com.ruoyi.errand.object.vo.app.CourierOrderListVO; import io.jsonwebtoken.Claims; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.stereotype.Component; import javax.annotation.Resource; -import javax.websocket.CloseReason; -import javax.websocket.OnClose; -import javax.websocket.OnOpen; -import javax.websocket.Session; +import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; @@ -22,10 +33,10 @@ @ServerEndpoint("/ws/delivery") @Component -public class DeliveryWebSocket { +@Slf4j +public class DeliveryWebSocket { - private static ConcurrentHashMap<String, Session> deliveryPersonSessions = new ConcurrentHashMap<>(); - + private static final ConcurrentHashMap<String, Session> deliveryPersonSessions = new ConcurrentHashMap<>(); private static RedisTemplate redisTemplate; @Autowired @@ -33,24 +44,68 @@ DeliveryWebSocket.redisTemplate = redisTemplate; } + private static AppUserMapper appUserMapper; + private static OrderMapper orderMapper; + private static CommunityCourierMapper communityCourierMapper; + + @Autowired + public void setAppUserMapper(AppUserMapper appUserMapper) { + DeliveryWebSocket.appUserMapper = appUserMapper; + } + @Autowired + public void setOrderMapper(OrderMapper orderMapper) { + DeliveryWebSocket.orderMapper = orderMapper; + } + + @Autowired + public void setCommunityCourierMapper(CommunityCourierMapper communityCourierMapper) { + DeliveryWebSocket.communityCourierMapper = communityCourierMapper; + } @OnOpen public void onOpen(Session session) { - // 从查询参数获取token - String token = session.getRequestParameterMap().get("token").stream().findFirst().orElse(null); - // 验证token - String userId=validateToken(token); - if (null==userId) { - try { - session.close(new CloseReason(CloseReason.CloseCodes.VIOLATED_POLICY, "Invalid token")); - } catch (IOException e) { - e.printStackTrace(); - } - return; - } + try { + String token = getTokenFromSession(session); + String userId = validateToken(token); - deliveryPersonSessions.put(userId, session); - checkPendingNotifications(userId); + if (userId == null) { + closeSessionWithReason(session, "Invalid token"); + return; + } + + deliveryPersonSessions.put(userId, session); + checkPendingNotifications(userId); + sendWaitOrderNum(userId); + } catch (Exception e) { + handleSessionError(session, "Connection error", e); + } + } + + + + @OnClose + public void onClose(Session session) { + try { + String token = getTokenFromSession(session); + String userId = validateToken(token); + + if (userId != null) { + deliveryPersonSessions.remove(userId); + } + } catch (Exception e) { + log.info("关闭失败:{}",e.getMessage()); + } + } + + @OnError + public void onError(Session session, Throwable error) { + handleSessionError(session, "WebSocket error", error); + } + + // Helper Methods + + private String getTokenFromSession(Session session) { + return session.getRequestParameterMap().get("Authorization").stream().findFirst().orElse(null); } private String validateToken(String token) { @@ -58,58 +113,116 @@ return null; } try { - //解析token 获取userid,再查询到AppUser String realToken = token.substring(4); Claims claims = JwtUtil.parseJWT(realToken); - String userId = claims.get("userId").toString(); - return userId; + return claims.get("userId").toString(); } catch (Exception e) { return null; } } - - @OnClose - public void onClose(Session session) { - String token = session.getRequestParameterMap().get("token").stream().findFirst().orElse(null); - // 验证token - String userId=validateToken(token); - if (null==userId) { - try { - session.close(new CloseReason(CloseReason.CloseCodes.VIOLATED_POLICY, "Invalid token")); - } catch (IOException e) { - e.printStackTrace(); - } - return; - } - deliveryPersonSessions.remove(userId); + private AppUser getAuthenticatedAppUser(String userId) { + return appUserMapper.selectById(userId); +// return (AppUser) SecurityContextHolder.getContext().getAuthentication().getPrincipal(); } + + private void validateCourierStatus(Session session, AppUser appUser) throws IOException { + if (appUser.getCourierId() == null) { + sendMessageToSession(session, "您已不是跑腿员"); + throw new IllegalStateException("您已不是跑腿员"); + } + } + + private CommunityCourier getCommunityCourier(AppUser appUser) { + return communityCourierMapper.selectOne( + new LambdaQueryWrapper<CommunityCourier>() + .eq(CommunityCourier::getCourierId, appUser.getCourierId()) + ); + } + + private void validateCommunityBinding(Session session, CommunityCourier courier) throws IOException { + if (courier == null) { + sendMessageToSession(session, "您还未绑定需代办的小区"); + throw new IllegalStateException("您还未绑定需代办的小区"); + } + } + + private long countAvailableOrders(CommunityCourier courier) { + return orderMapper.selectCount( + new LambdaQueryWrapper<Order>() + .eq(Order::getDelFlag, 0) + .eq(Order::getOrderStatus, 1) + .eq(Order::getPayStatus, 2) + .eq(Order::getCommunityId, courier.getCommunityId()) + ); + } + + public void sendWaitOrderNum(String userId) { + Session session = deliveryPersonSessions.get(userId); + AppUser appUser = getAuthenticatedAppUser(userId); + CommunityCourier courier = getCommunityCourier(appUser); + try { + validateCommunityBinding(session, courier); + } catch (IOException e) { + log.info("校验绑定跑腿员失败:{},跑腿员id:{}",e.getMessage(),courier.getCourierId()); + } + + long orderCount = countAvailableOrders(courier); + JSONObject message = new JSONObject(); + message.put("orderNum", orderCount); + sendNotification(userId, message.toJSONString()); + } + private void checkPendingNotifications(String userId) { String key = "delivery:notification:" + userId; List<Object> notifications = redisTemplate.opsForList().range(key, 0, -1); + if (notifications != null && !notifications.isEmpty()) { - for (Object notification : notifications) { - sendNotification(userId, notification.toString()); - } + notifications.forEach(notification -> + sendNotification(userId, notification.toString()) + ); redisTemplate.delete(key); } } - public void sendNotification(String userId, String message) { + public static void sendNotification(String userId, String message) { Session session = deliveryPersonSessions.get(userId); if (session != null && session.isOpen()) { - try { - session.getBasicRemote().sendText(message); - } catch (IOException e) { - e.printStackTrace(); - } + sendMessageToSession(session, message); } else { - // 用户不在线,存入Redis String key = "delivery:notification:" + userId; redisTemplate.opsForList().rightPush(key, message); - // 设置过期时间,比如1天 redisTemplate.expire(key, 1, TimeUnit.DAYS); } } + + private static void sendMessageToSession(Session session, String message) { + try { + session.getBasicRemote().sendText(message); + } catch (IOException e) { + log.info("发送消息错误:{},消息内容:{}",e.getMessage(),message); + } + } + + private void closeSessionWithReason(Session session, String reason) { + try { + session.close(new CloseReason(CloseReason.CloseCodes.VIOLATED_POLICY, reason)); + } catch (IOException e) { + log.info("关闭session错误:{}",e.getMessage()); + } + } + + private void handleSessionError(Session session, String message, Throwable error) { + try { + sendMessageToSession(session, message); + closeSessionWithReason(session, message); + } finally { + String token = getTokenFromSession(session); + String userId = validateToken(token); + if (userId != null) { + deliveryPersonSessions.remove(userId); + } + } + } } \ No newline at end of file -- Gitblit v1.7.1