From cbca933c49053ccb6471fec7f4adae05936fe2ea Mon Sep 17 00:00:00 2001 From: puzhibing <393733352@qq.com> Date: 星期五, 02 六月 2023 18:46:16 +0800 Subject: [PATCH] 提交代码 --- zuul/src/main/java/com/sinata/zuul/controller/NettyController.java | 54 + zuul/src/main/java/com/sinata/zuul/util/applets/CacheType.java | 86 + zuul/src/main/java/com/sinata/zuul/util/applets/NettyWebSocketController.java | 202 +++ zuul/src/main/java/com/sinata/zuul/util/SinataUtil.java | 405 +++++++ zuul/src/main/java/com/sinata/zuul/util/applets/Global.java | 9 zuul/src/main/resources/redis.properties | 28 zuul/src/main/java/com/sinata/zuul/util/echo/Method.java | 26 zuul/src/main/java/com/sinata/zuul/util/echo/NettyServer.java | 95 + zuul/src/main/java/com/sinata/zuul/util/ResultUtil.java | 188 +++ zuul/pom.xml | 105 + zuul/src/main/java/com/sinata/zuul/util/applets/ClientPingMessage.java | 59 + zuul/src/main/java/com/sinata/zuul/util/echo/NettyServerController.java | 327 ++++++ zuul/src/main/java/com/sinata/zuul/ZuulApplication.java | 120 ++ zuul/src/main/java/com/sinata/zuul/util/applets/NettyServer0.java | 77 + zuul/src/main/java/com/sinata/zuul/controller/RedisController.java | 83 + zuul/src/main/java/com/sinata/zuul/util/RedisUtil.java | 124 ++ zuul/src/main/resources/static/tXQaRbVjpJ.txt | 1 zuul/src/main/java/com/sinata/zuul/util/GDMapGeocodingUtil.java | 102 + zuul/src/main/java/com/sinata/zuul/util/NettyStartListener.java | 36 zuul/src/main/java/com/sinata/zuul/util/applets/ChildChannelHandler.java | 36 zuul/src/main/java/com/sinata/zuul/util/echo/ServerInit.java | 25 zuul/src/main/java/com/sinata/zuul/util/echo/NettyMsg.java | 165 +++ zuul/src/main/java/com/sinata/zuul/util/echo/DiscardServerHandler.java | 153 ++ zuul/src/main/java/com/sinata/zuul/util/CrossOriginFilter.java | 54 + zuul/src/main/java/com/sinata/zuul/util/StringUtil.java | 121 ++ zuul/src/main/java/com/sinata/zuul/util/echo/NettyChannelMap.java | 146 ++ zuul/src/main/java/com/sinata/zuul/util/applets/WebSocketHandler.java | 180 +++ zuul/src/main/java/com/sinata/zuul/util/SpringUtil.java | 34 zuul/src/main/java/com/sinata/zuul/util/applets/createSSLContext.java | 31 zuul/src/main/java/com/sinata/zuul/config/RedisConfig.java | 54 + zuul/src/main/resources/application.yml | 35 31 files changed, 3,161 insertions(+), 0 deletions(-) diff --git a/zuul/pom.xml b/zuul/pom.xml new file mode 100644 index 0000000..c2048fe --- /dev/null +++ b/zuul/pom.xml @@ -0,0 +1,105 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-parent</artifactId> + <version>2.0.4.RELEASE</version> + <relativePath/> <!-- lookup parent from repository --> + </parent> + <groupId>com.sinata</groupId> + <artifactId>zuul</artifactId> + <version>0.0.1-SNAPSHOT</version> + <name>zuul</name> + <description>Zuul project for Spring Boot</description> + + <packaging>jar</packaging> + + <properties> + <java.version>1.8</java.version> + <spring-cloud.version>Finchley.SR1</spring-cloud.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.springframework.cloud</groupId> + <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> + </dependency> + <dependency> + <groupId>org.springframework.cloud</groupId> + <artifactId>spring-cloud-starter-netflix-zuul</artifactId> + </dependency> + + <!--引入swagger--> + <dependency> + <groupId>com.spring4all</groupId> + <artifactId>swagger-spring-boot-starter</artifactId> + <version>1.7.0.RELEASE</version> + </dependency> + <!-- jedis --> + <dependency> + <groupId>redis.clients</groupId> + <artifactId>jedis</artifactId> + <version>2.9.0</version> + </dependency> + + <!--<dependency>--> + <!--<groupId>org.springframework.boot</groupId>--> + <!--<artifactId>spring-boot-starter-tomcat</artifactId>--> + <!--<scope>provided</scope>--> + <!--</dependency>--> + <!--<dependency>--> + <!--<groupId>javax.servlet</groupId>--> + <!--<artifactId>javax.servlet-api</artifactId>--> + <!--<version>3.1.0</version>--> + <!--<scope>provided</scope>--> + <!--</dependency>--> + + + <!-- netty --> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-all</artifactId> + <version>4.1.27.Final</version> + </dependency> + <dependency> + <groupId>com.alibaba</groupId> + <artifactId>fastjson</artifactId> + <version>1.2.47</version> + </dependency> + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-test</artifactId> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.junit.vintage</groupId> + <artifactId>junit-vintage-engine</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + + <dependencyManagement> + <dependencies> + <dependency> + <groupId>org.springframework.cloud</groupId> + <artifactId>spring-cloud-dependencies</artifactId> + <version>${spring-cloud.version}</version> + <type>pom</type> + <scope>import</scope> + </dependency> + </dependencies> + </dependencyManagement> + + <build> + <plugins> + <plugin> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-maven-plugin</artifactId> + </plugin> + </plugins> + </build> + +</project> diff --git a/zuul/src/main/java/com/sinata/zuul/ZuulApplication.java b/zuul/src/main/java/com/sinata/zuul/ZuulApplication.java new file mode 100644 index 0000000..e5fa4b5 --- /dev/null +++ b/zuul/src/main/java/com/sinata/zuul/ZuulApplication.java @@ -0,0 +1,120 @@ +package com.sinata.zuul; + +import com.sinata.zuul.util.applets.NettyServer0; +import com.sinata.zuul.util.echo.NettyServer; +import com.spring4all.swagger.EnableSwagger2Doc; +import org.apache.http.client.HttpClient; +import org.apache.http.config.SocketConfig; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.boot.web.servlet.support.SpringBootServletInitializer; +import org.springframework.cloud.client.discovery.EnableDiscoveryClient; +import org.springframework.cloud.client.loadbalancer.LoadBalanced; +import org.springframework.cloud.netflix.zuul.EnableZuulProxy; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Primary; +import org.springframework.http.client.HttpComponentsClientHttpRequestFactory; +import org.springframework.http.converter.StringHttpMessageConverter; +import org.springframework.stereotype.Component; +import org.springframework.web.client.RestTemplate; +import org.springframework.web.cors.CorsConfiguration; +import org.springframework.web.cors.UrlBasedCorsConfigurationSource; +import org.springframework.web.filter.CorsFilter; +import springfox.documentation.swagger.web.SwaggerResource; +import springfox.documentation.swagger.web.SwaggerResourcesProvider; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + + +@EnableSwagger2Doc +@EnableZuulProxy//开启网关服务 +@EnableDiscoveryClient//开启eureka客户端的消费者 +@SpringBootApplication +public class ZuulApplication extends SpringBootServletInitializer { + + public static void main(String[] args) { + SpringApplication.run(ZuulApplication.class, args); +// NettyServer nettyServer = new NettyServer(); +// nettyServer.bind(); +// NettyServer0 nettyServer0 = new NettyServer0(); +// nettyServer0.bind(); + } + + + @Bean //SpringCloud内部服务质检使用服务名调用 + @LoadBalanced + public RestTemplate internalRestTemplate() { + PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager(); + connectionManager.setDefaultSocketConfig(SocketConfig.custom().setTcpNoDelay(true).build()); + connectionManager.setDefaultMaxPerRoute(100);//最大并发连接 + connectionManager.setMaxTotal(200); // 总的最大连接数 + HttpClient httpClient = HttpClientBuilder.create().setConnectionManager(connectionManager).build(); + HttpComponentsClientHttpRequestFactory httpRequestFactory = new HttpComponentsClientHttpRequestFactory(httpClient); + httpRequestFactory.setConnectionRequestTimeout(30 * 1000); + httpRequestFactory.setConnectTimeout(30 * 3000); + httpRequestFactory.setReadTimeout(30 * 3000); + RestTemplate restTemplate = new RestTemplate(httpRequestFactory); + restTemplate.getMessageConverters().set(1, new StringHttpMessageConverter(StandardCharsets.UTF_8)); + return restTemplate; + } + + + +// /** +// * 向Spring容器中定义RestTemplate对象 +// * @return +// */ +// @Bean //必须new 一个RestTemplate并放入spring容器当中,否则启动时报错 +// public RestTemplate restTemplate() { +// PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager(); +// connectionManager.setDefaultSocketConfig(SocketConfig.custom().setTcpNoDelay(true).build()); +// connectionManager.setDefaultMaxPerRoute(100);//最大并发连接 +// connectionManager.setMaxTotal(200); // 总的最大连接数 +// HttpClient httpClient = HttpClientBuilder.create().setConnectionManager(connectionManager).build(); +// HttpComponentsClientHttpRequestFactory httpRequestFactory = new HttpComponentsClientHttpRequestFactory(httpClient); +// httpRequestFactory.setConnectionRequestTimeout(30 * 1000); +// httpRequestFactory.setConnectTimeout(30 * 3000); +// httpRequestFactory.setReadTimeout(30 * 3000); +// RestTemplate restTemplate = new RestTemplate(httpRequestFactory); +// restTemplate.getMessageConverters().set(1, new StringHttpMessageConverter(StandardCharsets.UTF_8)); +// return restTemplate; +// } + + + + /** + * 配置Swagger + */ + @Component + @Primary + class DocumentationConfig implements SwaggerResourcesProvider { + @Override + public List<SwaggerResource> get() { + List resource=new ArrayList<>(); + //name可以随便写,location前缀要与zuul配置的path一致。zuul开了token验证,要加上token,否则不用加?token=1 + resource.add(swaggerResource("user","/user-server/v2/api-docs","1.0")); + resource.add(swaggerResource("driver","/driver-server/v2/api-docs","1.0")); + return resource; + } + + //name可以随便写,location前缀要与zuul配置的path一致 + private SwaggerResource swaggerResource(String name, String location, String version){ + SwaggerResource swaggerResource=new SwaggerResource(); + swaggerResource.setName(name); + swaggerResource.setLocation(location); + swaggerResource.setSwaggerVersion(version); + return swaggerResource; + } + } + + + @Override + protected SpringApplicationBuilder configure(SpringApplicationBuilder builder) { + return builder.sources(ZuulApplication.class); + } +} diff --git a/zuul/src/main/java/com/sinata/zuul/config/RedisConfig.java b/zuul/src/main/java/com/sinata/zuul/config/RedisConfig.java new file mode 100644 index 0000000..895ffce --- /dev/null +++ b/zuul/src/main/java/com/sinata/zuul/config/RedisConfig.java @@ -0,0 +1,54 @@ +package com.sinata.zuul.config; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.PropertySource; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisPoolConfig; + +@Configuration +@PropertySource("classpath:redis.properties") +public class RedisConfig { + @Value("${spring.redis.host}") + private String host; + + @Value("${spring.redis.port}") + private int port; + + @Value("${spring.redis.timeout}") + private int timeout; + + @Value("${spring.redis.jedis.pool.max-idle}") + private int maxIdle; + + @Value("${spring.redis.jedis.pool.min-idle}") + private int minIdle; + + @Value("${spring.redis.jedis.pool.max-wait}") + private long maxWaitMillis; + + @Value("${spring.redis.jedis.pool.max-active}") + private int maxTotal; + + @Value("${spring.redis.password}") + private String password; + + @Value("${spring.redis.block-when-exhausted}") + private boolean blockWhenExhausted; + + @Bean + public JedisPool redisPoolFactory() throws Exception{ + JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); + jedisPoolConfig.setMaxIdle(maxIdle); + jedisPoolConfig.setMinIdle(minIdle); + jedisPoolConfig.setMaxTotal(maxTotal); + jedisPoolConfig.setMaxWaitMillis(maxWaitMillis); + // 连接耗尽时是否阻塞, false报异常,ture阻塞直到超时, 默认true + jedisPoolConfig.setBlockWhenExhausted(blockWhenExhausted); + // 是否启用pool的jmx管理功能, 默认true + jedisPoolConfig.setJmxEnabled(true); + JedisPool jedisPool = new JedisPool(jedisPoolConfig, host, port, timeout, password); + return jedisPool; + } +} diff --git a/zuul/src/main/java/com/sinata/zuul/controller/NettyController.java b/zuul/src/main/java/com/sinata/zuul/controller/NettyController.java new file mode 100644 index 0000000..014a079 --- /dev/null +++ b/zuul/src/main/java/com/sinata/zuul/controller/NettyController.java @@ -0,0 +1,54 @@ +package com.sinata.zuul.controller; + + +import com.alibaba.fastjson.JSON; +import com.sinata.zuul.util.ResultUtil; +import com.sinata.zuul.util.applets.NettyWebSocketController; +import com.sinata.zuul.util.echo.NettyChannelMap; +import com.sinata.zuul.util.echo.NettyServerController; +import io.netty.channel.ChannelHandlerContext; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.ResponseBody; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@RequestMapping("/netty") +public class NettyController { + + + /** + * 向客户端推送消息 + * @param id + * @param msg + */ + @ResponseBody + @PostMapping("/sendMsgToClient") + public String sendMsgToClient(String id, Integer type, String msg){ + if(type == 1){//用户端 + ChannelHandlerContext channel = NettyChannelMap.getData("Applets" + id);//小程序 + if(null != channel){ + NettyWebSocketController.sendMsgToClient(channel, msg); + return JSON.toJSONString(ResultUtil.success()); + } + channel = NettyChannelMap.getData("USER" + id); + if(null != channel){ + NettyServerController.sendMsgToClient(channel, msg); + return JSON.toJSONString(ResultUtil.success()); + } + return JSON.toJSONString(ResultUtil.error("推送失败-----用户id=" + id)); + + } + + if(type == 2){//司机端 + ChannelHandlerContext channel = NettyChannelMap.getData("DRIVER" + id); + if(null != channel){ + NettyServerController.sendMsgToClient(channel, msg); + return JSON.toJSONString(ResultUtil.success()); + } + return JSON.toJSONString(ResultUtil.error("推送失败-----司机id=" + id)); + } + + return JSON.toJSONString(ResultUtil.error("推送失败")); + } +} diff --git a/zuul/src/main/java/com/sinata/zuul/controller/RedisController.java b/zuul/src/main/java/com/sinata/zuul/controller/RedisController.java new file mode 100644 index 0000000..4866ef5 --- /dev/null +++ b/zuul/src/main/java/com/sinata/zuul/controller/RedisController.java @@ -0,0 +1,83 @@ +package com.sinata.zuul.controller; + +import com.alibaba.fastjson.JSON; +import com.sinata.zuul.util.RedisUtil; +import com.sinata.zuul.util.ResultUtil; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.ResponseBody; +import org.springframework.web.bind.annotation.RestController; + +import java.util.Arrays; +import java.util.List; + +@RestController +@RequestMapping("/redis") +public class RedisController { + + @Autowired + private RedisUtil redisUtil; + + + /** + * 从redis中获取数据 + * @param key + * @return + */ + @ResponseBody + @PostMapping("/getValue") + public String getValue(String key){ + String value = redisUtil.getValue(key); + return JSON.toJSONString(ResultUtil.success(value)); + } + + + /** + * 批量获取 + * @param keys + * @return + */ + @ResponseBody + @PostMapping("/getValues") + public String getValues(String keys){ + String[] split = keys.split(","); + List<String> list = Arrays.asList(split); + List<Object> values = redisUtil.getValues(list); + return JSON.toJSONString(ResultUtil.success(values)); + } + + + /** + * 存值 + * @param key + * @param value + * @param time + */ + @ResponseBody + @PostMapping("/setValue") + public String setValue(String key, String value, int time){ + redisUtil.setStrValue(key, value, time); + return JSON.toJSONString(ResultUtil.success()); + } + + + @ResponseBody + @PostMapping("/setValue_") + public String setValue_(String key, String value){ + redisUtil.setStrValue(key, value); + return JSON.toJSONString(ResultUtil.success()); + } + + /** + * 删除redis数据 + * @param key + * @return + */ + @ResponseBody + @PostMapping("/remove") + public String remove(String key){ + redisUtil.remove(key); + return JSON.toJSONString(ResultUtil.success()); + } +} diff --git a/zuul/src/main/java/com/sinata/zuul/util/CrossOriginFilter.java b/zuul/src/main/java/com/sinata/zuul/util/CrossOriginFilter.java new file mode 100644 index 0000000..2e0e635 --- /dev/null +++ b/zuul/src/main/java/com/sinata/zuul/util/CrossOriginFilter.java @@ -0,0 +1,54 @@ +package com.sinata.zuul.util; + + +import org.springframework.boot.web.servlet.ServletComponentScan; +import org.springframework.core.annotation.Order; +import org.springframework.stereotype.Component; + +import javax.servlet.*; +import javax.servlet.annotation.WebFilter; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; + +/** + * 统一配置跨域处理 + */ +@Order(-100) +@Component +@ServletComponentScan +@WebFilter(urlPatterns = "/*",filterName = "crossOriginFilter") +public class CrossOriginFilter implements Filter { + @Override + public void init(FilterConfig filterConfig) throws ServletException { + + } + + @Override + public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException { + HttpServletResponse response = (HttpServletResponse) servletResponse; + HttpServletRequest request = (HttpServletRequest) servletRequest; + // 允许哪些Origin发起跨域请求 + // response.setHeader( "Access-Control-Allow-Origin", config.getInitParameter( "AccessControlAllowOrigin" ) ); + response.setHeader( "Access-Control-Allow-Origin", "*" ); + // 允许请求的方法 + response.setHeader( "Access-Control-Allow-Methods", "POST,GET,OPTIONS,DELETE,PUT" ); + //多少秒内,不需要再发送预检验请求,可以缓存该结果 + response.setHeader( "Access-Control-Max-Age", "3600" ); + // 表明它允许跨域请求包含xxx头 + response.setHeader( "Access-Control-Allow-Headers", "x-auth-token,Origin,Access-Token,X-Requested-With,Content-Type, Accept, Authorization" ); + //是否允许浏览器携带用户身份信息(cookie) + response.setHeader( "Access-Control-Allow-Credentials", "true" ); + //prefight请求 + if (request.getMethod().equals( "OPTIONS" )) { + response.setStatus( 204 ); + return; + } + filterChain.doFilter( servletRequest, response ); + } + + @Override + public void destroy() { + + } +} diff --git a/zuul/src/main/java/com/sinata/zuul/util/GDMapGeocodingUtil.java b/zuul/src/main/java/com/sinata/zuul/util/GDMapGeocodingUtil.java new file mode 100644 index 0000000..99e0a5c --- /dev/null +++ b/zuul/src/main/java/com/sinata/zuul/util/GDMapGeocodingUtil.java @@ -0,0 +1,102 @@ +package com.sinata.zuul.util; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.springframework.web.client.RestTemplate; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * 高德地图的地理编码工具类 + */ +@Component +public class GDMapGeocodingUtil { + + private String key = "79293423abef825bf17db8568fa93d98"; + + @Autowired + private RestTemplate restTemplate; + + + /** + * 将行政区域名称转化为坐标 + * @param province + * @param city + * @param county + * @param address + * @return + */ + public Map<String, Object> geocoding(String province, String city, String county, String address){ + Map<String, Object> map = new HashMap<>(); + if(StringUtil.isEmpty(province)){ + map.put("status", -1); + map.put("data", "省不能为空"); + return map; + } + if((StringUtil.isEmpty(city) && StringUtil.isNotEmpty(county)) || (StringUtil.isEmpty(city) && StringUtil.isNotEmpty(address))){ + map.put("status", -1); + map.put("data", "市不能为空"); + return map; + } + if((StringUtil.isEmpty(county) && StringUtil.isNotEmpty(address))){ + map.put("status", -1); + map.put("data", "县/区不能为空"); + return map; + } + + String url = "https://restapi.amap.com/v3/geocode/geo?key=" + key + "&output=JSON"; + url += "&address=" + province + (StringUtil.isNotEmpty(city) ? city : "") + (StringUtil.isNotEmpty(county) ? county : "") + (StringUtil.isNotEmpty(address) ? address : ""); + String forObject = restTemplate.getForObject(url, String.class); + JSONObject jsonObject = JSON.parseObject(forObject); + String status = jsonObject.getString("status"); + List<String> list = new ArrayList<>(); + if(status.equals("1")){ + JSONArray geocodes = jsonObject.getJSONArray("geocodes"); + for(int i = 0; i < geocodes.size(); i++){ + String location = geocodes.getJSONObject(i).getString("location"); + list.add(location); + } + } + map.put("status", 0); + map.put("data", list); + return map; + } + + + /** + * 根据经纬度获取行政区域信息 + * @param lon + * @param lan + * @return + * @throws Exception + */ + public Map<String, String> geocode(String lon, String lan) throws Exception{ + String url = "https://restapi.amap.com/v3/geocode/regeo?key=" + key + "&location=" + lon + "," + lan; + String forObject = restTemplate.getForObject(url, String.class); + JSONObject jsonObject = JSON.parseObject(forObject); + Map<String, String> map = new HashMap<>(); + if(jsonObject.getString("status").equals("1")){ + JSONObject regeocode = jsonObject.getJSONObject("regeocode"); + JSONObject addressComponent = regeocode.getJSONObject("addressComponent"); + String address = regeocode.getString("formatted_address"); + map.put("address", address); + String code = addressComponent.getString("adcode"); + String province = addressComponent.getString("province"); + String city = addressComponent.getString("city"); + String district = addressComponent.getString("district"); + map.put("province", province); + map.put("provinceCode", code.substring(0, 2) + "0000"); + map.put("city", city); + map.put("cityCode", code.substring(0, 4) + "00"); + map.put("district", district); + map.put("districtCode", code); + } + return map; + } +} diff --git a/zuul/src/main/java/com/sinata/zuul/util/NettyStartListener.java b/zuul/src/main/java/com/sinata/zuul/util/NettyStartListener.java new file mode 100644 index 0000000..2f19e93 --- /dev/null +++ b/zuul/src/main/java/com/sinata/zuul/util/NettyStartListener.java @@ -0,0 +1,36 @@ +package com.sinata.zuul.util; + +import com.sinata.zuul.util.applets.NettyServer0; +import com.sinata.zuul.util.echo.NettyServer; + +import javax.servlet.ServletContextEvent; +import javax.servlet.ServletContextListener; +import javax.servlet.annotation.WebListener; + +/** + * 启动netty监听器 + */ +@WebListener +public class NettyStartListener implements ServletContextListener { + + + @Override + public void contextInitialized(ServletContextEvent sce) { + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + NettyServer nettyServer = new NettyServer(); + nettyServer.bind(); + + NettyServer0 nettyServer0 = new NettyServer0(); + nettyServer0.bind(); + } + }); + thread.start(); + } + + @Override + public void contextDestroyed(ServletContextEvent sce) { + + } +} diff --git a/zuul/src/main/java/com/sinata/zuul/util/RedisUtil.java b/zuul/src/main/java/com/sinata/zuul/util/RedisUtil.java new file mode 100644 index 0000000..3abe256 --- /dev/null +++ b/zuul/src/main/java/com/sinata/zuul/util/RedisUtil.java @@ -0,0 +1,124 @@ +package com.sinata.zuul.util; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.Pipeline; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + + +/** + * Redis工具类 + */ +@Component +public class RedisUtil { + + @Autowired + private JedisPool jedisPool; + + + /** + * 向redis中存储字符串没有过期时间 + * @param key + * @param value + */ + public void setStrValue(String key, String value){ + if(StringUtil.isNotEmpty(key)){ + Jedis resource = jedisPool.getResource(); + String set = resource.set(key, value); + closeJedis(resource); + } + } + + + /** + * 以分钟为单位设置存储值(设置过期时间) + * @param key + * @param value + * @param time 秒 + */ + public void setStrValue(String key, String value, int time){ + if(StringUtil.isNotEmpty(key)){ + Jedis resource = jedisPool.getResource(); + String setex = resource.setex(key, time, value); + closeJedis(resource); + } + } + + + /** + * 从redis中获取值 + * @param key + * @return + */ + public String getValue(String key){ + if(StringUtil.isNotEmpty(key)){ + Jedis resource = jedisPool.getResource(); + String data = resource.get(key); + closeJedis(resource); + return data; + } + return null; + } + + + /** + * 批量获取 + * @param kes + * @return + */ + public List<Object> getValues(List<String> kes){ + if(null != kes){ + Jedis resource = jedisPool.getResource(); + Pipeline pipelined = resource.pipelined(); + for(String key : kes){ + pipelined.get(key); + } + List<Object> list = pipelined.syncAndReturnAll(); + + closeJedis(resource); + pipelined.clear(); + try { + pipelined.close(); + } catch (IOException e) { + e.printStackTrace(); + } + List<Object> data = new ArrayList<>(); + for(Object o : list){ + if(null != o){ + data.add(o); + } + } + return data; + } + return null; + } + + + /** + * 删除key + * @param key + */ + public void remove(String key){ + if(StringUtil.isNotEmpty(key)){ + Jedis resource = jedisPool.getResource(); + Long del = resource.del(key); + closeJedis(resource); + } + } + + + /** + * 删除资源 + * @param jedis + */ + public void closeJedis(Jedis jedis){ + if(null != jedis){ + jedis.close(); + } + } +} diff --git a/zuul/src/main/java/com/sinata/zuul/util/ResultUtil.java b/zuul/src/main/java/com/sinata/zuul/util/ResultUtil.java new file mode 100644 index 0000000..5a4f428 --- /dev/null +++ b/zuul/src/main/java/com/sinata/zuul/util/ResultUtil.java @@ -0,0 +1,188 @@ +package com.sinata.zuul.util; + +import com.alibaba.fastjson.JSONObject; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; + +/** + * 定义统一返回对象 + */ +@ApiModel(value = "统一返回结果集") +public class ResultUtil<T> { + + public static final Integer SUCCESS = 200; + + public static final Integer PARAM_ERROR = 300; + + public static final Integer RUNTIME_ERROR = 400; + + public static final Integer ERROR = 500; + + public static final Integer TOKEN_ERROR = 600; + + public static final Integer SIGN_ERROR = 700; + + public static final String Token = "TOKEN_INVALID"; + + public static final String SIGN = "SIGN_INVALID"; + + @ApiModelProperty(name = "code", value = "业务状态码 200:成功,300:参数错误,400:运行异常,500:其他异常, 600:token无效,需重新登录,700:签名无效") + private Integer code;//备用状态码 + + @ApiModelProperty(name = "msg", value = "返回结果说明") + private String msg;//返回说明 + + @ApiModelProperty(name = "data", value = "返回结果值") + private T data;//返回数据 + + + + public String getMsg() { + return msg; + } + + public T getData() { + return data; + } + + + public Integer getCode() { + return code; + } + + private ResultUtil(Integer code, String msg) { + this.code = code; + this.msg = msg; + } + + private ResultUtil(Integer code, String msg, T data) { + this.code = code; + this.msg = msg; + this.data = data; + } + + + public static <T> ResultUtil<T> getResult(Integer code, String msg){ + return new ResultUtil<>(code, msg); + } + + public static <T> ResultUtil<T> getResult(Integer code, String msg, T data){ + return new ResultUtil<>(code, msg, data); + } + + /** + * 错误信息 + * @return + */ + public static ResultUtil error(String mag){ + return ResultUtil.getResult(ResultUtil.ERROR, mag, new JSONObject()); + } + + /** + * 错误信息 + * @return + */ + public static <T> ResultUtil <T> error(String mag, T obj){ + return ResultUtil.getResult(ResultUtil.ERROR, mag, obj); + } + + /** + * token失效 + * @return + */ + public static ResultUtil tokenErr(){ + return ResultUtil.getResult(ResultUtil.TOKEN_ERROR, ResultUtil.Token, new JSONObject()); + } + + /** + * token失效 + * @return + */ + public static ResultUtil tokenErr(String msg){ + return ResultUtil.getResult(ResultUtil.TOKEN_ERROR, msg, new JSONObject()); + } + + /** + * 参数异常 + * @return + */ + public static ResultUtil paranErr(){ + return ResultUtil.getResult(ResultUtil.PARAM_ERROR, "PARAM_ERROR", new JSONObject()); + } + + /** + * 参数异常 + * @return + */ + public static <T> ResultUtil<T> paranErr(T data){ + return ResultUtil.getResult(ResultUtil.PARAM_ERROR, "SYSTEM_RUN_ERROR", data); + } + + /** + * 运行异常 + * @return + */ + public static ResultUtil runErr(){ + return ResultUtil.getResult(ResultUtil.RUNTIME_ERROR, "SYSTEM_RUN_ERROR", new JSONObject()); + } + + + /** + * 运行异常 + * @return + */ + public static <T>ResultUtil<T> runErr(T data){ + return ResultUtil.getResult(ResultUtil.RUNTIME_ERROR, "SYSTEM_RUN_ERROR", data); + } + + /** + * 运行异常 + * @return + */ + public static <T>ResultUtil<T> runErr(T data, String msg){ + return ResultUtil.getResult(ResultUtil.RUNTIME_ERROR, msg, data); + } + + + /** + * 返回成功 + * @param + * @return + */ + public static ResultUtil success(){ + return ResultUtil.getResult(ResultUtil.SUCCESS, "SUCCESS", new JSONObject()); + } + + + /** + * 返回成功 + * @param data + * @param <T> + * @return + */ + public static <T> ResultUtil<T> success(T data){ + return ResultUtil.getResult(ResultUtil.SUCCESS, "SUCCESS", data); + } + + /** + * 返回成功 + * @param msg + * @param data + * @param <T> + * @return + */ + public static <T> ResultUtil<T> success(String msg, T data){ + return ResultUtil.getResult(ResultUtil.SUCCESS, msg, data); + } + + + /** + * 签名无效 + * @param <T> + * @return + */ + public static <T> ResultUtil<T> sign(){ + return ResultUtil.getResult(ResultUtil.SIGN_ERROR, SIGN); + } + +} diff --git a/zuul/src/main/java/com/sinata/zuul/util/SinataUtil.java b/zuul/src/main/java/com/sinata/zuul/util/SinataUtil.java new file mode 100644 index 0000000..287d4ad --- /dev/null +++ b/zuul/src/main/java/com/sinata/zuul/util/SinataUtil.java @@ -0,0 +1,405 @@ +package com.sinata.zuul.util; + +import java.io.UnsupportedEncodingException; +import java.text.DecimalFormat; +import java.text.NumberFormat; +import java.util.*; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * 基本数据处理工具类 + */ +public class SinataUtil { + /** + * List集合分页<br/> + * 创建人:Mryang<br/> + * 时间:2016年7月28日-下午2:58:14 <br/> + * @param <T> + * @param pageNo + * @param pageSize + * @param list + * @throws Exception List<UserOrderList> <br/> + */ + public static <T> List<T> listpage(int pageNo, int pageSize, List<T> list) throws Exception { + List<T> result = new ArrayList<T>(); + if (list != null && list.size() > 0) { + int allCount = list.size(); + if(pageNo > 1 && allCount < pageSize) { + return new ArrayList<>(); + } + int pageCount = (allCount + pageSize - 1) / pageSize; + if (pageNo >= pageCount) { + pageNo = pageCount; + } + int start = (pageNo - 1) * pageSize; + int end = pageNo * pageSize; + if (end >= allCount) { + end = allCount; + } + for (int i = start; i < end; i++) { + result.add(list.get(i)); + } + } + return (result != null && result.size() > 0) ? result : new ArrayList<T>(); + } + + /** + * Double类型取整 + * @param num + * @return + */ + public static String doubleTrans(double num) { + return String.valueOf((long) num); + } + + /** + * Double类型保留1位小数 + * + * @param num + * @return + */ + public static String doubleRetainOne(double num) { + DecimalFormat dfs = new DecimalFormat("0.0"); + return dfs.format(num); + } + + /** + * Double类型保留2位小数 + * + * @param num + * @return + */ + public static String doubleRetainTwo(double num) { + DecimalFormat dfs = new DecimalFormat("0.00"); + String.format("%.2f", num); + return dfs.format(num); + } + + /** + * Double类型保留1位小数(四舍五入) + * + * @param num + * @return + */ + public static String doubleForwardOne(double num) { + return String.format("%.1f", num); + } + + /** + * Double类型保留2位小数(四舍五入) + * + * @param num + * @return + */ + public static String doubleForwardTwo(double num) { + return String.format("%.2f", num); + } + + /** + * 字符串转换成Ascii + * + * @param value + * @return + */ + public static String stringToAscii(String value) { + StringBuffer sbu = new StringBuffer(); + char[] chars = value.toCharArray(); + for (int i = 0; i < chars.length; i++) { + if (i != chars.length - 1) { + sbu.append((int) chars[i]); + } else { + sbu.append((int) chars[i]); + } + } + return sbu.toString(); + } + + /** + * 小数转换为百分比 + * + * @param decimal + * @return + * @author TaoNingBo + */ + public static String decTurnPercent(double decimal) { + NumberFormat num = NumberFormat.getPercentInstance(); + num.setMaximumIntegerDigits(3); + num.setMaximumFractionDigits(2); + return num.format(decimal); + } + + /** + * Ascii转换成字符串 + * + * @param value + * @return + */ + public static String asciiToString(String value) { + String[] chars = value.split(","); + StringBuffer sbu = new StringBuffer(); + for (int i = 0; i < chars.length; i++) { + sbu.append((char) Integer.parseInt(chars[i])); + } + return sbu.toString(); + } + + /** + * 字符串转换unicode + * + * @param string + * @return + * @author TaoNingBo + */ + public static String string2Unicode(String string) { + StringBuffer unicode = new StringBuffer(); + for (int i = 0; i < string.length(); i++) { + // 取出每一个字符 + char c = string.charAt(i); + // 转换为unicode + unicode.append("\\u" + Integer.toHexString(c)); + } + return unicode.toString(); + } + + /** + * unicode 转字符串 + * + * @param unicode + * @return + * @author TaoNingBo + */ + public static String unicode2String(String unicode) { + StringBuffer string = new StringBuffer(); + String[] hex = unicode.split("\\\\u"); + for (int i = 1; i < hex.length; i++) { + // 转换出每一个代码点 + int data = Integer.parseInt(hex[i], 16); + // 追加成string + string.append((char) data); + } + return string.toString(); + } + + /** + * 字符串编码转换的实现方法 + * + * @param str + * 待转换编码的字符串 + * @param newCharset + * 目标编码 + * @return + * @throws UnsupportedEncodingException + */ + public static String changeCharset(String str, String newCharset) throws UnsupportedEncodingException { + if (str != null) { + // 用默认字符编码解码字符串。 + byte[] bs = str.getBytes(); + // 用新的字符编码生成字符串 + return new String(bs, newCharset); + } + return null; + } + + /** + * 注: \n 回车( ) \t 水平制表符( ) \s 空格(\u0008) \r 换行( ) + * + * @param str + * @return + */ + public static String replaceBlank(String str) { + String dest = ""; + if (str != null) { + Pattern p = Pattern.compile("\\s*|\t|\r|\n"); + Matcher m = p.matcher(str); + dest = m.replaceAll(""); + } + return dest; + } + + /** + * 判断该字符串不能为空 + * + * @param str + * @return + * @author TaoNingBo + */ + public static boolean isNotEmpty(Object str) { + return !isEmpty(str); + } + + + public static boolean isNotEmptyUndefined(Object str) { + return !isEmpty(str) && !str.toString().equals("undefined"); + } + + /** + * 字符串编码转换的实现方法 + * + * @param str + * 待转换编码的字符串 + * @param oldCharset + * 原编码 + * @param newCharset + * 目标编码 + * @return + * @throws UnsupportedEncodingException + */ + public static String changeCharset(String str, String oldCharset, String newCharset) throws UnsupportedEncodingException { + if (str != null) { + // 用旧的字符编码解码字符串。解码可能会出现异常。 + byte[] bs = str.getBytes(oldCharset); + // 用新的字符编码生成字符串 + return new String(bs, newCharset); + } + return null; + } + + /** + * 给手机号码加分割符 + * + * @param phone + * @return + * @author TaoNingBo + */ + public static String splitPhone(String phone) { + if (isNotEmpty(phone)) { + String strone = phone.substring(0, 3); + String strtwo = phone.substring(strone.length(), 7); + String strthree = phone.substring(strtwo.length() + strone.length(), phone.length()); + return strone + "-" + strtwo + "-" + strthree; + } + return ""; + } + + /** + * 非空判断 + * + * @param str + * @return + * @author TaoNingBo + */ + public static boolean isEmpty(Object str) { + return str == null || str.toString().length() == 0 || str.equals("") || str.toString().matches("\\s*"); + } + + /** + * 把米转换成公里 + * + * @param km + * @return + * @author TaoNingBo + */ + public static Double kmTransKilo(Integer m) { + return Math.round(m / 100d) / 10d; + } + + /** + * 将List<{@link Object}>转换成List<{@link T}> + * + * @param list + * 将要转换的对象 + * @param clazs + * 需要转换的泛型对象 + * @return + * @author TaoNingBo + */ + @SuppressWarnings("unchecked") + public static <T> List<T> fromToObject(List<?> list, Class<T> clazs) { + List<T> t = new ArrayList<T>(); + for (Object object : list) { + t.add((T) object); + } + return t; + } + + /** + * 生成 uuid, 即用来标识一笔单,也用做 nonce_str + * @return + */ + public static String generateUUID() { + return UUID.randomUUID().toString().replaceAll("-", "").substring(0, 32); + } + + /** + * 将List<{@link Object}>转换成List<{@link Map<String, Object>}> + * + * @param list + * @return + * @author TaoNingBo + */ + @SuppressWarnings("unchecked") + public static List<Map<String, Object>> fromToObject_M(List<?> list) { + List<Map<String, Object>> t = new ArrayList<Map<String, Object>>(); + for (Object object : list) { + t.add((Map<String, Object>) object); + } + return t; + } + + +/** + * 将对象中的null转换为空 + * @param obj + * @return + * @throws IllegalAccessException + */ + +public static Object checkObjFieldIsNull(Object obj) throws IllegalAccessException { + if(obj!=null){ + for(java.lang.reflect.Field f : obj.getClass().getDeclaredFields()){ + f.setAccessible(true); + if(f.get(obj) == null){ + if(f.getType()==String.class){ + f.set(obj, ""); + } + if(f.getType()==Double.class){ + f.set(obj, 0d); + } + if(f.getType()==Integer.class){ + f.set(obj, 0); + } + if(f.getType()==Date.class){ + f.set(obj, new Date()); + } + } + } + return obj; + } + return obj; +} +/** + * 获取六位标识码 + * @param length + * @return + */ +public static String createRandomCharData() +{ + StringBuilder sb=new StringBuilder(); + Random rand=new Random();//随机用以下三个随机生成器 + Random randdata=new Random(); + int data=0; + for(int i=0;i<6;i++) + { + int index=rand.nextInt(3); + //目的是随机选择生成数字,大小写字母 + switch(index) + { + case 0: + data=randdata.nextInt(10);//仅仅会生成0~9 + sb.append(data); + break; + case 1: + data=randdata.nextInt(26)+65;//保证只会产生65~90之间的整数 + sb.append((char)data); + break; + case 2: + data=randdata.nextInt(26)+97;//保证只会产生97~122之间的整数 + sb.append((char)data); + break; + } + } + String result=sb.toString().toLowerCase(); + return result; +} +} diff --git a/zuul/src/main/java/com/sinata/zuul/util/SpringUtil.java b/zuul/src/main/java/com/sinata/zuul/util/SpringUtil.java new file mode 100644 index 0000000..838138a --- /dev/null +++ b/zuul/src/main/java/com/sinata/zuul/util/SpringUtil.java @@ -0,0 +1,34 @@ +package com.sinata.zuul.util; + +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.stereotype.Component; + +@Component +public class SpringUtil implements ApplicationContextAware { + @Autowired + private static ApplicationContext applicationContext; + + @Override + public void setApplicationContext(ApplicationContext applicationContextParam) throws BeansException { + applicationContext = applicationContextParam; + } + public static Object getObject(String id) { + Object object = null; + object = applicationContext.getBean(id); + return object; + } + public static <T> T getObject(Class<T> tClass) { + return applicationContext.getBean(tClass); + } + + public static Object getBean(String tClass) { + return applicationContext.getBean(tClass); + } + + public <T> T getBean(Class<T> tClass) { + return applicationContext.getBean(tClass); + } +} \ No newline at end of file diff --git a/zuul/src/main/java/com/sinata/zuul/util/StringUtil.java b/zuul/src/main/java/com/sinata/zuul/util/StringUtil.java new file mode 100644 index 0000000..3f6641f --- /dev/null +++ b/zuul/src/main/java/com/sinata/zuul/util/StringUtil.java @@ -0,0 +1,121 @@ +package com.sinata.zuul.util; + +import org.apache.commons.lang.StringUtils; + +public class StringUtil { + + /** + * 判断字符串中是否包含表情 + * @param source + * @return + */ + public static boolean containsEmoji(String source) { + int len = 0; + if(null != source){ + len = source.length(); + } + boolean isEmoji = false; + for (int i = 0; i < len; i++) { + char hs = source.charAt(i); + if (0xd800 <= hs && hs <= 0xdbff) { + if (source.length() > 1) { + char ls = source.charAt(i + 1); + int uc = ((hs - 0xd800) * 0x400) + (ls - 0xdc00) + 0x10000; + if (0x1d000 <= uc && uc <= 0x1f77f) { + return true; + } + } + } else { + // non surrogate + if (0x2100 <= hs && hs <= 0x27ff && hs != 0x263b) { + return true; + } else if (0x2B05 <= hs && hs <= 0x2b07) { + return true; + } else if (0x2934 <= hs && hs <= 0x2935) { + return true; + } else if (0x3297 <= hs && hs <= 0x3299) { + return true; + } else if (hs == 0xa9 || hs == 0xae || hs == 0x303d + || hs == 0x3030 || hs == 0x2b55 || hs == 0x2b1c + || hs == 0x2b1b || hs == 0x2b50 || hs == 0x231a) { + return true; + } + if (!isEmoji && source.length() > 1 && i < source.length() - 1) { + char ls = source.charAt(i + 1); + if (ls == 0x20e3) { + return true; + } + } + } + } + return isEmoji; + } + + /** + * 判断某个字符是不是表情 + * @param codePoint + * @return + */ + private static boolean isEmojiCharacter(char codePoint) { + return (codePoint == 0x0) || (codePoint == 0x9) || (codePoint == 0xA) + || (codePoint == 0xD) + || ((codePoint >= 0x20) && (codePoint <= 0xD7FF)) + || ((codePoint >= 0xE000) && (codePoint <= 0xFFFD)) + || ((codePoint >= 0x10000) && (codePoint <= 0x10FFFF)); + } + + + /** + * 过滤掉字符串中的表情 + * @param source + * @return + */ + public static String filterEmoji(String source) { + if (StringUtils.isBlank(source)) { + return source; + } + StringBuilder buf = null; + int len = source.length(); + for (int i = 0; i < len; i++) { + char codePoint = source.charAt(i); + if (isEmojiCharacter(codePoint)) { + if (buf == null) { + buf = new StringBuilder(source.length()); + } + buf.append(codePoint); + } + } + if (buf == null) { + return source; + } else { + if (buf.length() == len) { + buf = null; + return source; + } else { + return buf.toString(); + } + } + } + + + /** + * 判断字符串为空 + * @param value + * @return + */ + public static boolean isEmpty(String value){ + if(null == value || value.trim().equals("")){ + return true; + } + return false; + } + + /** + * 判断字符串非空 + * @param value + * @return + */ + public static boolean isNotEmpty(String value){ + return !isEmpty(value); + } +} diff --git a/zuul/src/main/java/com/sinata/zuul/util/applets/CacheType.java b/zuul/src/main/java/com/sinata/zuul/util/applets/CacheType.java new file mode 100644 index 0000000..65621d6 --- /dev/null +++ b/zuul/src/main/java/com/sinata/zuul/util/applets/CacheType.java @@ -0,0 +1,86 @@ +package com.sinata.zuul.util.applets; + +/** + * 缓存消息类型 + * + * @author TaoNingBo + * @data 2016年9月21日 + * @version 1.0 + */ +public class CacheType { + + /** + * 用户通讯通道标识 + */ + public static final String userCtx = "USER_CTX_"; + + /** + * 司机通讯通道标识 + */ + public static final String driverCtx = "DRIVER_CTX_"; + + /** + * 出行-快车/专车/代驾-订单消息队列-自营车辆 + */ + public static final String travelOrder1 = "TRAVEL_ORDER1_"; + + /** + * 出行-快车/专车/代驾-订单消息队列-外来车辆 + */ + public static final String travelOrder2 = "TRAVEL_ORDER2_"; + + /** + * 快车-司机位置-自营车辆 + */ + public static final String location1 = "_LOCATION1"; + + /** + * 快车-司机位置-外来车辆 + */ + public static final String location2 = "_LOCATION2"; + + /** + * 快车-司机抢单成功后的消息 + */ + public static final String travelOrder = "TRAVEL_ORDER_"; + + /** + * 开始服务时间 + */ + public static final String startServerT = "START_SERVER_TIME_"; + + /** + * 司机上一次的位置(经纬度) + */ + public static final String location = "LAST_DRIVER_LOCATION_"; + + /** + * 司机上一次总共行驶距离 + */ + public static final String triverd = "LAST_TRIVER_Distance_"; + + /** + * 记录订单数据 司机 异常处理 + */ + public static final String dreco = "DRIVER_DATA_RECO"; + + /** + * 记录订单数据 用户 异常处理 + */ + public static final String ureco = "USER_DATA_RECO"; + + /** + * 记录司机跟用户的关系ID + */ + public final static String udID = "USER_DRIVER_ID_"; + + /** + * 记录下单乘客的出发点位置坐标 + */ + public final static String userLoc = "USER_LOCATION_"; + + /** + * 记录计费规则【用订单号获取】 + */ + public final static String rules = "RULES_ORDER_"; +} diff --git a/zuul/src/main/java/com/sinata/zuul/util/applets/ChildChannelHandler.java b/zuul/src/main/java/com/sinata/zuul/util/applets/ChildChannelHandler.java new file mode 100644 index 0000000..eb3386b --- /dev/null +++ b/zuul/src/main/java/com/sinata/zuul/util/applets/ChildChannelHandler.java @@ -0,0 +1,36 @@ +package com.sinata.zuul.util.applets; + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.ssl.SslHandler; +import io.netty.handler.stream.ChunkedWriteHandler; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; + +public class ChildChannelHandler extends ChannelInitializer<SocketChannel> { + @Override + protected void initChannel(SocketChannel socketChannel) throws Exception { +// String path = "C:\\Program Files\\Apache Software Foundation\\Tomcat 8.5\\cert\\SHA256withRSA_lzhyc.cn.pfx"; + String path = "/usr/local/server/app/cert/tomcat/scs1685585603235_fanghuatongxing.cn_server.jks"; + SSLContext sslContext = createSSLContext.createSSLContext("JKS" + , path, "Fb0@iWq90$wJNix3"); + //SSLEngine 此类允许使用ssl安全套接层协议进行安全通信 + SSLEngine engine = sslContext.createSSLEngine(); + engine.setUseClientMode(false); + socketChannel.pipeline().addLast("ssl", new SslHandler(engine)); + + // 设置30秒没有读到数据,则触发一个READER_IDLE事件。 + // pipeline.addLast(new IdleStateHandler(30, 0, 0)); + // HttpServerCodec:将请求和应答消息解码为HTTP消息 + socketChannel.pipeline().addLast("http-codec", new HttpServerCodec()); + // HttpObjectAggregator:将HTTP消息的多个部分合成一条完整的HTTP消息 + socketChannel.pipeline().addLast("aggregator", new HttpObjectAggregator(65536)); + // ChunkedWriteHandler:向客户端发送HTML5文件 + socketChannel.pipeline().addLast("http-chunked", new ChunkedWriteHandler()); + // 在管道中添加我们自己的接收数据实现方法 + socketChannel.pipeline().addLast("handler", new WebSocketHandler()); + } +} diff --git a/zuul/src/main/java/com/sinata/zuul/util/applets/ClientPingMessage.java b/zuul/src/main/java/com/sinata/zuul/util/applets/ClientPingMessage.java new file mode 100644 index 0000000..93a8497 --- /dev/null +++ b/zuul/src/main/java/com/sinata/zuul/util/applets/ClientPingMessage.java @@ -0,0 +1,59 @@ +package com.sinata.zuul.util.applets; + +import java.io.Serializable; + +/** + * 客户端心跳消息处理 + * + * @author TaoNingBo + * @data 2016年7月26日 + * @version 1.0 + */ +public class ClientPingMessage implements Serializable { + + private static final long serialVersionUID = -4953410803742767757L; + + /** + * 客户端标识ID + */ + private Integer id; + + /** + * 角色【0:用户,1:司机】 + */ + private Integer role; + + /** + * 客户端单点登录标识TOKEN + */ + private String token; + + public ClientPingMessage() { + + } + + public Integer getId() { + return id; + } + + public void setId(Integer id) { + this.id = id; + } + + public Integer getRole() { + return role; + } + + public void setRole(Integer role) { + this.role = role != null ? role : 0; + } + + public String getToken() { + return token; + } + + public void setToken(String token) { + this.token = token; + } + +} diff --git a/zuul/src/main/java/com/sinata/zuul/util/applets/Global.java b/zuul/src/main/java/com/sinata/zuul/util/applets/Global.java new file mode 100644 index 0000000..1f3fa95 --- /dev/null +++ b/zuul/src/main/java/com/sinata/zuul/util/applets/Global.java @@ -0,0 +1,9 @@ +package com.sinata.zuul.util.applets; + +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.util.concurrent.GlobalEventExecutor; + +public class Global { + public static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); +} diff --git a/zuul/src/main/java/com/sinata/zuul/util/applets/NettyServer0.java b/zuul/src/main/java/com/sinata/zuul/util/applets/NettyServer0.java new file mode 100644 index 0000000..aafe1b3 --- /dev/null +++ b/zuul/src/main/java/com/sinata/zuul/util/applets/NettyServer0.java @@ -0,0 +1,77 @@ +package com.sinata.zuul.util.applets; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; + +import java.util.Timer; +import java.util.TimerTask; + + +/** + * 即时通讯服务启动类 + * + * @date 2016年6月25日 + * @version 1.0 + */ +public class NettyServer0 { + + /** + * 延迟启动设置 + * + * NettyServer启动方法. + */ + public void bind() { + final Thread thread = new Thread(new NettyRunnable()); + Timer timer = new Timer(); + timer.schedule(new TimerTask() { + @Override + public void run() { + thread.start(); + } + }, 1000 * 2); + } + + /** + * 即时通讯服务启动 + * + * @date 2016年6月24日 + * @version 1.0 + */ + public class NettyRunnable implements Runnable { + + /** + * 获取即时通讯启动端口 + */ + @Override + public void run() { + System.out.println("===========================Netty端口启动========"); + // Boss线程:由这个线程池提供的线程是boss种类的,用于创建、连接、绑定socket, + // (有点像门卫)然后把这些socket传给worker线程池。 + // 在服务器端每个监听的socket都有一个boss线程来处理。在客户端,只有一个boss线程来处理所有的socket。 + EventLoopGroup bossGroup = new NioEventLoopGroup(); + // Worker线程:Worker线程执行所有的异步I/O,即处理操作 + EventLoopGroup workrGroup = new NioEventLoopGroup(); + try { + + // ServerBootstrap 启动NIO服务的辅助启动类,负责初始话netty服务器,并且开始监听端口的socket请求 + ServerBootstrap b = new ServerBootstrap(); + b.group(bossGroup, workrGroup); + // 设置非阻塞,用它来建立新accept的连接,用于构造serversocketchannel的工厂类 + b.channel(NioServerSocketChannel.class); + // ChildChannelHandler 对出入的数据进行的业务操作,其继承ChannelInitializer + b.childHandler(new ChildChannelHandler()); + System.out.println("服务端开启等待客户端连接 ... ..."); + Channel ch = b.bind(9090).sync().channel(); + ch.closeFuture().sync(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + bossGroup.shutdownGracefully(); + workrGroup.shutdownGracefully(); + } + } + } +} diff --git a/zuul/src/main/java/com/sinata/zuul/util/applets/NettyWebSocketController.java b/zuul/src/main/java/com/sinata/zuul/util/applets/NettyWebSocketController.java new file mode 100644 index 0000000..fbff56f --- /dev/null +++ b/zuul/src/main/java/com/sinata/zuul/util/applets/NettyWebSocketController.java @@ -0,0 +1,202 @@ +package com.sinata.zuul.util.applets; + + +import com.alibaba.fastjson.JSONObject; +import com.sinata.zuul.util.RedisUtil; +import com.sinata.zuul.util.SinataUtil; +import com.sinata.zuul.util.SpringUtil; +import com.sinata.zuul.util.StringUtil; +import com.sinata.zuul.util.echo.Method; +import com.sinata.zuul.util.echo.NettyChannelMap; +import com.sinata.zuul.util.echo.NettyMsg; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; + +import java.util.HashMap; +import java.util.Hashtable; +import java.util.Timer; +import java.util.TimerTask; + +public class NettyWebSocketController { + + public static Hashtable<String, Hashtable<ChannelHandlerContext, String>> map = new Hashtable<String, Hashtable<ChannelHandlerContext, String>>(); + + private RedisUtil redisUtil = SpringUtil.getObject(RedisUtil.class); + + public static Hashtable<String, String> table; + + static { + if (table == null) { + table = new Hashtable<>(); + } + } + + public static boolean isdebug = false; + public static int i = 0; + + + /** + * 判断客户端要执行什么操作 + * + * @param ctx + * @param msg + * @author TaoNingBo + */ + public void JudgeOperation(ChannelHandlerContext ctx, String msg) { + try { + // 验证即时通讯命令是否正确有效 + if (SinataUtil.isEmpty(msg)) { + return; + } + String msgStr = msg.toString(); + if (msgStr.indexOf("{") == -1 || msgStr.indexOf("}") == -1 || msgStr.indexOf("code") == -1 || msgStr.indexOf("msg") == -1 || msgStr.indexOf("data") == -1 || msgStr.indexOf("method") == -1) { + return; + } + if (isdebug) { +// System.out.println("<<<--receive-->>>111" + msg); + } + + // 获取socket信息,保存相应的socket + JSONObject jsonMsg = JSONObject.parseObject(msg.toString()); + int code = jsonMsg.getIntValue("code"); + String message = jsonMsg.getString("msg"); + String method = jsonMsg.getString("method"); + if (code != 200 || !message.equals("SUCCESS")) { + return; + } + JSONObject jsonCon = JSONObject.parseObject(jsonMsg.get("data").toString()); + + if (null != ctx && ctx.channel().isActive()) { + jsonMsg.put("method", Method.pong); + sendMsgToClient(ctx, jsonMsg.toJSONString()); + } + + + // ############################### 心跳 ############################ + // 心跳 + if (method.equals(Method.ping)) { + String token = jsonCon.getString("token"); + String userId1 = jsonCon.getString("userId"); + if (StringUtil.isNotEmpty(userId1)) { + //确保账号在单个设备上登录 + if (StringUtil.isNotEmpty(token)) { + String token_ = redisUtil.getValue("USER_Applets_" + userId1);//获取缓存中最新的数据 + if (StringUtil.isNotEmpty(token_) && !token.equals(token_)) {//不在同一设备上登录,向其他设备发送数据 + ChannelHandlerContext data_ = NettyChannelMap.getData_(token_.substring(token_.length() - 16)); + JSONObject msg_ = new JSONObject(); + msg_.put("code", 200); + msg_.put("msg", "SUCCESS"); + msg_.put("method", "OFFLINE"); + msg_.put("data", new Object()); + this.sendMsgToClient(data_, msg_.toJSONString()); + new Timer().schedule(new TimerTask() { + @Override + public void run() { + NettyChannelMap.remove_(data_); + } + }, 5000); + } + NettyChannelMap.update_(token.substring(token.length() - 16), ctx);//存储单点登录的通道 + NettyChannelMap.update("Applets" + userId1, ctx); + redisUtil.setStrValue("USER_Applets_" + userId1, token); + } + + //存储业务使用的通道 + if (null != ctx && ctx.channel().isActive()) { + NettyChannelMap.update("Applets" + userId1, ctx); + } + } + + + } + } catch (Exception e) { + if (isdebug) { + NettyWebSocketController.sendMsgToClient(ctx, "__error__" + msg.toString()); + } + e.printStackTrace(); + } + } + + /** + * 向客户端发送消息 + * + * @param ctx + * @param msg + * @author TaoNingBo + */ + public static void sendMsgToClient(ChannelHandlerContext ctx, String msg) { +// System.out.println(ctx.channel().isActive()); + if (ctx != null && ctx.channel().isActive()) { + ByteBuf buffer = Unpooled.copiedBuffer((msg).getBytes()); + ChannelFuture sync; + try { + sync = ctx.channel().writeAndFlush(new TextWebSocketFrame(msg)).sync(); + if (!sync.isSuccess()) { + boolean b = true; + for (int i = 0; i < 10; i++) { + ctx.wait(3000); + sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync(); + if (sync.isSuccess()) { + b = false; + break; + } + System.err.println("小程序-》推送不成功,将继续推送" + msg); + } + if (b) { + NettyChannelMap.remove(ctx); + } + } + } catch (Exception e) { + System.err.println("小程序-》推送发生异常,记录:" + msg); + NettyChannelMap.remove(ctx); + } + if (isdebug) { + System.err.println("小程序-》 <<<--send-->>>" + msg); + } + } else { + System.err.println("小程序-》推送失败,长连接不存在"); + NettyChannelMap.remove(ctx); + } + } + + // **链接断开 将推送消息记录 + public static void sendMsgToClient(String cacheType, Integer id, String msg) { + ChannelHandlerContext ctx = NettyChannelMap.getData(cacheType + id); + if (ctx != null) { + ChannelFuture sync; + try { + sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync(); + if (!sync.isSuccess()) { + for (int i = 0; i < 10; i++) { + sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync(); + ; + if (!sync.isSuccess()) { + sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync(); + ; + System.err.println("推送不成功,将继续推送" + msg); + if (i == 9) { + table.put(cacheType + id, msg); + ctx.close(); + System.err.println("推送发生异常,记录:" + msg); + } + } else { + break; + } + } + } + } catch (Exception e) { + table.put(cacheType + id, msg); + System.err.println("推送发生异常,记录:" + msg); + } + if (isdebug) { + System.err.println("<<<--send-->>>" + msg); + } + } else { + table.put(cacheType + id, msg); + System.err.println("链接断开,记录:id=" + cacheType + id + ",消息:" + msg); + } + } +} diff --git a/zuul/src/main/java/com/sinata/zuul/util/applets/WebSocketHandler.java b/zuul/src/main/java/com/sinata/zuul/util/applets/WebSocketHandler.java new file mode 100644 index 0000000..bbb3ba2 --- /dev/null +++ b/zuul/src/main/java/com/sinata/zuul/util/applets/WebSocketHandler.java @@ -0,0 +1,180 @@ +package com.sinata.zuul.util.applets; + +import com.alibaba.fastjson.JSONObject; +import com.sinata.zuul.util.echo.Method; +import com.sinata.zuul.util.echo.NettyChannelMap; +import com.sinata.zuul.util.echo.NettyMsg; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.*; +import io.netty.handler.codec.http.websocketx.*; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.util.CharsetUtil; + +import java.util.HashMap; + + +public class WebSocketHandler extends SimpleChannelInboundHandler<Object> { + //用于websocket握手的处理类 + private WebSocketServerHandshaker handshaker; + + private static final String WEB_SOCKET_URL = "wss://localhost:9090/websocket"; + + + +// @Override +// protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception { +// if (msg instanceof FullHttpRequest) { +// // websocket连接请求 +// handleHttpRequest(ctx, (FullHttpRequest)msg); +// } else if (msg instanceof WebSocketFrame) { +// // websocket业务处理 +// handleWebSocketRequest(ctx, (WebSocketFrame)msg); +// } +// } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + ctx.flush(); + } + + /** 心跳 */ + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent) { + IdleStateEvent event = (IdleStateEvent) evt; + if (event.state().equals(IdleState.READER_IDLE)) + { + // + } + else if (event.state().equals(IdleState.WRITER_IDLE)) + { + // + } + else if (event.state().equals(IdleState.ALL_IDLE)) + { + String msg = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>()); + if(ctx != null && ctx.channel().isActive()) { + ctx.writeAndFlush(Unpooled.copiedBuffer((msg).getBytes())); + } + } + } +// super.userEventTriggered(ctx, evt); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + ctx.close(); + } + + private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) { + // Http解码失败,向服务器指定传输的协议为Upgrade:websocket + if(!req.getDecoderResult().isSuccess() || !("websocket").equals(req.headers().get("Upgrade"))){ + sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)); + return; + } + // 握手相应处理,创建websocket握手的工厂类, + WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(WEB_SOCKET_URL, null, false); + // 根据工厂类和HTTP请求创建握手类 + handshaker = wsFactory.newHandshaker(req); + if (handshaker == null) { + // 不支持websocket + WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel()); + } else { + // 通过它构造握手响应消息返回给客户端 + handshaker.handshake(ctx.channel(), req); + } + } + + private void handleWebSocketRequest(ChannelHandlerContext ctx, WebSocketFrame req) throws Exception { + if (req instanceof CloseWebSocketFrame) { + // 关闭websocket连接 + handshaker.close(ctx.channel(), (CloseWebSocketFrame)req.retain()); + return; + } + if (req instanceof PingWebSocketFrame) { + ctx.channel().write(new PongWebSocketFrame(req.content().retain())); + return; + } + if (!(req instanceof TextWebSocketFrame)) { + throw new UnsupportedOperationException("当前只支持文本消息,不支持二进制消息"); + } + if (ctx == null || this.handshaker == null || ctx.isRemoved()) { + throw new Exception("尚未握手成功,无法向客户端发送WebSocket消息"); + } + String requestmsg = ((TextWebSocketFrame) req).text(); + + + //给连接的客户端返回数据 + //返回心跳 + JSONObject jsonObject = new JSONObject(); + jsonObject.put("code", 200); + jsonObject.put("method", Method.ok); + jsonObject.put("msg", "SUCCESS"); + jsonObject.put("data", new JSONObject()); + TextWebSocketFrame tws = new TextWebSocketFrame(jsonObject.toJSONString()); +// ctx.channel().writeAndFlush(tws); + + new NettyWebSocketController().JudgeOperation(ctx,requestmsg);//小程序心跳处理 + + // 群发服务端心跳响应 + Global.group.writeAndFlush(new TextWebSocketFrame((tws).text())); + } + + private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) { + // BAD_REQUEST(400) 客户端请求错误返回的应答消息 + if(res.getStatus().code() != 200){ + ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8); + res.content().writeBytes(buf); + buf.release(); + } + //服务端向客户端发送数据 + ChannelFuture f = ctx.channel().writeAndFlush(res); + // 非法连接直接关闭连接 + if(res.getStatus().code() != 200){ + f.addListener(ChannelFutureListener.CLOSE); + } + } + + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + Global.group.add(ctx.channel()); + System.err.println("客户端与服务器端开启"); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + Global.group.remove(ctx.channel()); + NettyChannelMap.remove(ctx); + System.err.println("客户端与服务器链接关闭"); + } + + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof FullHttpRequest) { + // websocket连接请求 + handleHttpRequest(ctx, (FullHttpRequest)msg); + } else if (msg instanceof WebSocketFrame) { + // websocket业务处理 + handleWebSocketRequest(ctx, (WebSocketFrame)msg); + } + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof FullHttpRequest) { + // websocket连接请求 + handleHttpRequest(ctx, (FullHttpRequest)msg); + } else if (msg instanceof WebSocketFrame) { + // websocket业务处理 + handleWebSocketRequest(ctx, (WebSocketFrame)msg); + } + } +} diff --git a/zuul/src/main/java/com/sinata/zuul/util/applets/createSSLContext.java b/zuul/src/main/java/com/sinata/zuul/util/applets/createSSLContext.java new file mode 100644 index 0000000..8487513 --- /dev/null +++ b/zuul/src/main/java/com/sinata/zuul/util/applets/createSSLContext.java @@ -0,0 +1,31 @@ +package com.sinata.zuul.util.applets; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import java.io.FileInputStream; +import java.io.InputStream; +import java.security.KeyStore; + +public class createSSLContext { + + /** + * 获取SSLContext + * @param type + * @param path + * @param password + * @return + * @throws Exception + */ + public static SSLContext createSSLContext(String type , String path , String password) throws Exception { + KeyStore ks = KeyStore.getInstance(type); /// "JKS" + InputStream ksInputStream = new FileInputStream(path); /// 证书存放地址 + ks.load(ksInputStream, password.toCharArray()); + //KeyManagerFactory充当基于密钥内容源的密钥管理器的工厂。 + KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());//getDefaultAlgorithm:获取默认的 KeyManagerFactory 算法名称。 + kmf.init(ks, password.toCharArray()); + //SSLContext的实例表示安全套接字协议的实现,它充当用于安全套接字工厂或 SSLEngine 的工厂。 + SSLContext sslContext = SSLContext.getInstance("TLS"); + sslContext.init(kmf.getKeyManagers(), null, null); + return sslContext; + } +} diff --git a/zuul/src/main/java/com/sinata/zuul/util/echo/DiscardServerHandler.java b/zuul/src/main/java/com/sinata/zuul/util/echo/DiscardServerHandler.java new file mode 100644 index 0000000..6edaf44 --- /dev/null +++ b/zuul/src/main/java/com/sinata/zuul/util/echo/DiscardServerHandler.java @@ -0,0 +1,153 @@ +package com.sinata.zuul.util.echo; + +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.util.ReferenceCountUtil; + +import java.net.InetSocketAddress; +import java.util.HashMap; + +public class DiscardServerHandler extends SimpleChannelInboundHandler<String> { + + private NettyServerController nettyServerController = new NettyServerController(); + + public static boolean isdebug = true; + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); + if(isdebug) { +// System.err.println(insocket.getAddress() + ": 收到客户端数据......."); + } + try { + // 调用service + nettyServerController.JudgeOperation(ctx, msg); + } catch (Exception e) { + e.printStackTrace(); + } finally { + ReferenceCountUtil.release(msg); + } + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { + InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); + if(isdebug) { +// System.err.println(insocket.getAddress() + ": 收到客户端数据......."); + } + try { + // 调用service + nettyServerController.JudgeOperation(ctx, msg); + } catch (Exception e) { + e.printStackTrace(); + } finally { + ReferenceCountUtil.release(msg); + } + } + +// @Override +// protected void messageReceived(ChannelHandlerContext channelHandlerContext, String s) throws Exception { +// +// } + +// @Override +// protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception { +// +// } + +// @Override +// protected void messageReceived(ChannelHandlerContext channelHandlerContext, String s) throws Exception { +// +// } + + /** 在连接被建立并且准备进行通信时被调用 */ + public void channelActive(final ChannelHandlerContext ctx) throws Exception { + InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); + if(isdebug) { +// System.err.println(insocket.getAddress() + ": Connect successful......"); + } + } + + /** 心跳 */ + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent) { + IdleStateEvent event = (IdleStateEvent) evt; + if (event.state().equals(IdleState.READER_IDLE)) + { + // + } + else if (event.state().equals(IdleState.WRITER_IDLE)) + { + // + } + else if (event.state().equals(IdleState.ALL_IDLE)) + { + String msg = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>()); + if(ctx != null && ctx.channel().isActive()) { + ctx.writeAndFlush(Unpooled.copiedBuffer((msg).getBytes())); +// System.err.println(msg); + } + } + } + //super.userEventTriggered(ctx, evt); + } + + /** 连接处于不活跃时调用(连接关闭) **/ + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + if(isdebug) { + InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); +// System.err.println(insocket.getAddress() + ": Disconnect connection......"); + } + NettyChannelMap.remove(ctx); +// System.err.println("清除通道" + ctx); +// super.channelInactive(ctx); + } + + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + ctx.flush(); + } + + /** 处理方法是当出现Throwable对象才会被调用 **/ + public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause) throws Exception { + ctx.close(); + } + + public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + if(isdebug) { + InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); +// System.err.println("close......." + insocket.getAddress()); + } + ctx.close(promise); + } + + public void read(ChannelHandlerContext ctx) throws Exception { + ctx.read(); + } + + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + ctx.write(msg, promise); + } + +// @Override +// protected void channelRead(ChannelHandlerContext ctx, String msg) throws Exception { +// // TODO Auto-generated method stub +// InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); +// if(isdebug) { +// System.out.println(insocket.getAddress() + ": 收到客户端数据......."); +// } +// try { +// // 调用service +// nettyServerController.JudgeOperation(ctx, msg); +// } catch (Exception e) { +// e.printStackTrace(); +// } finally { +// ReferenceCountUtil.release(msg); +// } +// } + +} diff --git a/zuul/src/main/java/com/sinata/zuul/util/echo/Method.java b/zuul/src/main/java/com/sinata/zuul/util/echo/Method.java new file mode 100644 index 0000000..aa3f100 --- /dev/null +++ b/zuul/src/main/java/com/sinata/zuul/util/echo/Method.java @@ -0,0 +1,26 @@ +package com.sinata.zuul.util.echo; + +/** + * 即时通讯【通讯类型类】 + * + * @author TaoNingBo + * @createDate 2016年6月14日 + * @version 1.0 + */ +public class Method { + + /** 心跳【推送】 */ + public static final String ok = "OK"; + + /** 心跳【接收】 */ + public final static String ping = "PING"; + + /** 心跳【响应】 */ + public final static String pong = "PONG"; + + /** 司机上传位置 */ + public static final String location = "LOCATION"; + + + +} diff --git a/zuul/src/main/java/com/sinata/zuul/util/echo/NettyChannelMap.java b/zuul/src/main/java/com/sinata/zuul/util/echo/NettyChannelMap.java new file mode 100644 index 0000000..33cf1f8 --- /dev/null +++ b/zuul/src/main/java/com/sinata/zuul/util/echo/NettyChannelMap.java @@ -0,0 +1,146 @@ +package com.sinata.zuul.util.echo; + +import io.netty.channel.ChannelHandlerContext; + +import java.net.SocketAddress; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +public class NettyChannelMap { + + protected static Map<String, ChannelHandlerContext> map = new ConcurrentHashMap<>(); + + public static Map<String, ChannelHandlerContext> ctxMap = new HashMap<>();//单点登录存储的通道 + + + private NettyChannelMap() { + // 放置外部实例化 + } + + /** + * Get data from source. + * + * @param key + * @return + */ + public static ChannelHandlerContext getData(String key) { + if(map==null){ + map = new HashMap<String, ChannelHandlerContext>(); + } + return map.get(key); + } + + + public static ChannelHandlerContext getData_(String key) { + if(ctxMap==null){ + ctxMap = new HashMap<String, ChannelHandlerContext>(); + } + return ctxMap.get(key); + } + + + /** + * Save data from source. + * + * @param key + * @param val + */ + public static synchronized void saveData(String key, ChannelHandlerContext val) { + map.put(key, val); + } + + /** + * Determine whether the cache key contains the key. + * + * @param key + * @return true|false + * @author TaoNingBo + */ + public static synchronized boolean containsKey(String key) { + return map.containsKey(key); + } + + /** + * Determine whether the cache value contains the value. + * + * @param val + * @return + */ + public static synchronized boolean containsVal(ChannelHandlerContext val) { + return map.containsValue(val); + } + + /** + * Remove the data resources. + * + * @param value + */ + @SuppressWarnings("rawtypes") + public static synchronized void remove(ChannelHandlerContext value) { + if(null == value){ + return; + } + Set<String> strings = map.keySet(); + for(String key : strings){ + ChannelHandlerContext channelHandlerContext = map.get(key); + String s = channelHandlerContext.channel().remoteAddress().toString(); + String s1 = value.channel().remoteAddress().toString(); + if(s.equals(s1)){ + channelHandlerContext.close();//关闭通道 + map.remove(key); + } + } + } + + + public static synchronized void remove_(ChannelHandlerContext value) { + Set<String> strings = ctxMap.keySet(); + for(String key : strings){ + ChannelHandlerContext channelHandlerContext = ctxMap.get(key); + String s = channelHandlerContext.channel().remoteAddress().toString(); + SocketAddress socketAddress = value.channel().remoteAddress(); + if(null == socketAddress){ + String s1 = socketAddress.toString(); + if(s.equals(s1)){ + channelHandlerContext.close();//关闭通道 + ctxMap.remove(key); + } + } + + } + } + + + public static synchronized void remove_(String key) { + ctxMap.remove(key); + } + + + /** + * Remove the data resources. + * + * @param key + * @author TaoNingBo + */ + public static synchronized void remove(String key) { + map.remove(key); + } + + /** + * Update the data resources. + * + * @param key + * @param value + */ + public static synchronized void update(String key, ChannelHandlerContext value) { + map.put(key, value); + } + + + + public static synchronized void update_(String key, ChannelHandlerContext value) { + ctxMap.put(key, value); + } +} diff --git a/zuul/src/main/java/com/sinata/zuul/util/echo/NettyMsg.java b/zuul/src/main/java/com/sinata/zuul/util/echo/NettyMsg.java new file mode 100644 index 0000000..2f49fc0 --- /dev/null +++ b/zuul/src/main/java/com/sinata/zuul/util/echo/NettyMsg.java @@ -0,0 +1,165 @@ +package com.sinata.zuul.util.echo; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + + +public class NettyMsg { + public static void main(String[] args) { + Map<String,Object> map = new HashMap<String, Object>(); + //用户信息 + map.put("imgUrl","1"); + map.put("nickName","1"); + map.put("licensePlate","1"); + map.put("phone",1); + map.put("driverId",1); + map.put("carColor","1"); + map.put("modelName","1"); + map.put("brandName","1"); + map.put("driverOrderNums",1); + map.put("score",1); + + + map.put("id",1); + map.put("orderNum", "123456"); + map.put("startAddress", "测试"); + map.put("endAddress", "测试1"); + map.put("departureTime", 1533608196000L); + map.put("type",1); + map.put("mileage",1); + map.put("mileageMoney",10); + map.put("duration",10); + map.put("durationMoney",10); + map.put("nightMoney",1); + map.put("serverMoney",1); + map.put("nightMileage",10); + map.put("longMileage",10); + map.put("longDurationMoney",10); + map.put("orderMoney",10); + map.put("payMoney",10); + map.put("couponsMoney",10); + System.out.println(setMsg(Method.ping, new HashMap<String, Object>())); + } + + /** + * 返回一个正确数据 + * + * @param method + * @param data + * @return + * @author TaoNingBo + */ + public static String setMsg(String method, Map<String, Object> data) { + StringBuffer json = new StringBuffer(); + json.append(getHeader(200, "SUCCESS", method)); + json.append(JSON.toJSONString(data)); + json.append("}"); + //return JSON.toJSONString(json); + return json.toString(); + } + + /** + * 返回一个正确数据 + * + * @param method + * @param data + * @return + */ + public static String setMsg(String method, List<Map<String, Object>> data) { + StringBuffer json = new StringBuffer(); + json.append(getHeader(200, "SUCCESS", method)); + List<JSONObject> jsonList = new ArrayList<JSONObject>(); + for(Map<String, Object> map : data) { + JSONObject dataJson = new JSONObject(map); + jsonList.add(dataJson); + } + json.append(jsonList); + json.append("}"); + +// return JSON.toJSONString(json); + return json.toString(); + } + + /** + * 返回一个错误数据 + * + * @param method + * @param data + * @return + * @author TaoNingBo + */ + public static String setErrMsg(String method, String data) { + StringBuffer json = new StringBuffer(); + json.append(getHeader(-1, "FAILURE", method)); + json.append("\"" + data + "\""); + json.append("}"); +// return JSON.toJSONString(json); + return json.toString(); + } + + /** + * 生成一个返回JSON的头 + * + * @param code + * @param msg + * @param method + * @return + * @author TaoNingBo + */ + private static String getHeader(int code, String msg, String method) { + StringBuffer header = new StringBuffer(); + header.append("{"); + header.append("\"code\":\"" + code); + header.append("\",\"msg\":\"" + msg); + header.append("\",\"method\":\"" + method); + header.append("\",\"data\":"); + return header.toString(); + } + + /** + * 发送消息给客户端 + * + * @param cacheType + * @param id + * @param method + * @param data + * @author TaoNingBo + */ + public static void sendMsg(String cacheType, Integer id, String method, Map<String, Object> data) { + //NettyServerController.sendMsgToClient(NettyChannelMap.getData(cacheType + id), setMsg(method, data)); + NettyServerController.sendMsgToClient(cacheType,id, setMsg(method, data)); + } + + + public static void resendMsg(String token){ + String msg = NettyServerController.table.get(token); + ChannelHandlerContext ctx = NettyChannelMap.getData(token); + if(null != msg && !"".equals(msg) && ctx != null && ctx.channel().isActive()){ + ByteBuf buffer = Unpooled.copiedBuffer((msg).getBytes()); + ChannelFuture sync; + try { + sync = ctx.writeAndFlush(buffer).sync(); + System.err.println("重发异常推送状态"+sync.isSuccess()+",位置:"+token+",消息内容:"+msg); + if(!sync.isSuccess()){ + resendMsg(token); + System.err.println("重发异常推送不成功,将继续推送"+msg); + } + NettyServerController.table.remove(token); + } catch (Exception e) { + resendMsg(token); + System.err.println("重发推送发生异常,记录:"+msg); + } + } + } + + +} diff --git a/zuul/src/main/java/com/sinata/zuul/util/echo/NettyServer.java b/zuul/src/main/java/com/sinata/zuul/util/echo/NettyServer.java new file mode 100644 index 0000000..82cce36 --- /dev/null +++ b/zuul/src/main/java/com/sinata/zuul/util/echo/NettyServer.java @@ -0,0 +1,95 @@ +package com.sinata.zuul.util.echo; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import io.netty.handler.codec.LengthFieldPrepender; +import io.netty.handler.timeout.IdleStateHandler; + +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.TimeUnit; + +/** + * 即时通讯服务启动类 + * + * @date 2016年6月25日 + * @version 1.0 + */ +public class NettyServer { + + + /** + * NettyServer启动方法. + */ + public void bind() { + final Thread thread = new Thread(new NettyRunnable()); + Timer timer = new Timer(); + timer.schedule(new TimerTask() { + @Override + public void run() { + thread.start(); + } + }, 10000); + } + + /** + * 即时通讯服务启动 + * + * @date 2016年6月24日 + * @version 1.0 + */ + public class NettyRunnable implements Runnable { + + /** + * 获取即时通讯启动端口 + */ + private Integer nettyPort = 8888; + @Override + public void run() { + EventLoopGroup bossGroup = new NioEventLoopGroup(); + EventLoopGroup workerGroup = new NioEventLoopGroup(); + try { + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(bossGroup, workerGroup); + bootstrap.channel(NioServerSocketChannel.class); + bootstrap.option(ChannelOption.SO_BACKLOG, 1024); + // 通过TCP_NODELAY禁用NAGLE,使消息立即发出去,不用等待到一定的数据量才发出去 + bootstrap.option(ChannelOption.TCP_NODELAY, true); + // 保持长连接状态 + bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); + bootstrap.childHandler(new ServerInit() { + @Override + protected void initChannel(SocketChannel socketChannel) throws Exception { + ChannelPipeline pipeline = socketChannel.pipeline(); + pipeline.addLast("ping", new IdleStateHandler(120, 60, 5, TimeUnit.SECONDS)); + pipeline.addLast("decoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); + pipeline.addLast("encoder", new LengthFieldPrepender(4)); + //pipeline.addLast(new LineBasedFrameDecoder(1048576 * 10)); + //pipeline.addLast(new StringDecoder(Charset.forName("UTF-8"))); + //pipeline.addLast(new StringEncoder(Charset.forName("UTF-8"))); + pipeline.addLast(new DiscardServerHandler()); + } + }); + // 服务器绑定端口监听 + ChannelFuture f = bootstrap.bind(nettyPort).sync(); + if(f.isSuccess()) { + System.out.println("******************************Netty启动成功******************************"); + } + // 监听服务器关闭监听 + f.channel().closeFuture().sync(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + } + } + } +} diff --git a/zuul/src/main/java/com/sinata/zuul/util/echo/NettyServerController.java b/zuul/src/main/java/com/sinata/zuul/util/echo/NettyServerController.java new file mode 100644 index 0000000..b173e90 --- /dev/null +++ b/zuul/src/main/java/com/sinata/zuul/util/echo/NettyServerController.java @@ -0,0 +1,327 @@ +package com.sinata.zuul.util.echo; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.sinata.zuul.util.*; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelId; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.MediaType; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; +import org.springframework.util.StringUtils; +import org.springframework.web.client.RestTemplate; + +import java.util.*; + + +/** + * Netty业务逻辑层 + * @author sinata + * @createDate 2016年6月3日 + * @version 1.0 + */ +public class NettyServerController { + + public static Hashtable<String, Hashtable<ChannelHandlerContext, String>> map = new Hashtable<String, Hashtable<ChannelHandlerContext,String>>(); + + public static Hashtable<String,String> table; + + private RedisUtil redisUtil = SpringUtil.getObject(RedisUtil.class); + + private GDMapGeocodingUtil gdMapGeocodingUtil = SpringUtil.getObject(GDMapGeocodingUtil.class); + + private RestTemplate internalRestTemplate = SpringUtil.getObject(RestTemplate.class); + + + + + static{ + if(table == null){ + table = new Hashtable<>(); + } + } + + public static boolean isdebug = false; + public static int i = 0; + + + /** + * 判断客户端要执行什么操作 + * + * @param ctx + * @param msg + * @author TaoNingBo + */ + public void JudgeOperation(ChannelHandlerContext ctx, Object msg) { + try { + // ByteBuf转String + ByteBuf byteBuf = (ByteBuf) msg; + + byte[] req = new byte[byteBuf.readableBytes()]; + byteBuf.readBytes(req); + msg = new String(req, "UTF-8"); + // 验证即时通讯命令是否正确有效 + if(SinataUtil.isEmpty(msg)) { + return; + } + String msgStr = msg.toString(); + if(msgStr.indexOf("{") == -1 || msgStr.indexOf("}") == -1 || msgStr.indexOf("code") == -1 || msgStr.indexOf("msg") == -1 || msgStr.indexOf("data") == -1 || msgStr.indexOf("method") == -1) { + return; + } + if(isdebug) { +// System.out.println("<<<--receive-->>>" + msg); + } + + // 获取socket信息,保存相应的socket + JSONObject jsonMsg = JSONObject.parseObject(msg.toString()); + int code = jsonMsg.getInteger("code"); + String message = jsonMsg.getString("msg"); + String method = jsonMsg.getString("method"); + if(code != 200 || !message.equals("SUCCESS")) { + return; + } + JSONObject jsonCon = JSONObject.parseObject(jsonMsg.get("data").toString()); + + //心跳 + if(method.equals(Method.ping)) { + Integer type = jsonCon.getInteger("type"); + String token = jsonCon.getString("token"); + String userId1 = jsonCon.getString("userId"); + String device = jsonCon.getString("device"); + String version = jsonCon.getString("version"); + if(StringUtil.isNotEmpty(userId1)){ + + //判断用户或者司机长连接 + if(type==1){ + //确保账号在单个设备上登录 + if(StringUtil.isNotEmpty(token)){ + String token_ = redisUtil.getValue("USER_APP_"+ userId1);//获取缓存中最新的数据 + if(StringUtil.isNotEmpty(token_) && !token.equals(token_)){//不在同一设备上登录,向其他设备发送数据 + ChannelHandlerContext data_ = NettyChannelMap.getData_(token_.substring(token_.length() - 16)); + JSONObject msg_ = new JSONObject(); + msg_.put("code", 200); + msg_.put("msg", "SUCCESS"); + msg_.put("method", "OFFLINE"); + msg_.put("data", new Object()); + boolean b = this.sendMsgToClient(data_, msg_.toJSONString());//给当前通道发送消息 + if(b){ + NettyChannelMap.remove_(data_); + } +// new Timer().schedule(new TimerTask() { +// @Override +// public void run() { +// NettyChannelMap.remove_(data_); +// } +// }, 5000); + } + NettyChannelMap.update_(token.substring(token.length() - 16), ctx); + NettyChannelMap.update("USER" + userId1, ctx); + redisUtil.setStrValue("USER_APP_" + userId1, token); + } + + //存储通讯通道 + if(null != ctx && ctx.channel().isActive()){ + NettyChannelMap.update("USER" + userId1, ctx); + } + }else{ + //确保账号在单个设备上登录 + if(StringUtil.isNotEmpty(token)){//APP端登录的操作 +// String token_ = redisUtil.getValue("DRIVER_" + userId1);//缓存中拿最新数据 +// if(StringUtil.isNotEmpty(token_) && !token.equals(token_)){//不在同一设备上登录,向当前设备发送数据 +// ChannelHandlerContext data_ = NettyChannelMap.getData_(token_.substring(token_.length() - 16)); +// if(null != data_){ +// JSONObject msg_ = new JSONObject(); +// msg_.put("code", 200); +// msg_.put("msg", "SUCCESS"); +// msg_.put("method", "OFFLINE"); +// msg_.put("data", new Object()); +// boolean b = this.sendMsgToClient(data_, msg_.toJSONString());//给当前通道发送消息 +// if(b){ +// NettyChannelMap.remove_(data_); +// } +// } +// } + NettyChannelMap.update("DRIVER" + userId1, ctx); + NettyChannelMap.update_(token.substring(token.length() - 16), ctx); + redisUtil.setStrValue("DRIVER_" + userId1, token); + } + //存储通讯通道 + if(null != ctx && ctx.channel().isActive()){ + NettyChannelMap.update("DRIVER" + userId1, ctx); + } + } + } + + if(null != ctx && ctx.channel().isActive()){ + jsonMsg.put("method", Method.pong); + sendMsgToClient(ctx, jsonMsg.toJSONString()); + } + } + //司机上传位置 + if(method.equals(Method.location)){ + Integer driverId = jsonCon.getInteger("driverId"); + Integer orderId = jsonCon.getInteger("orderId"); + Integer orderType = jsonCon.getInteger("orderType"); + Double lon = jsonCon.getDouble("lon"); + Double lat = jsonCon.getDouble("lat"); + Double computeAzimuth = jsonCon.getDouble("computeAzimuth"); + Double altitude = jsonCon.getDouble("altitude"); + if(SinataUtil.isNotEmpty(driverId)){ + if(null != lon && 0 != lon && null != lat && 0 != lat){ + HttpHeaders headers = new HttpHeaders(); + // 以表单的方式提交 + headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED); + //将请求头部和参数合成一个请求 + MultiValueMap<String, Object> params = new LinkedMultiValueMap<>(); + params.add("orderType", null == orderType ? orderType : String.valueOf(orderType)); + params.add("orderId", null == orderId ? orderId : String.valueOf(orderId)); + params.add("driverId", String.valueOf(driverId)); + params.add("lon", String.valueOf(lon)); + params.add("lat", String.valueOf(lat)); + params.add("directionAngle", String.valueOf(computeAzimuth)); + params.add("altitude", String.valueOf(altitude)); + HttpEntity<MultiValueMap<String, Object>> requestEntity = new HttpEntity<>(params, headers); + String s = internalRestTemplate.postForObject("http://driver-server/base/driver/addDriverPosition",requestEntity , String.class); + JSONObject jsonObject = JSON.parseObject(s, JSONObject.class); + if(jsonObject.getIntValue("code") != 200){ + System.err.println("调用driver-server存储位置数据出错了"); + } + }else{ + NettyServerController.sendMsgToClient(ctx, "__error__" + msg.toString()); + } + }else{ + NettyServerController.sendMsgToClient(ctx, "__error__" + msg.toString()); + } + } + + } catch (Exception e) { + if(isdebug) { + NettyServerController.sendMsgToClient(ctx, "__error__" + msg.toString()); + } + e.printStackTrace(); + } + } + + /** + * 向客户端发送消息 + * + * @param ctx + * @param msg + * @author TaoNingBo + */ + public static boolean sendMsgToClient(ChannelHandlerContext ctx, String msg) { + if (ctx != null && ctx.channel().isActive()) { + ByteBuf buffer = Unpooled.copiedBuffer((msg).getBytes()); + ChannelFuture sync; + try { + sync = ctx.writeAndFlush(buffer).sync(); + if(!sync.isSuccess()){//如果推送失败则继续推送10次 + boolean b = true; + for (int i = 0; i < 10; i++) { + ctx.wait(3000); + sync = ctx.writeAndFlush(buffer).sync(); + if(sync.isSuccess()){ + b = false; + break; + } + System.err.println("推送不成功,将继续推送"+msg); + } + if(b){ + NettyChannelMap.remove(ctx); + } + return true; + } + return sync.isSuccess(); + } catch (Exception e) { + System.err.println("推送发生异常,记录:"+msg); + NettyChannelMap.remove(ctx); + } + if(isdebug) { + System.err.println("<<<--send-->>>" + msg) ; + } + }else{ + System.err.println("推送失败,长连接不存在"); + NettyChannelMap.remove(ctx); + } + return false; + } + +// **链接断开 将推送消息记录 + public static void sendMsgToClient(String cacheType, Integer id,String msg) { + ChannelHandlerContext ctx = NettyChannelMap.getData(cacheType + id); + if (ctx != null) { + ByteBuf buffer = Unpooled.copiedBuffer((msg).getBytes()); + ChannelFuture sync; + try { + sync = ctx.writeAndFlush(buffer).sync(); +// System.out.println("推送状态"+sync.isSuccess()); + if(!sync.isSuccess()){ + for (int i = 0; i < 10; i++) { + sync = ctx.writeAndFlush(buffer).sync(); + if(!sync.isSuccess()){ + sync = ctx.writeAndFlush(buffer).sync(); + System.err.println("推送不成功,将继续推送"+msg); + if(i == 9){ + table.put(cacheType+id, msg); + + ctx.close(); + System.err.println("推送发生异常,记录:"+msg); + } + }else{ + break; + } + } + } + } catch (Exception e) { + table.put(cacheType+id, msg); + System.err.println("推送发生异常,记录:"+msg); + } + if(isdebug) { +// System.out.println("<<<--send-->>>" + msg); + } + }else{ + table.put(cacheType+id, msg); + System.err.println("链接断开,记录:id="+cacheType+id+",消息:"+msg); + } + } + + /** + * 记录推送不成功消息,并在心跳连接续推 + * @param token + */ + public static void resendMsg(String token){ + String msg = table.get(token); + ChannelHandlerContext ctx = NettyChannelMap.getData(token); + if(SinataUtil.isNotEmpty(msg) && ctx != null && ctx.channel().isActive()){ + ByteBuf buffer = Unpooled.copiedBuffer((msg).getBytes()); + ChannelFuture sync; + try { + sync = ctx.writeAndFlush(buffer).sync(); + System.err.println("重发异常推送状态"+sync.isSuccess()+",位置:"+token+",消息内容:"+msg); + if(!sync.isSuccess()){ + i++; + if(i == 10){ + i =0; + ctx.close(); + return; + } + System.err.println("重发异常推送不成功,将继续推送"+msg); + resendMsg(token); + }else{ + i=0; + } + table.remove(token); + } catch (Exception e) { + resendMsg(token); + System.err.println("重发推送发生异常,记录:"+msg); + } + } + } + + +} diff --git a/zuul/src/main/java/com/sinata/zuul/util/echo/ServerInit.java b/zuul/src/main/java/com/sinata/zuul/util/echo/ServerInit.java new file mode 100644 index 0000000..3a50298 --- /dev/null +++ b/zuul/src/main/java/com/sinata/zuul/util/echo/ServerInit.java @@ -0,0 +1,25 @@ +package com.sinata.zuul.util.echo; + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.string.StringDecoder; +import io.netty.handler.codec.string.StringEncoder; + +import java.nio.charset.Charset; + +public class ServerInit extends ChannelInitializer<SocketChannel> { + + private DiscardServerHandler discardServerHandler = new DiscardServerHandler(); + + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast(new StringDecoder(Charset.forName("utf-8"))); + pipeline.addLast(new StringEncoder(Charset.forName("utf-8"))); + // 心跳监测机制 +// pipeline.addLast(new IdleStateHandler(5,7,10, TimeUnit.SECONDS)); + pipeline.addLast(discardServerHandler); + } + +} diff --git a/zuul/src/main/resources/application.yml b/zuul/src/main/resources/application.yml new file mode 100644 index 0000000..b14faa5 --- /dev/null +++ b/zuul/src/main/resources/application.yml @@ -0,0 +1,35 @@ +server: + port: 81 + +spring: + application: + name: zuul-gateway #服务名称 + +eureka: + client: + service-url: #注册中心地址 + defaultZone: http://sinata:sinata@127.0.0.1:8000/eureka #启用身份验证的方式连接 + register-with-eureka: true #在注册中心进行注册 + fetch-registry: true #从Eureka中获取注册信息。 + +zuul: +# prefix: /api #添加URL前缀 + sensitive-headers: #将默认过滤掉的敏感数据清除,不进行过滤("Cookie", "Set-Cookie", "Authorization") + routes: + user-server: #接口调用路由服务,名字任意取。(用户端服务) + path: /user/** #配置请求URL的请求规则 + url: http://127.0.0.1:8006 #真正的微服务地址,path匹配的请求都转发到这里 + serviceid: user-server #指定Eureka注册中心的服务id + driver-server: #路由司机相关请求 + path: /driver/** #配置请求URL的请求规则 + url: http://127.0.0.1:8007 #真正的微服务地址,path匹配的请求都转发到这里 + serviceid: driver-server #指定Eureka注册中心的服务id + # 配置zuul超时时间 + host: + connect-timeout-millis: 150000 + socket-timeout-millis: 15000 + +# 配置ribbon超时时间 +ribbon: + ReadTimeout: 10000 + ConnectTimeout: 10000 \ No newline at end of file diff --git a/zuul/src/main/resources/redis.properties b/zuul/src/main/resources/redis.properties new file mode 100644 index 0000000..eae38eb --- /dev/null +++ b/zuul/src/main/resources/redis.properties @@ -0,0 +1,28 @@ +#redis���ÿ�ʼ +# Redis���ݿ�������Ĭ��Ϊ0�� +spring.redis.database=0 +# Redis��������ַ +spring.redis.host=127.0.0.1 +# Redis���������Ӷ˿� +spring.redis.port=16379 +# Redis�������������루Ĭ��Ϊ�գ� +spring.redis.password=wuOT@8NLdZ*V09Jt + +## Redis��������ַ +#spring.redis.host=127.0.0.1 +## Redis���������Ӷ˿� +#spring.redis.port=6379 +## Redis�������������루Ĭ��Ϊ�գ� +#spring.redis.password=123456 +# ���ӳ������������ʹ�ø�ֵ��ʾû�����ƣ� +spring.redis.jedis.pool.max-active=1024 +# ���ӳ���������ȴ�ʱ�䣨ʹ�ø�ֵ��ʾû�����ƣ� +spring.redis.jedis.pool.max-wait=10000 +# ���ӳ��е����������� +spring.redis.jedis.pool.max-idle=200 +# ���ӳ��е���С�������� +spring.redis.jedis.pool.min-idle=50 +# ���ӳ�ʱʱ�䣨���룩 +spring.redis.timeout=10000 +#redis���ý��� +spring.redis.block-when-exhausted=true \ No newline at end of file diff --git a/zuul/src/main/resources/static/tXQaRbVjpJ.txt b/zuul/src/main/resources/static/tXQaRbVjpJ.txt new file mode 100644 index 0000000..9059363 --- /dev/null +++ b/zuul/src/main/resources/static/tXQaRbVjpJ.txt @@ -0,0 +1 @@ +0ef42640d32181b65822b974b89492d4 \ No newline at end of file -- Gitblit v1.7.1