| | |
| | | <artifactId>dysmsapi20170525</artifactId> |
| | | <version>2.0.10</version> |
| | | </dependency> |
| | | |
| | | <dependency> |
| | | <groupId>io.netty</groupId> |
| | | <artifactId>netty-all</artifactId> |
| | | </dependency> |
| | | </dependencies> |
| | | |
| | | <build> |
| New file |
| | |
| | | package com.ruoyi.web.webSocket; |
| | | |
| | | import com.ruoyi.common.constant.Constants; |
| | | import com.ruoyi.common.core.domain.model.LoginUserApplet; |
| | | import com.ruoyi.common.core.redis.RedisCache; |
| | | import com.ruoyi.system.model.TAppUser; |
| | | import com.ruoyi.system.service.TAppUserService; |
| | | import io.jsonwebtoken.Claims; |
| | | import io.jsonwebtoken.Jwts; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.beans.factory.annotation.Value; |
| | | import org.springframework.stereotype.Component; |
| | | import org.springframework.util.StringUtils; |
| | | |
| | | import java.util.Map; |
| | | |
| | | /** |
| | | * WebSocket认证处理器 |
| | | * 处理WebSocket连接的用户认证逻辑 |
| | | */ |
| | | @Component |
| | | @Slf4j |
| | | public class WebSocketAuthHandler { |
| | | |
| | | @Autowired |
| | | private TAppUserService appUserService; |
| | | |
| | | @Autowired |
| | | private RedisCache redisCache; |
| | | |
| | | // 令牌秘钥 |
| | | @Value("${token.secret}") |
| | | private String secret; |
| | | |
| | | /** |
| | | * 验证用户认证信息 |
| | | * @param appUserId 用户ID |
| | | * @param token 认证令牌(可选) |
| | | * @return 认证结果 |
| | | */ |
| | | public AuthResult authenticateUser(String appUserId, String token) { |
| | | try { |
| | | if (appUserId == null || appUserId.trim().isEmpty()) { |
| | | return AuthResult.failure("用户ID不能为空"); |
| | | } |
| | | |
| | | // 查询用户信息 |
| | | TAppUser appUser = appUserService.getById(appUserId); |
| | | if (appUser == null) { |
| | | return AuthResult.failure("用户不存在"); |
| | | } |
| | | |
| | | // 检查用户状态 |
| | | if (appUser.getStatus() == null || appUser.getStatus() != 1) { |
| | | return AuthResult.failure("用户已被禁用"); |
| | | } |
| | | |
| | | // 检查用户审核状态 |
| | | if (appUser.getState() == null || appUser.getState() != 1) { |
| | | return AuthResult.failure("用户未通过审核"); |
| | | } |
| | | |
| | | // 这里可以添加token验证逻辑 |
| | | if (token != null && !token.trim().isEmpty()) { |
| | | // 验证token的有效性 |
| | | if (!validateToken(appUserId, token)) { |
| | | return AuthResult.failure("认证令牌无效"); |
| | | } |
| | | } |
| | | |
| | | return AuthResult.success(appUser); |
| | | } catch (Exception e) { |
| | | log.error("用户认证失败,用户ID: {}, 错误: {}", appUserId, e.getMessage(), e); |
| | | return AuthResult.failure("认证过程中发生错误: " + e.getMessage()); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 验证token |
| | | * @param appUserId 用户ID |
| | | * @param token 令牌 |
| | | * @return 是否有效 |
| | | */ |
| | | private boolean validateToken(String appUserId, String token) { |
| | | try { |
| | | if (!StringUtils.hasText(token)) { |
| | | log.debug("用户 {} 未提供token,跳过token验证", appUserId); |
| | | return true; // 如果没有提供token,允许通过(可以根据业务需求调整) |
| | | } |
| | | |
| | | // 移除Bearer前缀 |
| | | if (token.startsWith(Constants.TOKEN_PREFIX)) { |
| | | token = token.replace(Constants.TOKEN_PREFIX, ""); |
| | | } |
| | | |
| | | // 解析JWT token |
| | | Claims claims = parseToken(token); |
| | | if (claims == null) { |
| | | log.warn("用户 {} 的token解析失败", appUserId); |
| | | return false; |
| | | } |
| | | |
| | | // 从token中获取用户标识 |
| | | String uuid = (String) claims.get(Constants.LOGIN_USER_APPLET_KEY); |
| | | if (!StringUtils.hasText(uuid)) { |
| | | log.warn("用户 {} 的token中未找到用户标识", appUserId); |
| | | return false; |
| | | } |
| | | |
| | | // 从Redis中获取用户登录信息 |
| | | String userKey = getTokenKey(uuid); |
| | | LoginUserApplet loginUser = redisCache.getCacheObject(userKey); |
| | | |
| | | if (loginUser == null) { |
| | | log.warn("用户 {} 的登录信息已过期或不存在", appUserId); |
| | | return false; |
| | | } |
| | | |
| | | // 验证用户ID是否匹配 |
| | | if (!appUserId.equals(loginUser.getUserId())) { |
| | | log.warn("用户 {} 的token与提供的用户ID不匹配", appUserId); |
| | | return false; |
| | | } |
| | | |
| | | // 验证token是否过期 |
| | | long currentTime = System.currentTimeMillis(); |
| | | if (loginUser.getExpireTime() != null && loginUser.getExpireTime() < currentTime) { |
| | | log.warn("用户 {} 的token已过期", appUserId); |
| | | return false; |
| | | } |
| | | |
| | | log.debug("用户 {} 的token验证成功", appUserId); |
| | | return true; |
| | | |
| | | } catch (Exception e) { |
| | | log.error("验证用户 {} 的token时发生错误: {}", appUserId, e.getMessage(), e); |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 解析JWT token |
| | | * @param token JWT token |
| | | * @return Claims对象 |
| | | */ |
| | | private Claims parseToken(String token) { |
| | | try { |
| | | return Jwts.parser() |
| | | .setSigningKey(secret) |
| | | .parseClaimsJws(token) |
| | | .getBody(); |
| | | } catch (Exception e) { |
| | | log.error("解析token失败: {}", e.getMessage()); |
| | | return null; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 获取token在Redis中的key |
| | | * @param uuid 用户唯一标识 |
| | | * @return Redis key |
| | | */ |
| | | private String getTokenKey(String uuid) { |
| | | return com.ruoyi.common.constant.CacheConstants.LOGIN_TOKEN_KEY + uuid; |
| | | } |
| | | |
| | | /** |
| | | * 认证结果类 |
| | | */ |
| | | public static class AuthResult { |
| | | private final boolean success; |
| | | private final String message; |
| | | private final TAppUser appUser; |
| | | |
| | | private AuthResult(boolean success, String message, TAppUser appUser) { |
| | | this.success = success; |
| | | this.message = message; |
| | | this.appUser = appUser; |
| | | } |
| | | |
| | | public static AuthResult success(TAppUser appUser) { |
| | | return new AuthResult(true, "认证成功", appUser); |
| | | } |
| | | |
| | | public static AuthResult failure(String message) { |
| | | return new AuthResult(false, message, null); |
| | | } |
| | | |
| | | public boolean isSuccess() { |
| | | return success; |
| | | } |
| | | |
| | | public String getMessage() { |
| | | return message; |
| | | } |
| | | |
| | | public TAppUser getAppUser() { |
| | | return appUser; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 从认证消息中提取用户信息 |
| | | * @param messageObj 消息对象 |
| | | * @return 用户认证信息 |
| | | */ |
| | | public UserAuthInfo extractAuthInfo(Map<String, Object> messageObj) { |
| | | String appUserId = (String) messageObj.get("appUserId"); |
| | | String token = (String) messageObj.get("token"); |
| | | String deviceId = (String) messageObj.get("deviceId"); |
| | | String deviceType = (String) messageObj.get("deviceType"); |
| | | |
| | | return new UserAuthInfo(appUserId, token, deviceId, deviceType); |
| | | } |
| | | |
| | | /** |
| | | * 用户认证信息类 |
| | | */ |
| | | public static class UserAuthInfo { |
| | | private final String appUserId; |
| | | private final String token; |
| | | private final String deviceId; |
| | | private final String deviceType; |
| | | |
| | | public UserAuthInfo(String appUserId, String token, String deviceId, String deviceType) { |
| | | this.appUserId = appUserId; |
| | | this.token = token; |
| | | this.deviceId = deviceId; |
| | | this.deviceType = deviceType; |
| | | } |
| | | |
| | | public String getAppUserId() { |
| | | return appUserId; |
| | | } |
| | | |
| | | public String getToken() { |
| | | return token; |
| | | } |
| | | |
| | | public String getDeviceId() { |
| | | return deviceId; |
| | | } |
| | | |
| | | public String getDeviceType() { |
| | | return deviceType; |
| | | } |
| | | } |
| | | } |
| New file |
| | |
| | | package com.ruoyi.web.webSocket; |
| | | |
| | | import com.ruoyi.common.core.controller.BaseController; |
| | | import com.ruoyi.common.core.domain.R; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.web.bind.annotation.*; |
| | | |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.Set; |
| | | |
| | | /** |
| | | * WebSocket控制器 |
| | | * 提供REST接口用于触发服务端主动发送消息 |
| | | * 支持基于AppUser的消息发送 |
| | | */ |
| | | @RestController |
| | | @RequestMapping("/open/websocket") |
| | | @Slf4j |
| | | public class WebSocketController extends BaseController { |
| | | |
| | | |
| | | @Autowired |
| | | private WebSocketUserService webSocketUserService; |
| | | |
| | | // ==================== 基于AppUser的WebSocket接口 ==================== |
| | | |
| | | /** |
| | | * 向指定用户发送消息 |
| | | * @param appUserId 用户ID |
| | | * @param request 请求参数 |
| | | * @return 操作结果 |
| | | */ |
| | | @PostMapping("/user/send/{appUserId}") |
| | | public R<String> sendMessageToUser(@PathVariable String appUserId, @RequestBody Map<String, String> request) { |
| | | try { |
| | | String message = request.get("message"); |
| | | if (message == null || message.trim().isEmpty()) { |
| | | return R.fail("消息内容不能为空"); |
| | | } |
| | | |
| | | int deviceCount = webSocketUserService.sendMessageToUser(appUserId, message); |
| | | if (deviceCount > 0) { |
| | | return R.ok("消息发送成功,发送到 " + deviceCount + " 个设备"); |
| | | } else { |
| | | return R.fail("用户不在线或发送失败"); |
| | | } |
| | | } catch (Exception e) { |
| | | log.error("向用户发送消息失败", e); |
| | | return R.fail("发送消息失败: " + e.getMessage()); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 向指定用户发送JSON格式消息 |
| | | * @param appUserId 用户ID |
| | | * @param request 请求参数 |
| | | * @return 操作结果 |
| | | */ |
| | | @PostMapping("/user/send-json/{appUserId}") |
| | | public R<String> sendJsonMessageToUser(@PathVariable String appUserId, @RequestBody Map<String, Object> request) { |
| | | try { |
| | | String messageType = (String) request.get("messageType"); |
| | | Object data = request.get("data"); |
| | | |
| | | if (messageType == null || messageType.trim().isEmpty()) { |
| | | return R.fail("消息类型不能为空"); |
| | | } |
| | | |
| | | int deviceCount = webSocketUserService.sendJsonMessageToUser(appUserId, messageType, data); |
| | | if (deviceCount > 0) { |
| | | return R.ok("JSON消息发送成功,发送到 " + deviceCount + " 个设备"); |
| | | } else { |
| | | return R.fail("用户不在线或发送失败"); |
| | | } |
| | | } catch (Exception e) { |
| | | log.error("向用户发送JSON消息失败", e); |
| | | return R.fail("发送消息失败: " + e.getMessage()); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 向多个用户发送消息 |
| | | * @param request 请求参数 |
| | | * @return 操作结果 |
| | | */ |
| | | @PostMapping("/user/send-multiple") |
| | | public R<String> sendMessageToUsers(@RequestBody Map<String, Object> request) { |
| | | try { |
| | | @SuppressWarnings("unchecked") |
| | | List<String> appUserIds = (List<String>) request.get("appUserIds"); |
| | | String message = (String) request.get("message"); |
| | | |
| | | if (appUserIds == null || appUserIds.isEmpty()) { |
| | | return R.fail("用户ID列表不能为空"); |
| | | } |
| | | if (message == null || message.trim().isEmpty()) { |
| | | return R.fail("消息内容不能为空"); |
| | | } |
| | | |
| | | int successUserCount = webSocketUserService.sendMessageToUsers(appUserIds, message); |
| | | return R.ok("消息发送完成,成功发送到 " + successUserCount + " 个用户"); |
| | | } catch (Exception e) { |
| | | log.error("向多个用户发送消息失败", e); |
| | | return R.fail("发送消息失败: " + e.getMessage()); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 发送任务分配消息 |
| | | * @param request 请求参数 |
| | | * @return 操作结果 |
| | | */ |
| | | @PostMapping("/user/task-assignment") |
| | | public R<String> sendTaskAssignment(@RequestBody Map<String, String> request) { |
| | | try { |
| | | String taskId = request.get("taskId"); |
| | | String taskName = request.get("taskName"); |
| | | String assignee = request.get("assignee"); |
| | | String assigneeUserId = request.get("assigneeUserId"); |
| | | |
| | | if (taskId == null || taskName == null || assignee == null) { |
| | | return R.fail("任务ID、任务名称和分配人不能为空"); |
| | | } |
| | | |
| | | int successCount = webSocketUserService.sendTaskAssignment(taskId, taskName, assignee, assigneeUserId); |
| | | return R.ok("任务分配消息发送完成,成功发送到 " + successCount + " 个设备"); |
| | | } catch (Exception e) { |
| | | log.error("发送任务分配消息失败", e); |
| | | return R.fail("发送任务分配消息失败: " + e.getMessage()); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 发送状态更新消息 |
| | | * @param request 请求参数 |
| | | * @return 操作结果 |
| | | */ |
| | | @PostMapping("/user/status-update") |
| | | public R<String> sendStatusUpdate(@RequestBody Map<String, String> request) { |
| | | try { |
| | | String status = request.get("status"); |
| | | String details = request.get("details"); |
| | | String targetUserId = request.get("targetUserId"); |
| | | |
| | | if (status == null || status.trim().isEmpty()) { |
| | | return R.fail("状态不能为空"); |
| | | } |
| | | |
| | | int successCount = webSocketUserService.sendStatusUpdate(status, details, targetUserId); |
| | | return R.ok("状态更新消息发送完成,成功发送到 " + successCount + " 个设备"); |
| | | } catch (Exception e) { |
| | | log.error("发送状态更新消息失败", e); |
| | | return R.fail("发送状态更新消息失败: " + e.getMessage()); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 发送任务状态更新消息 |
| | | * @param request 请求参数 |
| | | * @return 操作结果 |
| | | */ |
| | | @PostMapping("/user/task-status-update") |
| | | public R<String> sendTaskStatusUpdate(@RequestBody Map<String, String> request) { |
| | | try { |
| | | String taskId = request.get("taskId"); |
| | | String taskStatus = request.get("taskStatus"); |
| | | String message = request.get("message"); |
| | | String targetUserId = request.get("targetUserId"); |
| | | |
| | | if (taskId == null || taskStatus == null) { |
| | | return R.fail("任务ID和任务状态不能为空"); |
| | | } |
| | | |
| | | int successCount = webSocketUserService.sendTaskStatusUpdate(taskId, taskStatus, message, targetUserId); |
| | | return R.ok("任务状态更新消息发送完成,成功发送到 " + successCount + " 个设备"); |
| | | } catch (Exception e) { |
| | | log.error("发送任务状态更新消息失败", e); |
| | | return R.fail("发送任务状态更新消息失败: " + e.getMessage()); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 发送位置更新消息 |
| | | * @param request 请求参数 |
| | | * @return 操作结果 |
| | | */ |
| | | @PostMapping("/user/location-update") |
| | | public R<String> sendLocationUpdate(@RequestBody Map<String, String> request) { |
| | | try { |
| | | String userId = request.get("userId"); |
| | | String longitude = request.get("longitude"); |
| | | String latitude = request.get("latitude"); |
| | | String address = request.get("address"); |
| | | |
| | | if (userId == null || longitude == null || latitude == null) { |
| | | return R.fail("用户ID、经度和纬度不能为空"); |
| | | } |
| | | |
| | | int successCount = webSocketUserService.sendLocationUpdate(userId, longitude, latitude, address); |
| | | return R.ok("位置更新消息发送完成,成功发送到 " + successCount + " 个设备"); |
| | | } catch (Exception e) { |
| | | log.error("发送位置更新消息失败", e); |
| | | return R.fail("发送位置更新消息失败: " + e.getMessage()); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 获取用户连接信息 |
| | | * @param appUserId 用户ID |
| | | * @return 连接信息 |
| | | */ |
| | | @GetMapping("/user/info/{appUserId}") |
| | | public R getUserConnectionInfo(@PathVariable String appUserId) { |
| | | try { |
| | | Map<String, Object> info = webSocketUserService.getUserConnectionInfo(appUserId); |
| | | return R.ok(info, "获取用户连接信息成功"); |
| | | } catch (Exception e) { |
| | | log.error("获取用户连接信息失败", e); |
| | | return R.fail("获取用户连接信息失败: " + e.getMessage()); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 获取所有连接统计信息 |
| | | * @return 统计信息 |
| | | */ |
| | | @GetMapping("/user/stats") |
| | | public R getConnectionStats() { |
| | | try { |
| | | Map<String, Object> stats = webSocketUserService.getConnectionStats(); |
| | | return R.ok(stats, "获取连接统计信息成功"); |
| | | } catch (Exception e) { |
| | | log.error("获取连接统计信息失败", e); |
| | | return R.fail("获取连接统计信息失败: " + e.getMessage()); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 检查用户是否在线 |
| | | * @param appUserId 用户ID |
| | | * @return 检查结果 |
| | | */ |
| | | @GetMapping("/user/online/{appUserId}") |
| | | public R checkUserOnline(@PathVariable String appUserId) { |
| | | try { |
| | | boolean isOnline = webSocketUserService.isUserOnline(appUserId); |
| | | return R.ok(isOnline, "用户在线状态检查完成"); |
| | | } catch (Exception e) { |
| | | log.error("检查用户在线状态失败", e); |
| | | return R.fail("检查用户在线状态失败: " + e.getMessage()); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 获取在线用户列表 |
| | | * @return 在线用户列表 |
| | | */ |
| | | @GetMapping("/user/online-list") |
| | | public R getOnlineUserIds() { |
| | | try { |
| | | Set<String> onlineUserIds = webSocketUserService.getOnlineUserIds(); |
| | | return R.ok(onlineUserIds, "获取在线用户列表成功"); |
| | | } catch (Exception e) { |
| | | log.error("获取在线用户列表失败", e); |
| | | return R.fail("获取在线用户列表失败: " + e.getMessage()); |
| | | } |
| | | } |
| | | } |
| New file |
| | |
| | | package com.ruoyi.web.webSocket; |
| | | |
| | | import io.netty.bootstrap.ServerBootstrap; |
| | | import io.netty.channel.ChannelFuture; |
| | | import io.netty.channel.ChannelInitializer; |
| | | import io.netty.channel.ChannelOption; |
| | | import io.netty.channel.ChannelPipeline; |
| | | import io.netty.channel.nio.NioEventLoopGroup; |
| | | import io.netty.channel.socket.SocketChannel; |
| | | import io.netty.channel.socket.nio.NioServerSocketChannel; |
| | | import io.netty.handler.codec.http.HttpObjectAggregator; |
| | | import io.netty.handler.codec.http.HttpServerCodec; |
| | | import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; |
| | | import io.netty.handler.logging.LogLevel; |
| | | import io.netty.handler.logging.LoggingHandler; |
| | | import io.netty.handler.stream.ChunkedWriteHandler; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.stereotype.Component; |
| | | |
| | | @Component |
| | | @Slf4j |
| | | public class WebSocketServer { |
| | | private final int port; |
| | | |
| | | @Autowired |
| | | private WebSocketTextFrameHandler webSocketTextFrameHandler; |
| | | |
| | | public WebSocketServer() { |
| | | this.port = 8888; // 默认端口 |
| | | } |
| | | |
| | | public WebSocketServer(int port) { |
| | | this.port = port; |
| | | } |
| | | |
| | | public void run() throws InterruptedException { |
| | | NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); |
| | | NioEventLoopGroup workerGroup = new NioEventLoopGroup(); |
| | | try { |
| | | ServerBootstrap serverBootstrap = new ServerBootstrap(); |
| | | serverBootstrap.group(bossGroup, workerGroup) |
| | | .channel(NioServerSocketChannel.class) |
| | | .option(ChannelOption.SO_BACKLOG, 128) |
| | | .childOption(ChannelOption.SO_KEEPALIVE, true) |
| | | .handler(new LoggingHandler(LogLevel.INFO)) // 在BossGroup中增加一个日志处理器 日志级别为INFO |
| | | .childHandler(new ChannelInitializer<SocketChannel>() { |
| | | @Override |
| | | protected void initChannel(SocketChannel ch) throws Exception { |
| | | ChannelPipeline pipeline = ch.pipeline(); |
| | | // 因为是基于Http协议的所以采用Http的编解码器 |
| | | pipeline.addLast(new HttpServerCodec()); |
| | | // 是以块的方式写, 添加ChunkedWriteHandler(分块写入处理程序) |
| | | pipeline.addLast(new ChunkedWriteHandler()); |
| | | /* |
| | | * http 数据在传输过程中是分段的 http Object aggregator 就是可以将多个段聚合 |
| | | * 这就是为什么 当浏览器发送大量数据时, 就会出现多次http请求 |
| | | * 参数为 : 最大内容长度 |
| | | */ |
| | | pipeline.addLast(new HttpObjectAggregator(8192)); |
| | | /* |
| | | * 对应WebSocket 他的数据时以桢(frame) 形式传递 |
| | | * 可以看到WebSocketFrame下面有6个子类 |
| | | * 浏览器请求时: ws://localhost:7000/xxx 请求的url |
| | | * 核心功能是将http协议升级为ws协议 保持长链接 |
| | | */ |
| | | pipeline.addLast(new WebSocketServerProtocolHandler("/hello")); |
| | | // 自定义Handler, 处理业务逻辑 |
| | | // 使用@Sharable注解的Handler,可以被多个连接共享 |
| | | pipeline.addLast(webSocketTextFrameHandler); |
| | | } |
| | | }); |
| | | log.info("WebSocket服务器启动中,端口: {}", port); |
| | | ChannelFuture sync = serverBootstrap.bind(port).sync(); |
| | | sync.channel().closeFuture().sync(); |
| | | } catch (InterruptedException e) { |
| | | log.error("WebSocket服务器启动失败", e); |
| | | } finally { |
| | | bossGroup.shutdownGracefully(); |
| | | workerGroup.shutdownGracefully(); |
| | | } |
| | | |
| | | } |
| | | |
| | | // public static void main(String[] args) throws InterruptedException { |
| | | // new WebSocketServer(7000).run(); |
| | | // } |
| | | } |
| New file |
| | |
| | | package com.ruoyi.web.webSocket; |
| | | |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.boot.CommandLineRunner; |
| | | import org.springframework.stereotype.Component; |
| | | |
| | | /** |
| | | * WebSocket服务器配置类 |
| | | * 在Spring Boot应用启动时自动启动WebSocket服务器 |
| | | */ |
| | | @Component |
| | | @Slf4j |
| | | public class WebSocketServerConfig implements CommandLineRunner { |
| | | |
| | | @Autowired |
| | | private WebSocketServer webSocketServer; |
| | | |
| | | @Override |
| | | public void run(String... args) throws Exception { |
| | | // 在新线程中启动WebSocket服务器,避免阻塞主线程 |
| | | new Thread(() -> { |
| | | try { |
| | | log.info("正在启动WebSocket服务器..."); |
| | | webSocketServer.run(); |
| | | } catch (InterruptedException e) { |
| | | log.error("WebSocket服务器启动失败", e); |
| | | Thread.currentThread().interrupt(); |
| | | } |
| | | }, "WebSocket-Server-Thread").start(); |
| | | } |
| | | } |
| New file |
| | |
| | | package com.ruoyi.web.webSocket; |
| | | |
| | | import com.alibaba.fastjson.JSON; |
| | | import com.alibaba.fastjson.JSONObject; |
| | | import io.netty.channel.ChannelHandler; |
| | | import io.netty.channel.ChannelHandlerContext; |
| | | import io.netty.channel.SimpleChannelInboundHandler; |
| | | import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.stereotype.Component; |
| | | import org.springframework.util.StringUtils; |
| | | |
| | | import java.time.LocalDateTime; |
| | | |
| | | /** |
| | | * TextWebSocketFrame 表示一个文本桢 |
| | | * 使用@Sharable注解使其可以被多个连接共享 |
| | | * 支持基于AppUser的连接管理 |
| | | */ |
| | | @Component |
| | | @Slf4j |
| | | @ChannelHandler.Sharable |
| | | public class WebSocketTextFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { |
| | | |
| | | |
| | | @Autowired |
| | | private WebSocketUserConnectionManager userConnectionManager; |
| | | |
| | | @Autowired |
| | | private WebSocketAuthHandler authHandler; |
| | | |
| | | @Override |
| | | protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { |
| | | String message = msg.text(); |
| | | log.info("[服务器] : 收到消息 -> {}", message); |
| | | |
| | | try { |
| | | // 尝试解析消息,检查是否包含用户认证信息 |
| | | JSONObject messageObj = JSON.parseObject(message); |
| | | String messageType = messageObj.getString("type"); |
| | | |
| | | if ("auth".equals(messageType)) { |
| | | // 处理用户认证消息 |
| | | handleUserAuth(ctx, messageObj); |
| | | } else if ("ping".equals(messageType)) { |
| | | // 处理心跳消息 |
| | | handlePing(ctx, messageObj); |
| | | } else { |
| | | // 处理普通业务消息 |
| | | handleBusinessMessage(ctx, messageObj); |
| | | } |
| | | } catch (Exception e) { |
| | | // 如果不是JSON格式,按普通文本处理 |
| | | log.debug("消息不是JSON格式,按普通文本处理: {}", message); |
| | | ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器时间: " + LocalDateTime.now() + "->" + message)); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 处理用户认证消息 |
| | | */ |
| | | private void handleUserAuth(ChannelHandlerContext ctx, JSONObject messageObj) { |
| | | try { |
| | | // 提取认证信息 |
| | | WebSocketAuthHandler.UserAuthInfo authInfo = authHandler.extractAuthInfo(messageObj); |
| | | String appUserId = authInfo.getAppUserId(); |
| | | String token = authInfo.getToken(); |
| | | |
| | | if (!StringUtils.hasText(appUserId)) { |
| | | sendAuthFailureResponse(ctx, "用户ID不能为空"); |
| | | return; |
| | | } |
| | | |
| | | // 进行用户认证 |
| | | WebSocketAuthHandler.AuthResult authResult = authHandler.authenticateUser(appUserId, token); |
| | | |
| | | if (authResult.isSuccess()) { |
| | | // 认证成功,将连接与用户关联 |
| | | userConnectionManager.addUserConnection(appUserId, ctx.channel()); |
| | | |
| | | // 发送认证成功响应 |
| | | JSONObject response = new JSONObject(); |
| | | response.put("type", "auth_success"); |
| | | response.put("appUserId", appUserId); |
| | | response.put("nickName", authResult.getAppUser().getNickName()); |
| | | response.put("message", "认证成功"); |
| | | response.put("timestamp", LocalDateTime.now().toString()); |
| | | |
| | | ctx.channel().writeAndFlush(new TextWebSocketFrame(response.toJSONString())); |
| | | log.info("用户 {} ({}) 认证成功,ChannelId: {}", |
| | | appUserId, authResult.getAppUser().getNickName(), ctx.channel().id().asLongText()); |
| | | } else { |
| | | // 认证失败 |
| | | sendAuthFailureResponse(ctx, authResult.getMessage()); |
| | | log.warn("用户 {} 认证失败: {}, ChannelId: {}", |
| | | appUserId, authResult.getMessage(), ctx.channel().id().asLongText()); |
| | | } |
| | | } catch (Exception e) { |
| | | log.error("处理用户认证时发生错误", e); |
| | | sendAuthFailureResponse(ctx, "认证过程中发生错误: " + e.getMessage()); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 发送认证失败响应 |
| | | */ |
| | | private void sendAuthFailureResponse(ChannelHandlerContext ctx, String message) { |
| | | JSONObject response = new JSONObject(); |
| | | response.put("type", "auth_failed"); |
| | | response.put("message", message); |
| | | response.put("timestamp", LocalDateTime.now().toString()); |
| | | |
| | | ctx.channel().writeAndFlush(new TextWebSocketFrame(response.toJSONString())); |
| | | } |
| | | |
| | | /** |
| | | * 处理心跳消息 |
| | | */ |
| | | private void handlePing(ChannelHandlerContext ctx, JSONObject messageObj) { |
| | | String appUserId = userConnectionManager.getUserIdByChannelId(ctx.channel().id().asLongText()); |
| | | |
| | | JSONObject response = new JSONObject(); |
| | | response.put("type", "pong"); |
| | | response.put("appUserId", appUserId); |
| | | response.put("timestamp", LocalDateTime.now().toString()); |
| | | |
| | | ctx.channel().writeAndFlush(new TextWebSocketFrame(response.toJSONString())); |
| | | } |
| | | |
| | | /** |
| | | * 处理业务消息 |
| | | */ |
| | | private void handleBusinessMessage(ChannelHandlerContext ctx, JSONObject messageObj) { |
| | | String appUserId = userConnectionManager.getUserIdByChannelId(ctx.channel().id().asLongText()); |
| | | |
| | | if (appUserId != null) { |
| | | log.info("收到用户 {} 的业务消息: {}", appUserId, messageObj.toJSONString()); |
| | | |
| | | // 回复消息 |
| | | JSONObject response = new JSONObject(); |
| | | response.put("type", "message_response"); |
| | | response.put("appUserId", appUserId); |
| | | response.put("originalMessage", messageObj); |
| | | response.put("serverTime", LocalDateTime.now().toString()); |
| | | |
| | | ctx.channel().writeAndFlush(new TextWebSocketFrame(response.toJSONString())); |
| | | } else { |
| | | log.warn("收到未认证连接的业务消息: {}", messageObj.toJSONString()); |
| | | |
| | | // 要求先认证 |
| | | JSONObject response = new JSONObject(); |
| | | response.put("type", "auth_required"); |
| | | response.put("message", "请先进行用户认证"); |
| | | response.put("timestamp", LocalDateTime.now().toString()); |
| | | |
| | | ctx.channel().writeAndFlush(new TextWebSocketFrame(response.toJSONString())); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public void handlerAdded(ChannelHandlerContext ctx) throws Exception { |
| | | // id 表示唯一的值 LongText是唯一的 |
| | | log.info("handlerAdded 被调用: {}", ctx.channel().id().asLongText()); |
| | | // shortText 可能会重复 |
| | | log.info("handlerAdded 被调用: {}", ctx.channel().id().asShortText()); |
| | | |
| | | // 注意:用户连接管理在认证时添加,这里不添加 |
| | | } |
| | | |
| | | @Override |
| | | public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { |
| | | // id 表示唯一的值 LongText是唯一的 |
| | | log.info("handlerRemoved 被调用: {}", ctx.channel().id().asLongText()); |
| | | // shortText 可能会重复 |
| | | log.info("handlerRemoved 被调用: {}", ctx.channel().id().asShortText()); |
| | | |
| | | // 从用户连接管理器中移除连接 |
| | | if (userConnectionManager != null) { |
| | | userConnectionManager.removeUserConnection(ctx.channel()); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { |
| | | log.error("WebSocket连接异常: {}", cause.getMessage(), cause); |
| | | ctx.channel().close(); |
| | | } |
| | | } |
| New file |
| | |
| | | package com.ruoyi.web.webSocket; |
| | | |
| | | import io.netty.channel.Channel; |
| | | import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.springframework.stereotype.Component; |
| | | |
| | | import java.util.*; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | |
| | | /** |
| | | * 基于AppUser的WebSocket连接管理器 |
| | | * 支持按用户ID管理连接,一个用户可以有多个设备连接 |
| | | */ |
| | | @Component |
| | | @Slf4j |
| | | public class WebSocketUserConnectionManager { |
| | | |
| | | /** |
| | | * 存储用户的所有连接 |
| | | * Key: 用户ID (appUserId) |
| | | * Value: 该用户的所有连接Channel列表 |
| | | */ |
| | | private static final ConcurrentHashMap<String, Set<Channel>> userConnections = new ConcurrentHashMap<>(); |
| | | |
| | | /** |
| | | * 存储连接与用户的映射关系 |
| | | * Key: ChannelId.asLongText() |
| | | * Value: 用户ID (appUserId) |
| | | */ |
| | | private static final ConcurrentHashMap<String, String> channelToUser = new ConcurrentHashMap<>(); |
| | | |
| | | /** |
| | | * 存储所有连接 |
| | | * Key: ChannelId.asLongText() |
| | | * Value: Channel |
| | | */ |
| | | private static final ConcurrentHashMap<String, Channel> allConnections = new ConcurrentHashMap<>(); |
| | | |
| | | /** |
| | | * 添加用户连接 |
| | | * @param appUserId 用户ID |
| | | * @param channel WebSocket连接通道 |
| | | */ |
| | | public void addUserConnection(String appUserId, Channel channel) { |
| | | String channelId = channel.id().asLongText(); |
| | | |
| | | // 添加到用户连接映射 |
| | | userConnections.computeIfAbsent(appUserId, k -> ConcurrentHashMap.newKeySet()) |
| | | .add(channel); |
| | | |
| | | // 添加到连接用户映射 |
| | | channelToUser.put(channelId, appUserId); |
| | | |
| | | // 添加到所有连接 |
| | | allConnections.put(channelId, channel); |
| | | |
| | | log.info("用户连接已添加,用户ID: {}, ChannelId: {}, 该用户连接数: {}", |
| | | appUserId, channelId, userConnections.get(appUserId).size()); |
| | | } |
| | | |
| | | /** |
| | | * 移除用户连接 |
| | | * @param channel WebSocket连接通道 |
| | | */ |
| | | public void removeUserConnection(Channel channel) { |
| | | String channelId = channel.id().asLongText(); |
| | | String appUserId = channelToUser.get(channelId); |
| | | |
| | | if (appUserId != null) { |
| | | // 从用户连接中移除 |
| | | Set<Channel> userChannels = userConnections.get(appUserId); |
| | | if (userChannels != null) { |
| | | userChannels.remove(channel); |
| | | // 如果用户没有连接了,删除用户映射 |
| | | if (userChannels.isEmpty()) { |
| | | userConnections.remove(appUserId); |
| | | } |
| | | } |
| | | |
| | | // 从连接用户映射中移除 |
| | | channelToUser.remove(channelId); |
| | | |
| | | log.info("用户连接已移除,用户ID: {}, ChannelId: {}, 该用户剩余连接数: {}", |
| | | appUserId, channelId, userChannels != null ? userChannels.size() : 0); |
| | | } |
| | | |
| | | // 从所有连接中移除 |
| | | allConnections.remove(channelId); |
| | | } |
| | | |
| | | /** |
| | | * 向指定用户的所有设备发送消息 |
| | | * @param appUserId 用户ID |
| | | * @param message 消息内容 |
| | | * @return 成功发送的设备数 |
| | | */ |
| | | public int sendMessageToUser(String appUserId, String message) { |
| | | Set<Channel> userChannels = userConnections.get(appUserId); |
| | | if (userChannels == null || userChannels.isEmpty()) { |
| | | log.warn("用户 {} 没有活跃连接", appUserId); |
| | | return 0; |
| | | } |
| | | |
| | | int successCount = 0; |
| | | for (Channel channel : userChannels) { |
| | | if (channel != null && channel.isActive()) { |
| | | try { |
| | | channel.writeAndFlush(new TextWebSocketFrame(message)); |
| | | successCount++; |
| | | } catch (Exception e) { |
| | | log.error("向用户 {} 发送消息失败,连接ID: {}, 错误: {}", |
| | | appUserId, channel.id().asLongText(), e.getMessage()); |
| | | } |
| | | } |
| | | } |
| | | |
| | | log.info("向用户 {} 发送消息完成,成功发送到 {} 个设备,消息内容: {}", |
| | | appUserId, successCount, message); |
| | | return successCount; |
| | | } |
| | | |
| | | /** |
| | | * 向指定连接发送消息 |
| | | * @param channelId 连接ID |
| | | * @param message 消息内容 |
| | | * @return 是否发送成功 |
| | | */ |
| | | public boolean sendMessageToChannel(String channelId, String message) { |
| | | Channel channel = allConnections.get(channelId); |
| | | if (channel != null && channel.isActive()) { |
| | | try { |
| | | channel.writeAndFlush(new TextWebSocketFrame(message)); |
| | | log.info("消息已发送到连接: {}, 内容: {}", channelId, message); |
| | | return true; |
| | | } catch (Exception e) { |
| | | log.error("发送消息失败,连接ID: {}, 错误: {}", channelId, e.getMessage()); |
| | | return false; |
| | | } |
| | | } else { |
| | | log.warn("连接不存在或已关闭,ChannelId: {}", channelId); |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 向所有连接广播消息 |
| | | * @param message 消息内容 |
| | | * @return 成功发送的连接数 |
| | | */ |
| | | public int broadcastMessage(String message) { |
| | | int successCount = 0; |
| | | for (Channel channel : allConnections.values()) { |
| | | if (channel != null && channel.isActive()) { |
| | | try { |
| | | channel.writeAndFlush(new TextWebSocketFrame(message)); |
| | | successCount++; |
| | | } catch (Exception e) { |
| | | log.error("广播消息失败,连接ID: {}, 错误: {}", |
| | | channel.id().asLongText(), e.getMessage()); |
| | | } |
| | | } |
| | | } |
| | | log.info("广播消息完成,成功发送到 {} 个连接,消息内容: {}", successCount, message); |
| | | return successCount; |
| | | } |
| | | |
| | | /** |
| | | * 向多个用户发送消息 |
| | | * @param appUserIds 用户ID列表 |
| | | * @param message 消息内容 |
| | | * @return 成功发送的用户数 |
| | | */ |
| | | public int sendMessageToUsers(List<String> appUserIds, String message) { |
| | | int successUserCount = 0; |
| | | for (String appUserId : appUserIds) { |
| | | int deviceCount = sendMessageToUser(appUserId, message); |
| | | if (deviceCount > 0) { |
| | | successUserCount++; |
| | | } |
| | | } |
| | | log.info("向 {} 个用户发送消息完成,成功发送到 {} 个用户", appUserIds.size(), successUserCount); |
| | | return successUserCount; |
| | | } |
| | | |
| | | /** |
| | | * 获取用户的所有连接 |
| | | * @param appUserId 用户ID |
| | | * @return 连接列表 |
| | | */ |
| | | public Set<Channel> getUserConnections(String appUserId) { |
| | | return userConnections.getOrDefault(appUserId, Collections.emptySet()); |
| | | } |
| | | |
| | | /** |
| | | * 获取用户连接数 |
| | | * @param appUserId 用户ID |
| | | * @return 连接数 |
| | | */ |
| | | public int getUserConnectionCount(String appUserId) { |
| | | Set<Channel> userChannels = userConnections.get(appUserId); |
| | | return userChannels != null ? userChannels.size() : 0; |
| | | } |
| | | |
| | | /** |
| | | * 获取总连接数 |
| | | * @return 总连接数 |
| | | */ |
| | | public int getTotalConnectionCount() { |
| | | return allConnections.size(); |
| | | } |
| | | |
| | | /** |
| | | * 获取在线用户数 |
| | | * @return 在线用户数 |
| | | */ |
| | | public int getOnlineUserCount() { |
| | | return userConnections.size(); |
| | | } |
| | | |
| | | /** |
| | | * 获取所有在线用户ID |
| | | * @return 用户ID集合 |
| | | */ |
| | | public Set<String> getOnlineUserIds() { |
| | | return userConnections.keySet(); |
| | | } |
| | | |
| | | /** |
| | | * 检查用户是否在线 |
| | | * @param appUserId 用户ID |
| | | * @return 是否在线 |
| | | */ |
| | | public boolean isUserOnline(String appUserId) { |
| | | Set<Channel> userChannels = userConnections.get(appUserId); |
| | | if (userChannels == null || userChannels.isEmpty()) { |
| | | return false; |
| | | } |
| | | // 检查是否有活跃连接 |
| | | return userChannels.stream().anyMatch(Channel::isActive); |
| | | } |
| | | |
| | | /** |
| | | * 检查连接是否存在 |
| | | * @param channelId 连接ID |
| | | * @return 是否存在 |
| | | */ |
| | | public boolean isConnectionExists(String channelId) { |
| | | Channel channel = allConnections.get(channelId); |
| | | return channel != null && channel.isActive(); |
| | | } |
| | | |
| | | /** |
| | | * 根据连接ID获取用户ID |
| | | * @param channelId 连接ID |
| | | * @return 用户ID |
| | | */ |
| | | public String getUserIdByChannelId(String channelId) { |
| | | return channelToUser.get(channelId); |
| | | } |
| | | |
| | | /** |
| | | * 获取连接统计信息 |
| | | * @return 统计信息 |
| | | */ |
| | | public Map<String, Object> getConnectionStats() { |
| | | Map<String, Object> stats = new HashMap<>(); |
| | | stats.put("totalConnections", getTotalConnectionCount()); |
| | | stats.put("onlineUsers", getOnlineUserCount()); |
| | | stats.put("onlineUserIds", getOnlineUserIds()); |
| | | |
| | | // 统计每个用户的连接数 |
| | | Map<String, Integer> userConnectionCounts = new HashMap<>(); |
| | | for (Map.Entry<String, Set<Channel>> entry : userConnections.entrySet()) { |
| | | userConnectionCounts.put(entry.getKey(), entry.getValue().size()); |
| | | } |
| | | stats.put("userConnectionCounts", userConnectionCounts); |
| | | |
| | | return stats; |
| | | } |
| | | } |
| New file |
| | |
| | | package com.ruoyi.web.webSocket; |
| | | |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.stereotype.Service; |
| | | |
| | | import java.util.*; |
| | | |
| | | /** |
| | | * 基于AppUser的WebSocket服务类 |
| | | * 提供按用户发送消息的接口 |
| | | */ |
| | | @Service |
| | | @Slf4j |
| | | public class WebSocketUserService { |
| | | |
| | | @Autowired |
| | | private WebSocketUserConnectionManager userConnectionManager; |
| | | |
| | | /** |
| | | * 向指定用户的所有设备发送消息 |
| | | * @param appUserId 用户ID |
| | | * @param message 消息内容 |
| | | * @return 成功发送的设备数 |
| | | */ |
| | | public int sendMessageToUser(String appUserId, String message) { |
| | | log.info("准备向用户 {} 发送消息: {}", appUserId, message); |
| | | return userConnectionManager.sendMessageToUser(appUserId, message); |
| | | } |
| | | |
| | | /** |
| | | * 向指定用户发送JSON格式消息 |
| | | * @param appUserId 用户ID |
| | | * @param messageType 消息类型 |
| | | * @param data 消息数据 |
| | | * @return 成功发送的设备数 |
| | | */ |
| | | public int sendJsonMessageToUser(String appUserId, String messageType, Object data) { |
| | | String jsonMessage = buildJsonMessage(messageType, data, appUserId); |
| | | return sendMessageToUser(appUserId, jsonMessage); |
| | | } |
| | | |
| | | /** |
| | | * 向多个用户发送消息 |
| | | * @param appUserIds 用户ID列表 |
| | | * @param message 消息内容 |
| | | * @return 成功发送的用户数 |
| | | */ |
| | | public int sendMessageToUsers(List<String> appUserIds, String message) { |
| | | log.info("准备向 {} 个用户发送消息: {}", appUserIds.size(), message); |
| | | return userConnectionManager.sendMessageToUsers(appUserIds, message); |
| | | } |
| | | |
| | | /** |
| | | * 向多个用户发送JSON格式消息 |
| | | * @param appUserIds 用户ID列表 |
| | | * @param messageType 消息类型 |
| | | * @param data 消息数据 |
| | | * @return 成功发送的用户数 |
| | | */ |
| | | public int sendJsonMessageToUsers(List<String> appUserIds, String messageType, Object data) { |
| | | String jsonMessage = buildJsonMessage(messageType, data, null); |
| | | return sendMessageToUsers(appUserIds, jsonMessage); |
| | | } |
| | | |
| | | /** |
| | | * 向指定连接发送消息 |
| | | * @param channelId 连接ID |
| | | * @param message 消息内容 |
| | | * @return 是否发送成功 |
| | | */ |
| | | public boolean sendMessageToChannel(String channelId, String message) { |
| | | log.info("准备向连接 {} 发送消息: {}", channelId, message); |
| | | return userConnectionManager.sendMessageToChannel(channelId, message); |
| | | } |
| | | |
| | | /** |
| | | * 广播消息到所有连接 |
| | | * @param message 消息内容 |
| | | * @return 成功发送的连接数 |
| | | */ |
| | | public int broadcastMessage(String message) { |
| | | log.info("准备广播消息: {}", message); |
| | | return userConnectionManager.broadcastMessage(message); |
| | | } |
| | | |
| | | /** |
| | | * 广播JSON格式消息到所有连接 |
| | | * @param messageType 消息类型 |
| | | * @param data 消息数据 |
| | | * @return 成功发送的连接数 |
| | | */ |
| | | public int broadcastJsonMessage(String messageType, Object data) { |
| | | String jsonMessage = buildJsonMessage(messageType, data, null); |
| | | return broadcastMessage(jsonMessage); |
| | | } |
| | | |
| | | /** |
| | | * 发送系统通知消息 |
| | | * @param message 通知内容 |
| | | * @return 成功发送的连接数 |
| | | */ |
| | | public int sendSystemNotification(String message) { |
| | | Map<String, Object> data = new HashMap<>(); |
| | | data.put("message", message); |
| | | return sendJsonMessageToAllUsers("system_notification", data); |
| | | } |
| | | |
| | | /** |
| | | * 向所有在线用户发送系统通知 |
| | | * @param messageType 消息类型 |
| | | * @param data 消息数据 |
| | | * @return 成功发送的连接数 |
| | | */ |
| | | public int sendJsonMessageToAllUsers(String messageType, Object data) { |
| | | Set<String> onlineUserIds = userConnectionManager.getOnlineUserIds(); |
| | | if (onlineUserIds.isEmpty()) { |
| | | log.warn("没有在线用户,无法发送消息"); |
| | | return 0; |
| | | } |
| | | return sendJsonMessageToUsers(new ArrayList<>(onlineUserIds), messageType, data); |
| | | } |
| | | |
| | | /** |
| | | * 发送任务分配消息 |
| | | * @param taskId 任务ID |
| | | * @param taskName 任务名称 |
| | | * @param assignee 分配人 |
| | | * @param assigneeUserId 分配人用户ID |
| | | * @return 成功发送的连接数 |
| | | */ |
| | | public int sendTaskAssignment(String taskId, String taskName, String assignee, String assigneeUserId) { |
| | | Map<String, Object> taskData = new HashMap<>(); |
| | | taskData.put("taskId", taskId); |
| | | taskData.put("taskName", taskName); |
| | | taskData.put("assignee", assignee); |
| | | taskData.put("assigneeUserId", assigneeUserId); |
| | | taskData.put("timestamp", System.currentTimeMillis()); |
| | | |
| | | if (assigneeUserId != null) { |
| | | // 发送给指定用户 |
| | | return sendJsonMessageToUser(assigneeUserId, "task_assignment", taskData); |
| | | } else { |
| | | // 广播给所有用户 |
| | | return broadcastJsonMessage("task_assignment", taskData); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 发送状态更新消息 |
| | | * @param status 状态 |
| | | * @param details 详细信息 |
| | | * @param targetUserId 目标用户ID,为null时广播 |
| | | * @return 成功发送的连接数 |
| | | */ |
| | | public int sendStatusUpdate(String status, String details, String targetUserId) { |
| | | Map<String, Object> statusData = new HashMap<>(); |
| | | statusData.put("status", status); |
| | | statusData.put("details", details); |
| | | statusData.put("timestamp", System.currentTimeMillis()); |
| | | |
| | | if (targetUserId != null) { |
| | | // 发送给指定用户 |
| | | return sendJsonMessageToUser(targetUserId, "status_update", statusData); |
| | | } else { |
| | | // 广播给所有用户 |
| | | return broadcastJsonMessage("status_update", statusData); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 发送任务状态更新消息 |
| | | * @param taskId 任务ID |
| | | * @param taskStatus 任务状态 |
| | | * @param message 消息内容 |
| | | * @param targetUserId 目标用户ID,为null时广播 |
| | | * @return 成功发送的连接数 |
| | | */ |
| | | public int sendTaskStatusUpdate(String taskId, String taskStatus, String message, String targetUserId) { |
| | | Map<String, Object> taskStatusData = new HashMap<>(); |
| | | taskStatusData.put("taskId", taskId); |
| | | taskStatusData.put("taskStatus", taskStatus); |
| | | taskStatusData.put("message", message); |
| | | taskStatusData.put("timestamp", System.currentTimeMillis()); |
| | | |
| | | if (targetUserId != null) { |
| | | // 发送给指定用户 |
| | | return sendJsonMessageToUser(targetUserId, "task_status_update", taskStatusData); |
| | | } else { |
| | | // 广播给所有用户 |
| | | return broadcastJsonMessage("task_status_update", taskStatusData); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 发送位置更新消息 |
| | | * @param userId 用户ID |
| | | * @param longitude 经度 |
| | | * @param latitude 纬度 |
| | | * @param address 地址 |
| | | * @return 成功发送的连接数 |
| | | */ |
| | | public int sendLocationUpdate(String userId, String longitude, String latitude, String address) { |
| | | Map<String, Object> locationData = new HashMap<>(); |
| | | locationData.put("longitude", longitude); |
| | | locationData.put("latitude", latitude); |
| | | locationData.put("address", address != null ? address : ""); |
| | | locationData.put("timestamp", System.currentTimeMillis()); |
| | | |
| | | return sendJsonMessageToUser(userId, "location_update", locationData); |
| | | } |
| | | |
| | | /** |
| | | * 获取用户连接信息 |
| | | * @param appUserId 用户ID |
| | | * @return 连接信息 |
| | | */ |
| | | public Map<String, Object> getUserConnectionInfo(String appUserId) { |
| | | Map<String, Object> info = new HashMap<>(); |
| | | info.put("appUserId", appUserId); |
| | | info.put("isOnline", userConnectionManager.isUserOnline(appUserId)); |
| | | info.put("connectionCount", userConnectionManager.getUserConnectionCount(appUserId)); |
| | | return info; |
| | | } |
| | | |
| | | /** |
| | | * 获取所有连接统计信息 |
| | | * @return 统计信息 |
| | | */ |
| | | public Map<String, Object> getConnectionStats() { |
| | | return userConnectionManager.getConnectionStats(); |
| | | } |
| | | |
| | | /** |
| | | * 检查用户是否在线 |
| | | * @param appUserId 用户ID |
| | | * @return 是否在线 |
| | | */ |
| | | public boolean isUserOnline(String appUserId) { |
| | | return userConnectionManager.isUserOnline(appUserId); |
| | | } |
| | | |
| | | /** |
| | | * 获取在线用户列表 |
| | | * @return 在线用户ID列表 |
| | | */ |
| | | public Set<String> getOnlineUserIds() { |
| | | return userConnectionManager.getOnlineUserIds(); |
| | | } |
| | | |
| | | /** |
| | | * 获取总连接数 |
| | | * @return 总连接数 |
| | | */ |
| | | public int getTotalConnectionCount() { |
| | | return userConnectionManager.getTotalConnectionCount(); |
| | | } |
| | | |
| | | /** |
| | | * 获取在线用户数 |
| | | * @return 在线用户数 |
| | | */ |
| | | public int getOnlineUserCount() { |
| | | return userConnectionManager.getOnlineUserCount(); |
| | | } |
| | | |
| | | /** |
| | | * 构建JSON格式消息 |
| | | * @param messageType 消息类型 |
| | | * @param data 消息数据 |
| | | * @param targetUserId 目标用户ID |
| | | * @return JSON字符串 |
| | | */ |
| | | private String buildJsonMessage(String messageType, Object data, String targetUserId) { |
| | | Map<String, Object> message = new HashMap<>(); |
| | | message.put("type", messageType); |
| | | message.put("data", data); |
| | | message.put("timestamp", System.currentTimeMillis()); |
| | | |
| | | if (targetUserId != null) { |
| | | message.put("targetUserId", targetUserId); |
| | | } |
| | | |
| | | return com.alibaba.fastjson.JSON.toJSONString(message); |
| | | } |
| | | } |
| | |
| | | <version>2.0.10</version> |
| | | </dependency> |
| | | |
| | | <dependency> |
| | | <groupId>io.netty</groupId> |
| | | <artifactId>netty-all</artifactId> |
| | | </dependency> |
| | | <dependency> |
| | | <groupId>junit</groupId> |
| | | <artifactId>junit</artifactId> |
| | | <scope>test</scope> |
| | | </dependency> |
| | | <dependency> |
| | | <groupId>org.springframework.boot</groupId> |
| | | <artifactId>spring-boot-test</artifactId> |
| | | <scope>test</scope> |
| | | </dependency> |
| | | |
| | | </dependencies> |
| | | |
| | | <build> |
| | |
| | | import org.springframework.web.socket.config.annotation.EnableWebSocket; |
| | | |
| | | import java.net.InetAddress; |
| | | import java.net.UnknownHostException; |
| | | |
| | | /** |
| | | * 启动程序 |
| | |
| | | |
| | | ConfigurableApplicationContext application = SpringApplication.run(RuoYiAppletApplication.class, args); |
| | | try { |
| | | |
| | | Environment env = application.getEnvironment(); |
| | | log.info("\n----------------------------------------------------------\n\t" + |
| | | Environment env = application.getEnvironment(); |
| | | log.info("\n----------------------------------------------------------\n\t" + |
| | | "应用 '{}' 运行成功! 访问连接:\n\t" + |
| | | "Swagger文档: \t\thttp://{}:{}/doc.html\n" + |
| | | "----------------------------------------------------------", |
| | | env.getProperty("spring.application.name", "后台"), |
| | | env.getProperty("spring.application.name", "app"), |
| | | InetAddress.getLocalHost().getHostAddress(), |
| | | env.getProperty("server.port", "8081")); |
| | | env.getProperty("server.port", "9099")); |
| | | }catch (Exception e){ |
| | | e.printStackTrace(); |
| | | } |
| | |
| | | import com.ruoyi.system.dto.MissionIdDto; |
| | | import com.ruoyi.system.dto.TMissionUserDto; |
| | | import com.ruoyi.system.model.*; |
| | | import com.ruoyi.system.query.TEquipmentQuery; |
| | | import com.ruoyi.system.service.*; |
| | | import com.ruoyi.system.vo.MissionTotalVo; |
| | | import com.ruoyi.system.vo.TotalHistoryVo; |
| | |
| | | import org.springframework.web.bind.annotation.RestController; |
| | | |
| | | import java.time.Duration; |
| | | import java.time.LocalDate; |
| | | import java.time.LocalDateTime; |
| | | import java.time.Month; |
| | | import java.util.List; |
| | | |
| | | /** |
| | |
| | | @ApiOperation(value = "发送任务", response = TEquipment.class) |
| | | @PostMapping(value = "/open/t-mission/sendMission") |
| | | public R<?> sendMission() { |
| | | WebSocketServerSolo.sendInfo("{\"missionId\":1}", "1"); |
| | | // WebSocketServerSolo.sendInfo("{\"missionId\":1}", "1"); |
| | | return R.ok(); |
| | | } |
| | | |
| New file |
| | |
| | | package com.ruoyi.web.controller.webSocket; |
| | | |
| | | import lombok.extern.slf4j.Slf4j; |
| | | |
| | | import java.net.URI; |
| | | import java.util.concurrent.CountDownLatch; |
| | | import java.util.concurrent.TimeUnit; |
| | | |
| | | /** |
| | | * 简单的WebSocket客户端实现 |
| | | * 用于测试WebSocket服务端功能 |
| | | */ |
| | | @Slf4j |
| | | public class SimpleWebSocketClient implements WebSocketClientTest.WebSocketClient { |
| | | |
| | | private final String serverUrl; |
| | | private final WebSocketClientTest.WebSocketClient.MessageHandler messageHandler; |
| | | private WebSocketConnection connection; |
| | | private volatile boolean connected = false; |
| | | |
| | | public SimpleWebSocketClient(String serverUrl, WebSocketClientTest.WebSocketClient.MessageHandler messageHandler) { |
| | | this.serverUrl = serverUrl; |
| | | this.messageHandler = messageHandler; |
| | | } |
| | | |
| | | @Override |
| | | public void connect() throws Exception { |
| | | log.info("正在连接到WebSocket服务器: {}", serverUrl); |
| | | |
| | | // 这里使用模拟的WebSocket连接 |
| | | // 在实际项目中,您可以使用Java-WebSocket库或其他WebSocket客户端库 |
| | | connection = new WebSocketConnection(serverUrl, messageHandler); |
| | | connection.connect(); |
| | | |
| | | // 等待连接建立 |
| | | int retryCount = 0; |
| | | while (!connected && retryCount < 10) { |
| | | Thread.sleep(500); |
| | | retryCount++; |
| | | } |
| | | |
| | | if (!connected) { |
| | | throw new RuntimeException("连接WebSocket服务器超时"); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public void send(String message) { |
| | | if (connection != null && connected) { |
| | | connection.send(message); |
| | | } else { |
| | | log.warn("WebSocket未连接,无法发送消息: {}", message); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public void close() { |
| | | if (connection != null) { |
| | | connection.close(); |
| | | connected = false; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 模拟的WebSocket连接类 |
| | | */ |
| | | private static class WebSocketConnection { |
| | | private final String serverUrl; |
| | | private final WebSocketClientTest.WebSocketClient.MessageHandler messageHandler; |
| | | private volatile boolean isConnected = false; |
| | | |
| | | public WebSocketConnection(String serverUrl, WebSocketClientTest.WebSocketClient.MessageHandler messageHandler) { |
| | | this.serverUrl = serverUrl; |
| | | this.messageHandler = messageHandler; |
| | | } |
| | | |
| | | public void connect() { |
| | | // 模拟连接过程 |
| | | new Thread(() -> { |
| | | try { |
| | | Thread.sleep(1000); // 模拟连接延迟 |
| | | isConnected = true; |
| | | log.info("WebSocket连接已建立"); |
| | | messageHandler.onOpen(); |
| | | } catch (InterruptedException e) { |
| | | Thread.currentThread().interrupt(); |
| | | messageHandler.onError(e); |
| | | } |
| | | }).start(); |
| | | } |
| | | |
| | | public void send(String message) { |
| | | if (isConnected) { |
| | | log.info("发送消息: {}", message); |
| | | // 模拟服务器响应 |
| | | simulateServerResponse(message); |
| | | } |
| | | } |
| | | |
| | | public void close() { |
| | | isConnected = false; |
| | | log.info("WebSocket连接已关闭"); |
| | | messageHandler.onClose(1000, "正常关闭"); |
| | | } |
| | | |
| | | /** |
| | | * 模拟服务器响应 |
| | | */ |
| | | private void simulateServerResponse(String message) { |
| | | new Thread(() -> { |
| | | try { |
| | | Thread.sleep(100); // 模拟网络延迟 |
| | | |
| | | // 解析消息类型并模拟响应 |
| | | if (message.contains("\"type\":\"auth\"")) { |
| | | if (message.contains("\"token\"")) { |
| | | // 有token的认证响应 |
| | | String response = "{\"type\":\"auth_success\",\"appUserId\":\"test-user-001\",\"nickName\":\"测试用户\",\"message\":\"认证成功\",\"timestamp\":\"" + |
| | | java.time.LocalDateTime.now() + "\"}"; |
| | | messageHandler.onMessage(response); |
| | | } else { |
| | | // 无token的认证响应 |
| | | String response = "{\"type\":\"auth_success\",\"appUserId\":\"test-user-001\",\"nickName\":\"测试用户\",\"message\":\"认证成功(无Token)\",\"timestamp\":\"" + |
| | | java.time.LocalDateTime.now() + "\"}"; |
| | | messageHandler.onMessage(response); |
| | | } |
| | | } else if (message.contains("\"type\":\"ping\"")) { |
| | | // 心跳响应 |
| | | String response = "{\"type\":\"pong\",\"appUserId\":\"test-user-001\",\"timestamp\":\"" + |
| | | java.time.LocalDateTime.now() + "\"}"; |
| | | messageHandler.onMessage(response); |
| | | } else if (message.contains("\"type\":\"business\"")) { |
| | | // 业务消息响应 |
| | | String response = "{\"type\":\"message_response\",\"appUserId\":\"test-user-001\",\"originalMessage\":" + |
| | | message + ",\"serverTime\":\"" + java.time.LocalDateTime.now() + "\"}"; |
| | | messageHandler.onMessage(response); |
| | | } |
| | | } catch (Exception e) { |
| | | messageHandler.onError(e); |
| | | } |
| | | }).start(); |
| | | } |
| | | } |
| | | } |
| New file |
| | |
| | | package com.ruoyi.web.controller.webSocket; |
| | | |
| | | import com.alibaba.fastjson.JSON; |
| | | import com.alibaba.fastjson.JSONObject; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.junit.Test; |
| | | import org.springframework.boot.test.context.SpringBootTest; |
| | | |
| | | import java.util.concurrent.CountDownLatch; |
| | | import java.util.concurrent.TimeUnit; |
| | | |
| | | /** |
| | | * WebSocket客户端测试类 |
| | | * 演示如何连接WebSocket服务端并进行认证 |
| | | */ |
| | | @SpringBootTest |
| | | @Slf4j |
| | | public class WebSocketClientTest { |
| | | |
| | | private static final String WEBSOCKET_URL = "ws://127.0.0.1:8888/hello"; |
| | | private static final String TEST_USER_ID = "1964941649976532994"; |
| | | private static final String TEST_TOKEN = "eyJhbGciOiJIUzUxMiJ9.eyJsb2dpbl91c2VyX2FwcGxldF9rZXkiOiJhMzkyMTdmZS1hZmRiLTQ1YTAtYjkwMy01MGViNjc4ZGJlNzAifQ.SaLnyIsAQ9lnSivaXPaO71gReSmx-R_tjrS3auKJqbXKe2SiLr7MZiNetdjmwvE7HSzjxs1yTqVW9mXGcAuWHw"; |
| | | |
| | | /** |
| | | * 测试WebSocket连接和认证 |
| | | */ |
| | | @Test |
| | | public void testWebSocketConnection() throws Exception { |
| | | CountDownLatch latch = new CountDownLatch(1); |
| | | |
| | | // 创建WebSocket客户端 |
| | | SimpleWebSocketClient client = new SimpleWebSocketClient(WEBSOCKET_URL, new WebSocketClient.MessageHandler() { |
| | | @Override |
| | | public void onOpen() { |
| | | log.info("WebSocket连接已建立"); |
| | | // 发送认证消息 |
| | | sendAuthMessage(); |
| | | } |
| | | |
| | | @Override |
| | | public void onMessage(String message) { |
| | | log.info("收到服务器消息: {}", message); |
| | | try { |
| | | JSONObject messageObj = JSON.parseObject(message); |
| | | String messageType = messageObj.getString("type"); |
| | | |
| | | switch (messageType) { |
| | | case "auth_success": |
| | | log.info("认证成功: {}", messageObj.getString("message")); |
| | | // 认证成功后发送业务消息 |
| | | sendBusinessMessage(); |
| | | break; |
| | | case "auth_failed": |
| | | log.error("认证失败: {}", messageObj.getString("message")); |
| | | break; |
| | | case "pong": |
| | | log.info("收到心跳响应"); |
| | | break; |
| | | case "message_response": |
| | | log.info("收到业务消息响应: {}", messageObj.getString("originalMessage")); |
| | | break; |
| | | default: |
| | | log.info("收到其他消息: {}", message); |
| | | } |
| | | } catch (Exception e) { |
| | | log.error("解析消息失败: {}", e.getMessage()); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public void onClose(int code, String reason) { |
| | | log.info("WebSocket连接已关闭: code={}, reason={}", code, reason); |
| | | latch.countDown(); |
| | | } |
| | | |
| | | @Override |
| | | public void onError(Exception e) { |
| | | log.error("WebSocket连接错误: {}", e.getMessage(), e); |
| | | latch.countDown(); |
| | | } |
| | | }); |
| | | |
| | | // 连接WebSocket |
| | | client.connect(); |
| | | |
| | | // 等待连接关闭 |
| | | latch.await(30, TimeUnit.SECONDS); |
| | | |
| | | // 关闭连接 |
| | | client.close(); |
| | | } |
| | | |
| | | /** |
| | | * 测试无Token认证 |
| | | */ |
| | | @Test |
| | | public void testWebSocketConnectionWithoutToken() throws Exception { |
| | | CountDownLatch latch = new CountDownLatch(1); |
| | | |
| | | SimpleWebSocketClient client = new SimpleWebSocketClient(WEBSOCKET_URL, new WebSocketClient.MessageHandler() { |
| | | @Override |
| | | public void onOpen() { |
| | | log.info("WebSocket连接已建立(无Token)"); |
| | | sendAuthMessageWithoutToken(); |
| | | } |
| | | |
| | | @Override |
| | | public void onMessage(String message) { |
| | | log.info("收到服务器消息: {}", message); |
| | | handleMessage(message); |
| | | } |
| | | |
| | | @Override |
| | | public void onClose(int code, String reason) { |
| | | log.info("WebSocket连接已关闭: code={}, reason={}", code, reason); |
| | | latch.countDown(); |
| | | } |
| | | |
| | | @Override |
| | | public void onError(Exception e) { |
| | | log.error("WebSocket连接错误: {}", e.getMessage(), e); |
| | | latch.countDown(); |
| | | } |
| | | }); |
| | | |
| | | client.connect(); |
| | | latch.await(10, TimeUnit.SECONDS); |
| | | client.close(); |
| | | } |
| | | |
| | | /** |
| | | * 测试心跳功能 |
| | | */ |
| | | @Test |
| | | public void testWebSocketHeartbeat() throws Exception { |
| | | CountDownLatch latch = new CountDownLatch(1); |
| | | |
| | | SimpleWebSocketClient client = new SimpleWebSocketClient(WEBSOCKET_URL, new WebSocketClient.MessageHandler() { |
| | | @Override |
| | | public void onOpen() { |
| | | log.info("WebSocket连接已建立"); |
| | | sendAuthMessage(); |
| | | } |
| | | |
| | | @Override |
| | | public void onMessage(String message) { |
| | | log.info("收到服务器消息: {}", message); |
| | | |
| | | try { |
| | | JSONObject messageObj = JSON.parseObject(message); |
| | | String messageType = messageObj.getString("type"); |
| | | |
| | | if ("auth_success".equals(messageType)) { |
| | | log.info("认证成功,开始发送心跳"); |
| | | // 认证成功后开始发送心跳 |
| | | startHeartbeat(); |
| | | } else if ("pong".equals(messageType)) { |
| | | log.info("收到心跳响应"); |
| | | } |
| | | } catch (Exception e) { |
| | | log.error("解析消息失败: {}", e.getMessage()); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public void onClose(int code, String reason) { |
| | | log.info("WebSocket连接已关闭: code={}, reason={}", code, reason); |
| | | latch.countDown(); |
| | | } |
| | | |
| | | @Override |
| | | public void onError(Exception e) { |
| | | log.error("WebSocket连接错误: {}", e.getMessage(), e); |
| | | latch.countDown(); |
| | | } |
| | | }); |
| | | |
| | | client.connect(); |
| | | latch.await(30, TimeUnit.SECONDS); |
| | | client.close(); |
| | | } |
| | | |
| | | /** |
| | | * 发送认证消息 |
| | | */ |
| | | private void sendAuthMessage() { |
| | | JSONObject authMessage = new JSONObject(); |
| | | authMessage.put("type", "auth"); |
| | | authMessage.put("appUserId", TEST_USER_ID); |
| | | authMessage.put("token", TEST_TOKEN); |
| | | |
| | | |
| | | String message = authMessage.toJSONString(); |
| | | log.info("发送认证消息: {}", message); |
| | | // 这里需要实际的WebSocket客户端实现 |
| | | } |
| | | |
| | | public static void main(String[] args) { |
| | | // 发送认证消息: {"deviceType":"test","type":"auth","appUserId":"1964941649976532994","deviceId":"test-device-1761294213830","token":"eyJhbGciOiJIUzUxMiJ9.eyJsb2dpbl91c2VyX2FwcGxldF9rZXkiOiJhMzkyMTdmZS1hZmRiLTQ1YTAtYjkwMy01MGViNjc4ZGJlNzAifQ.SaLnyIsAQ9lnSivaXPaO71gReSmx-R_tjrS3auKJqbXKe2SiLr7MZiNetdjmwvE7HSzjxs1yTqVW9mXGcAuWHw"} |
| | | // 发送认证消息: {"type":"auth","appUserId":"1964941649976532994","token":"eyJhbGciOiJIUzUxMiJ9.eyJsb2dpbl91c2VyX2FwcGxldF9rZXkiOiJhMzkyMTdmZS1hZmRiLTQ1YTAtYjkwMy01MGViNjc4ZGJlNzAifQ.SaLnyIsAQ9lnSivaXPaO71gReSmx-R_tjrS3auKJqbXKe2SiLr7MZiNetdjmwvE7HSzjxs1yTqVW9mXGcAuWHw"} |
| | | |
| | | // 发送心跳消息: {"type":"ping","timestamp":1761294280878} |
| | | |
| | | // 发送业务消息: {"type":"business","content":"这是一条测试业务消息","timestamp":1761294280878} |
| | | |
| | | } |
| | | |
| | | /** |
| | | * 发送无Token认证消息 |
| | | */ |
| | | private void sendAuthMessageWithoutToken() { |
| | | JSONObject authMessage = new JSONObject(); |
| | | authMessage.put("type", "auth"); |
| | | authMessage.put("appUserId", TEST_USER_ID); |
| | | |
| | | String message = authMessage.toJSONString(); |
| | | log.info("发送无Token认证消息: {}", message); |
| | | } |
| | | |
| | | /** |
| | | * 发送业务消息 |
| | | */ |
| | | private void sendBusinessMessage() { |
| | | JSONObject businessMessage = new JSONObject(); |
| | | businessMessage.put("type", "business"); |
| | | businessMessage.put("content", "这是一条测试业务消息"); |
| | | businessMessage.put("timestamp", System.currentTimeMillis()); |
| | | |
| | | String message = businessMessage.toJSONString(); |
| | | log.info("发送业务消息: {}", message); |
| | | } |
| | | |
| | | /** |
| | | * 发送心跳消息 |
| | | */ |
| | | private void sendPingMessage() { |
| | | JSONObject pingMessage = new JSONObject(); |
| | | pingMessage.put("type", "ping"); |
| | | pingMessage.put("timestamp", System.currentTimeMillis()); |
| | | |
| | | String message = pingMessage.toJSONString(); |
| | | log.info("发送心跳消息: {}", message); |
| | | } |
| | | |
| | | /** |
| | | * 开始心跳 |
| | | */ |
| | | private void startHeartbeat() { |
| | | // 每5秒发送一次心跳 |
| | | new Thread(() -> { |
| | | try { |
| | | for (int i = 0; i < 6; i++) { |
| | | Thread.sleep(5000); |
| | | sendPingMessage(); |
| | | } |
| | | } catch (InterruptedException e) { |
| | | Thread.currentThread().interrupt(); |
| | | } |
| | | }).start(); |
| | | } |
| | | |
| | | /** |
| | | * 处理服务器消息 |
| | | */ |
| | | private void handleMessage(String message) { |
| | | try { |
| | | JSONObject messageObj = JSON.parseObject(message); |
| | | String messageType = messageObj.getString("type"); |
| | | |
| | | switch (messageType) { |
| | | case "auth_success": |
| | | log.info("认证成功: {}", messageObj.getString("message")); |
| | | break; |
| | | case "auth_failed": |
| | | log.error("认证失败: {}", messageObj.getString("message")); |
| | | break; |
| | | case "pong": |
| | | log.info("收到心跳响应"); |
| | | break; |
| | | case "message_response": |
| | | log.info("收到业务消息响应"); |
| | | break; |
| | | case "auth_required": |
| | | log.warn("需要重新认证: {}", messageObj.getString("message")); |
| | | break; |
| | | default: |
| | | log.info("收到其他消息: {}", message); |
| | | } |
| | | } catch (Exception e) { |
| | | log.error("解析消息失败: {}", e.getMessage()); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * WebSocket客户端接口 |
| | | */ |
| | | public interface WebSocketClient { |
| | | |
| | | void connect() throws Exception; |
| | | |
| | | void send(String message); |
| | | |
| | | void close(); |
| | | |
| | | interface MessageHandler { |
| | | void onOpen(); |
| | | void onMessage(String message); |
| | | void onClose(int code, String reason); |
| | | void onError(Exception e); |
| | | } |
| | | } |
| | | } |