xuhy
2 天以前 2f16c1a3f958371cc12408ece6a4f7931bbad904
websocket连接
4个文件已修改
2个文件已删除
9个文件已添加
2072 ■■■■■ 已修改文件
ruoyi-admin/pom.xml 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-admin/src/main/java/com/ruoyi/web/webSocket/WebSocketAuthHandler.java 249 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-admin/src/main/java/com/ruoyi/web/webSocket/WebSocketController.java 270 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-admin/src/main/java/com/ruoyi/web/webSocket/WebSocketServer.java 88 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-admin/src/main/java/com/ruoyi/web/webSocket/WebSocketServerConfig.java 32 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-admin/src/main/java/com/ruoyi/web/webSocket/WebSocketTextFrameHandler.java 189 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-admin/src/main/java/com/ruoyi/web/webSocket/WebSocketUserConnectionManager.java 281 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-admin/src/main/java/com/ruoyi/web/webSocket/WebSocketUserService.java 286 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-applet/pom.xml 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-applet/src/main/java/com/ruoyi/RuoYiAppletApplication.java 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-applet/src/main/java/com/ruoyi/web/controller/api/TMissionController.java 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-applet/src/main/java/com/ruoyi/web/controller/api/WebSocketConfig.java 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-applet/src/main/java/com/ruoyi/web/controller/api/WebSocketServerSolo.java 173 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-applet/src/test/java/com/ruoyi/web/controller/webSocket/SimpleWebSocketClient.java 144 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-applet/src/test/java/com/ruoyi/web/controller/webSocket/WebSocketClientTest.java 312 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-admin/pom.xml
@@ -154,7 +154,10 @@
            <artifactId>dysmsapi20170525</artifactId>
            <version>2.0.10</version>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
        </dependency>
    </dependencies>
    <build>
ruoyi-admin/src/main/java/com/ruoyi/web/webSocket/WebSocketAuthHandler.java
New file
@@ -0,0 +1,249 @@
package com.ruoyi.web.webSocket;
import com.ruoyi.common.constant.Constants;
import com.ruoyi.common.core.domain.model.LoginUserApplet;
import com.ruoyi.common.core.redis.RedisCache;
import com.ruoyi.system.model.TAppUser;
import com.ruoyi.system.service.TAppUserService;
import io.jsonwebtoken.Claims;
import io.jsonwebtoken.Jwts;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.util.Map;
/**
 * WebSocket认证处理器
 * 处理WebSocket连接的用户认证逻辑
 */
@Component
@Slf4j
public class WebSocketAuthHandler {
    @Autowired
    private TAppUserService appUserService;
    @Autowired
    private RedisCache redisCache;
    // 令牌秘钥
    @Value("${token.secret}")
    private String secret;
    /**
     * 验证用户认证信息
     * @param appUserId 用户ID
     * @param token 认证令牌(可选)
     * @return 认证结果
     */
    public AuthResult authenticateUser(String appUserId, String token) {
        try {
            if (appUserId == null || appUserId.trim().isEmpty()) {
                return AuthResult.failure("用户ID不能为空");
            }
            // 查询用户信息
            TAppUser appUser = appUserService.getById(appUserId);
            if (appUser == null) {
                return AuthResult.failure("用户不存在");
            }
            // 检查用户状态
            if (appUser.getStatus() == null || appUser.getStatus() != 1) {
                return AuthResult.failure("用户已被禁用");
            }
            // 检查用户审核状态
            if (appUser.getState() == null || appUser.getState() != 1) {
                return AuthResult.failure("用户未通过审核");
            }
            // 这里可以添加token验证逻辑
            if (token != null && !token.trim().isEmpty()) {
                // 验证token的有效性
                if (!validateToken(appUserId, token)) {
                    return AuthResult.failure("认证令牌无效");
                }
            }
            return AuthResult.success(appUser);
        } catch (Exception e) {
            log.error("用户认证失败,用户ID: {}, 错误: {}", appUserId, e.getMessage(), e);
            return AuthResult.failure("认证过程中发生错误: " + e.getMessage());
        }
    }
    /**
     * 验证token
     * @param appUserId 用户ID
     * @param token 令牌
     * @return 是否有效
     */
    private boolean validateToken(String appUserId, String token) {
        try {
            if (!StringUtils.hasText(token)) {
                log.debug("用户 {} 未提供token,跳过token验证", appUserId);
                return true; // 如果没有提供token,允许通过(可以根据业务需求调整)
            }
            // 移除Bearer前缀
            if (token.startsWith(Constants.TOKEN_PREFIX)) {
                token = token.replace(Constants.TOKEN_PREFIX, "");
            }
            // 解析JWT token
            Claims claims = parseToken(token);
            if (claims == null) {
                log.warn("用户 {} 的token解析失败", appUserId);
                return false;
            }
            // 从token中获取用户标识
            String uuid = (String) claims.get(Constants.LOGIN_USER_APPLET_KEY);
            if (!StringUtils.hasText(uuid)) {
                log.warn("用户 {} 的token中未找到用户标识", appUserId);
                return false;
            }
            // 从Redis中获取用户登录信息
            String userKey = getTokenKey(uuid);
            LoginUserApplet loginUser = redisCache.getCacheObject(userKey);
            if (loginUser == null) {
                log.warn("用户 {} 的登录信息已过期或不存在", appUserId);
                return false;
            }
            // 验证用户ID是否匹配
            if (!appUserId.equals(loginUser.getUserId())) {
                log.warn("用户 {} 的token与提供的用户ID不匹配", appUserId);
                return false;
            }
            // 验证token是否过期
            long currentTime = System.currentTimeMillis();
            if (loginUser.getExpireTime() != null && loginUser.getExpireTime() < currentTime) {
                log.warn("用户 {} 的token已过期", appUserId);
                return false;
            }
            log.debug("用户 {} 的token验证成功", appUserId);
            return true;
        } catch (Exception e) {
            log.error("验证用户 {} 的token时发生错误: {}", appUserId, e.getMessage(), e);
            return false;
        }
    }
    /**
     * 解析JWT token
     * @param token JWT token
     * @return Claims对象
     */
    private Claims parseToken(String token) {
        try {
            return Jwts.parser()
                    .setSigningKey(secret)
                    .parseClaimsJws(token)
                    .getBody();
        } catch (Exception e) {
            log.error("解析token失败: {}", e.getMessage());
            return null;
        }
    }
    /**
     * 获取token在Redis中的key
     * @param uuid 用户唯一标识
     * @return Redis key
     */
    private String getTokenKey(String uuid) {
        return com.ruoyi.common.constant.CacheConstants.LOGIN_TOKEN_KEY + uuid;
    }
    /**
     * 认证结果类
     */
    public static class AuthResult {
        private final boolean success;
        private final String message;
        private final TAppUser appUser;
        private AuthResult(boolean success, String message, TAppUser appUser) {
            this.success = success;
            this.message = message;
            this.appUser = appUser;
        }
        public static AuthResult success(TAppUser appUser) {
            return new AuthResult(true, "认证成功", appUser);
        }
        public static AuthResult failure(String message) {
            return new AuthResult(false, message, null);
        }
        public boolean isSuccess() {
            return success;
        }
        public String getMessage() {
            return message;
        }
        public TAppUser getAppUser() {
            return appUser;
        }
    }
    /**
     * 从认证消息中提取用户信息
     * @param messageObj 消息对象
     * @return 用户认证信息
     */
    public UserAuthInfo extractAuthInfo(Map<String, Object> messageObj) {
        String appUserId = (String) messageObj.get("appUserId");
        String token = (String) messageObj.get("token");
        String deviceId = (String) messageObj.get("deviceId");
        String deviceType = (String) messageObj.get("deviceType");
        return new UserAuthInfo(appUserId, token, deviceId, deviceType);
    }
    /**
     * 用户认证信息类
     */
    public static class UserAuthInfo {
        private final String appUserId;
        private final String token;
        private final String deviceId;
        private final String deviceType;
        public UserAuthInfo(String appUserId, String token, String deviceId, String deviceType) {
            this.appUserId = appUserId;
            this.token = token;
            this.deviceId = deviceId;
            this.deviceType = deviceType;
        }
        public String getAppUserId() {
            return appUserId;
        }
        public String getToken() {
            return token;
        }
        public String getDeviceId() {
            return deviceId;
        }
        public String getDeviceType() {
            return deviceType;
        }
    }
}
ruoyi-admin/src/main/java/com/ruoyi/web/webSocket/WebSocketController.java
New file
@@ -0,0 +1,270 @@
package com.ruoyi.web.webSocket;
import com.ruoyi.common.core.controller.BaseController;
import com.ruoyi.common.core.domain.R;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
 * WebSocket控制器
 * 提供REST接口用于触发服务端主动发送消息
 * 支持基于AppUser的消息发送
 */
