Soul-全流程分析(四)

今天继续上次未完成的第二部分,话说当请求到达 WebHandler,从 handle 开始处理。

SoulWebHandler

SoulWebHandler#handle

@Override
public Mono<Void> handle(@NonNull final ServerWebExchange exchange) {
  MetricsTrackerFacade.getInstance().counterInc(MetricsLabelEnum.REQUEST_TOTAL.getName());
  Optional<HistogramMetricsTrackerDelegate> startTimer = MetricsTrackerFacade.getInstance().histogramStartTimer(MetricsLabelEnum.REQUEST_LATENCY.getName());
  return new DefaultSoulPluginChain(plugins).execute(exchange).subscribeOn(scheduler)
    .doOnSuccess(t -> startTimer.ifPresent(time -> MetricsTrackerFacade.getInstance().histogramObserveDuration(time)));
}

使用责任链,使用 new DefaultSoulPluginChain(plugins) 构建 PluginChain,之后执行 execute 方法;

DefaultSoulPluginChain#execute

private static class DefaultSoulPluginChain implements SoulPluginChain {

    private int index;

    private final List<SoulPlugin> plugins;

    /**
     * Instantiates a new Default soul plugin chain.
     *
     * @param plugins the plugins
     */
    DefaultSoulPluginChain(final List<SoulPlugin> plugins) {
        this.plugins = plugins;
    }

    /**
     * Delegate to the next {@code WebFilter} in the chain.
     *
     * @param exchange the current server exchange
     * @return {@code Mono<Void>} to indicate when request handling is complete
     */
    @Override
    public Mono<Void> execute(final ServerWebExchange exchange) {
        return Mono.defer(() -> {
            if (this.index < plugins.size()) {
                SoulPlugin plugin = plugins.get(this.index++);
                Boolean skip = plugin.skip(exchange);
                if (skip) {
                    return this.execute(exchange);
                }
                return plugin.execute(exchange, this);
            }
            return Mono.empty();
        });
    }
}

AbstractSoulPlugin#execute

@Override
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
  String pluginName = named();
  final PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);
  if (pluginData != null && pluginData.getEnabled()) {
    final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);
    if (CollectionUtils.isEmpty(selectors)) {
      return handleSelectorIsNull(pluginName, exchange, chain);
    }
    // 查找对应的 selectorData
    final SelectorData selectorData = matchSelector(exchange, selectors);
    if (Objects.isNull(selectorData)) {
      return handleSelectorIsNull(pluginName, exchange, chain);
    }
    selectorLog(selectorData, pluginName);
    final List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId());
    if (CollectionUtils.isEmpty(rules)) {
      return handleRuleIsNull(pluginName, exchange, chain);
    }
    RuleData rule;
    if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {
      //get last
      rule = rules.get(rules.size() - 1);
    } else {
      rule = matchRule(exchange, rules);
    }
    if (Objects.isNull(rule)) {
      return handleRuleIsNull(pluginName, exchange, chain);
    }
    ruleLog(rule, pluginName);
    return doExecute(exchange, chain, selectorData, rule);
  }
  return chain.execute(exchange);
}

AbstractSoulPlugin#matchSelector

private SelectorData matchSelector(final ServerWebExchange exchange, final Collection<SelectorData> selectors) {
    return selectors.stream()
            .filter(selector -> selector.getEnabled() && filterSelector(selector, exchange))
            .findFirst().orElse(null);
}

整体请求流程如下

image-20210119011948090

AbstractSoulPlugin#filterSelector

private Boolean filterSelector(final SelectorData selector, final ServerWebExchange exchange) {
    if (selector.getType() == SelectorTypeEnum.CUSTOM_FLOW.getCode()) {
        if (CollectionUtils.isEmpty(selector.getConditionList())) {
            return false;
        }
        return MatchStrategyUtils.match(selector.getMatchMode(), selector.getConditionList(), exchange);
    }
    return true;
}

MatchStrategyUtils#match

public static boolean match(final Integer strategy, final List<ConditionData> conditionDataList, final ServerWebExchange exchange) {
    String matchMode = MatchModeEnum.getMatchModeByCode(strategy);
    MatchStrategy matchStrategy = ExtensionLoader.getExtensionLoader(MatchStrategy.class).getJoin(matchMode);
    return matchStrategy.match(conditionDataList, exchange);
}

AndMatchStrategy#match

@Override
public Boolean match(final List<ConditionData> conditionDataList, final ServerWebExchange exchange) {
  return conditionDataList
    .stream()
    .allMatch(condition -> OperatorJudgeFactory.judge(condition, buildRealData(condition, exchange)));
}

AbstractMatchStrategy#buildRealData

String buildRealData(final ConditionData condition, final ServerWebExchange exchange) {
  String realData = "";
  ParamTypeEnum paramTypeEnum = ParamTypeEnum.getParamTypeEnumByName(condition.getParamType());
  switch (paramTypeEnum) {
    case HEADER:
      final HttpHeaders headers = exchange.getRequest().getHeaders();
      final List<String> list = headers.get(condition.getParamName());
      if (CollectionUtils.isEmpty(list)) {
        return realData;
      }
      realData = Objects.requireNonNull(headers.get(condition.getParamName())).stream().findFirst().orElse("");
      break;
    case URI:
      realData = exchange.getRequest().getURI().getPath();
      break;
    case QUERY:
      final MultiValueMap<String, String> queryParams = exchange.getRequest().getQueryParams();
      realData = queryParams.getFirst(condition.getParamName());
      break;
    case HOST:
      realData = HostAddressUtils.acquireHost(exchange);
      break;
    case IP:
      realData = HostAddressUtils.acquireIp(exchange);
      break;
    case POST:
      final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
      realData = (String) ReflectUtils.getFieldValue(soulContext, condition.getParamName());
      break;
    default:
      break;
  }
  return realData;
}

