Pu Zhibing
2025-04-10 0f9ecda1919beb1e8fbbca0f669ac86badf70806
优化推流和拉流功能及关闭逻辑
6个文件已修改
262 ■■■■ 已修改文件
ruoyi-modules/ruoyi-system/src/main/java/com/ruoyi/system/controller/CarController.java 72 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-modules/ruoyi-system/src/main/java/com/ruoyi/system/controller/OrderController.java 49 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-modules/ruoyi-system/src/main/java/com/ruoyi/system/service/ICarService.java 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-modules/ruoyi-system/src/main/java/com/ruoyi/system/service/impl/CarServiceImpl.java 22 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-modules/ruoyi-system/src/main/java/com/ruoyi/system/util/JavaCVStreamUtil.java 103 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-modules/ruoyi-system/src/main/java/com/ruoyi/system/util/TaskUtil.java 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-modules/ruoyi-system/src/main/java/com/ruoyi/system/controller/CarController.java
@@ -1,5 +1,6 @@
package com.ruoyi.system.controller;
import cn.hutool.core.io.FileUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.ruoyi.common.core.domain.R;
import com.ruoyi.common.core.utils.StringUtils;
@@ -24,6 +25,7 @@
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
@@ -31,6 +33,8 @@
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
@@ -48,7 +52,14 @@
@RequestMapping("/car")
public class CarController {
    