@RestController
@RequestMapping("/open/websocket")
@Slf4j
public class WebSocketController extends BaseController {
    @Autowired
    private WebSocketUserService webSocketUserService;
    // ==================== 基于AppUser的WebSocket接口 ====================
    /**
     * 向指定用户发送消息
     * @param appUserId 用户ID
     * @param request 请求参数
     * @return 操作结果
     */
    @PostMapping("/user/send/{appUserId}")
    public R<String> sendMessageToUser(@PathVariable String appUserId, @RequestBody Map<String, String> request) {
        try {
            String message = request.get("message");
            if (message == null || message.trim().isEmpty()) {
                return R.fail("消息内容不能为空");
            }
            int deviceCount = webSocketUserService.sendMessageToUser(appUserId, message);
            if (deviceCount > 0) {
                return R.ok("消息发送成功,发送到 " + deviceCount + " 个设备");
            } else {
                return R.fail("用户不在线或发送失败");
            }
        } catch (Exception e) {
            log.error("向用户发送消息失败", e);
            return R.fail("发送消息失败: " + e.getMessage());
        }
    }
    /**
     * 向指定用户发送JSON格式消息
     * @param appUserId 用户ID
     * @param request 请求参数
     * @return 操作结果
     */
    @PostMapping("/user/send-json/{appUserId}")
    public R<String> sendJsonMessageToUser(@PathVariable String appUserId, @RequestBody Map<String, Object> request) {
        try {
            String messageType = (String) request.get("messageType");
            Object data = request.get("data");
            if (messageType == null || messageType.trim().isEmpty()) {
                return R.fail("消息类型不能为空");
            }
            int deviceCount = webSocketUserService.sendJsonMessageToUser(appUserId, messageType, data);
            if (deviceCount > 0) {
                return R.ok("JSON消息发送成功,发送到 " + deviceCount + " 个设备");
            } else {
                return R.fail("用户不在线或发送失败");
            }
        } catch (Exception e) {
            log.error("向用户发送JSON消息失败", e);
            return R.fail("发送消息失败: " + e.getMessage());
        }
    }
    /**
     * 向多个用户发送消息
     * @param request 请求参数
     * @return 操作结果
     */
    @PostMapping("/user/send-multiple")
    public R<String> sendMessageToUsers(@RequestBody Map<String, Object> request) {
        try {
            @SuppressWarnings("unchecked")
            List<String> appUserIds = (List<String>) request.get("appUserIds");
            String message = (String) request.get("message");
            if (appUserIds == null || appUserIds.isEmpty()) {
                return R.fail("用户ID列表不能为空");
            }
            if (message == null || message.trim().isEmpty()) {
                return R.fail("消息内容不能为空");
            }
            int successUserCount = webSocketUserService.sendMessageToUsers(appUserIds, message);
            return R.ok("消息发送完成,成功发送到 " + successUserCount + " 个用户");
        } catch (Exception e) {
            log.error("向多个用户发送消息失败", e);
            return R.fail("发送消息失败: " + e.getMessage());
        }
    }
    /**
     * 发送任务分配消息
     * @param request 请求参数
     * @return 操作结果
     */
    @PostMapping("/user/task-assignment")
    public R<String> sendTaskAssignment(@RequestBody Map<String, String> request) {
        try {
            String taskId = request.get("taskId");
            String taskName = request.get("taskName");
            String assignee = request.get("assignee");
            String assigneeUserId = request.get("assigneeUserId");
            if (taskId == null || taskName == null || assignee == null) {
                return R.fail("任务ID、任务名称和分配人不能为空");
            }
            int successCount = webSocketUserService.sendTaskAssignment(taskId, taskName, assignee, assigneeUserId);
            return R.ok("任务分配消息发送完成,成功发送到 " + successCount + " 个设备");
        } catch (Exception e) {
            log.error("发送任务分配消息失败", e);
            return R.fail("发送任务分配消息失败: " + e.getMessage());
        }
    }
    /**
     * 发送状态更新消息
     * @param request 请求参数
     * @return 操作结果
     */
    @PostMapping("/user/status-update")
    public R<String> sendStatusUpdate(@RequestBody Map<String, String> request) {
        try {
            String status = request.get("status");
            String details = request.get("details");
            String targetUserId = request.get("targetUserId");
            if (status == null || status.trim().isEmpty()) {
                return R.fail("状态不能为空");
            }
            int successCount = webSocketUserService.sendStatusUpdate(status, details, targetUserId);
            return R.ok("状态更新消息发送完成,成功发送到 " + successCount + " 个设备");
        } catch (Exception e) {
            log.error("发送状态更新消息失败", e);
            return R.fail("发送状态更新消息失败: " + e.getMessage());
        }
    }
    /**
     * 发送任务状态更新消息
     * @param request 请求参数
     * @return 操作结果
     */
    @PostMapping("/user/task-status-update")
    public R<String> sendTaskStatusUpdate(@RequestBody Map<String, String> request) {
        try {
            String taskId = request.get("taskId");
            String taskStatus = request.get("taskStatus");
            String message = request.get("message");
            String targetUserId = request.get("targetUserId");
            if (taskId == null || taskStatus == null) {
                return R.fail("任务ID和任务状态不能为空");
            }
            int successCount = webSocketUserService.sendTaskStatusUpdate(taskId, taskStatus, message, targetUserId);
            return R.ok("任务状态更新消息发送完成,成功发送到 " + successCount + " 个设备");
        } catch (Exception e) {
            log.error("发送任务状态更新消息失败", e);
            return R.fail("发送任务状态更新消息失败: " + e.getMessage());
        }
    }
    /**
     * 发送位置更新消息
     * @param request 请求参数
     * @return 操作结果
     */
    @PostMapping("/user/location-update")
    public R<String> sendLocationUpdate(@RequestBody Map<String, String> request) {
        try {
            String userId = request.get("userId");
            String longitude = request.get("longitude");
            String latitude = request.get("latitude");
            String address = request.get("address");
            if (userId == null || longitude == null || latitude == null) {
                return R.fail("用户ID、经度和纬度不能为空");
            }
            int successCount = webSocketUserService.sendLocationUpdate(userId, longitude, latitude, address);
            return R.ok("位置更新消息发送完成,成功发送到 " + successCount + " 个设备");
        } catch (Exception e) {
            log.error("发送位置更新消息失败", e);
            return R.fail("发送位置更新消息失败: " + e.getMessage());
        }
    }
    /**
     * 获取用户连接信息
     * @param appUserId 用户ID
     * @return 连接信息
     */
    @GetMapping("/user/info/{appUserId}")
    public R getUserConnectionInfo(@PathVariable String appUserId) {
        try {
            Map<String, Object> info = webSocketUserService.getUserConnectionInfo(appUserId);
            return R.ok(info, "获取用户连接信息成功");
        } catch (Exception e) {
            log.error("获取用户连接信息失败", e);
            return R.fail("获取用户连接信息失败: " + e.getMessage());
        }
    }
    /**
     * 获取所有连接统计信息
     * @return 统计信息
     */
    @GetMapping("/user/stats")
    public R getConnectionStats() {
        try {
            Map<String, Object> stats = webSocketUserService.getConnectionStats();
            return R.ok(stats, "获取连接统计信息成功");
        } catch (Exception e) {
            log.error("获取连接统计信息失败", e);
            return R.fail("获取连接统计信息失败: " + e.getMessage());
        }
    }
    /**
     * 检查用户是否在线
     * @param appUserId 用户ID
     * @return 检查结果
     */
    @GetMapping("/user/online/{appUserId}")
    public R checkUserOnline(@PathVariable String appUserId) {
        try {
            boolean isOnline = webSocketUserService.isUserOnline(appUserId);
            return R.ok(isOnline, "用户在线状态检查完成");
        } catch (Exception e) {
            log.error("检查用户在线状态失败", e);
            return R.fail("检查用户在线状态失败: " + e.getMessage());
        }
    }
    /**
     * 获取在线用户列表
     * @return 在线用户列表
     */
    @GetMapping("/user/online-list")
    public R getOnlineUserIds() {
        try {
            Set<String> onlineUserIds = webSocketUserService.getOnlineUserIds();
            return R.ok(onlineUserIds, "获取在线用户列表成功");
        } catch (Exception e) {
            log.error("获取在线用户列表失败", e);
            return R.fail("获取在线用户列表失败: " + e.getMessage());
        }
    }
}
ruoyi-admin/src/main/java/com/ruoyi/web/webSocket/WebSocketServer.java
New file
@@ -0,0 +1,88 @@
package com.ruoyi.web.webSocket;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class WebSocketServer {
    private final int port;
    @Autowired
    private WebSocketTextFrameHandler webSocketTextFrameHandler;
    public WebSocketServer() {
        this.port = 8888; // 默认端口
    }
    public WebSocketServer(int port) {
        this.port = port;
    }
    public void run() throws InterruptedException {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .handler(new LoggingHandler(LogLevel.INFO))  // 在BossGroup中增加一个日志处理器 日志级别为INFO
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            // 因为是基于Http协议的所以采用Http的编解码器
                            pipeline.addLast(new HttpServerCodec());
                            // 是以块的方式写, 添加ChunkedWriteHandler(分块写入处理程序)
                            pipeline.addLast(new ChunkedWriteHandler());
                            /*
                             * http 数据在传输过程中是分段的 http Object aggregator 就是可以将多个段聚合
                             * 这就是为什么 当浏览器发送大量数据时, 就会出现多次http请求
                             * 参数为 : 最大内容长度
                             */
                            pipeline.addLast(new HttpObjectAggregator(8192));
                            /*
                             * 对应WebSocket 他的数据时以桢(frame) 形式传递
                             * 可以看到WebSocketFrame下面有6个子类
                             * 浏览器请求时: ws://localhost:7000/xxx 请求的url
                             * 核心功能是将http协议升级为ws协议 保持长链接
                             */
                            pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));
                            // 自定义Handler, 处理业务逻辑
                            // 使用@Sharable注解的Handler,可以被多个连接共享
                            pipeline.addLast(webSocketTextFrameHandler);
                        }
                    });
            log.info("WebSocket服务器启动中,端口: {}", port);
            ChannelFuture sync = serverBootstrap.bind(port).sync();
            sync.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            log.error("WebSocket服务器启动失败", e);
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
//    public static void main(String[] args) throws InterruptedException {
//        new WebSocketServer(7000).run();
//    }
}
ruoyi-admin/src/main/java/com/ruoyi/web/webSocket/WebSocketServerConfig.java
New file
@@ -0,0 +1,32 @@
package com.ruoyi.web.webSocket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
/**
 * WebSocket服务器配置类
 * 在Spring Boot应用启动时自动启动WebSocket服务器
 */
