package com.ruoyi.errand.utils;
|
|
|
import com.alibaba.fastjson2.JSONObject;
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
import com.baomidou.mybatisplus.core.metadata.IPage;
|
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
import com.ruoyi.common.exception.ServiceException;
|
import com.ruoyi.errand.domain.AppUser;
|
import com.ruoyi.errand.domain.CommunityCourier;
|
import com.ruoyi.errand.domain.Order;
|
import com.ruoyi.errand.mapper.AppUserMapper;
|
import com.ruoyi.errand.mapper.CommunityCourierMapper;
|
import com.ruoyi.errand.mapper.OrderMapper;
|
import com.ruoyi.errand.object.vo.app.CourierOrderListVO;
|
import io.jsonwebtoken.Claims;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.data.redis.core.RedisTemplate;
|
|
import org.springframework.security.core.context.SecurityContextHolder;
|
import org.springframework.stereotype.Component;
|
|
import javax.annotation.Resource;
|
import javax.websocket.*;
|
import javax.websocket.server.PathParam;
|
import javax.websocket.server.ServerEndpoint;
|
import java.io.IOException;
|
import java.util.List;
|
import java.util.Objects;
|
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.TimeUnit;
|
|
@ServerEndpoint("/ws/delivery")
|
@Component
|
@Slf4j
|
public class DeliveryWebSocket {
|
|
private static final ConcurrentHashMap<String, Session> deliveryPersonSessions = new ConcurrentHashMap<>();
|
|
private static RedisTemplate redisTemplate;
|
@Autowired
|
public void setRedisTemplate(RedisTemplate redisTemplate) {
|
DeliveryWebSocket.redisTemplate = redisTemplate;
|
}
|
|
private static AppUserMapper appUserMapper;
|
private static OrderMapper orderMapper;
|
private static CommunityCourierMapper communityCourierMapper;
|
|
@Autowired
|
public void setAppUserMapper(AppUserMapper appUserMapper) {
|
DeliveryWebSocket.appUserMapper = appUserMapper;
|
}
|
@Autowired
|
public void setOrderMapper(OrderMapper orderMapper) {
|
DeliveryWebSocket.orderMapper = orderMapper;
|
}
|
|
@Autowired
|
public void setCommunityCourierMapper(CommunityCourierMapper communityCourierMapper) {
|
DeliveryWebSocket.communityCourierMapper = communityCourierMapper;
|
}
|
|
@OnOpen
|
public void onOpen(Session session) {
|
try {
|
String token = getTokenFromSession(session);
|
String userId = validateToken(token);
|
|
if (userId == null) {
|
closeSessionWithReason(session, "Invalid token");
|
return;
|
}
|
|
deliveryPersonSessions.put(userId, session);
|
checkPendingNotifications(userId);
|
sendWaitOrderNum(userId);
|
} catch (Exception e) {
|
handleSessionError(session, "Connection error", e);
|
}
|
}
|
|
|
|
@OnClose
|
public void onClose(Session session) {
|
try {
|
String token = getTokenFromSession(session);
|
String userId = validateToken(token);
|
|
if (userId != null) {
|
deliveryPersonSessions.remove(userId);
|
}
|
} catch (Exception e) {
|
log.info("关闭失败:{}",e.getMessage());
|
}
|
}
|
|
@OnError
|
public void onError(Session session, Throwable error) {
|
handleSessionError(session, "WebSocket error", error);
|
}
|
|
// Helper Methods
|
|
private String getTokenFromSession(Session session) {
|
return session.getRequestParameterMap().get("Authorization").stream().findFirst().orElse(null);
|
}
|
|
private String validateToken(String token) {
|
if (token == null || !token.startsWith("app:")) {
|
return null;
|
}
|
try {
|
String realToken = token.substring(4);
|
Claims claims = JwtUtil.parseJWT(realToken);
|
return claims.get("userId").toString();
|
} catch (Exception e) {
|
return null;
|
}
|
}
|
|
private AppUser getAuthenticatedAppUser(String userId) {
|
return appUserMapper.selectById(userId);
|
// return (AppUser) SecurityContextHolder.getContext().getAuthentication().getPrincipal();
|
}
|
|
private void validateCourierStatus(Session session, AppUser appUser) throws IOException {
|
if (appUser.getCourierId() == null) {
|
sendMessageToSession(session, "您已不是跑腿员");
|
throw new IllegalStateException("您已不是跑腿员");
|
}
|
}
|
|
private CommunityCourier getCommunityCourier(AppUser appUser) {
|
return communityCourierMapper.selectOne(
|
new LambdaQueryWrapper<CommunityCourier>()
|
.eq(CommunityCourier::getCourierId, appUser.getCourierId())
|
);
|
}
|
|
private void validateCommunityBinding(Session session, CommunityCourier courier) throws IOException {
|
if (courier == null) {
|
sendMessageToSession(session, "您还未绑定需代办的小区");
|
throw new IllegalStateException("您还未绑定需代办的小区");
|
}
|
}
|
|
private long countAvailableOrders(CommunityCourier courier) {
|
//待确认
|
Long count1 = orderMapper.selectCount(
|
new LambdaQueryWrapper<Order>()
|
.eq(Order::getDelFlag, 0)
|
.eq(Order::getOrderStatus, 1)
|
.eq(Order::getPayStatus, 2)
|
.eq(Order::getCommunityId, courier.getCommunityId())
|
);
|
//进行中
|
Long count2 = orderMapper.selectCount(
|
new LambdaQueryWrapper<Order>()
|
.eq(Order::getDelFlag, 0)
|
.eq(Order::getOrderStatus, 2)
|
.eq(Order::getPayStatus, 2)
|
.eq(Order::getCourierId, courier.getCourierId())
|
);
|
return count1 + count2;
|
}
|
|
public void sendWaitOrderNum(String userId) {
|
log.info("进入socket发送订单数量,userId:{}", userId);
|
Session session = deliveryPersonSessions.get(userId);
|
AppUser appUser = getAuthenticatedAppUser(userId);
|
CommunityCourier courier = getCommunityCourier(appUser);
|
try {
|
validateCommunityBinding(session, courier);
|
} catch (IOException e) {
|
log.info("校验绑定跑腿员失败:{},跑腿员id:{}",e.getMessage(),courier.getCourierId());
|
}
|
|
long orderCount = countAvailableOrders(courier);
|
JSONObject message = new JSONObject();
|
message.put("orderNum", orderCount);
|
sendNotification(userId, message.toJSONString());
|
}
|
|
|
private void checkPendingNotifications(String userId) {
|
String key = "delivery:notification:" + userId;
|
List<Object> notifications = redisTemplate.opsForList().range(key, 0, -1);
|
|
if (notifications != null && !notifications.isEmpty()) {
|
notifications.forEach(notification ->
|
sendNotification(userId, notification.toString())
|
);
|
redisTemplate.delete(key);
|
}
|
}
|
|
public static void sendNotification(String userId, String message) {
|
Session session = deliveryPersonSessions.get(userId);
|
if (session != null && session.isOpen()) {
|
sendMessageToSession(session, message);
|
log.info("发送订单数量:{}", message);
|
} else {
|
String key = "delivery:notification:" + userId;
|
redisTemplate.opsForList().rightPush(key, message);
|
redisTemplate.expire(key, 1, TimeUnit.DAYS);
|
}
|
}
|
|
private static void sendMessageToSession(Session session, String message) {
|
try {
|
session.getBasicRemote().sendText(message);
|
} catch (IOException e) {
|
log.info("发送消息错误:{},消息内容:{}",e.getMessage(),message);
|
}
|
}
|
|
private void closeSessionWithReason(Session session, String reason) {
|
try {
|
session.close(new CloseReason(CloseReason.CloseCodes.VIOLATED_POLICY, reason));
|
} catch (IOException e) {
|
log.info("关闭session错误:{}",e.getMessage());
|
}
|
}
|
|
private void handleSessionError(Session session, String message, Throwable error) {
|
try {
|
sendMessageToSession(session, message);
|
closeSessionWithReason(session, message);
|
} finally {
|
String token = getTokenFromSession(session);
|
String userId = validateToken(token);
|
if (userId != null) {
|
deliveryPersonSessions.remove(userId);
|
}
|
}
|
}
|
}
|