springcloud_k8s_panzhihuazhihuishequ/service_property/src/main/java/com/panzhihua/service_property/config/RabbitmqConfig.java
New file @@ -0,0 +1,100 @@ package com.panzhihua.service_property.config; import org.springframework.amqp.core.*; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class RabbitmqConfig { public static final String DELAYED_QUEUE="delayed.queue"; public static final String DELAYED_ROUTING_KEY="delayed.key"; public static final String DELAYED_EXCHANGE="delayed.exchange"; @Bean public Queue delayedQueue(){ return new Queue(DELAYED_QUEUE,true,false,false,null); } @Bean public Exchange delayedExchange(){ Map<String, Object> arguments = new HashMap<>(); arguments.put("x-delayed-type", ExchangeTypes.DIRECT); return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",true,false,arguments); } @Bean public Binding delayedBinding(){ return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(DELAYED_ROUTING_KEY).noargs(); } public Queue directQueue() { // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效 // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。 // return new Queue("TestDirectQueue",true,true,false); //一般设置一下队列的持久化就好,其余两个就是默认false return new Queue("directQueue",true); } //Direct交换机 起名:TestDirectExchange @Bean DirectExchange directExchange() { // return new DirectExchange("TestDirectExchange",true,true); return new DirectExchange("directExchange",true,false); } //绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting @Bean Binding bindingDirect() { return BindingBuilder.bind(directQueue()).to(directExchange()).with("directRouting"); } public Queue pushQueue() { // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效 // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。 // return new Queue("TestDirectQueue",true,true,false); //一般设置一下队列的持久化就好,其余两个就是默认false return new Queue("pushQueue",true); } //Direct交换机 起名:TestDirectExchange @Bean DirectExchange pushExchange() { // return new DirectExchange("TestDirectExchange",true,true); return new DirectExchange("PUSH_Exchange",true,false); } //绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting @Bean Binding bindingPush() { return BindingBuilder.bind(directQueue()).to(directExchange()).with("PUSH_ROUTING"); } @Bean DirectExchange lonelyDirectExchange() { return new DirectExchange("lonelyDirectExchange"); } @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } } springcloud_k8s_panzhihuazhihuishequ/service_property/src/main/java/com/panzhihua/service_property/entity/ComPropertyEquipment.java
@@ -47,7 +47,7 @@ * 社区id */ @ApiModelProperty(value = "社区id") private Integer communityId; private Long communityId; /** * 经度 springcloud_k8s_panzhihuazhihuishequ/service_property/src/main/java/com/panzhihua/service_property/message/AlarmMessage.java
New file @@ -0,0 +1,4 @@ package com.panzhihua.service_property.message; public class AlarmMessage { } springcloud_k8s_panzhihuazhihuishequ/service_property/src/main/java/com/panzhihua/service_property/netty/NettyServerHandler.java
@@ -1,17 +1,27 @@ package com.panzhihua.service_property.netty; import cn.hutool.core.date.DateUtil; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.panzhihua.common.utlis.DateUtils; import com.panzhihua.common.utlis.StringUtils; import com.panzhihua.service_property.dao.ComPropertyAlarmDao; import com.panzhihua.service_property.dao.ComPropertyAlarmSettingDao; import com.panzhihua.service_property.dao.ComPropertyEquipmentDao; import com.panzhihua.service_property.entity.ComPropertyAlarm; import com.panzhihua.service_property.entity.ComPropertyAlarmSetting; import com.panzhihua.service_property.entity.ComPropertyEquipment; import com.panzhihua.service_property.util.MyTools; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.time.Duration; import java.util.Date; @Slf4j @@ -19,6 +29,14 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { @Resource private ComPropertyAlarmDao comPropertyAlarmDao; @Resource private ComPropertyAlarmSettingDao comPropertyAlarmSettingDao; @Resource private ComPropertyEquipmentDao comPropertyEquipmentDao; @Resource private StringRedisTemplate stringRedisTemplate; @Resource private RabbitTemplate rabbitTemplate; private static NettyServerHandler nettyServerHandler; /** @@ -67,5 +85,33 @@ public void init() { nettyServerHandler=this; nettyServerHandler.comPropertyAlarmDao=this.comPropertyAlarmDao; nettyServerHandler.stringRedisTemplate=this.stringRedisTemplate; nettyServerHandler.rabbitTemplate=this.rabbitTemplate; } private void delayAlarm(String serial){ int duration=0; if(StringUtils.isNotEmpty(serial)){ if (nettyServerHandler.stringRedisTemplate.hasKey(serial)){ ComPropertyEquipment comPropertyEquipment= JSONObject.parseObject(nettyServerHandler.stringRedisTemplate.boundValueOps(serial).get(),ComPropertyEquipment.class); if(nettyServerHandler.stringRedisTemplate.hasKey(comPropertyEquipment.getCommunityId().toString())){ duration=Integer.parseInt(nettyServerHandler.stringRedisTemplate.boundValueOps(comPropertyEquipment.getCommunityId().toString()).get()); }else{ ComPropertyAlarmSetting comPropertyAlarmSetting=nettyServerHandler.comPropertyAlarmSettingDao.getByCommunityId(comPropertyEquipment.getCommunityId()); duration=comPropertyAlarmSetting.getTriggerTime(); nettyServerHandler.stringRedisTemplate.boundValueOps(comPropertyEquipment.getCommunityId().toString()).set(comPropertyAlarmSetting.getTriggerTime().toString()); } nettyServerHandler.stringRedisTemplate.boundValueOps(serial).set(JSONObject.toJSONString(comPropertyEquipment), Duration.ofHours(duration)); } else { ComPropertyEquipment comPropertyEquipment=nettyServerHandler.comPropertyEquipmentDao.selectOne(new QueryWrapper<ComPropertyEquipment>().eq("serial_no",serial)); nettyServerHandler.stringRedisTemplate.boundValueOps(serial).set(JSONObject.toJSONString(comPropertyEquipment)); } int finalDuration = duration; nettyServerHandler.rabbitTemplate.convertAndSend("delayed.queue","delayed.key", message -> { message.getMessageProperties().setDelay(finalDuration); return message; }); } } }