Apollo配置中心动态刷新分析

最近项目中用到了Apollo,记录一下探究Apollo配置动态刷新与Spring的整合过程。

先看一下启动Apollo的注解 @EnableApolloConfig

这里导入了ApolloConfigRegistrar(阿波罗配置注册器)

可以看到ApolloConfigRegistrar实现了bean定义注册器,在这里注册了启动的bean

与配置相关加载的属于这两行代码了,点开SpringValueProcessor

注释表明这是一个解析@Value注解字段、方法上占位符的加载器,这里实现了BeanFactoryProcessor,BeanFactoryAware,表示了实现了修改bean定义

这个类里的postProcessBeanFactory是获取xml的bean定义的配置,这里不看了。

关键点在于父抽象类定于的加载方法,在bean初始化之前调用的加载Field和加载Method。

看看子类SpringValueProcessor的实现


  @Override
  public Object postProcessBeforeInitialization(Object bean, String beanName)
      throws BeansException {
   //这里判断是否开启更新spring注入的值
    if (configUtil.isAutoUpdateInjectedSpringPropertiesEnabled()) {
//调用父类方法去加载
      super.postProcessBeforeInitialization(bean, beanName);
//这里加载XML定义的Bean
      processBeanPropertyValues(bean, beanName);
    }
    return bean;
  }


  @Override
  protected void processField(Object bean, String beanName, Field field) {
    // register @Value on field
//获取Value
    Value value = field.getAnnotation(Value.class);
    if (value == null) {
      return;
    }
//这里解析出占位符,取出value值
    Set<String> keys = placeholderHelper.extractPlaceholderKeys(value.value());

    if (keys.isEmpty()) {
      return;
    }

    for (String key : keys) {
//这里构造一个SpringValue对象,存入Bean的一些属性值
      SpringValue springValue = new SpringValue(key, value.value(), bean, beanName, field, false);
//springvalue值的注册器,内部是一个map维护
      springValueRegistry.register(beanFactory, key, springValue);
      logger.debug("Monitoring {}", springValue);
    }
  }

  @Override
  protected void processMethod(Object bean, String beanName, Method method) {
    //register @Value on method
//获取方法上的Value注解值
    Value value = method.getAnnotation(Value.class);
    if (value == null) {
      return;
    }
    //skip Configuration bean methods
    if (method.getAnnotation(Bean.class) != null) {
      return;
    }
    if (method.getParameterTypes().length != 1) {
      logger.error("Ignore @Value setter {}.{}, expecting 1 parameter, actual {} parameters",
          bean.getClass().getName(), method.getName(), method.getParameterTypes().length);
      return;
    }

//解析出占位符
    Set<String> keys = placeholderHelper.extractPlaceholderKeys(value.value());

    if (keys.isEmpty()) {
      return;
    }

    for (String key : keys) {
      SpringValue springValue = new SpringValue(key, value.value(), bean, beanName, method, false);
      springValueRegistry.register(beanFactory, key, springValue);
      logger.info("Monitoring {}", springValue);
    }
  }

看一下SpringValueRegistry内部结构,registry为一个线程安全的Map。储存格式bean工厂对应@Value的属性值对应构造对象的Map。

public class SpringValueRegistry {
  private static final long CLEAN_INTERVAL_IN_SECONDS = 5;
//Map<BeanFactory, Multimap<@Value注解的属性, 包装@Value属性、对象引用等>>
  private final Map<BeanFactory, Multimap<String, SpringValue>> registry = Maps.newConcurrentMap();
//锁并发操作判断
  private final AtomicBoolean initialized = new AtomicBoolean(false);
  private final Object LOCK = new Object();

  public void register(BeanFactory beanFactory, String key, SpringValue springValue) {
    if (!registry.containsKey(beanFactory)) {
      synchronized (LOCK) {
        if (!registry.containsKey(beanFactory)) {
          registry.put(beanFactory, LinkedListMultimap.<String, SpringValue>create());
        }
      }
    }

    registry.get(beanFactory).put(key, springValue);

    // lazy initialize
    if (initialized.compareAndSet(false, true)) {
      initialize();
    }
  }

  public Collection<SpringValue> get(BeanFactory beanFactory, String key) {
    Multimap<String, SpringValue> beanFactorySpringValues = registry.get(beanFactory);
    if (beanFactorySpringValues == null) {
      return null;
    }
    return beanFactorySpringValues.get(key);
  }

//处理懒加载对象
  private void initialize() {
    Executors.newSingleThreadScheduledExecutor(ApolloThreadFactory.create("SpringValueRegistry", true)).scheduleAtFixedRate(
        new Runnable() {
          @Override
          public void run() {
            try {
              scanAndClean();
            } catch (Throwable ex) {
              ex.printStackTrace();
            }
          }
        }, CLEAN_INTERVAL_IN_SECONDS, CLEAN_INTERVAL_IN_SECONDS, TimeUnit.SECONDS);
  }

//扫描懒加载对象,并且从map中移除
  private void scanAndClean() {
    Iterator<Multimap<String, SpringValue>> iterator = registry.values().iterator();
    while (!Thread.currentThread().isInterrupted() && iterator.hasNext()) {
      Multimap<String, SpringValue> springValues = iterator.next();
      Iterator<Entry<String, SpringValue>> springValueIterator = springValues.entries().iterator();
      while (springValueIterator.hasNext()) {
        Entry<String, SpringValue> springValue = springValueIterator.next();
        if (!springValue.getValue().isTargetBeanValid()) {
          // clear unused spring values
          springValueIterator.remove();
        }
      }
    }
  }
}

然后看一下将所有对象放进registry过后,哪里用到了,点击get方法引用的地方是在一个监听器的onChange事件会调用

