Soul-全流程分析(七)

ZK 数据配置

启动配置

public class ZookeeperDataInit implements CommandLineRunner {

    private final ZkClient zkClient;

    private final SyncDataService syncDataService;

    /**
     * Instantiates a new Zookeeper data init.
     *
     * @param zkClient        the zk client
     * @param syncDataService the sync data service
     */
    public ZookeeperDataInit(final ZkClient zkClient, final SyncDataService syncDataService) {
        this.zkClient = zkClient;
        this.syncDataService = syncDataService;
    }

    @Override
    public void run(final String... args) {
        String pluginPath = ZkPathConstants.PLUGIN_PARENT;
        String authPath = ZkPathConstants.APP_AUTH_PARENT;
        String metaDataPath = ZkPathConstants.META_DATA;
        if (!zkClient.exists(pluginPath) && !zkClient.exists(authPath) && !zkClient.exists(metaDataPath)) {
            syncDataService.syncAll(DataEventTypeEnum.REFRESH);
        }
    }
}

如果不存在,就全部刷新;

SyncDataServiceImpl

@Service("syncDataService")
public class SyncDataServiceImpl implements SyncDataService {
  @Override
  public boolean syncAll(final DataEventTypeEnum type) {
    appAuthService.syncData();
    List<PluginData> pluginDataList = pluginService.listAll();
    eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, type, pluginDataList));
    List<SelectorData> selectorDataList = selectorService.listAll();
    eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, type, selectorDataList));
    List<RuleData> ruleDataList = ruleService.listAll();
    eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, type, ruleDataList));
    metaDataService.syncData();
    return true;
  }

}

发送异步事件,进行异步同步;

DataChangedEventDispatcher

@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {

    private ApplicationContext applicationContext;

    private List<DataChangedListener> listeners;

    public DataChangedEventDispatcher(final ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
    }

    @Override
    @SuppressWarnings("unchecked")
    public void onApplicationEvent(final DataChangedEvent event) {
        for (DataChangedListener listener : listeners) {
            switch (event.getGroupKey()) {
                case APP_AUTH:
                    listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
                    break;
                case PLUGIN:
                    listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
                    break;
                case RULE:
                    listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
                    break;
                case SELECTOR:
                    listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
                    break;
                case META_DATA:
                    listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
                    break;
                default:
                    throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
            }
        }
    }

    @Override
    public void afterPropertiesSet() {
        Collection<DataChangedListener> listenerBeans = applicationContext.getBeansOfType(DataChangedListener.class).values();
        this.listeners = Collections.unmodifiableList(new ArrayList<>(listenerBeans));
    }

}

做消息监听,以及分发;

ZookeeperDataChangedListener

public class ZookeeperDataChangedListener implements DataChangedListener {

  private final ZkClient zkClient;

  public ZookeeperDataChangedListener(final ZkClient zkClient) {
    this.zkClient = zkClient;
  }

  @Override
  public void onAppAuthChanged(final List<AppAuthData> changed, final DataEventTypeEnum eventType) {
    for (AppAuthData data : changed) {
      final String appAuthPath = ZkPathConstants.buildAppAuthPath(data.getAppKey());
      // delete
      if (eventType == DataEventTypeEnum.DELETE) {
        deleteZkPath(appAuthPath);
        continue;
      }
      // create or update
      upsertZkNode(appAuthPath, data);
    }
  }

  private void upsertZkNode(final String path, final Object data) {
    if (!zkClient.exists(path)) {
      zkClient.createPersistent(path, true);
    }
    zkClient.writeData(path, data);
  }
}

可以明白,如果对应的 Znode 不存在就去创建,之后再往这个 node 下面写数据,至此 Zookeeper 的数据异步更新结束;


文章作者: HoldDie
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 HoldDie !
评论
  目录