@Component
@Slf4j
public class WebSocketServerConfig implements CommandLineRunner {
    @Autowired
    private WebSocketServer webSocketServer;
    @Override
    public void run(String... args) throws Exception {
        // 在新线程中启动WebSocket服务器,避免阻塞主线程
        new Thread(() -> {
            try {
                log.info("正在启动WebSocket服务器...");
                webSocketServer.run();
            } catch (InterruptedException e) {
                log.error("WebSocket服务器启动失败", e);
                Thread.currentThread().interrupt();
            }
        }, "WebSocket-Server-Thread").start();
    }
}
ruoyi-admin/src/main/java/com/ruoyi/web/webSocket/WebSocketTextFrameHandler.java
New file
@@ -0,0 +1,189 @@
package com.ruoyi.web.webSocket;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.time.LocalDateTime;
/**
 * TextWebSocketFrame 表示一个文本桢
 * 使用@Sharable注解使其可以被多个连接共享
 * 支持基于AppUser的连接管理
 */
@Component
@Slf4j
@ChannelHandler.Sharable
public class WebSocketTextFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    @Autowired
    private WebSocketUserConnectionManager userConnectionManager;
    @Autowired
    private WebSocketAuthHandler authHandler;
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        String message = msg.text();
        log.info("[服务器] : 收到消息 -> {}", message);
        try {
            // 尝试解析消息,检查是否包含用户认证信息
            JSONObject messageObj = JSON.parseObject(message);
            String messageType = messageObj.getString("type");
            if ("auth".equals(messageType)) {
                // 处理用户认证消息
                handleUserAuth(ctx, messageObj);
            } else if ("ping".equals(messageType)) {
                // 处理心跳消息
                handlePing(ctx, messageObj);
            } else {
                // 处理普通业务消息
                handleBusinessMessage(ctx, messageObj);
            }
        } catch (Exception e) {
            // 如果不是JSON格式,按普通文本处理
            log.debug("消息不是JSON格式,按普通文本处理: {}", message);
            ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器时间: " + LocalDateTime.now() + "->" + message));
        }
    }
    /**
     * 处理用户认证消息
     */
    private void handleUserAuth(ChannelHandlerContext ctx, JSONObject messageObj) {
        try {
            // 提取认证信息
            WebSocketAuthHandler.UserAuthInfo authInfo = authHandler.extractAuthInfo(messageObj);
            String appUserId = authInfo.getAppUserId();
            String token = authInfo.getToken();
            if (!StringUtils.hasText(appUserId)) {
                sendAuthFailureResponse(ctx, "用户ID不能为空");
                return;
            }
            // 进行用户认证
            WebSocketAuthHandler.AuthResult authResult = authHandler.authenticateUser(appUserId, token);
            if (authResult.isSuccess()) {
                // 认证成功,将连接与用户关联
                userConnectionManager.addUserConnection(appUserId, ctx.channel());
                // 发送认证成功响应
                JSONObject response = new JSONObject();
                response.put("type", "auth_success");
                response.put("appUserId", appUserId);
                response.put("nickName", authResult.getAppUser().getNickName());
                response.put("message", "认证成功");
                response.put("timestamp", LocalDateTime.now().toString());
                ctx.channel().writeAndFlush(new TextWebSocketFrame(response.toJSONString()));
                log.info("用户 {} ({}) 认证成功,ChannelId: {}",
                        appUserId, authResult.getAppUser().getNickName(), ctx.channel().id().asLongText());
            } else {
                // 认证失败
                sendAuthFailureResponse(ctx, authResult.getMessage());
                log.warn("用户 {} 认证失败: {}, ChannelId: {}",
                        appUserId, authResult.getMessage(), ctx.channel().id().asLongText());
            }
        } catch (Exception e) {
            log.error("处理用户认证时发生错误", e);
            sendAuthFailureResponse(ctx, "认证过程中发生错误: " + e.getMessage());
        }
    }
    /**
     * 发送认证失败响应
     */
    private void sendAuthFailureResponse(ChannelHandlerContext ctx, String message) {
        JSONObject response = new JSONObject();
        response.put("type", "auth_failed");
        response.put("message", message);
        response.put("timestamp", LocalDateTime.now().toString());
        ctx.channel().writeAndFlush(new TextWebSocketFrame(response.toJSONString()));
    }
    /**
     * 处理心跳消息
     */
    private void handlePing(ChannelHandlerContext ctx, JSONObject messageObj) {
        String appUserId = userConnectionManager.getUserIdByChannelId(ctx.channel().id().asLongText());
        JSONObject response = new JSONObject();
        response.put("type", "pong");
        response.put("appUserId", appUserId);
        response.put("timestamp", LocalDateTime.now().toString());
        ctx.channel().writeAndFlush(new TextWebSocketFrame(response.toJSONString()));
    }
    /**
     * 处理业务消息
     */
    private void handleBusinessMessage(ChannelHandlerContext ctx, JSONObject messageObj) {
        String appUserId = userConnectionManager.getUserIdByChannelId(ctx.channel().id().asLongText());
        if (appUserId != null) {
            log.info("收到用户 {} 的业务消息: {}", appUserId, messageObj.toJSONString());
            // 回复消息
            JSONObject response = new JSONObject();
            response.put("type", "message_response");
            response.put("appUserId", appUserId);
            response.put("originalMessage", messageObj);
            response.put("serverTime", LocalDateTime.now().toString());
            ctx.channel().writeAndFlush(new TextWebSocketFrame(response.toJSONString()));
        } else {
            log.warn("收到未认证连接的业务消息: {}", messageObj.toJSONString());
            // 要求先认证
            JSONObject response = new JSONObject();
            response.put("type", "auth_required");
            response.put("message", "请先进行用户认证");
            response.put("timestamp", LocalDateTime.now().toString());
            ctx.channel().writeAndFlush(new TextWebSocketFrame(response.toJSONString()));
        }
    }
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        // id 表示唯一的值 LongText是唯一的
        log.info("handlerAdded 被调用: {}", ctx.channel().id().asLongText());
        // shortText 可能会重复
        log.info("handlerAdded 被调用: {}", ctx.channel().id().asShortText());
        // 注意:用户连接管理在认证时添加,这里不添加
    }
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        // id 表示唯一的值 LongText是唯一的
        log.info("handlerRemoved 被调用: {}", ctx.channel().id().asLongText());
        // shortText 可能会重复
        log.info("handlerRemoved 被调用: {}", ctx.channel().id().asShortText());
        // 从用户连接管理器中移除连接
        if (userConnectionManager != null) {
            userConnectionManager.removeUserConnection(ctx.channel());
        }
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("WebSocket连接异常: {}", cause.getMessage(), cause);
        ctx.channel().close();
    }
}
ruoyi-admin/src/main/java/com/ruoyi/web/webSocket/WebSocketUserConnectionManager.java
New file
@@ -0,0 +1,281 @@
package com.ruoyi.web.webSocket;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
 * 基于AppUser的WebSocket连接管理器
 * 支持按用户ID管理连接,一个用户可以有多个设备连接
 */
