除此之外, 这篇博客: SpringCloud Gateway 动态路由【篇2终极版】基于 MySQL + 二级缓存实现中的 1. 创建网关服务 部分也许能够解决你的问题, 你可以仔细阅读以下内容或跳转源博客中阅读:
首先根据上一节中的 AppRoute 实体类创建一张 MySQL 表:
CREATE TABLE `app_route` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`routeId` varchar(255) NOT NULL,
`order` int(11) DEFAULT NULL,
`uri` varchar(255) NOT NULL,
`predicates` text,
`filters` text,
`updateTime` datetime NOT NULL,
`delete` tinyint(1) NOT NULL DEFAULT '0',
PRIMARY KEY (`id`,`routeId`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
同上一节创建一个 parent 项目,然后在其下创建一个网关服务,添加 redis 和 mybatis 相关的依赖:
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.0.0</version>
</dependency>
</dependencies>
下面我们改造一下上一节在网关服务中的一些配置:
application.yml:
server:
port: 8502
spring:
application:
name: gateway-demo222
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://localhost:3306/test?useSSL=false
username: root
password: root
redis:
host: localhost
password:
port: 6379
database: 10
management:
endpoints:
web:
exposure:
include: health,info,gateway
mybatis:
mapper-locations: classpath:mapper/*.xml
configuration:
map-underscore-to-camel-case: true
# 自定义参数
gateway:
dynamicRoute:
dataId: 'yq_routes'
group: 'YQ_GATEWAY'
创建 AppRoute 的 DAO 和对应的 Mybatis SQL 语句,AppRouteDAO 如:
@Mapper
@Component
public interface AppRouteDAO {
@Select("select * from app_route")
List<AppRoute> findAll();
@Select("select * from app_route where routeId = #{routeId} AND `delete` = 0 LIMIT 1")
AppRoute findByRouteId(String routeId);
@Select("select * from app_route where id = #{id} AND `delete` = 0")
AppRoute findById(Integer id);
boolean update(AppRoute route);
boolean insert(AppRoute route);
boolean delete(AppRoute route);
}
PS: 其它的语句在 /resources/mapper/AppRouteDAO.xml 中。
@Service
public class RouteHandler implements ApplicationEventPublisherAware, CommandLineRunner {
private static final Logger log = LoggerFactory.getLogger(RouteHandler.class);
private ApplicationEventPublisher publisher;
@Autowired
private AppRouteService appRouteService;
@Autowired
private CacheRouteDefinitionRepository cacheRouteDefinitionRepository;
@Autowired
private RouteDefinitionCacheService routeDefinitionCacheService;
@Override
public void run(String... args) throws Exception {
log.info("首次初始化路由....");
this.loadRouteConfig();
}
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.publisher = applicationEventPublisher;
}
public void loadRouteConfig() {
log.info("加载路由配置...");
Flux<RouteDefinition> definitionFlux = cacheRouteDefinitionRepository.getRouteDefinitions();
new Thread(() -> {
List<String> existRouteIds = definitionFlux.toStream().map(RouteDefinition::getId).collect(Collectors.toList());
// 也可以用下面这种方法,就不需要 new Thread() 了:
// List<String> existRouteIds = routeDefinitionCacheService.getRouteDefinitions().stream().map(RouteDefinition::getId).collect(Collectors.toList());
List<AppRoute> appRouteList = appRouteService.findAll();
if (appRouteList != null && appRouteList.size() > 0) {
appRouteList.forEach(a -> {
if (BooleanUtils.isTrue(a.getDelete()) && existRouteIds.contains(a.getRouteId())) {
deleteRoute(a.getRouteId());
} else {
RouteDefinition routeDefinition = a.parseToRoute();
System.out.println("s: " + JSONObject.toJSONString(routeDefinition));
if (routeDefinition != null) {
cacheRouteDefinitionRepository.save(Mono.just(routeDefinition)).subscribe();
}
}
});
}
this.publisher.publishEvent(new RefreshRoutesEvent(this));
}).start();
}
public void deleteRoute(String routeId) {
log.info("删除路由:" + routeId);
cacheRouteDefinitionRepository.delete(Mono.just(routeId)).subscribe();
this.publisher.publishEvent(new RefreshRoutesEvent(this));
}
}
注意,这里直接用 cacheRouteDefinitionRepository.getRouteDefinitions(),在通过接口更新路由信息后调用此方法时,会出现以下异常:
java.lang.IllegalStateException: Iterating over a toIterable() / toStream() is blocking, which is not supported in thread reactor-http-nio-3
这个问题的详细解释我还没找到,目前大概理解是 WebFlux 中的异步数据 Flux 被同步的操作调用时,会抛出 blocking 异常。
解决办法
- new 一个线程来从 Flux 中获取数据,然后执行操作
- 这里不使用 cacheRouteDefinitionRepository.getRouteDefinitions() 了,而是直接用routeDefinitionCacheService.getRouteDefinitions()
RouteDefinitionCacheService,该接口定义了 RouteDefinition 的本地存储和 Redis 存储,这样可以避免每一次读取路由信息都要访问数据库的问题。
@Service
public class RouteDefinitionCacheServiceImpl implements RouteDefinitionCacheService {
/**
* 本地缓存
*/
private static ConcurrentHashMap<String, RouteDefinition> definitionMap = new ConcurrentHashMap<>();
/**
* redis 缓存地址
*/
public static String SPACE = GatewayConfig.NACOS_DATA_ID + ":" + GatewayConfig.NACOS_GROUP_ID;
@Autowired
private RedisTemplate redisTemplate;
@Override
public List<RouteDefinition> getRouteDefinitions() {
List<RouteDefinition> list = new ArrayList<>();
if (definitionMap.size() > 0) {
return new ArrayList<>(definitionMap.values());
} else {
redisTemplate.opsForHash().values(SPACE)
.stream().forEach(r -> {
RouteDefinition route = JSONObject.parseObject(r.toString(), RouteDefinition.class);
list.add(route);
definitionMap.put(route.getId(), route);
});
return list;
}
}
@Override
public boolean saveAll(List<RouteDefinition> definitions) {
if (definitions != null && definitions.size() > 0) {
definitions.forEach(this::save);
return true;
}
return false;
}
@Override
public boolean has(String routeId) {
return definitionMap.containsKey(routeId) ? true : redisTemplate.opsForHash().hasKey(SPACE, routeId);
}
@Override
public boolean delete(String routeId) {
if (has(routeId)) {
definitionMap.remove(routeId);
redisTemplate.opsForHash().delete(SPACE, routeId);
return true;
}
return false;
}
@Override
public boolean save(RouteDefinition r) {
if (r != null && StringUtils.isNotBlank(r.getId())) {
definitionMap.put(r.getId(), r);
redisTemplate.opsForHash().put(SPACE, r.getId(), JSONObject.toJSONString(r));
return true;
}
return false;
}
}
CacheRouteDefinitionRepository,以 RouteDefinitionCacheService 为基础,是 RouteDefinitionRepository 的实现类。该类直接给 Gateway 定义了读取路由信息的方式。
@Service
public class CacheRouteDefinitionRepository implements RouteDefinitionRepository {
@Autowired
private RouteDefinitionCacheService cacheService;
@Override
public Flux<RouteDefinition> getRouteDefinitions() {
List<RouteDefinition> list = cacheService.getRouteDefinitions();
return Flux.fromIterable(list);
}
@Override
public Mono<Void> save(Mono<RouteDefinition> route) {
return route.flatMap(r -> {
cacheService.save(r);
return Mono.empty();
});
}
@Override
public Mono<Void> delete(Mono<String> routeId) {
return routeId.flatMap(id -> {
if (cacheService.has(id)) {
cacheService.delete(id);
return Mono.empty();
}
return Mono.defer(() -> Mono.error(new NotFoundException("未找到路由配置:" + routeId)));
});
}
}
现在我们已经定义好了 MySQL 中的 app_route 表,设计了程序中存储 RouteDefinition 的二级缓存(Redis + Mysql),下面的问题就是:如何将 MySQL 中的 app_route 和缓存结合起来?