Apollo配置中心动态刷新分析
最近项目中用到了Apollo,记录一下探究Apollo配置动态刷新与Spring的整合过程。
先看一下启动Apollo的注解 @EnableApolloConfig

这里导入了ApolloConfigRegistrar(阿波罗配置注册器)

可以看到ApolloConfigRegistrar实现了bean定义注册器,在这里注册了启动的bean

与配置相关加载的属于这两行代码了,点开SpringValueProcessor,
注释表明这是一个解析@Value注解字段、方法上占位符的加载器,这里实现了BeanFactoryProcessor,BeanFactoryAware,表示了实现了修改bean定义

这个类里的postProcessBeanFactory是获取xml的bean定义的配置,这里不看了。
关键点在于父抽象类定于的加载方法,在bean初始化之前调用的加载Field和加载Method。

看看子类SpringValueProcessor的实现
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName)
throws BeansException {
//这里判断是否开启更新spring注入的值
if (configUtil.isAutoUpdateInjectedSpringPropertiesEnabled()) {
//调用父类方法去加载
super.postProcessBeforeInitialization(bean, beanName);
//这里加载XML定义的Bean
processBeanPropertyValues(bean, beanName);
}
return bean;
}
@Override
protected void processField(Object bean, String beanName, Field field) {
// register @Value on field
//获取Value
Value value = field.getAnnotation(Value.class);
if (value == null) {
return;
}
//这里解析出占位符,取出value值
Set<String> keys = placeholderHelper.extractPlaceholderKeys(value.value());
if (keys.isEmpty()) {
return;
}
for (String key : keys) {
//这里构造一个SpringValue对象,存入Bean的一些属性值
SpringValue springValue = new SpringValue(key, value.value(), bean, beanName, field, false);
//springvalue值的注册器,内部是一个map维护
springValueRegistry.register(beanFactory, key, springValue);
logger.debug("Monitoring {}", springValue);
}
}
@Override
protected void processMethod(Object bean, String beanName, Method method) {
//register @Value on method
//获取方法上的Value注解值
Value value = method.getAnnotation(Value.class);
if (value == null) {
return;
}
//skip Configuration bean methods
if (method.getAnnotation(Bean.class) != null) {
return;
}
if (method.getParameterTypes().length != 1) {
logger.error("Ignore @Value setter {}.{}, expecting 1 parameter, actual {} parameters",
bean.getClass().getName(), method.getName(), method.getParameterTypes().length);
return;
}
//解析出占位符
Set<String> keys = placeholderHelper.extractPlaceholderKeys(value.value());
if (keys.isEmpty()) {
return;
}
for (String key : keys) {
SpringValue springValue = new SpringValue(key, value.value(), bean, beanName, method, false);
springValueRegistry.register(beanFactory, key, springValue);
logger.info("Monitoring {}", springValue);
}
}
看一下SpringValueRegistry内部结构,registry为一个线程安全的Map。储存格式bean工厂对应@Value的属性值对应构造对象的Map。
public class SpringValueRegistry {
private static final long CLEAN_INTERVAL_IN_SECONDS = 5;
//Map<BeanFactory, Multimap<@Value注解的属性, 包装@Value属性、对象引用等>>
private final Map<BeanFactory, Multimap<String, SpringValue>> registry = Maps.newConcurrentMap();
//锁并发操作判断
private final AtomicBoolean initialized = new AtomicBoolean(false);
private final Object LOCK = new Object();
public void register(BeanFactory beanFactory, String key, SpringValue springValue) {
if (!registry.containsKey(beanFactory)) {
synchronized (LOCK) {
if (!registry.containsKey(beanFactory)) {
registry.put(beanFactory, LinkedListMultimap.<String, SpringValue>create());
}
}
}
registry.get(beanFactory).put(key, springValue);
// lazy initialize
if (initialized.compareAndSet(false, true)) {
initialize();
}
}
public Collection<SpringValue> get(BeanFactory beanFactory, String key) {
Multimap<String, SpringValue> beanFactorySpringValues = registry.get(beanFactory);
if (beanFactorySpringValues == null) {
return null;
}
return beanFactorySpringValues.get(key);
}
//处理懒加载对象
private void initialize() {
Executors.newSingleThreadScheduledExecutor(ApolloThreadFactory.create("SpringValueRegistry", true)).scheduleAtFixedRate(
new Runnable() {
@Override
public void run() {
try {
scanAndClean();
} catch (Throwable ex) {
ex.printStackTrace();
}
}
}, CLEAN_INTERVAL_IN_SECONDS, CLEAN_INTERVAL_IN_SECONDS, TimeUnit.SECONDS);
}
//扫描懒加载对象,并且从map中移除
private void scanAndClean() {
Iterator<Multimap<String, SpringValue>> iterator = registry.values().iterator();
while (!Thread.currentThread().isInterrupted() && iterator.hasNext()) {
Multimap<String, SpringValue> springValues = iterator.next();
Iterator<Entry<String, SpringValue>> springValueIterator = springValues.entries().iterator();
while (springValueIterator.hasNext()) {
Entry<String, SpringValue> springValue = springValueIterator.next();
if (!springValue.getValue().isTargetBeanValid()) {
// clear unused spring values
springValueIterator.remove();
}
}
}
}
}
然后看一下将所有对象放进registry过后,哪里用到了,点击get方法引用的地方是在一个监听器的onChange事件会调用
public class AutoUpdateConfigChangeListener implements ConfigChangeListener{
private static final Logger logger = LoggerFactory.getLogger(AutoUpdateConfigChangeListener.class);
private final boolean typeConverterHasConvertIfNecessaryWithFieldParameter;
private final Environment environment;
private final ConfigurableBeanFactory beanFactory;
private final TypeConverter typeConverter;
private final PlaceholderHelper placeholderHelper;
private final SpringValueRegistry springValueRegistry;
private final Gson gson;
public AutoUpdateConfigChangeListener(Environment environment, ConfigurableListableBeanFactory beanFactory){
this.typeConverterHasConvertIfNecessaryWithFieldParameter = testTypeConverterHasConvertIfNecessaryWithFieldParameter();
this.beanFactory = beanFactory;
this.typeConverter = this.beanFactory.getTypeConverter();
this.environment = environment;
this.placeholderHelper = SpringInjector.getInstance(PlaceholderHelper.class);
this.springValueRegistry = SpringInjector.getInstance(SpringValueRegistry.class);
this.gson = new Gson();
}
@Override
public void onChange(ConfigChangeEvent changeEvent) {
//读取监听事件改变的属性
Set<String> keys = changeEvent.changedKeys();
if (CollectionUtils.isEmpty(keys)) {
return;
}
for (String key : keys) {
// 1. check whether the changed key is relevant
Collection<SpringValue> targetValues = springValueRegistry.get(beanFactory, key);
if (targetValues == null || targetValues.isEmpty()) {
continue;
}
// 2. update the value
for (SpringValue val : targetValues) {
updateSpringValue(val);
}
}
}
//这里更改属性值
private void updateSpringValue(SpringValue springValue) {
try {
//解析属性值
Object value = resolvePropertyValue(springValue);
//这里调用了springvalue的更改方法
springValue.update(value);
logger.info("Auto update apollo changed value successfully, new value: {}, {}", value,
springValue);
} catch (Throwable ex) {
logger.error("Auto update apollo changed value failed, {}", springValue.toString(), ex);
}
}
最后会调用SpringValue里面的update方法,将SpringValue引用的对象利用反射将值变更。
public void update(Object newVal) throws IllegalAccessException, InvocationTargetException {
//判断更新字段属性值还是方法返回值
if (isField()) {
injectField(newVal);
} else {
injectMethod(newVal);
}
}
//更改引用对象的属性值
private void injectField(Object newVal) throws IllegalAccessException {
Object bean = beanRef.get();
if (bean == null) {
return;
}
boolean accessible = field.isAccessible();
field.setAccessible(true);
field.set(bean, newVal);
field.setAccessible(accessible);
}
//更改方法的返回值
private void injectMethod(Object newVal)
throws InvocationTargetException, IllegalAccessException {
Object bean = beanRef.get();
if (bean == null) {
return;
}
methodParameter.getMethod().invoke(bean, newVal);
}
监听事件通知触发可追溯到
远程配置类RemoteConfigRepository,构造方法中 进行
public RemoteConfigRepository(String namespace) {
....
//先更新一次配置
this.trySync();
//定时刷新
this.schedulePeriodicRefresh();
//长轮询刷新
this.scheduleLongPollingRefresh();
}
内部都执行的同一个sync方法
protected synchronized void sync() {
Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "syncRemoteConfig");
try {
//旧的配置
ApolloConfig previous = m_configCache.get();
//获取远程配置
ApolloConfig current = loadApolloConfig();
//reference equals means HTTP 304
if (previous != current) {
logger.info("[ARCH_APOLLO_CLIENT]Remote Config refreshed!");
m_configCache.set(current);
//刷新配置
this.fireRepositoryChange(m_namespace, this.getConfig());
}
if (current != null) {
Tracer.logEvent(String.format("Apollo.Client.Configs.%s", current.getNamespaceName()),
current.getReleaseKey());
}
transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
transaction.setStatus(ex);
throw ex;
} finally {
transaction.complete();
}
}
刷新配置内部方法,最终触发监听事件
protected void fireRepositoryChange(String namespace, Properties newProperties) {
for (RepositoryChangeListener listener : m_listeners) {
try {
listener.onRepositoryChange(namespace, newProperties);
} catch (Throwable ex) {
Tracer.logError(ex);
logger.error("Failed to invoke repository change listener {}", listener.getClass(), ex);
}
}
}
最终触发过程
trySync
->sync
->loadApolloConfig
->fireRepositoryChange
->DefaultConfig#onRepositoryChange
->AbstractConfig#fireConfigChange
->AutoUpdateConfigChangeListener#onChange
->AutoUpdateConfigChangeListener#updateSpringValue
->SpringValue#update
RemoteConfigRepository会在PropertySourcesProcessor中的后置处理器进行配置初始化

看完源码回到开头那个问题,进行总结:
- 启动时利用BeanPostProcessor会把Bean存放到SpringValueRegistry(springValue的注册器)
- SpringValueProcessor读取所有bean的信息并封装到SpringValueRegistry中的
- 启动时利用PropertySourcesProcessor加载配置并定时刷新调用RemoteConfigRepository中trySync最终会触发AutoUpdateConfigChangeListener的onChange事件
- 监听事件传递过来的配置参数,获取SpringValueRegistry的值并反射Set新的值
Leave a Reply