    private String outputUrl = "F:\\nginx 1.7.11.3 Gryphon\\html\\";
    @Value("${live.output.path}")
    private String outputPath;
    @Value("${live.ip}")
    private String liveIp;
    @Value("${live.port}")
    private Integer livePort;
    @Resource
    private ICarService carService;
    @Resource
@@ -106,12 +117,25 @@
        if (null == car) {
            return R.fail("失败");
        }
        //手动加一次状态数据,避免定时任务结束任务线程
        redisTemplate.opsForValue().set("live:" + id, true, 1, TimeUnit.MINUTES);
        Enterprise enterprise = enterpriseService.getById(car.getEnterpriseId());
        R<UPRealvideoMsgStartupAckVo> msgStartupAckVoR = realVideoMsgClient.startupRealVideo(Integer.valueOf(enterprise.getCode()), car.getVehicleNumber());
        if (200 == msgStartupAckVoR.getCode()) {
            String path = outputPath + "hls\\" + car.getVehicleNumber() + "\\live.m3u8";
            String folderPath = outputPath + "hls\\" + car.getVehicleNumber();
            FileUtil.mkParentDirs(path);
            File file = new File(path);
            if (!file.exists()) {
                try {
                    file.createNewFile();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
            UPRealvideoMsgStartupAckVo data = msgStartupAckVoR.getData();
            RealVideoResp resp = new RealVideoResp();
            String url = "rtmp://192.168.110.85:1935/flv-live/" + car.getVehicleNumber();
            //执行拉流和推流
            ExecutorService executorService = new ThreadPoolExecutor(1, 1,
                    0L, TimeUnit.MILLISECONDS,
@@ -119,10 +143,11 @@
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    JavaCVStreamUtil.push_flv(data.getUrl(), url, id);
                    JavaCVStreamUtil.push_hls(data.getUrl(), path, id, folderPath);
                }
            });
            resp.setUrl(url);
            resp.setServerIp(liveIp);
            resp.setServerPort(livePort);
            return R.ok(resp);
        }
        
@@ -135,13 +160,26 @@
    @ApiImplicitParams({
            @ApiImplicitParam(value = "车辆id", name = "id", required = true)
    })
    public R closeRealVideo(@PathVariable("id") Integer id){
        JavaCVStreamUtil.close(id);
    public R closeRealVideo(@PathVariable("id") Integer id) {
        Car car = carService.getById(id);
        if (null == car) {
            return R.fail("失败");
        }
        String folderPath = outputPath + "hls\\" + car.getVehicleNumber();
        JavaCVStreamUtil.close(id, folderPath);
        return R.ok();
    }
    
    
    @GetMapping("/playDetection/{id}")
    @ApiOperation(value = "播放检测", tags = {"车辆管理"})
    @ApiImplicitParams({
            @ApiImplicitParam(value = "车辆id", name = "id", required = true)
    })
    public R playDetection(@PathVariable("id") Integer id) {
        redisTemplate.opsForValue().set("live:" + id, true, 1, TimeUnit.MINUTES);
        return R.ok();
    }
    
    
    @GetMapping("/getPlaybackVideo")
@@ -151,13 +189,26 @@
        if (null == car) {
            return R.fail("失败");
        }
        //手动加一次状态数据,避免定时任务结束任务线程
        redisTemplate.opsForValue().set("live:" + req.getId(), true, 1, TimeUnit.MINUTES);
        Enterprise enterprise = enterpriseService.getById(car.getEnterpriseId());
        R<UPPlaybackMsgStartupAckVo> startupAckVoR = playbackMsgClient.playbackMsgStartup(Integer.valueOf(enterprise.getCode()), car.getVehicleNumber(),
                req.getStartTime(), req.getEndTime());
        if (200 == startupAckVoR.getCode()) {
            UPPlaybackMsgStartupAckVo data = startupAckVoR.getData();
            RealVideoResp resp = new RealVideoResp();
            String url = "rtmp://192.168.110.85:1935/flv-live/" + car.getVehicleNumber();
            String path = outputPath + "hls\\" + car.getVehicleNumber() + "\\live.m3u8";
            String folderPath = outputPath + "hls\\" + car.getVehicleNumber();
            FileUtil.mkParentDirs(path);
            File file = new File(path);
            if (!file.exists()) {
                try {
                    file.createNewFile();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
            //执行拉流和推流
            ExecutorService executorService = new ThreadPoolExecutor(1, 1,
                    0L, TimeUnit.MILLISECONDS,
@@ -165,10 +216,11 @@
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    JavaCVStreamUtil.push_flv(data.getUrl(), url, req.getId());
                    JavaCVStreamUtil.push_hls(data.getUrl(), path, req.getId(), folderPath);
                }
            });
            resp.setUrl(url);
            resp.setServerIp(liveIp);
            resp.setServerPort(livePort);
            return R.ok(resp);
        }
        return R.fail(startupAckVoR.getMsg());
ruoyi-modules/ruoyi-system/src/main/java/com/ruoyi/system/controller/OrderController.java
@@ -2,6 +2,7 @@
import cn.afterturn.easypoi.excel.ExcelExportUtil;
import cn.afterturn.easypoi.excel.entity.ExportParams;
import cn.hutool.core.io.FileUtil;
import com.ruoyi.common.core.domain.R;
import com.ruoyi.common.core.web.page.PageInfo;
import com.ruoyi.dataInterchange.api.feignClient.PlaybackMsgClient;
@@ -18,10 +19,13 @@
import com.ruoyi.system.service.IDriverService;
import com.ruoyi.system.service.IEnterpriseService;
import com.ruoyi.system.service.IOrderService;
import com.ruoyi.system.util.JavaCVStreamUtil;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.apache.poi.ss.usermodel.Workbook;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
@@ -30,12 +34,17 @@
import javax.annotation.Resource;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletResponse;
import java.io.File;
import java.io.IOException;
import java.net.URLEncoder;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
 * @author zhibing.pu
@@ -62,6 +71,18 @@
    
    @Resource
    private PlaybackMsgClient playbackMsgClient;
    @Resource
    private RedisTemplate redisTemplate;
    @Value("${live.output.path}")
    private String outputPath;
    @Value("${live.ip}")
    private String liveIp;
    @Value("${live.port}")
    private Integer livePort;
    
    
    @GetMapping("/getOrderList")
@@ -155,6 +176,8 @@
            return R.fail("失败");
        }
        Car car = carService.getById(order.getCarId());
        //手动加一次状态数据,避免定时任务结束任务线程
        redisTemplate.opsForValue().set("live:" + order.getCarId(), true, 1, TimeUnit.MINUTES);
        Enterprise enterprise = enterpriseService.getById(car.getEnterpriseId());
        LocalDateTime dateTime = LocalDateTime.parse(order.getOrderTime(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        long startTime = dateTime.toEpochSecond(ZoneOffset.ofHours(8));
@@ -164,8 +187,30 @@
        if (200 == startupAckVoR.getCode()) {
            UPPlaybackMsgStartupAckVo data = startupAckVoR.getData();
            RealVideoResp resp = new RealVideoResp();
            resp.setServerIp(data.getServerIP());
            resp.setServerPort(data.getServerPort());
            String path = outputPath + "hls\\" + car.getVehicleNumber() + "\\live.m3u8";
            String folderPath = outputPath + "hls\\" + car.getVehicleNumber();
            FileUtil.mkParentDirs(path);
            File file = new File(path);
            if (!file.exists()) {
                try {
                    file.createNewFile();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
            //执行拉流和推流
            ExecutorService executorService = new ThreadPoolExecutor(1, 1,
                    0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<Runnable>());
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    JavaCVStreamUtil.push_hls(data.getUrl(), path, id, folderPath);
                }
            });
            resp.setServerIp(liveIp);
            resp.setServerPort(livePort);
            return R.ok(resp);
        }
        return R.fail(startupAckVoR.getMsg());
ruoyi-modules/ruoyi-system/src/main/java/com/ruoyi/system/service/ICarService.java
@@ -31,4 +31,11 @@
     * 定时任务修改车辆状态
     */
    void taskUpdateCarStatus();
    /**
     * 检测视频播放,清除没有播放的视频流
     * @return
     */
    void taskPlayDetection();
}
ruoyi-modules/ruoyi-system/src/main/java/com/ruoyi/system/service/impl/CarServiceImpl.java
@@ -19,6 +19,8 @@
import com.ruoyi.system.service.ICarTypeService;
import com.ruoyi.system.service.IDriverService;
import com.ruoyi.system.service.IEnterpriseService;
import com.ruoyi.system.util.JavaCVStreamUtil;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
@@ -36,6 +38,9 @@
 */
@Service
public class CarServiceImpl extends ServiceImpl<CarMapper, Car> implements ICarService {
    @Value("${live.output.path}")
    private String outputPath;
    