@Component
@Slf4j
public class WebSocketUserConnectionManager {
    /**
     * 存储用户的所有连接
     * Key: 用户ID (appUserId)
     * Value: 该用户的所有连接Channel列表
     */
    private static final ConcurrentHashMap<String, Set<Channel>> userConnections = new ConcurrentHashMap<>();
    /**
     * 存储连接与用户的映射关系
     * Key: ChannelId.asLongText()
     * Value: 用户ID (appUserId)
     */
    private static final ConcurrentHashMap<String, String> channelToUser = new ConcurrentHashMap<>();
    /**
     * 存储所有连接
     * Key: ChannelId.asLongText()
     * Value: Channel
     */
    private static final ConcurrentHashMap<String, Channel> allConnections = new ConcurrentHashMap<>();
    /**
     * 添加用户连接
     * @param appUserId 用户ID
     * @param channel WebSocket连接通道
     */
    public void addUserConnection(String appUserId, Channel channel) {
        String channelId = channel.id().asLongText();
        // 添加到用户连接映射
        userConnections.computeIfAbsent(appUserId, k -> ConcurrentHashMap.newKeySet())
                .add(channel);
        // 添加到连接用户映射
        channelToUser.put(channelId, appUserId);
        // 添加到所有连接
        allConnections.put(channelId, channel);
        log.info("用户连接已添加,用户ID: {}, ChannelId: {}, 该用户连接数: {}",
                appUserId, channelId, userConnections.get(appUserId).size());
    }
    /**
     * 移除用户连接
     * @param channel WebSocket连接通道
     */
    public void removeUserConnection(Channel channel) {
        String channelId = channel.id().asLongText();
        String appUserId = channelToUser.get(channelId);
        if (appUserId != null) {
            // 从用户连接中移除
            Set<Channel> userChannels = userConnections.get(appUserId);
            if (userChannels != null) {
                userChannels.remove(channel);
                // 如果用户没有连接了,删除用户映射
                if (userChannels.isEmpty()) {
                    userConnections.remove(appUserId);
                }
            }
            // 从连接用户映射中移除
            channelToUser.remove(channelId);
            log.info("用户连接已移除,用户ID: {}, ChannelId: {}, 该用户剩余连接数: {}",
                    appUserId, channelId, userChannels != null ? userChannels.size() : 0);
        }
        // 从所有连接中移除
        allConnections.remove(channelId);
    }
    /**
     * 向指定用户的所有设备发送消息
     * @param appUserId 用户ID
     * @param message 消息内容
     * @return 成功发送的设备数
     */
    public int sendMessageToUser(String appUserId, String message) {
        Set<Channel> userChannels = userConnections.get(appUserId);
        if (userChannels == null || userChannels.isEmpty()) {
            log.warn("用户 {} 没有活跃连接", appUserId);
            return 0;
        }
        int successCount = 0;
        for (Channel channel : userChannels) {
            if (channel != null && channel.isActive()) {
                try {
                    channel.writeAndFlush(new TextWebSocketFrame(message));
                    successCount++;
                } catch (Exception e) {
                    log.error("向用户 {} 发送消息失败,连接ID: {}, 错误: {}",
                            appUserId, channel.id().asLongText(), e.getMessage());
                }
            }
        }
        log.info("向用户 {} 发送消息完成,成功发送到 {} 个设备,消息内容: {}",
                appUserId, successCount, message);
        return successCount;
    }
    /**
     * 向指定连接发送消息
     * @param channelId 连接ID
     * @param message 消息内容
     * @return 是否发送成功
     */
    public boolean sendMessageToChannel(String channelId, String message) {
        Channel channel = allConnections.get(channelId);
        if (channel != null && channel.isActive()) {
            try {
                channel.writeAndFlush(new TextWebSocketFrame(message));
                log.info("消息已发送到连接: {}, 内容: {}", channelId, message);
                return true;
            } catch (Exception e) {
                log.error("发送消息失败,连接ID: {}, 错误: {}", channelId, e.getMessage());
                return false;
            }
        } else {
            log.warn("连接不存在或已关闭,ChannelId: {}", channelId);
            return false;
        }
    }
    /**
     * 向所有连接广播消息
     * @param message 消息内容
     * @return 成功发送的连接数
     */
    public int broadcastMessage(String message) {
        int successCount = 0;
        for (Channel channel : allConnections.values()) {
            if (channel != null && channel.isActive()) {
                try {
                    channel.writeAndFlush(new TextWebSocketFrame(message));
                    successCount++;
                } catch (Exception e) {
                    log.error("广播消息失败,连接ID: {}, 错误: {}",
                            channel.id().asLongText(), e.getMessage());
                }
            }
        }
        log.info("广播消息完成,成功发送到 {} 个连接,消息内容: {}", successCount, message);
        return successCount;
    }
    /**
     * 向多个用户发送消息
     * @param appUserIds 用户ID列表
     * @param message 消息内容
     * @return 成功发送的用户数
     */
    public int sendMessageToUsers(List<String> appUserIds, String message) {
        int successUserCount = 0;
        for (String appUserId : appUserIds) {
            int deviceCount = sendMessageToUser(appUserId, message);
            if (deviceCount > 0) {
                successUserCount++;
            }
        }
        log.info("向 {} 个用户发送消息完成,成功发送到 {} 个用户", appUserIds.size(), successUserCount);
        return successUserCount;
    }
    /**
     * 获取用户的所有连接
     * @param appUserId 用户ID
     * @return 连接列表
     */
    public Set<Channel> getUserConnections(String appUserId) {
        return userConnections.getOrDefault(appUserId, Collections.emptySet());
    }
    /**
     * 获取用户连接数
     * @param appUserId 用户ID
     * @return 连接数
     */
    public int getUserConnectionCount(String appUserId) {
        Set<Channel> userChannels = userConnections.get(appUserId);
        return userChannels != null ? userChannels.size() : 0;
    }
    /**
     * 获取总连接数
     * @return 总连接数
     */
    public int getTotalConnectionCount() {
        return allConnections.size();
    }
    /**
     * 获取在线用户数
     * @return 在线用户数
     */
    public int getOnlineUserCount() {
        return userConnections.size();
    }
    /**
     * 获取所有在线用户ID
     * @return 用户ID集合
     */
    public Set<String> getOnlineUserIds() {
        return userConnections.keySet();
    }
    /**
     * 检查用户是否在线
     * @param appUserId 用户ID
     * @return 是否在线
     */
    public boolean isUserOnline(String appUserId) {
        Set<Channel> userChannels = userConnections.get(appUserId);
        if (userChannels == null || userChannels.isEmpty()) {
            return false;
        }
        // 检查是否有活跃连接
        return userChannels.stream().anyMatch(Channel::isActive);
    }
    /**
     * 检查连接是否存在
     * @param channelId 连接ID
     * @return 是否存在
     */
    public boolean isConnectionExists(String channelId) {
        Channel channel = allConnections.get(channelId);
        return channel != null && channel.isActive();
    }
    /**
     * 根据连接ID获取用户ID
     * @param channelId 连接ID
     * @return 用户ID
     */
    public String getUserIdByChannelId(String channelId) {
        return channelToUser.get(channelId);
    }
    /**
     * 获取连接统计信息
     * @return 统计信息
     */
    public Map<String, Object> getConnectionStats() {
        Map<String, Object> stats = new HashMap<>();
        stats.put("totalConnections", getTotalConnectionCount());
        stats.put("onlineUsers", getOnlineUserCount());
        stats.put("onlineUserIds", getOnlineUserIds());
        // 统计每个用户的连接数
        Map<String, Integer> userConnectionCounts = new HashMap<>();
        for (Map.Entry<String, Set<Channel>> entry : userConnections.entrySet()) {
            userConnectionCounts.put(entry.getKey(), entry.getValue().size());
        }
        stats.put("userConnectionCounts", userConnectionCounts);
        return stats;
    }
}
ruoyi-admin/src/main/java/com/ruoyi/web/webSocket/WebSocketUserService.java
New file
@@ -0,0 +1,286 @@
package com.ruoyi.web.webSocket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.*;
/**
 * 基于AppUser的WebSocket服务类
 * 提供按用户发送消息的接口
 */
