• 注册
  • 后端开发博客 后端开发博客 关注:0 内容:3653

    spring cloud gateway 一个请求到转发走过的路途-源码解析

  • 查看作者
  • 打赏作者
  • 当前位置: 职业司 > 后端开发 > 后端开发博客 > 正文
    • 后端开发博客
    • spring cloud gateway 一个请求到转发走过的路途-源码解析

      简介

      Spring Cloud Gateway是基于Spring Boot 2.x,Spring WebFlux构建的,是新一代的网关解决方案。目前在打算用gateway网关替换掉原有的zuul网关,利用gateway提供的特性来提升原有网关性能。所以借此机会分析了下网关源码。

      工作原理

      这里贴一张官网的图
      spring cloud gateway 一个请求到转发走过的路途-源码解析
      客户端向Gateway网关发出请求。如果网关处理映射请求与路由匹配,则将其发送到网关处理请求。请求经过网关多个过滤器链(这是涉及一个设计模式:责任链模式)。过滤器由虚线分隔的原因是,过滤器可以在发送请求之前和之后运行逻辑。所有“前置”过滤器逻辑均被执行。然后发出代理请求。发出代理请求后,将运行“后置”过滤器逻辑。

      源码解析

      首先从GatewayAutoConfiguration看起

      负载均衡

      GatewayAutoConfiguration注解中的GatewayLoadBalancerClientAutoConfiguration注解中有个LoadBalancerClientFilter是处理负载均衡的关键代码。

      public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
      URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
      String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);
      if (url == null
      || (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {
      return chain.filter(exchange);
      }
      // preserve the original url
      addOriginalRequestUrl(exchange, url);
      if (log.isTraceEnabled()) {
      log.trace("LoadBalancerClientFilter url before: " + url);
      }
      final ServiceInstance instance = choose(exchange);
      if (instance == null) {
      throw NotFoundException.create(properties.isUse404(),
      "Unable to find instance for " + url.getHost());
      }
      URI uri = exchange.getRequest().getURI();
      // if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default,
      // if the loadbalancer doesn't provide one.
      String overrideScheme = instance.isSecure() ? "https" : "http";
      if (schemePrefix != null) {
      overrideScheme = url.getScheme();
      }
      URI requestUrl = loadBalancer.reconstructURI(
      new DelegatingServiceInstance(instance, overrideScheme), uri);
      if (log.isTraceEnabled()) {
      log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
      }
      exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
      return chain.filter(exchange);
      }
      复制代码

      我们在配置路由转发的时候在配置服务时会写lb://xxx,源码中看到了熟悉的lb。

      if (url == null
      || (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {
      return chain.filter(exchange);
      }
      复制代码

      url为null或不是lb则跳过此过滤器,否则进行负载均衡处理。
      负载关键代码是choose方法,返回ServiceInstance对象(serviceId,host,port等信息)作为负载均衡后的结果。

      protected ServiceInstance choose(ServerWebExchange exchange) {
      return loadBalancer.choose(
      ((URI) exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR)).getHost());
      }
      复制代码

      choose方法内部通过serviceId,通过ribbon去nacos里找到服务名对应的实例,并负载均衡选出一台实例服务ip和端口返回。将lb://xxx那部分替换成具体ip+端口后接请求路径,放入到上下文中key为gatewayRequestUrl。后通过NettyRoutingFilter过滤器(可以看到这个过滤器的order顺序是Integer.MAX_VALUE,目的就是为了处在最后的位置发送请求)使用httpclient发送请求到下游服务。

      负载均衡流程图

      spring cloud gateway 一个请求到转发走过的路途-源码解析

      请求转发

      核心代码:DispatcherHandler.handle(ServcerWebExchange exchange),它是org.springframework.web.reactive包下的,所有的请求都会经过这里。webflux暂时没有研究,不过大体能看出关键代码和逻辑。

      if (this.handlerMappings == null) {
      return createNotFoundError();
      }
      return Flux.fromIterable(this.handlerMappings)
      .concatMap(mapping -> mapping.getHandler(exchange))
      .next()
      .switchIfEmpty(createNotFoundError())
      .flatMap(handler -> invokeHandler(exchange, handler))
      .flatMap(result -> handleResult(exchange, result));
      复制代码

      我们可以看到mapping -> mapping.getHandler(exchange) debug进去发现getHandlerInternal()方法里面。

      protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
      // don't handle requests on management port if set and different than server port
      if (this.managementPortType == DIFFERENT && this.managementPort != null
      && exchange.getRequest().getURI().getPort() == this.managementPort) {
      return Mono.empty();
      }
      exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());
      return lookupRoute(exchange)
      // .log("route-predicate-handler-mapping", Level.FINER) //name this
      .flatMap((Function<Route, Mono<?>>) r -> {
      exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
      if (logger.isDebugEnabled()) {
      logger.debug(
      "Mapping [" + getExchangeDesc(exchange) + "] to " + r);
      }
      exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
      return Mono.just(webHandler);
      }).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
      exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
      if (logger.isTraceEnabled()) {
      logger.trace("No RouteDefinition found for ["
      + getExchangeDesc(exchange) + "]");
      }
      })));
      }
      复制代码

      它的实现方法在RoutePredicateHandlerMapping类中。开头if判断不用看,核心方法是lookupRoute(exchange)

      protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
      return this.routeLocator.getRoutes()
      // individually filter routes so that filterWhen error delaying is not a
      // problem
      .concatMap(route -> Mono.just(route).filterWhen(r -> {
      // add the current route we are testing
      exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
      return r.getPredicate().apply(exchange);
      })
      // instead of immediately stopping main flux due to error, log and
      // swallow it
      .doOnError(e -> logger.error(
      "Error applying predicate for route: " + route.getId(),
      e))
      .onErrorResume(e -> Mono.empty()))
      // .defaultIfEmpty() put a static Route not found
      // or .switchIfEmpty()
      // .switchIfEmpty(Mono.<Route>empty().log("noroute"))
      .next()
      // TODO: error handling
      .map(route -> {
      if (logger.isDebugEnabled()) {
      logger.debug("Route matched: " + route.getId());
      }
      validateRoute(route, exchange);
      return route;
      });
      /*
      * TODO: trace logging if (logger.isTraceEnabled()) {
      * logger.trace("RouteDefinition did not match: " + routeDefinition.getId()); }
      */
      }
      复制代码

      首先outeLocator.getRoutes()的实现方法RouteDefinitionRouteLocator.getRoutes

      Flux<Route> routes = this.routeDefinitionLocator.getRouteDefinitions()
      .map(this::convertToRoute);
      if (!gatewayProperties.isFailOnRouteDefinitionError()) {
      // instead of letting error bubble up, continue
      routes = routes.onErrorContinue((error, obj) -> {
      if (logger.isWarnEnabled()) {
      logger.warn("RouteDefinition id " + ((RouteDefinition) obj).getId()
      + " will be ignored. Definition has invalid configs, "
      + error.getMessage());
      }
      });
      }
      return routes.map(route -> {
      if (logger.isDebugEnabled()) {
      logger.debug("RouteDefinition matched: " + route.getId());
      }
      return route;
      });
      复制代码

      routeDefinitionLocator.getRouteDefinitions()又有很多实现方法。。。分别都是从缓存中获取路由信息,从注册中心获取路由信息,从配置文件中获取路由信息等,总而言之就是获取到路由RouteDefinition对象,通过convertToRoute方法将RouteDefinition对象转换成Route对象(咱们网关配置的谓词和过滤器都放入了Route对象,构建Route对象的时候又涉及一个设计模式:建造者模式)。
      再往上看,获取到路由信息后Mono.just(route).filterWhen()大概就是我们请求过来的url对某个路由信息做匹配过滤。将我们在路由里配置的id放入上下文中,key为GATEWAY_PREDICATE_ROUTE_ATTR(id如不指定,则为UUID)
      spring cloud gateway 一个请求到转发走过的路途-源码解析
      我这里配置的路由有两个,从图中断点可以看出,第一个路由信息和当前访问的url不匹配,返回为false,第二个路由信息匹配上了,返回为true。
      这样我们再一路返回到lookupRoute方法,经过上面的一顿操作,又将route路由放入上下文中key为GATEWAY_ROUTE_ATTR。
      再一路往上返回,回到最初的handle
      spring cloud gateway 一个请求到转发走过的路途-源码解析
      经过对mapping和路由的一系列前置处理,我们是不是就应该执行真正的过滤等处理逻辑了,下面就是执行处理的关键代码。
      关键代码:invokeHandler(exchange, handler)

      private Mono<HandlerResult> invokeHandler(ServerWebExchange exchange, Object handler) {
      if (this.handlerAdapters != null) {
      for (HandlerAdapter handlerAdapter : this.handlerAdapters) {
      if (handlerAdapter.supports(handler)) {
      return handlerAdapter.handle(exchange, handler);
      }
      }
      }
      return Mono.error(new IllegalStateException("No HandlerAdapter: " + handler));
      }
      复制代码

      我们进入handlerAdapter.handle(exchange, handler)方法,再经过多个实现
      SimpleHandlerAdapter -> FilteringWebHandler

      public Mono<Void> handle(ServerWebExchange exchange) {
      Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
      List<GatewayFilter> gatewayFilters = route.getFilters();
      List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
      combined.addAll(gatewayFilters);
      // TODO: needed or cached?
      AnnotationAwareOrderComparator.sort(combined);
      if (logger.isDebugEnabled()) {
      logger.debug("Sorted gatewayFilterFactories: " + combined);
      }
      return new DefaultGatewayFilterChain(combined).filter(exchange);
      }
      复制代码

      我们看到之前放入上下文的key为GATEWAY_ROUTE_ATTR的路由现在可以用到了!我们取出Route路由对象,记得之前RouteDefinition对象转Route的时候做了什么吗?是不是放入了过滤器?这里取出之前set进的过滤器集合,然后new DefaultGatewayFilterChain(combined).filter(exchange),执行过滤!(又是个设计模式:责任链模式,设计模式写法不固定,主要是思想哈,它这里的写法是通过index标记改执行哪一个过滤器,然后通过index游标移动来经过过滤器链条)
      还记得上面的负载均衡过滤器吗,他的order为int最大值,所以肯定最后要走到LoadBalancerClientFilter.filter,然后执行choose方法通过负载均衡算法选举出服务器实例,再通过httpClient调用下游服务。是不是又和前面负载均衡源码解析的步骤连起来了!

      ps

      里面有很多细节其实还没有写进去,比如路由信息会放入本地缓存中,路由信息获取也可自定义获取方式,比如从数据库中,从redis中等。大体上的流程就是这样了,有不对的地方欢迎指正!

      请登录之后再进行评论

      登录

      手机阅读天地(APP)

      • 微信公众号
      • 微信小程序
      • 安卓APP
      手机浏览,惊喜多多
      匿名树洞,说我想说!
      问答悬赏,VIP可见!
      密码可见,回复可见!
      即时聊天、群聊互动!
      宠物孵化,赠送礼物!
      动态像框,专属头衔!
      挑战/抽奖,金币送不停!
      赶紧体会下,不会让你失望!
    • 实时动态
    • 签到
    • 做任务
    • 发表内容
    • 偏好设置
    • 到底部
    • 帖子间隔 侧栏位置:
    • 还没有账号?点这里立即注册