huliguo
2025-07-31 7b1772169b274e87fe441923f0dbf5e25ee30a72
pt-errand/src/main/java/com/ruoyi/errand/utils/DeliveryWebSocket.java
@@ -1,17 +1,28 @@
package com.ruoyi.errand.utils;
import com.alibaba.fastjson2.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.ruoyi.common.exception.ServiceException;
import com.ruoyi.errand.domain.AppUser;
import com.ruoyi.errand.domain.CommunityCourier;
import com.ruoyi.errand.domain.Order;
import com.ruoyi.errand.mapper.AppUserMapper;
import com.ruoyi.errand.mapper.CommunityCourierMapper;
import com.ruoyi.errand.mapper.OrderMapper;
import com.ruoyi.errand.object.vo.app.CourierOrderListVO;
import io.jsonwebtoken.Claims;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import javax.websocket.CloseReason;
import javax.websocket.OnClose;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
@@ -22,10 +33,10 @@
@ServerEndpoint("/ws/delivery")
@Component
public class    DeliveryWebSocket {
@Slf4j
public class DeliveryWebSocket {
    private static ConcurrentHashMap<String, Session> deliveryPersonSessions = new ConcurrentHashMap<>();
    private static final ConcurrentHashMap<String, Session> deliveryPersonSessions = new ConcurrentHashMap<>();
    private static  RedisTemplate redisTemplate;
    @Autowired
@@ -33,24 +44,68 @@
        DeliveryWebSocket.redisTemplate = redisTemplate;
    }
    private static AppUserMapper appUserMapper;
    private static OrderMapper orderMapper;
    private static CommunityCourierMapper communityCourierMapper;
    @Autowired
    public void setAppUserMapper(AppUserMapper appUserMapper) {
        DeliveryWebSocket.appUserMapper = appUserMapper;
    }
    @Autowired
    public void setOrderMapper(OrderMapper orderMapper) {
        DeliveryWebSocket.orderMapper = orderMapper;
    }
    @Autowired
    public void setCommunityCourierMapper(CommunityCourierMapper communityCourierMapper) {
        DeliveryWebSocket.communityCourierMapper = communityCourierMapper;
    }
    @OnOpen
    public void onOpen(Session session) {
        // 从查询参数获取token
        String token = session.getRequestParameterMap().get("token").stream().findFirst().orElse(null);
        // 验证token
        String userId=validateToken(token);
        if (null==userId) {
            try {
                session.close(new CloseReason(CloseReason.CloseCodes.VIOLATED_POLICY, "Invalid token"));
            } catch (IOException e) {
                e.printStackTrace();
            }
            return;
        }
        try {
            String token = getTokenFromSession(session);
            String userId = validateToken(token);
        deliveryPersonSessions.put(userId, session);
        checkPendingNotifications(userId);
            if (userId == null) {
                closeSessionWithReason(session, "Invalid token");
                return;
            }
            deliveryPersonSessions.put(userId, session);
            checkPendingNotifications(userId);
            sendWaitOrderNum(userId);
        } catch (Exception e) {
            handleSessionError(session, "Connection error", e);
        }
    }
    @OnClose
    public void onClose(Session session) {
        try {
            String token = getTokenFromSession(session);
            String userId = validateToken(token);
            if (userId != null) {
                deliveryPersonSessions.remove(userId);
            }
        } catch (Exception e) {
            log.info("关闭失败:{}",e.getMessage());
        }
    }
    @OnError
    public void onError(Session session, Throwable error) {
        handleSessionError(session, "WebSocket error", error);
    }
    // Helper Methods
    private String getTokenFromSession(Session session) {
        return session.getRequestParameterMap().get("Authorization").stream().findFirst().orElse(null);
    }
    private String validateToken(String token) {
@@ -58,58 +113,116 @@
            return null;
        }
        try {
            //解析token 获取userid,再查询到AppUser
            String realToken = token.substring(4);
            Claims claims = JwtUtil.parseJWT(realToken);
            String userId = claims.get("userId").toString();
            return userId;
            return claims.get("userId").toString();
        } catch (Exception e) {
            return null;
        }
    }
    @OnClose
    public void onClose(Session session) {
        String token = session.getRequestParameterMap().get("token").stream().findFirst().orElse(null);
        // 验证token
        String userId=validateToken(token);
        if (null==userId) {
            try {
                session.close(new CloseReason(CloseReason.CloseCodes.VIOLATED_POLICY, "Invalid token"));
            } catch (IOException e) {
                e.printStackTrace();
            }
            return;
        }
        deliveryPersonSessions.remove(userId);
    private AppUser getAuthenticatedAppUser(String userId) {
        return appUserMapper.selectById(userId);
//        return (AppUser) SecurityContextHolder.getContext().getAuthentication().getPrincipal();
    }
    private void validateCourierStatus(Session session, AppUser appUser) throws IOException {
        if (appUser.getCourierId() == null) {
            sendMessageToSession(session, "您已不是跑腿员");
            throw new IllegalStateException("您已不是跑腿员");
        }
    }
    private CommunityCourier getCommunityCourier(AppUser appUser) {
        return communityCourierMapper.selectOne(
                new LambdaQueryWrapper<CommunityCourier>()
                        .eq(CommunityCourier::getCourierId, appUser.getCourierId())
        );
    }
    private void validateCommunityBinding(Session session, CommunityCourier courier) throws IOException {
        if (courier == null) {
            sendMessageToSession(session, "您还未绑定需代办的小区");
            throw new IllegalStateException("您还未绑定需代办的小区");
        }
    }
    private long countAvailableOrders(CommunityCourier courier) {
        return orderMapper.selectCount(
                new LambdaQueryWrapper<Order>()
                        .eq(Order::getDelFlag, 0)
                        .eq(Order::getOrderStatus, 1)
                        .eq(Order::getPayStatus, 2)
                        .eq(Order::getCommunityId, courier.getCommunityId())
        );
    }
    public void sendWaitOrderNum(String userId) {
        Session session = deliveryPersonSessions.get(userId);
        AppUser appUser = getAuthenticatedAppUser(userId);
        CommunityCourier courier = getCommunityCourier(appUser);
        try {
            validateCommunityBinding(session, courier);
        } catch (IOException e) {
            log.info("校验绑定跑腿员失败:{},跑腿员id:{}",e.getMessage(),courier.getCourierId());
        }
        long orderCount = countAvailableOrders(courier);
        JSONObject message = new JSONObject();
        message.put("orderNum", orderCount);
        sendNotification(userId, message.toJSONString());
    }
    private void checkPendingNotifications(String userId) {
        String key = "delivery:notification:" + userId;
        List<Object> notifications = redisTemplate.opsForList().range(key, 0, -1);
        if (notifications != null && !notifications.isEmpty()) {
            for (Object notification : notifications) {
                sendNotification(userId, notification.toString());
            }
            notifications.forEach(notification ->
                    sendNotification(userId, notification.toString())
            );
            redisTemplate.delete(key);
        }
    }
    public  void sendNotification(String userId, String message) {
    public static void sendNotification(String userId, String message) {
        Session session = deliveryPersonSessions.get(userId);
        if (session != null && session.isOpen()) {
            try {
                session.getBasicRemote().sendText(message);
            } catch (IOException e) {
                e.printStackTrace();
            }
            sendMessageToSession(session, message);
        } else {
            // 用户不在线,存入Redis
            String key = "delivery:notification:" + userId;
            redisTemplate.opsForList().rightPush(key, message);
            // 设置过期时间,比如1天
            redisTemplate.expire(key, 1, TimeUnit.DAYS);
        }
    }
    private static void sendMessageToSession(Session session, String message) {
        try {
            session.getBasicRemote().sendText(message);
        } catch (IOException e) {
            log.info("发送消息错误:{},消息内容:{}",e.getMessage(),message);
        }
    }
    private void closeSessionWithReason(Session session, String reason) {
        try {
            session.close(new CloseReason(CloseReason.CloseCodes.VIOLATED_POLICY, reason));
        } catch (IOException e) {
            log.info("关闭session错误:{}",e.getMessage());
        }
    }
    private void handleSessionError(Session session, String message, Throwable error) {
        try {
            sendMessageToSession(session, message);
            closeSessionWithReason(session, message);
        } finally {
            String token = getTokenFromSession(session);
            String userId = validateToken(token);
            if (userId != null) {
                deliveryPersonSessions.remove(userId);
            }
        }
    }
}