@Service
@Slf4j
public class WebSocketUserService {
    @Autowired
    private WebSocketUserConnectionManager userConnectionManager;
    /**
     * 向指定用户的所有设备发送消息
     * @param appUserId 用户ID
     * @param message 消息内容
     * @return 成功发送的设备数
     */
    public int sendMessageToUser(String appUserId, String message) {
        log.info("准备向用户 {} 发送消息: {}", appUserId, message);
        return userConnectionManager.sendMessageToUser(appUserId, message);
    }
    /**
     * 向指定用户发送JSON格式消息
     * @param appUserId 用户ID
     * @param messageType 消息类型
     * @param data 消息数据
     * @return 成功发送的设备数
     */
    public int sendJsonMessageToUser(String appUserId, String messageType, Object data) {
        String jsonMessage = buildJsonMessage(messageType, data, appUserId);
        return sendMessageToUser(appUserId, jsonMessage);
    }
    /**
     * 向多个用户发送消息
     * @param appUserIds 用户ID列表
     * @param message 消息内容
     * @return 成功发送的用户数
     */
    public int sendMessageToUsers(List<String> appUserIds, String message) {
        log.info("准备向 {} 个用户发送消息: {}", appUserIds.size(), message);
        return userConnectionManager.sendMessageToUsers(appUserIds, message);
    }
    /**
     * 向多个用户发送JSON格式消息
     * @param appUserIds 用户ID列表
     * @param messageType 消息类型
     * @param data 消息数据
     * @return 成功发送的用户数
     */
    public int sendJsonMessageToUsers(List<String> appUserIds, String messageType, Object data) {
        String jsonMessage = buildJsonMessage(messageType, data, null);
        return sendMessageToUsers(appUserIds, jsonMessage);
    }
    /**
     * 向指定连接发送消息
     * @param channelId 连接ID
     * @param message 消息内容
     * @return 是否发送成功
     */
    public boolean sendMessageToChannel(String channelId, String message) {
        log.info("准备向连接 {} 发送消息: {}", channelId, message);
        return userConnectionManager.sendMessageToChannel(channelId, message);
    }
    /**
     * 广播消息到所有连接
     * @param message 消息内容
     * @return 成功发送的连接数
     */
    public int broadcastMessage(String message) {
        log.info("准备广播消息: {}", message);
        return userConnectionManager.broadcastMessage(message);
    }
    /**
     * 广播JSON格式消息到所有连接
     * @param messageType 消息类型
     * @param data 消息数据
     * @return 成功发送的连接数
     */
    public int broadcastJsonMessage(String messageType, Object data) {
        String jsonMessage = buildJsonMessage(messageType, data, null);
        return broadcastMessage(jsonMessage);
    }
    /**
     * 发送系统通知消息
     * @param message 通知内容
     * @return 成功发送的连接数
     */
    public int sendSystemNotification(String message) {
        Map<String, Object> data = new HashMap<>();
        data.put("message", message);
        return sendJsonMessageToAllUsers("system_notification", data);
    }
    /**
     * 向所有在线用户发送系统通知
     * @param messageType 消息类型
     * @param data 消息数据
     * @return 成功发送的连接数
     */
    public int sendJsonMessageToAllUsers(String messageType, Object data) {
        Set<String> onlineUserIds = userConnectionManager.getOnlineUserIds();
        if (onlineUserIds.isEmpty()) {
            log.warn("没有在线用户,无法发送消息");
            return 0;
        }
        return sendJsonMessageToUsers(new ArrayList<>(onlineUserIds), messageType, data);
    }
    /**
     * 发送任务分配消息
     * @param taskId 任务ID
     * @param taskName 任务名称
     * @param assignee 分配人
     * @param assigneeUserId 分配人用户ID
     * @return 成功发送的连接数
     */
    public int sendTaskAssignment(String taskId, String taskName, String assignee, String assigneeUserId) {
        Map<String, Object> taskData = new HashMap<>();
        taskData.put("taskId", taskId);
        taskData.put("taskName", taskName);
        taskData.put("assignee", assignee);
        taskData.put("assigneeUserId", assigneeUserId);
        taskData.put("timestamp", System.currentTimeMillis());
        if (assigneeUserId != null) {
            // 发送给指定用户
            return sendJsonMessageToUser(assigneeUserId, "task_assignment", taskData);
        } else {
            // 广播给所有用户
            return broadcastJsonMessage("task_assignment", taskData);
        }
    }
    /**
     * 发送状态更新消息
     * @param status 状态
     * @param details 详细信息
     * @param targetUserId 目标用户ID,为null时广播
     * @return 成功发送的连接数
     */
    public int sendStatusUpdate(String status, String details, String targetUserId) {
        Map<String, Object> statusData = new HashMap<>();
        statusData.put("status", status);
        statusData.put("details", details);
        statusData.put("timestamp", System.currentTimeMillis());
        if (targetUserId != null) {
            // 发送给指定用户
            return sendJsonMessageToUser(targetUserId, "status_update", statusData);
        } else {
            // 广播给所有用户
            return broadcastJsonMessage("status_update", statusData);
        }
    }
    /**
     * 发送任务状态更新消息
     * @param taskId 任务ID
     * @param taskStatus 任务状态
     * @param message 消息内容
     * @param targetUserId 目标用户ID,为null时广播
     * @return 成功发送的连接数
     */
    public int sendTaskStatusUpdate(String taskId, String taskStatus, String message, String targetUserId) {
        Map<String, Object> taskStatusData = new HashMap<>();
        taskStatusData.put("taskId", taskId);
        taskStatusData.put("taskStatus", taskStatus);
        taskStatusData.put("message", message);
        taskStatusData.put("timestamp", System.currentTimeMillis());
        if (targetUserId != null) {
            // 发送给指定用户
            return sendJsonMessageToUser(targetUserId, "task_status_update", taskStatusData);
        } else {
            // 广播给所有用户
            return broadcastJsonMessage("task_status_update", taskStatusData);
        }
    }
    /**
     * 发送位置更新消息
     * @param userId 用户ID
     * @param longitude 经度
     * @param latitude 纬度
     * @param address 地址
     * @return 成功发送的连接数
     */
    public int sendLocationUpdate(String userId, String longitude, String latitude, String address) {
        Map<String, Object> locationData = new HashMap<>();
        locationData.put("longitude", longitude);
        locationData.put("latitude", latitude);
        locationData.put("address", address != null ? address : "");
        locationData.put("timestamp", System.currentTimeMillis());
        return sendJsonMessageToUser(userId, "location_update", locationData);
    }
    /**
     * 获取用户连接信息
     * @param appUserId 用户ID
     * @return 连接信息
     */
    public Map<String, Object> getUserConnectionInfo(String appUserId) {
        Map<String, Object> info = new HashMap<>();
        info.put("appUserId", appUserId);
        info.put("isOnline", userConnectionManager.isUserOnline(appUserId));
        info.put("connectionCount", userConnectionManager.getUserConnectionCount(appUserId));
        return info;
    }
    /**
     * 获取所有连接统计信息
     * @return 统计信息
     */
    public Map<String, Object> getConnectionStats() {
        return userConnectionManager.getConnectionStats();
    }
    /**
     * 检查用户是否在线
     * @param appUserId 用户ID
     * @return 是否在线
     */
    public boolean isUserOnline(String appUserId) {
        return userConnectionManager.isUserOnline(appUserId);
    }
    /**
     * 获取在线用户列表
     * @return 在线用户ID列表
     */
    public Set<String> getOnlineUserIds() {
        return userConnectionManager.getOnlineUserIds();
    }
    /**
     * 获取总连接数
     * @return 总连接数
     */
    public int getTotalConnectionCount() {
        return userConnectionManager.getTotalConnectionCount();
    }
    /**
     * 获取在线用户数
     * @return 在线用户数
     */
    public int getOnlineUserCount() {
        return userConnectionManager.getOnlineUserCount();
    }
    /**
     * 构建JSON格式消息
     * @param messageType 消息类型
     * @param data 消息数据
     * @param targetUserId 目标用户ID
     * @return JSON字符串
     */
    private String buildJsonMessage(String messageType, Object data, String targetUserId) {
        Map<String, Object> message = new HashMap<>();
        message.put("type", messageType);
        message.put("data", data);
        message.put("timestamp", System.currentTimeMillis());
        if (targetUserId != null) {
            message.put("targetUserId", targetUserId);
        }
        return com.alibaba.fastjson.JSON.toJSONString(message);
    }
}
ruoyi-applet/pom.xml
@@ -155,6 +155,21 @@
            <version>2.0.10</version>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <build>
