101captain
2021-09-09 a04fbfc66c3054c2714bad1796a676306fd2393a
延迟报警处理
2个文件已修改
2个文件已添加
152 ■■■■■ 已修改文件
springcloud_k8s_panzhihuazhihuishequ/service_property/src/main/java/com/panzhihua/service_property/config/RabbitmqConfig.java 100 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
springcloud_k8s_panzhihuazhihuishequ/service_property/src/main/java/com/panzhihua/service_property/entity/ComPropertyEquipment.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
springcloud_k8s_panzhihuazhihuishequ/service_property/src/main/java/com/panzhihua/service_property/message/AlarmMessage.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
springcloud_k8s_panzhihuazhihuishequ/service_property/src/main/java/com/panzhihua/service_property/netty/NettyServerHandler.java 46 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
springcloud_k8s_panzhihuazhihuishequ/service_property/src/main/java/com/panzhihua/service_property/config/RabbitmqConfig.java
New file
@@ -0,0 +1,100 @@
package com.panzhihua.service_property.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitmqConfig {
    public static final String DELAYED_QUEUE="delayed.queue";
    public static final String DELAYED_ROUTING_KEY="delayed.key";
    public static final String DELAYED_EXCHANGE="delayed.exchange";
    @Bean
    public Queue delayedQueue(){
        return new Queue(DELAYED_QUEUE,true,false,false,null);
    }
    @Bean
    public Exchange delayedExchange(){
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-delayed-type", ExchangeTypes.DIRECT);
        return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",true,false,arguments);
    }
    @Bean
    public Binding delayedBinding(){
        return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(DELAYED_ROUTING_KEY).noargs();
    }
    public Queue directQueue() {
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        //   return new Queue("TestDirectQueue",true,true,false);
        //一般设置一下队列的持久化就好,其余两个就是默认false
        return new Queue("directQueue",true);
    }
    //Direct交换机 起名:TestDirectExchange
    @Bean
    DirectExchange directExchange() {
        //  return new DirectExchange("TestDirectExchange",true,true);
        return new DirectExchange("directExchange",true,false);
    }
    //绑定  将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
    @Bean
    Binding bindingDirect() {
        return BindingBuilder.bind(directQueue()).to(directExchange()).with("directRouting");
    }
    public Queue pushQueue() {
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        //   return new Queue("TestDirectQueue",true,true,false);
        //一般设置一下队列的持久化就好,其余两个就是默认false
        return new Queue("pushQueue",true);
    }
    //Direct交换机 起名:TestDirectExchange
    @Bean
    DirectExchange pushExchange() {
        //  return new DirectExchange("TestDirectExchange",true,true);
        return new DirectExchange("PUSH_Exchange",true,false);
    }
    //绑定  将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
    @Bean
    Binding bindingPush() {
        return BindingBuilder.bind(directQueue()).to(directExchange()).with("PUSH_ROUTING");
    }
    @Bean
    DirectExchange lonelyDirectExchange() {
        return new DirectExchange("lonelyDirectExchange");
    }
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}
springcloud_k8s_panzhihuazhihuishequ/service_property/src/main/java/com/panzhihua/service_property/entity/ComPropertyEquipment.java
@@ -47,7 +47,7 @@
     * 社区id
     */
    @ApiModelProperty(value = "社区id")
    private Integer communityId;
    private Long communityId;
    /**
     * 经度
springcloud_k8s_panzhihuazhihuishequ/service_property/src/main/java/com/panzhihua/service_property/message/AlarmMessage.java
New file
@@ -0,0 +1,4 @@
package com.panzhihua.service_property.message;
public class AlarmMessage {
}
springcloud_k8s_panzhihuazhihuishequ/service_property/src/main/java/com/panzhihua/service_property/netty/NettyServerHandler.java
@@ -1,17 +1,27 @@
package com.panzhihua.service_property.netty;
import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.panzhihua.common.utlis.DateUtils;
import com.panzhihua.common.utlis.StringUtils;
import com.panzhihua.service_property.dao.ComPropertyAlarmDao;
import com.panzhihua.service_property.dao.ComPropertyAlarmSettingDao;
import com.panzhihua.service_property.dao.ComPropertyEquipmentDao;
import com.panzhihua.service_property.entity.ComPropertyAlarm;
import com.panzhihua.service_property.entity.ComPropertyAlarmSetting;
import com.panzhihua.service_property.entity.ComPropertyEquipment;
import com.panzhihua.service_property.util.MyTools;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.time.Duration;
import java.util.Date;
@Slf4j
@@ -19,6 +29,14 @@
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    @Resource
    private ComPropertyAlarmDao comPropertyAlarmDao;
    @Resource
    private ComPropertyAlarmSettingDao comPropertyAlarmSettingDao;
    @Resource
    private ComPropertyEquipmentDao comPropertyEquipmentDao;
    @Resource
    private StringRedisTemplate stringRedisTemplate;
    @Resource
    private RabbitTemplate rabbitTemplate;
    private static NettyServerHandler nettyServerHandler;
    /**
@@ -67,5 +85,33 @@
    public void init() {
        nettyServerHandler=this;
        nettyServerHandler.comPropertyAlarmDao=this.comPropertyAlarmDao;
        nettyServerHandler.stringRedisTemplate=this.stringRedisTemplate;
        nettyServerHandler.rabbitTemplate=this.rabbitTemplate;
    }
    private void delayAlarm(String serial){
        int duration=0;
        if(StringUtils.isNotEmpty(serial)){
            if (nettyServerHandler.stringRedisTemplate.hasKey(serial)){
                ComPropertyEquipment comPropertyEquipment= JSONObject.parseObject(nettyServerHandler.stringRedisTemplate.boundValueOps(serial).get(),ComPropertyEquipment.class);
                if(nettyServerHandler.stringRedisTemplate.hasKey(comPropertyEquipment.getCommunityId().toString())){
                    duration=Integer.parseInt(nettyServerHandler.stringRedisTemplate.boundValueOps(comPropertyEquipment.getCommunityId().toString()).get());
                }else{
                    ComPropertyAlarmSetting comPropertyAlarmSetting=nettyServerHandler.comPropertyAlarmSettingDao.getByCommunityId(comPropertyEquipment.getCommunityId());
                    duration=comPropertyAlarmSetting.getTriggerTime();
                    nettyServerHandler.stringRedisTemplate.boundValueOps(comPropertyEquipment.getCommunityId().toString()).set(comPropertyAlarmSetting.getTriggerTime().toString());
                }
                nettyServerHandler.stringRedisTemplate.boundValueOps(serial).set(JSONObject.toJSONString(comPropertyEquipment), Duration.ofHours(duration));
            }
            else {
                ComPropertyEquipment comPropertyEquipment=nettyServerHandler.comPropertyEquipmentDao.selectOne(new QueryWrapper<ComPropertyEquipment>().eq("serial_no",serial));
                nettyServerHandler.stringRedisTemplate.boundValueOps(serial).set(JSONObject.toJSONString(comPropertyEquipment));
            }
            int finalDuration = duration;
            nettyServerHandler.rabbitTemplate.convertAndSend("delayed.queue","delayed.key", message -> {
                message.getMessageProperties().setDelay(finalDuration);
                return message;
            });
        }
    }
}