    @Resource
    private UPExgMsgRegisterClient upExgMsgRegisterClient;
@@ -151,4 +156,21 @@
            }
        }
    }
    /**
     * 检测视频播放,清除没有播放的视频流
     */
    @Override
    public void taskPlayDetection() {
        List<Car> list = this.list();
        for (Car car : list) {
            if(!redisTemplate.hasKey("live:" + car.getId())){
                String folderPath = outputPath + "hls\\" + car.getVehicleNumber();
                JavaCVStreamUtil.close(car.getId(), folderPath);
            }
        }
    }
}
ruoyi-modules/ruoyi-system/src/main/java/com/ruoyi/system/util/JavaCVStreamUtil.java
@@ -1,5 +1,7 @@
package com.ruoyi.system.util;
import cn.hutool.core.io.FileUtil;
import com.ruoyi.common.core.utils.StringUtils;
import org.bytedeco.ffmpeg.global.avcodec;
import org.bytedeco.ffmpeg.global.avutil;
import org.bytedeco.javacv.*;
@@ -17,6 +19,8 @@
    
    private static Map<Integer, FFmpegFrameGrabber> grabberMap = new ConcurrentHashMap<>();
    
    private static Map<Integer, OpenCVFrameGrabber> grabberMap1 = new ConcurrentHashMap<>();
    private static Map<Integer, FFmpegFrameRecorder> frameRecorderMap = new ConcurrentHashMap<>();
    
    private static Map<Integer, Boolean> statusMap = new ConcurrentHashMap<>();