ruoyi-applet/src/main/java/com/ruoyi/RuoYiAppletApplication.java
@@ -14,7 +14,6 @@
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import java.net.InetAddress;
import java.net.UnknownHostException;
/**
 * 启动程序
@@ -32,15 +31,14 @@
        ConfigurableApplicationContext application = SpringApplication.run(RuoYiAppletApplication.class, args);
        try {
        Environment env = application.getEnvironment();
        log.info("\n----------------------------------------------------------\n\t" +
            Environment env = application.getEnvironment();
            log.info("\n----------------------------------------------------------\n\t" +
                        "应用 '{}' 运行成功! 访问连接:\n\t" +
                        "Swagger文档: \t\thttp://{}:{}/doc.html\n" +
                        "----------------------------------------------------------",
                env.getProperty("spring.application.name", "后台"),
                env.getProperty("spring.application.name", "app"),
                InetAddress.getLocalHost().getHostAddress(),
                env.getProperty("server.port", "8081"));
                env.getProperty("server.port", "9099"));
        }catch (Exception e){
            e.printStackTrace();
        }
ruoyi-applet/src/main/java/com/ruoyi/web/controller/api/TMissionController.java
@@ -11,7 +11,6 @@
import com.ruoyi.system.dto.MissionIdDto;
import com.ruoyi.system.dto.TMissionUserDto;
import com.ruoyi.system.model.*;
import com.ruoyi.system.query.TEquipmentQuery;
import com.ruoyi.system.service.*;
import com.ruoyi.system.vo.MissionTotalVo;
import com.ruoyi.system.vo.TotalHistoryVo;
@@ -24,9 +23,7 @@
import org.springframework.web.bind.annotation.RestController;
import java.time.Duration;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.Month;
import java.util.List;
/**
@@ -246,7 +243,7 @@
    @ApiOperation(value = "发送任务", response = TEquipment.class)
    @PostMapping(value = "/open/t-mission/sendMission")
    public R<?> sendMission() {
        WebSocketServerSolo.sendInfo("{\"missionId\":1}", "1");
//        WebSocketServerSolo.sendInfo("{\"missionId\":1}", "1");
        return R.ok();
    }
ruoyi-applet/src/main/java/com/ruoyi/web/controller/api/WebSocketConfig.java
File was deleted
ruoyi-applet/src/main/java/com/ruoyi/web/controller/api/WebSocketServerSolo.java
File was deleted
ruoyi-applet/src/test/java/com/ruoyi/web/controller/webSocket/SimpleWebSocketClient.java
New file
@@ -0,0 +1,144 @@
package com.ruoyi.web.controller.webSocket;
import lombok.extern.slf4j.Slf4j;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
 * 简单的WebSocket客户端实现
 * 用于测试WebSocket服务端功能
 */