OperatorJudgeFactory#judge

public static Boolean judge(final ConditionData conditionData, final String realData) {
  if (Objects.isNull(conditionData) || StringUtils.isBlank(realData)) {
    return false;
  }
  return OPERATOR_JUDGE_MAP.get(conditionData.getOperator()).judge(conditionData, realData);
}

MatchOperatorJudge#judge

@Override
public Boolean judge(final ConditionData conditionData, final String realData) {
    if (Objects.equals(ParamTypeEnum.URI.getName(), conditionData.getParamType())) {
        return PathMatchUtils.match(conditionData.getParamValue().trim(), realData);
    }
    return realData.contains(conditionData.getParamValue().trim());
}

PathMatchUtils#match

public class PathMatchUtils {

  private static final AntPathMatcher MATCHER = new AntPathMatcher();

  /**
     * Match boolean.
     *
     * @param matchUrls the ignore urls
     * @param path      the path
     * @return the boolean
     */
  public static boolean match(final String matchUrls, final String path) {
    return Splitter.on(",").omitEmptyStrings().trimResults().splitToList(matchUrls).stream().anyMatch(url -> reg(url, path));
  }

  private static boolean reg(final String pattern, final String path) {
    return MATCHER.match(pattern, path);
  }

}

此处我们就可以发现使用的 AntPathMater 的校验规则,今天可能到这里又要和大家说晚安了,剩下的明天继续吧!

Reactor 知识点

在介绍 publishOnsubscribeOn 方法之前,需要先介绍 Scheduler 这个概念。在 Reactor 中,Scheduler 用来定义执行调度任务的抽象。可以简单理解为线程池,但其实际作用要更多。先简单介绍 Scheduler 的实现:

  • Schedulers.elastic(): 调度器会动态创建工作线程,线程数无上界,类似于 Execturos.newCachedThreadPool()
  • Schedulers.parallel(): 创建固定线程数的调度器,默认线程数等于 CPU 核心数。

Reactor 之 subscribeOn

Flux<User> blockingRepositoryToFlux(BlockingRepository<User> repository)  {
    return Flux.defer(() -> Flux.fromIterable(repository.findAll()))
               .subscribeOn(Schedulers.elastic());
}

repository.findAll()(也包括 Flux.fromIterable)的执行发生在 Schedulers.elastic() 所定义的线程池中。

Reactor 之 publishOn

Mono<Void> fluxToBlockingRepository(Flux<User> flux, 
                                    BlockingRepository<User> repository) {
    return flux
            .publishOn(Schedulers.elastic())
            .doOnNext(repository::save)
            .then();
}

执行了 publishOn(Schedulers.elastic()) 之后,repository::save 就会被 Schedulers.elastic() 定义的线程池所执行。

两者的区别

publishOn 影响在其之后的 operator 执行的线程池,而 subscribeOn 则会从源头影响整个执行过程。所以,publishOn 的影响范围和它的位置有关,而 subscribeOn 的影响范围则和位置无关。

Flux.just("tom".map(s -> {
            System.out.println("[map] Thread name: " + Thread.currentThread().getName());
            return s.concat("@mail.com");
        })
        .publishOn(Schedulers.newElastic("thread-publishOn"))
        .filter(s -> {
            System.out.println("[filter] Thread name: " + Thread.currentThread().getName());
            return s.startsWith("t");
        })
        .subscribeOn(Schedulers.newElastic("thread-subscribeOn"))
        .subscribe(s -> {
            System.out.println("[subscribe] Thread name: " + Thread.currentThread().getName());
            System.out.println(s);
        });

输出结果如下:

[map] Thread name: thread-subscribeOn-3
[filter] Thread name: thread-publishOn-4
[subscribe] Thread name: thread-publishOn-4
tom@mail.com

通过 publishOnsubscribeOn 就在反应式编程中实现了线程池隔离的目的,一定程度上避免了会导致线程阻塞的程序执行影响到反应式编程的程序执行效率。

参考链接


文章作者: HoldDie
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 HoldDie !
评论
 上一篇
Soul-全流程分析(五) Soul-全流程分析(五)
书接上回,讲到如何根据请求 path 匹配到对应 Selector,之后我们执行对应 Plugin 的 doExecute 方法; Divide 插件解析AbstractSoulPlugin#execute@Override public
2021-01-20
下一篇 
Soul-全流程分析(三) Soul-全流程分析(三)
本节意在配置一个简单的只使用 divide 插件实现转发,进而了解整个网关全流程,管中窥豹。 使用配置启动环境接着上节环境接着说,使用本地 MySQL 环境,分别启动 Soul-bootstrap、Soul-admin 两个工程,此时我
2021-01-16
  目录