Soul-全流程分析(五)

书接上回,讲到如何根据请求 path 匹配到对应 Selector,之后我们执行对应 Plugin 的 doExecute 方法;

Divide 插件解析

AbstractSoulPlugin#execute

@Override
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
    String pluginName = named();
    final PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);
    if (pluginData != null && pluginData.getEnabled()) {
        final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);
        if (CollectionUtils.isEmpty(selectors)) {
            return handleSelectorIsNull(pluginName, exchange, chain);
        }
        final SelectorData selectorData = matchSelector(exchange, selectors);
        if (Objects.isNull(selectorData)) {
            return handleSelectorIsNull(pluginName, exchange, chain);
        }
        selectorLog(selectorData, pluginName);
        final List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId());
        if (CollectionUtils.isEmpty(rules)) {
            return handleRuleIsNull(pluginName, exchange, chain);
        }
        RuleData rule;
        if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {
            //get last
            rule = rules.get(rules.size() - 1);
        } else {
            rule = matchRule(exchange, rules);
        }
        if (Objects.isNull(rule)) {
            return handleRuleIsNull(pluginName, exchange, chain);
        }
        ruleLog(rule, pluginName);
          // 执行对应Plugin的doExecute方法
        return doExecute(exchange, chain, selectorData, rule);
    }
    return chain.execute(exchange);
}

DividePlugin#doExecute

@Override
protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
    final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
    assert soulContext != null;
    final DivideRuleHandle ruleHandle = GsonUtils.getInstance().fromJson(rule.getHandle(), DivideRuleHandle.class);
    final List<DivideUpstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId());
    if (CollectionUtils.isEmpty(upstreamList)) {
        log.error("divide upstream configuration error: {}", rule.toString());
        Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
        return WebFluxResultUtils.result(exchange, error);
    }
    final String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();
    DivideUpstream divideUpstream = LoadBalanceUtils.selector(upstreamList, ruleHandle.getLoadBalance(), ip);
    if (Objects.isNull(divideUpstream)) {
        log.error("divide has no upstream");
        Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
        return WebFluxResultUtils.result(exchange, error);
    }
    // set the http url
    String domain = buildDomain(divideUpstream);
    String realURL = buildRealURL(domain, soulContext, exchange);
    exchange.getAttributes().put(Constants.HTTP_URL, realURL);
    // set the http timeout
    exchange.getAttributes().put(Constants.HTTP_TIME_OUT, ruleHandle.getTimeout());
    exchange.getAttributes().put(Constants.HTTP_RETRY, ruleHandle.getRetry());
    return chain.execute(exchange);
}

注意点:

  • 获取上游地址列表
  • 根据负载均衡策略选择一个上游地址
  • 构建 realURL 地址
  • 设置 Constants.HTTP_URLConstants.HTTP_TIME_OUTConstants.HTTP_RETRY 变量值

WebClientPlugin#execute

@Override
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
    final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
    assert soulContext != null;
    String urlPath = exchange.getAttribute(Constants.HTTP_URL);
    if (StringUtils.isEmpty(urlPath)) {
        Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
        return WebFluxResultUtils.result(exchange, error);
    }
    long timeout = (long) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_TIME_OUT)).orElse(3000L);
    int retryTimes = (int) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_RETRY)).orElse(0);
    log.info("The request urlPath is {}, retryTimes is {}", urlPath, retryTimes);
    HttpMethod method = HttpMethod.valueOf(exchange.getRequest().getMethodValue());
    // 主要开始请求
    WebClient.RequestBodySpec requestBodySpec = webClient.method(method).uri(urlPath);
    return handleRequestBody(requestBodySpec, exchange, timeout, retryTimes, chain);
}

image-20210120013558068

通过查看 Plugin 列表我们可以知道,在Divide插件之后是 WebClient 查看,进行远程请求响应结果;

WebClientPlugin#handleRequestBody

private Mono<Void> handleRequestBody(final WebClient.RequestBodySpec requestBodySpec,
                                     final ServerWebExchange exchange,
                                     final long timeout,
                                     final int retryTimes,
                                     final SoulPluginChain chain) {
    return requestBodySpec.headers(httpHeaders -> {
        httpHeaders.addAll(exchange.getRequest().getHeaders());
        httpHeaders.remove(HttpHeaders.HOST);
    })
            .contentType(buildMediaType(exchange))
            .body(BodyInserters.fromDataBuffers(exchange.getRequest().getBody()))
            .exchange()
            .doOnError(e -> log.error(e.getMessage()))
            .timeout(Duration.ofMillis(timeout))
            .retryWhen(Retry.onlyIf(x -> x.exception() instanceof ConnectTimeoutException)
                .retryMax(retryTimes)
                .backoff(Backoff.exponential(Duration.ofMillis(200), Duration.ofSeconds(20), 2, true)))
                  // 关键设置值
            .flatMap(e -> doNext(e, exchange, chain));

}

WebClientPlugin#doNext

private Mono<Void> doNext(final ClientResponse res, final ServerWebExchange exchange, final SoulPluginChain chain) {
    if (res.statusCode().is2xxSuccessful()) {
        exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName());
    } else {
        exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.ERROR.getName());
    }
    exchange.getAttributes().put(Constants.CLIENT_RESPONSE_ATTR, res);
    return chain.execute(exchange);
}

在此我们可以明白,异步响应编程并非返回对应结果,而是直接 exchange.getAttributes().put(xxx,yyy),进行值传递;

WebClientResponsePlugin#execute

@Override
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
    return chain.execute(exchange).then(Mono.defer(() -> {
        ServerHttpResponse response = exchange.getResponse();
        ClientResponse clientResponse = exchange.getAttribute(Constants.CLIENT_RESPONSE_ATTR);
        if (Objects.isNull(clientResponse)
                || response.getStatusCode() == HttpStatus.BAD_GATEWAY
                || response.getStatusCode() == HttpStatus.INTERNAL_SERVER_ERROR) {
            Object error = SoulResultWrap.error(SoulResultEnum.SERVICE_RESULT_ERROR.getCode(), SoulResultEnum.SERVICE_RESULT_ERROR.getMsg(), null);
            return WebFluxResultUtils.result(exchange, error);
        }
        if (response.getStatusCode() == HttpStatus.GATEWAY_TIMEOUT) {
            Object error = SoulResultWrap.error(SoulResultEnum.SERVICE_TIMEOUT.getCode(), SoulResultEnum.SERVICE_TIMEOUT.getMsg(), null);
            return WebFluxResultUtils.result(exchange, error);
        }
        response.setStatusCode(clientResponse.statusCode());
        response.getCookies().putAll(clientResponse.cookies());
        response.getHeaders().putAll(clientResponse.headers().asHttpHeaders());
        return response.writeWith(clientResponse.body(BodyExtractors.toDataBuffers()));
    }));
}

等执行到 WebClientResponsePlugin 的时候,可以看出直接通过getAttribute获取属性值,然后封装参数到 response 中,整个请求流程结束;


文章作者: HoldDie
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 HoldDie !
评论
  目录