@Slf4j
public class SimpleWebSocketClient implements WebSocketClientTest.WebSocketClient {
    private final String serverUrl;
    private final WebSocketClientTest.WebSocketClient.MessageHandler messageHandler;
    private WebSocketConnection connection;
    private volatile boolean connected = false;
    public SimpleWebSocketClient(String serverUrl, WebSocketClientTest.WebSocketClient.MessageHandler messageHandler) {
        this.serverUrl = serverUrl;
        this.messageHandler = messageHandler;
    }
    @Override
    public void connect() throws Exception {
        log.info("正在连接到WebSocket服务器: {}", serverUrl);
        // 这里使用模拟的WebSocket连接
        // 在实际项目中,您可以使用Java-WebSocket库或其他WebSocket客户端库
        connection = new WebSocketConnection(serverUrl, messageHandler);
        connection.connect();
        // 等待连接建立
        int retryCount = 0;
        while (!connected && retryCount < 10) {
            Thread.sleep(500);
            retryCount++;
        }
        if (!connected) {
            throw new RuntimeException("连接WebSocket服务器超时");
        }
    }
    @Override
    public void send(String message) {
        if (connection != null && connected) {
            connection.send(message);
        } else {
            log.warn("WebSocket未连接,无法发送消息: {}", message);
        }
    }
    @Override
    public void close() {
        if (connection != null) {
            connection.close();
            connected = false;
        }
    }
    /**
     * 模拟的WebSocket连接类
     */
    private static class WebSocketConnection {
        private final String serverUrl;
        private final WebSocketClientTest.WebSocketClient.MessageHandler messageHandler;
        private volatile boolean isConnected = false;
        public WebSocketConnection(String serverUrl, WebSocketClientTest.WebSocketClient.MessageHandler messageHandler) {
            this.serverUrl = serverUrl;
            this.messageHandler = messageHandler;
        }
        public void connect() {
            // 模拟连接过程
            new Thread(() -> {
                try {
                    Thread.sleep(1000); // 模拟连接延迟
                    isConnected = true;
                    log.info("WebSocket连接已建立");
                    messageHandler.onOpen();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    messageHandler.onError(e);
                }
            }).start();
        }
        public void send(String message) {
            if (isConnected) {
                log.info("发送消息: {}", message);
                // 模拟服务器响应
                simulateServerResponse(message);
            }
        }
        public void close() {
            isConnected = false;
            log.info("WebSocket连接已关闭");
            messageHandler.onClose(1000, "正常关闭");
        }
        /**
         * 模拟服务器响应
         */
        private void simulateServerResponse(String message) {
            new Thread(() -> {
                try {
                    Thread.sleep(100); // 模拟网络延迟
                    // 解析消息类型并模拟响应
                    if (message.contains("\"type\":\"auth\"")) {
                        if (message.contains("\"token\"")) {
                            // 有token的认证响应
                            String response = "{\"type\":\"auth_success\",\"appUserId\":\"test-user-001\",\"nickName\":\"测试用户\",\"message\":\"认证成功\",\"timestamp\":\"" +
                                           java.time.LocalDateTime.now() + "\"}";
                            messageHandler.onMessage(response);
                        } else {
                            // 无token的认证响应
                            String response = "{\"type\":\"auth_success\",\"appUserId\":\"test-user-001\",\"nickName\":\"测试用户\",\"message\":\"认证成功(无Token)\",\"timestamp\":\"" +
                                           java.time.LocalDateTime.now() + "\"}";
                            messageHandler.onMessage(response);
                        }
                    } else if (message.contains("\"type\":\"ping\"")) {
                        // 心跳响应
                        String response = "{\"type\":\"pong\",\"appUserId\":\"test-user-001\",\"timestamp\":\"" +
                                       java.time.LocalDateTime.now() + "\"}";
                        messageHandler.onMessage(response);
                    } else if (message.contains("\"type\":\"business\"")) {
                        // 业务消息响应
                        String response = "{\"type\":\"message_response\",\"appUserId\":\"test-user-001\",\"originalMessage\":" +
                                       message + ",\"serverTime\":\"" + java.time.LocalDateTime.now() + "\"}";
                        messageHandler.onMessage(response);
                    }
                } catch (Exception e) {
                    messageHandler.onError(e);
                }
            }).start();
        }
    }
}
ruoyi-applet/src/test/java/com/ruoyi/web/controller/webSocket/WebSocketClientTest.java
New file
@@ -0,0 +1,312 @@
package com.ruoyi.web.controller.webSocket;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
 * WebSocket客户端测试类
 * 演示如何连接WebSocket服务端并进行认证
 */
