Soul-全流程分析(六)

数据同步

DataChangedListener

public interface DataChangedListener {

    /**
     * invoke this method when AppAuth was received.
     *
     * @param changed   the changed
     * @param eventType the event type
     */
    default void onAppAuthChanged(List<AppAuthData> changed, DataEventTypeEnum eventType) {
    }

    /**
     * invoke this method when Plugin was received.
     *
     * @param changed   the changed
     * @param eventType the event type
     */
    default void onPluginChanged(List<PluginData> changed, DataEventTypeEnum eventType) {
    }

    /**
     * invoke this method when Selector was received.
     *
     * @param changed   the changed
     * @param eventType the event type
     */
    default void onSelectorChanged(List<SelectorData> changed, DataEventTypeEnum eventType) {
    }

    /**
     * On meta data changed.
     *
     * @param changed   the changed
     * @param eventType the event type
     */
    default void onMetaDataChanged(List<MetaData> changed, DataEventTypeEnum eventType) {

    }

    /**
     * invoke this method when Rule was received.
     *
     * @param changed   the changed
     * @param eventType the event type
     */
    default void onRuleChanged(List<RuleData> changed, DataEventTypeEnum eventType) {
    }

}

数据同步时间监听

AbstractDataChangedListener

public abstract class AbstractDataChangedListener implements DataChangedListener, InitializingBean {

  /**
     * The constant CACHE.
     */
  protected static final ConcurrentMap<String, ConfigDataCache> CACHE = new ConcurrentHashMap<>();

  /**
     * The Plugin service.
     */
  @Resource
  private PluginService pluginService;

  @Override
  public void onPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {
    if (CollectionUtils.isEmpty(changed)) {
      return;
    }
    this.updatePluginCache();
    this.afterPluginChanged(changed, eventType);
  }

  /**
     * After plugin changed.
     *
     * @param changed   the changed
     * @param eventType the event type
     */
  protected void afterPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {
  }

  @Override
  public final void afterPropertiesSet() {
    updateAppAuthCache();
    updatePluginCache();
    updateRuleCache();
    updateSelectorCache();
    updateMetaDataCache();
    afterInitialize();
  }

  protected abstract void afterInitialize();

  protected void updatePluginCache() {
    this.updateCache(ConfigGroupEnum.PLUGIN, pluginService.listAll());
  }

  protected <T> void updateCache(final ConfigGroupEnum group, final List<T> data) {
    String json = GsonUtils.getInstance().toJson(data);
    ConfigDataCache newVal = new ConfigDataCache(group.name(), json, Md5Utils.md5(json), System.currentTimeMillis());
    ConfigDataCache oldVal = CACHE.put(newVal.getGroup(), newVal);
    log.info("update config cache[{}], old: {}, updated: {}", group, oldVal, newVal);
  }

}

我们先使用 Plugin 的配置说明问题,当系统启动时,因为我们继承了 InitializingBean 接口,所以我们可以做一些初始化的操作,重写 afterPropertiesSet 方法,我们把对应的 AppAuth、Plugin、Rule、Selector、MetaData 数据,都通过查询数据库,把数据加载到 CACHE 缓存中,使用模板模式预留 afterPluginChangedafterInitialize 两个方法给子类实现;

ConfigController

public class ConfigController {
  @PostMapping(value = "/listener")
  public void listener(final HttpServletRequest request, final HttpServletResponse response) {
    longPollingListener.doLongPolling(request, response);
  }
}

HttpLongPollingDataChangedListener

public class HttpLongPollingDataChangedListener extends AbstractDataChangedListener {

  @Override
  protected void afterInitialize() {
    long syncInterval = httpSyncProperties.getRefreshInterval().toMillis();
    // Periodically check the data for changes and update the cache
    scheduler.scheduleWithFixedDelay(() -> {
      log.info("http sync strategy refresh config start.");
      try {
        this.refreshLocalCache();
        log.info("http sync strategy refresh config success.");
      } catch (Exception e) {
        log.error("http sync strategy refresh config error!", e);
      }
    }, syncInterval, syncInterval, TimeUnit.MILLISECONDS);
    log.info("http sync strategy refresh interval: {}ms", syncInterval);
  }

  private void refreshLocalCache() {
    this.updateAppAuthCache();
    // 执行父类方法
    this.updatePluginCache();
    this.updateRuleCache();
    this.updateSelectorCache();
    this.updateMetaDataCache();
  }


  public void doLongPolling(final HttpServletRequest request, final HttpServletResponse response) {

    // compare group md5
    List<ConfigGroupEnum> changedGroup = compareChangedGroup(request);
    String clientIp = getRemoteIp(request);

    // response immediately.
    if (CollectionUtils.isNotEmpty(changedGroup)) {
      this.generateResponse(response, changedGroup);
      log.info("send response with the changed group, ip={}, group={}", clientIp, changedGroup);
      return;
    }

    // listen for configuration changed.
    final AsyncContext asyncContext = request.startAsync();

    // AsyncContext.settimeout() does not timeout properly, so you have to control it yourself
    asyncContext.setTimeout(0L);

    // block client's thread.
    scheduler.execute(new LongPollingClient(asyncContext, clientIp, HttpConstants.SERVER_MAX_HOLD_TIMEOUT));
  }