public class AutoUpdateConfigChangeListener implements ConfigChangeListener{
  private static final Logger logger = LoggerFactory.getLogger(AutoUpdateConfigChangeListener.class);

  private final boolean typeConverterHasConvertIfNecessaryWithFieldParameter;
  private final Environment environment;
  private final ConfigurableBeanFactory beanFactory;
  private final TypeConverter typeConverter;
  private final PlaceholderHelper placeholderHelper;
  private final SpringValueRegistry springValueRegistry;
  private final Gson gson;

  public AutoUpdateConfigChangeListener(Environment environment, ConfigurableListableBeanFactory beanFactory){
    this.typeConverterHasConvertIfNecessaryWithFieldParameter = testTypeConverterHasConvertIfNecessaryWithFieldParameter();
    this.beanFactory = beanFactory;
    this.typeConverter = this.beanFactory.getTypeConverter();
    this.environment = environment;
    this.placeholderHelper = SpringInjector.getInstance(PlaceholderHelper.class);
    this.springValueRegistry = SpringInjector.getInstance(SpringValueRegistry.class);
    this.gson = new Gson();
  }

  @Override
  public void onChange(ConfigChangeEvent changeEvent) {
//读取监听事件改变的属性
    Set<String> keys = changeEvent.changedKeys();
    if (CollectionUtils.isEmpty(keys)) {
      return;
    }
    for (String key : keys) {
      // 1. check whether the changed key is relevant
      Collection<SpringValue> targetValues = springValueRegistry.get(beanFactory, key);
      if (targetValues == null || targetValues.isEmpty()) {
        continue;
      }

      // 2. update the value
      for (SpringValue val : targetValues) {
        updateSpringValue(val);
      }
    }
  }

//这里更改属性值
  private void updateSpringValue(SpringValue springValue) {
    try {
//解析属性值
      Object value = resolvePropertyValue(springValue);
//这里调用了springvalue的更改方法
      springValue.update(value);

      logger.info("Auto update apollo changed value successfully, new value: {}, {}", value,
          springValue);
    } catch (Throwable ex) {
      logger.error("Auto update apollo changed value failed, {}", springValue.toString(), ex);
    }
  }

最后会调用SpringValue里面的update方法,将SpringValue引用的对象利用反射将值变更。

  public void update(Object newVal) throws IllegalAccessException, InvocationTargetException {
//判断更新字段属性值还是方法返回值
    if (isField()) {
      injectField(newVal);
    } else {
      injectMethod(newVal);
    }
  }

//更改引用对象的属性值
  private void injectField(Object newVal) throws IllegalAccessException {
    Object bean = beanRef.get();
    if (bean == null) {
      return;
    }
    boolean accessible = field.isAccessible();
    field.setAccessible(true);
    field.set(bean, newVal);
    field.setAccessible(accessible);
  }

//更改方法的返回值
  private void injectMethod(Object newVal)
      throws InvocationTargetException, IllegalAccessException {
    Object bean = beanRef.get();
    if (bean == null) {
      return;
    }
    methodParameter.getMethod().invoke(bean, newVal);
  }

监听事件通知触发可追溯到

远程配置类RemoteConfigRepository,构造方法中 进行

public RemoteConfigRepository(String namespace) {
    ....
    //先更新一次配置
    this.trySync();
    //定时刷新
    this.schedulePeriodicRefresh();
    //长轮询刷新
    this.scheduleLongPollingRefresh();
  }

内部都执行的同一个sync方法

protected synchronized void sync() {
  Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "syncRemoteConfig");

  try {
  //旧的配置
    ApolloConfig previous = m_configCache.get();
    //获取远程配置
    ApolloConfig current = loadApolloConfig();

    //reference equals means HTTP 304
    if (previous != current) {
      logger.info("[ARCH_APOLLO_CLIENT]Remote Config refreshed!");
      m_configCache.set(current);
    //刷新配置
      this.fireRepositoryChange(m_namespace, this.getConfig());
    }

    if (current != null) {
      Tracer.logEvent(String.format("Apollo.Client.Configs.%s", current.getNamespaceName()),
          current.getReleaseKey());
    }

    transaction.setStatus(Transaction.SUCCESS);
  } catch (Throwable ex) {
    transaction.setStatus(ex);
    throw ex;
  } finally {
    transaction.complete();
  }
}

刷新配置内部方法,最终触发监听事件

 protected void fireRepositoryChange(String namespace, Properties newProperties) {
    for (RepositoryChangeListener listener : m_listeners) {
      try {
        listener.onRepositoryChange(namespace, newProperties);
      } catch (Throwable ex) {
        Tracer.logError(ex);
        logger.error("Failed to invoke repository change listener {}", listener.getClass(), ex);
      }
    }
  }

最终触发过程

trySync
->sync
->loadApolloConfig
->fireRepositoryChange
->DefaultConfig#onRepositoryChange
->AbstractConfig#fireConfigChange
->AutoUpdateConfigChangeListener#onChange
->AutoUpdateConfigChangeListener#updateSpringValue
->SpringValue#update

RemoteConfigRepository会在PropertySourcesProcessor中的后置处理器进行配置初始化

看完源码回到开头那个问题,进行总结:

  • 启动时利用BeanPostProcessor会把Bean存放到SpringValueRegistry(springValue的注册器)
  • SpringValueProcessor读取所有bean的信息并封装到SpringValueRegistry中的
  • 启动时利用PropertySourcesProcessor加载配置并定时刷新调用RemoteConfigRepositorytrySync最终会触发AutoUpdateConfigChangeListeneronChange事件
  • 监听事件传递过来的配置参数,获取SpringValueRegistry的值并反射Set新的值