@@ -27,14 +31,18 @@
     * 视频拉流和推流
     */
    public static void push_flv(String inputUrl, String outputUrl, Integer deviceNumber){
//        OpenCVFrameGrabber grabber = null;
        FFmpegFrameGrabber grabber = null;
        OpenCVFrameGrabber grabber = null;
//        FFmpegFrameGrabber grabber = null;
        FFmpegFrameRecorder recorder = null;
        try {
            //关闭上一个没有正确关闭的流
            FFmpegFrameGrabber fFmpegFrameGrabber = grabberMap.get(deviceNumber);
            if(null != fFmpegFrameGrabber){
                fFmpegFrameGrabber.close();
            }
            OpenCVFrameGrabber openCVFrameGrabber = grabberMap1.get(deviceNumber);
            if(null != openCVFrameGrabber){
                openCVFrameGrabber.close();;
            }
            FFmpegFrameRecorder fFmpegFrameRecorder = frameRecorderMap.get(deviceNumber);
            if(null != fFmpegFrameRecorder){
@@ -46,15 +54,16 @@
            FFmpegLogCallback.set();
            
            //视频抓帧
            grabber = new FFmpegFrameGrabber(inputUrl);
            grabber.setOption("rtsp_transport", "tcp");
            // 正确设置超时时间
            grabber.setOption("timeout", "120000");
            grabber.start();
            grabberMap.put(deviceNumber, grabber);
//            grabber = new OpenCVFrameGrabber(0);
//            grabber = new FFmpegFrameGrabber(inputUrl);
//            grabber.setOption("rtsp_transport", "tcp");
//            // 正确设置超时时间
//            grabber.setOption("timeout", "120000");
//            grabber.start();
//            grabberMap.put(deviceNumber, grabber);
            grabber = new OpenCVFrameGrabber(0);
            grabber.start();
            grabberMap1.put(deviceNumber, grabber);
            
            //录制视频,推送到流媒体服务器(nginx)
            recorder = new FFmpegFrameRecorder(outputUrl, grabber.getImageWidth(), grabber.getImageHeight());
@@ -69,33 +78,31 @@
            recorder.setAudioOption("crf", "23");
            
            //设置音频编码为AAC
//            if (grabber.getAudioChannels() > 0) {
            if (grabber.getAudioChannels() > 0) {
                recorder.setAudioChannels(grabber.getAudioChannels());
                recorder.setAudioBitrate(grabber.getAudioBitrate());
                recorder.setAudioCodec(avcodec.AV_CODEC_ID_AAC);
//            }
            }
            recorder.setVideoCodec(avcodec.AV_CODEC_ID_H264);
            //将解码后的帧记录到输出文件中
            //recorder.start通常用于处理已经解码成图像的视频数据
            recorder.start();
            frameRecorderMap.put(deviceNumber, recorder);
            //设置状态为开始
            statusMap.put(deviceNumber, true);
            Frame frame;
            while ((frame = grabber.grab()) != null) {
                recorder.record(frame);
                //判断状态为停止,则结束此线程任务
                if(!statusMap.get(deviceNumber)){
                    break;
                }
            }
        } catch (FrameGrabber.Exception | FrameRecorder.Exception e) {
            e.printStackTrace();
        }finally {
            try {
                statusMap.put(deviceNumber, false);
                if(null != grabber){
                    grabber.stop();
                }
                if(null != recorder){
                    recorder.stop();
                }
            }catch (Exception e){
                e.printStackTrace();
            }
            close(deviceNumber, null);
        }
    }
    
@@ -103,7 +110,7 @@
    /**
     * 视频拉流和推流
     */
    public static void push_hls(String inputUrl, String outputUrl, Integer deviceNumber){
    public static void push_hls(String inputUrl, String outputUrl, Integer deviceNumber, String folderPath){
        OpenCVFrameGrabber grabber = null;
//        FFmpegFrameGrabber grabber = null;
        FFmpegFrameRecorder recorder = null;
@@ -113,6 +120,10 @@
            FFmpegFrameGrabber fFmpegFrameGrabber = grabberMap.get(deviceNumber);
            if(null != fFmpegFrameGrabber){
                fFmpegFrameGrabber.close();
            }
            OpenCVFrameGrabber openCVFrameGrabber = grabberMap1.get(deviceNumber);
            if(null != openCVFrameGrabber){
                openCVFrameGrabber.close();;
            }
            FFmpegFrameRecorder fFmpegFrameRecorder = frameRecorderMap.get(deviceNumber);
            if(null != fFmpegFrameRecorder){
@@ -133,6 +144,7 @@
            
            grabber = new OpenCVFrameGrabber(0);
            grabber.start();
            grabberMap1.put(deviceNumber, grabber);
            
            //录制视频,推送到流媒体服务器(nginx)
            recorder = new FFmpegFrameRecorder(outputUrl, grabber.getImageWidth(), grabber.getImageHeight());
@@ -189,17 +201,7 @@
        } catch (FrameGrabber.Exception | FrameRecorder.Exception e) {
            e.printStackTrace();
        }finally {
            try {
                statusMap.put(deviceNumber, false);
                if(null != grabber){
                    grabber.stop();
                }
                if(null != recorder){
                    recorder.stop();
                }
            }catch (Exception e){
                e.printStackTrace();
            }
            close(deviceNumber, folderPath);
        }
    }
    
@@ -208,35 +210,38 @@
     * 关闭推流和拉流进程
     * @param deviceNumber
     */
    public static void close(Integer deviceNumber){
    public static void close(Integer deviceNumber, String folderPath){
        //设置状态为停止
        Boolean status = statusMap.get(deviceNumber);
        if(null == status || !status){
            return;
        }
        statusMap.put(deviceNumber, false);
        FFmpegFrameGrabber fFmpegFrameGrabber = null;
        OpenCVFrameGrabber openCVFrameGrabber = null;
        FFmpegFrameRecorder fFmpegFrameRecorder = null;
        try {
            fFmpegFrameGrabber = grabberMap.get(deviceNumber);
            if(null != fFmpegFrameGrabber){
                fFmpegFrameGrabber.flush();
                fFmpegFrameGrabber.close();
            }
            openCVFrameGrabber = grabberMap1.get(deviceNumber);
            if(null != openCVFrameGrabber){
                openCVFrameGrabber.flush();
                openCVFrameGrabber.close();
            }
            fFmpegFrameRecorder = frameRecorderMap.get(deviceNumber);
            if(null != fFmpegFrameRecorder){
                fFmpegFrameRecorder.flush();
                fFmpegFrameRecorder.close();
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            try {
                fFmpegFrameGrabber = grabberMap.get(deviceNumber);
                if(null != fFmpegFrameGrabber){
                    fFmpegFrameGrabber.close();
                }
                fFmpegFrameRecorder = frameRecorderMap.get(deviceNumber);
                if(null != fFmpegFrameRecorder){
                    fFmpegFrameRecorder.close();
                }
            }catch (Exception e){
                e.printStackTrace();
            }
        }
        //开始清除视频文件
        if(StringUtils.isNotEmpty(folderPath)){
            FileUtil.del(folderPath);
        }
    }
}
ruoyi-modules/ruoyi-system/src/main/java/com/ruoyi/system/util/TaskUtil.java
@@ -82,4 +82,13 @@
        carService.taskUpdateCarStatus();
    }
    
    /**
     * 视频播放检测
     */
    @Scheduled(fixedRate = 1000 * 300)
    public void taskPlayDetection() {
        carService.taskPlayDetection();
    }
}