  class LongPollingClient implements Runnable {

    /**
         * The Async context.
         */
    private final AsyncContext asyncContext;

    /**
         * The Ip.
         */
    private final String ip;

    /**
         * The Timeout time.
         */
    private final long timeoutTime;

    /**
         * The Async timeout future.
         */
    private Future<?> asyncTimeoutFuture;

    /**
         * Instantiates a new Long polling client.
         *
         * @param ac          the ac
         * @param ip          the ip
         * @param timeoutTime the timeout time
         */
    LongPollingClient(final AsyncContext ac, final String ip, final long timeoutTime) {
      this.asyncContext = ac;
      this.ip = ip;
      this.timeoutTime = timeoutTime;
    }

    @Override
    public void run() {
      this.asyncTimeoutFuture = scheduler.schedule(() -> {
        clients.remove(LongPollingClient.this);
        List<ConfigGroupEnum> changedGroups = compareChangedGroup((HttpServletRequest) asyncContext.getRequest());
        sendResponse(changedGroups);
      }, timeoutTime, TimeUnit.MILLISECONDS);
      clients.add(this);
    }

    /**
         * Send response.
         *
         * @param changedGroups the changed groups
         */
    void sendResponse(final List<ConfigGroupEnum> changedGroups) {
      // cancel scheduler
      if (null != asyncTimeoutFuture) {
        asyncTimeoutFuture.cancel(false);
      }
      generateResponse((HttpServletResponse) asyncContext.getResponse(), changedGroups);
      asyncContext.complete();
    }
  }
}

对于方法 doLongPolling 如果 changedGroup 不为空直接返回,如果为空构造一个异步的后台定时任务,默认六十秒执行一次;

private List<ConfigGroupEnum> compareChangedGroup(final HttpServletRequest request) {
  List<ConfigGroupEnum> changedGroup = new ArrayList<>(4);
  for (ConfigGroupEnum group : ConfigGroupEnum.values()) {
    // md5,lastModifyTime
    String[] params = StringUtils.split(request.getParameter(group.name()), ',');
    if (params == null || params.length != 2) {
      throw new SoulException("group param invalid:" + request.getParameter(group.name()));
    }
    String clientMd5 = params[0];
    long clientModifyTime = NumberUtils.toLong(params[1]);
    ConfigDataCache serverCache = CACHE.get(group.name());
    // do check.
    if (this.checkCacheDelayAndUpdate(serverCache, clientMd5, clientModifyTime)) {
      changedGroup.add(group);
    }
  }
  return changedGroup;
}


private boolean checkCacheDelayAndUpdate(final ConfigDataCache serverCache, final String clientMd5, final long clientModifyTime) {

  // is the same, doesn't need to be updated
  if (StringUtils.equals(clientMd5, serverCache.getMd5())) {
    return false;
  }

  // if the md5 value is different, it is necessary to compare lastModifyTime.
  long lastModifyTime = serverCache.getLastModifyTime();
  if (lastModifyTime >= clientModifyTime) {
    // the client's config is out of date.
    return true;
  }

  // the lastModifyTime before client, then the local cache needs to be updated.
  // Considering the concurrency problem, admin must lock,
  // otherwise it may cause the request from soul-web to update the cache concurrently, causing excessive db pressure
  boolean locked = false;
  try {
    locked = LOCK.tryLock(5, TimeUnit.SECONDS);
  } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    return true;
  }
  if (locked) {
    try {
      ConfigDataCache latest = CACHE.get(serverCache.getGroup());
      if (latest != serverCache) {
        // the cache of admin was updated. if the md5 value is the same, there's no need to update.
        return !StringUtils.equals(clientMd5, latest.getMd5());
      }
      // load cache from db.
      this.refreshLocalCache();
      latest = CACHE.get(serverCache.getGroup());
      return !StringUtils.equals(clientMd5, latest.getMd5());
    } finally {
      LOCK.unlock();
    }
  }

  // not locked, the client need to be updated.
  return true;

}

进行比较数据的一致性问题

@Override
protected void afterPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {
  scheduler.execute(new DataChangeTask(ConfigGroupEnum.PLUGIN));
}

class DataChangeTask implements Runnable {

              /**
         * The Group where the data has changed.
         */
  private final ConfigGroupEnum groupKey;

              /**
         * The Change time.
         */
  private final long changeTime = System.currentTimeMillis();

              /**
         * Instantiates a new Data change task.
         *
         * @param groupKey the group key
         */
  DataChangeTask(final ConfigGroupEnum groupKey) {
    this.groupKey = groupKey;
  }

  @Override
  public void run() {
    for (Iterator<LongPollingClient> iter = clients.iterator(); iter.hasNext();) {
      LongPollingClient client = iter.next();
      iter.remove();
      client.sendResponse(Collections.singletonList(groupKey));
      log.info("send response with the changed group,ip={}, group={}, changeTime={}", client.ip, groupKey, changeTime);
    }
  }
}

文章作者: HoldDie
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 HoldDie !
评论
  目录