@SpringBootTest
@Slf4j
public class WebSocketClientTest {
    private static final String WEBSOCKET_URL = "ws://127.0.0.1:8888/hello";
    private static final String TEST_USER_ID = "1964941649976532994";
    private static final String TEST_TOKEN = "eyJhbGciOiJIUzUxMiJ9.eyJsb2dpbl91c2VyX2FwcGxldF9rZXkiOiJhMzkyMTdmZS1hZmRiLTQ1YTAtYjkwMy01MGViNjc4ZGJlNzAifQ.SaLnyIsAQ9lnSivaXPaO71gReSmx-R_tjrS3auKJqbXKe2SiLr7MZiNetdjmwvE7HSzjxs1yTqVW9mXGcAuWHw";
    /**
     * 测试WebSocket连接和认证
     */
    @Test
    public void testWebSocketConnection() throws Exception {
        CountDownLatch latch = new CountDownLatch(1);
        // 创建WebSocket客户端
        SimpleWebSocketClient client = new SimpleWebSocketClient(WEBSOCKET_URL, new WebSocketClient.MessageHandler() {
            @Override
            public void onOpen() {
                log.info("WebSocket连接已建立");
                // 发送认证消息
                sendAuthMessage();
            }
            @Override
            public void onMessage(String message) {
                log.info("收到服务器消息: {}", message);
                try {
                    JSONObject messageObj = JSON.parseObject(message);
                    String messageType = messageObj.getString("type");
                    switch (messageType) {
                        case "auth_success":
                            log.info("认证成功: {}", messageObj.getString("message"));
                            // 认证成功后发送业务消息
                            sendBusinessMessage();
                            break;
                        case "auth_failed":
                            log.error("认证失败: {}", messageObj.getString("message"));
                            break;
                        case "pong":
                            log.info("收到心跳响应");
                            break;
                        case "message_response":
                            log.info("收到业务消息响应: {}", messageObj.getString("originalMessage"));
                            break;
                        default:
                            log.info("收到其他消息: {}", message);
                    }
                } catch (Exception e) {
                    log.error("解析消息失败: {}", e.getMessage());
                }
            }
            @Override
            public void onClose(int code, String reason) {
                log.info("WebSocket连接已关闭: code={}, reason={}", code, reason);
                latch.countDown();
            }
            @Override
            public void onError(Exception e) {
                log.error("WebSocket连接错误: {}", e.getMessage(), e);
                latch.countDown();
            }
        });
        // 连接WebSocket
        client.connect();
        // 等待连接关闭
        latch.await(30, TimeUnit.SECONDS);
        // 关闭连接
        client.close();
    }
    /**
     * 测试无Token认证
     */
    @Test
    public void testWebSocketConnectionWithoutToken() throws Exception {
        CountDownLatch latch = new CountDownLatch(1);
        SimpleWebSocketClient client = new SimpleWebSocketClient(WEBSOCKET_URL, new WebSocketClient.MessageHandler() {
            @Override
            public void onOpen() {
                log.info("WebSocket连接已建立(无Token)");
                sendAuthMessageWithoutToken();
            }
            @Override
            public void onMessage(String message) {
                log.info("收到服务器消息: {}", message);
                handleMessage(message);
            }
            @Override
            public void onClose(int code, String reason) {
                log.info("WebSocket连接已关闭: code={}, reason={}", code, reason);
                latch.countDown();
            }
            @Override
            public void onError(Exception e) {
                log.error("WebSocket连接错误: {}", e.getMessage(), e);
                latch.countDown();
            }
        });
        client.connect();
        latch.await(10, TimeUnit.SECONDS);
        client.close();
    }
    /**
     * 测试心跳功能
     */
    @Test
    public void testWebSocketHeartbeat() throws Exception {
        CountDownLatch latch = new CountDownLatch(1);
        SimpleWebSocketClient client = new SimpleWebSocketClient(WEBSOCKET_URL, new WebSocketClient.MessageHandler() {
            @Override
            public void onOpen() {
                log.info("WebSocket连接已建立");
                sendAuthMessage();
            }
            @Override
            public void onMessage(String message) {
                log.info("收到服务器消息: {}", message);
                try {
                    JSONObject messageObj = JSON.parseObject(message);
                    String messageType = messageObj.getString("type");
                    if ("auth_success".equals(messageType)) {
                        log.info("认证成功,开始发送心跳");
                        // 认证成功后开始发送心跳
                        startHeartbeat();
                    } else if ("pong".equals(messageType)) {
                        log.info("收到心跳响应");
                    }
                } catch (Exception e) {
                    log.error("解析消息失败: {}", e.getMessage());
                }
            }
            @Override
            public void onClose(int code, String reason) {
                log.info("WebSocket连接已关闭: code={}, reason={}", code, reason);
                latch.countDown();
            }
            @Override
            public void onError(Exception e) {
                log.error("WebSocket连接错误: {}", e.getMessage(), e);
                latch.countDown();
            }
        });
        client.connect();
        latch.await(30, TimeUnit.SECONDS);
        client.close();
    }
    /**
     * 发送认证消息
     */
    private void sendAuthMessage() {
        JSONObject authMessage = new JSONObject();
        authMessage.put("type", "auth");
        authMessage.put("appUserId", TEST_USER_ID);
        authMessage.put("token", TEST_TOKEN);
        String message = authMessage.toJSONString();
        log.info("发送认证消息: {}", message);
        // 这里需要实际的WebSocket客户端实现
    }
    public static void main(String[] args) {
        // 发送认证消息: {"deviceType":"test","type":"auth","appUserId":"1964941649976532994","deviceId":"test-device-1761294213830","token":"eyJhbGciOiJIUzUxMiJ9.eyJsb2dpbl91c2VyX2FwcGxldF9rZXkiOiJhMzkyMTdmZS1hZmRiLTQ1YTAtYjkwMy01MGViNjc4ZGJlNzAifQ.SaLnyIsAQ9lnSivaXPaO71gReSmx-R_tjrS3auKJqbXKe2SiLr7MZiNetdjmwvE7HSzjxs1yTqVW9mXGcAuWHw"}
        // 发送认证消息: {"type":"auth","appUserId":"1964941649976532994","token":"eyJhbGciOiJIUzUxMiJ9.eyJsb2dpbl91c2VyX2FwcGxldF9rZXkiOiJhMzkyMTdmZS1hZmRiLTQ1YTAtYjkwMy01MGViNjc4ZGJlNzAifQ.SaLnyIsAQ9lnSivaXPaO71gReSmx-R_tjrS3auKJqbXKe2SiLr7MZiNetdjmwvE7HSzjxs1yTqVW9mXGcAuWHw"}
        // 发送心跳消息: {"type":"ping","timestamp":1761294280878}
        // 发送业务消息: {"type":"business","content":"这是一条测试业务消息","timestamp":1761294280878}
    }
    /**
     * 发送无Token认证消息
     */
    private void sendAuthMessageWithoutToken() {
        JSONObject authMessage = new JSONObject();
        authMessage.put("type", "auth");
        authMessage.put("appUserId", TEST_USER_ID);
        String message = authMessage.toJSONString();
        log.info("发送无Token认证消息: {}", message);
    }
    /**
     * 发送业务消息
     */
    private void sendBusinessMessage() {
        JSONObject businessMessage = new JSONObject();
        businessMessage.put("type", "business");
        businessMessage.put("content", "这是一条测试业务消息");
        businessMessage.put("timestamp", System.currentTimeMillis());
        String message = businessMessage.toJSONString();
        log.info("发送业务消息: {}", message);
    }
    /**
     * 发送心跳消息
     */
    private void sendPingMessage() {
        JSONObject pingMessage = new JSONObject();
        pingMessage.put("type", "ping");
        pingMessage.put("timestamp", System.currentTimeMillis());
        String message = pingMessage.toJSONString();
        log.info("发送心跳消息: {}", message);
    }
    /**
     * 开始心跳
     */
    private void startHeartbeat() {
        // 每5秒发送一次心跳
        new Thread(() -> {
            try {
                for (int i = 0; i < 6; i++) {
                    Thread.sleep(5000);
                    sendPingMessage();
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).start();
    }
    /**
     * 处理服务器消息
     */
    private void handleMessage(String message) {
        try {
            JSONObject messageObj = JSON.parseObject(message);
            String messageType = messageObj.getString("type");
            switch (messageType) {
                case "auth_success":
                    log.info("认证成功: {}", messageObj.getString("message"));
                    break;
                case "auth_failed":
                    log.error("认证失败: {}", messageObj.getString("message"));
                    break;
                case "pong":
                    log.info("收到心跳响应");
                    break;
                case "message_response":
                    log.info("收到业务消息响应");
                    break;
                case "auth_required":
                    log.warn("需要重新认证: {}", messageObj.getString("message"));
                    break;
                default:
                    log.info("收到其他消息: {}", message);
            }
        } catch (Exception e) {
            log.error("解析消息失败: {}", e.getMessage());
        }
    }
    /**
     * WebSocket客户端接口
     */
    public interface WebSocketClient {
        void connect() throws Exception;
        void send(String message);
        void close();
        interface MessageHandler {
            void onOpen();
            void onMessage(String message);
            void onClose(int code, String reason);
            void onError(Exception e);
        }
    }
}