对dubbo的DubboReference.check的参数进行剖析
背景
在使用dubbo的时候,发现当消费者启动的时候,如果提供者没有启动,即使提供者后来启动了,消费者也调不通提供者提供的接口了。
注册中心使用都是nacos
dubbo版本是3.0.4
例子
接口
public interface DemoService {
String sayHello();
}
提供者
@DubboService
public class DemoServiceImpl implements DemoService {
@Override
public String sayHello() {
return "hello";
}
}
@EnableDubbo
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
public class ReferenceCheckProviderStarter {
public static void main(String[] args) {
new SpringApplicationBuilder(ReferenceCheckProviderStarter.class)
.web(WebApplicationType.NONE) // .REACTIVE, .SERVLET
.run(args);
System.out.println("dubbo service started");
}
}
消费者
@EnableDubbo
@RestController
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
public class ReferenceCheckConsumerStarter {
@DubboReference
private DemoService demoService;
@GetMapping("/dubbo/nacos/test")
public Object test() {
return demoService.sayHello();
}
public static void main(String[] args) {
SpringApplication.run(ReferenceCheckConsumerStarter.class, args);
}
}
1. 先启动provider,再启动consumer
a. 启动provider
nacos出现provider的服务
b. 启动consumer
nacos出现consumer的服务
访问 http://127.0.0.1:8080/dubbo/nacos/test 后返回
hello
c. 终止provider
nacos上provider的服务消失了
访问 http://127.0.0.1:8080/dubbo/nacos/test 后返回
No provider available from registry
d. 重新启动provider
nacos出现provider的服务
访问 http://127.0.0.1:8080/dubbo/nacos/test 后返回
hello
可以看出:先启动provider,再启动consumer,整个过程是没问题。
2. 先启动consumer,再启动provider
a. 启动consumer
nacos出现consumer的服务,但立即又消失了
b. 启动provider
nacos出现provider的服务
访问 http://127.0.0.1:8080/dubbo/nacos/test 后返回
Directory already destroyed .
可以看出:当consumer先启动时,如果provider此时没有启动,consumer就再也访问不到provider的服务了。
3. 先启动consumer,再启动provider (check=false)
修改一下注解@DubboRefere
的参数
@DubboReference(check = false)
private DemoService demoService;
a. 启动consumer
nacos出现consumer的服务
访问 http://127.0.0.1:8080/dubbo/nacos/test 后返回
No provider available from registry
b. 启动provider
nacos出现provider的服务
访问 http://127.0.0.1:8080/dubbo/nacos/test 后返回
hello
可以看出:即使是consumer先启动,当provider启动后,consumer还是能够访问到provider的服务的。
关于报错
org.apache.dubbo.rpc.RpcException: No provider available from registry
public class RegistryDirectory<T> extends DynamicDirectory<T> {
@Override
public List<Invoker<T>> doList(Invocation invocation) {
if (forbidden) {
// 1. No service provider 2. Service providers are disabled
throw new RpcException(RpcException.FORBIDDEN_EXCEPTION, "No provider available from registry " +
getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " +
NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() +
", please check status of providers(disabled, not registered or in blacklist).");
}
// ......
}
}
public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> {
String EMPTY_PROTOCOL = "empty";
private void refreshInvoker(List<URL> invokerUrls) {
Assert.notNull(invokerUrls, "invokerUrls should not be null, use empty url list to clear address.");
this.originalUrls = invokerUrls;
if (invokerUrls.size() == 0) {
logger.info("Received empty url list...");
this.forbidden = true; // Forbid to access // 这里
this.invokers = Collections.emptyList();
routerChain.setInvokers(this.invokers);
destroyAllInvokers(); // Close all invokers
} else {
this.forbidden = false; // Allow accessing // 这里
if (CollectionUtils.isEmpty(invokerUrls)) {
return;
}
// can't use local reference because this.urlInvokerMap might be accessed at isAvailable() by main thread concurrently.
Map<String, Invoker<T>> oldUrlInvokerMap = null;
if (this.urlInvokerMap != null) {
// the initial capacity should be set greater than the maximum number of entries divided by the load factor to avoid resizing.
oldUrlInvokerMap = new LinkedHashMap<>(Math.round(1 + this.urlInvokerMap.size() / DEFAULT_HASHMAP_LOAD_FACTOR));
this.urlInvokerMap.forEach(oldUrlInvokerMap::put);
}
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(oldUrlInvokerMap, invokerUrls);// Translate url list to Invoker map // 这里
logger.info("Refreshed invoker size " + newUrlInvokerMap.size());
if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {
logger.error(new IllegalStateException("Cannot create invokers from url address list (total " + invokerUrls.size() + ")"));
return;
}
List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
// pre-route and build cache, notice that route cache should build on original Invoker list.
// toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed.
routerChain.setInvokers(newInvokers);
this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
this.urlInvokerMap = newUrlInvokerMap; // 这里
if (oldUrlInvokerMap != null) {
try {
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}
// notify invokers refreshed
this.invokersChanged();
}
private synchronized void refreshOverrideAndInvoker(List<URL> instanceUrls) {
// mock zookeeper://xxx?mock=return null
if (enableConfigurationListen) {
overrideDirectoryUrl();
}
refreshInvoker(instanceUrls); // 这里
}
}
public class RegistryDirectory<T> extends DynamicDirectory<T> {
@Override
public synchronized void notify(List<URL> urls) {
if (isDestroyed()) {
return;
}
Map<String, List<URL>> categoryUrls = urls.stream()
.filter(Objects::nonNull)
.filter(this::isValidCategory)
.filter(this::isNotCompatibleFor26x)
.collect(Collectors.groupingBy(this::judgeCategory));
List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
toRouters(routerURLs).ifPresent(this::addRouters);
// providers
List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
// 3.x added for extend URL address
ExtensionLoader<AddressListener> addressListenerExtensionLoader = getUrl().getOrDefaultModuleModel().getExtensionLoader(AddressListener.class);
List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);
if (supportedListeners != null && !supportedListeners.isEmpty()) {
for (AddressListener addressListener : supportedListeners) {
providerURLs = addressListener.notify(providerURLs, getConsumerUrl(),this);
}
}
refreshOverrideAndInvoker(providerURLs); // 这里
}
}
public abstract class AbstractRegistry implements Registry {
/**
* Notify changes from the Provider side.
*
* @param url consumer side url
* @param listener listener
* @param urls provider latest urls
*/
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
if ((CollectionUtils.isEmpty(urls))
&& !ANY_VALUE.equals(url.getServiceInterface())) {
logger.warn("Ignore empty notify urls for subscribe url " + url);
return;
}
if (logger.isInfoEnabled()) {
logger.info("Notify urls for subscribe url " + url + ", url size: " + urls.size());
}
// keep every provider's category.
Map<String, List<URL>> result = new HashMap<>(); // 这里
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
String category = u.getCategory(DEFAULT_CATEGORY);
List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>()); // 这里
categoryList.add(u); // 这里
}
}
if (result.size() == 0) {
return;
}
Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
listener.notify(categoryList); // 这里
// We will update our cache file after each notification.
// When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.
if (localCacheEnabled) {
saveProperties(url);
}
}
}
}
public class NacosRegistry extends FailbackRegistry {
private void notifySubscriber(URL url, NotifyListener listener, Collection<Instance> instances) {
List<Instance> enabledInstances = new LinkedList<>(instances);
if (enabledInstances.size() > 0) {
// Instances
filterEnabledInstances(enabledInstances);
}
List<URL> urls = toUrlWithEmpty(url, enabledInstances);
NacosRegistry.this.notify(url, listener, urls); // 这里
}
String EMPTY_PROTOCOL = "empty";
private List<URL> toUrlWithEmpty(URL consumerURL, Collection<Instance> instances) {
List<URL> urls = buildURLs(consumerURL, instances);
if (urls.size() == 0) { // 这里
URL empty = URLBuilder.from(consumerURL)
.setProtocol(EMPTY_PROTOCOL)
.addParameter(CATEGORY_KEY, DEFAULT_CATEGORY)
.build();
urls.add(empty);
}
return urls;
}
}
当没有可用的服务时,instances是空的
当有可用的服务时,instances是不为空的
是怎么通知的
public class ServiceInfoHolder implements Closeable {
public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
String serviceKey = serviceInfo.getKey();
if (serviceKey == null) {
return null;
}
ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
if (isEmptyOrErrorPush(serviceInfo)) {
//empty or error push, just ignore
return oldService;
}
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
boolean changed = isChangedServiceInfo(oldService, serviceInfo);
if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
}
MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
if (changed) { // 这里
NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> "
+ JacksonUtils.toJson(serviceInfo.getHosts()));
NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),
serviceInfo.getClusters(), serviceInfo.getHosts())); // 这里
DiskCache.write(serviceInfo, cacheDir);
}
return serviceInfo;
}
}
public class DefaultPublisher extends Thread implements EventPublisher {
private BlockingQueue<Event> queue;
@Override
public void init(Class<? extends Event> type, int bufferSize) {
setDaemon(true);
setName("nacos.publisher-" + type.getName());
this.eventType = type;
this.queueMaxSize = bufferSize;
this.queue = new ArrayBlockingQueue<>(bufferSize); // 这里
start();
}
@Override
public boolean publish(Event event) {
checkIsStart();
boolean success = this.queue.offer(event); // 这里
if (!success) {
LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);
receiveEvent(event);
return true;
}
return true;
}
@Override
public void run() {
openEventHandler();
}
void openEventHandler() {
try {
// This variable is defined to resolve the problem which message overstock in the queue.
int waitTimes = 60;
// To ensure that messages are not lost, enable EventHandler when
// waiting for the first Subscriber to register
for (; ; ) {
if (shutdown || hasSubscriber() || waitTimes <= 0) {
break;
}
ThreadUtils.sleep(1000L);
waitTimes--;
}
for (; ; ) {
if (shutdown) {
break;
}
final Event event = queue.take(); // 这里
receiveEvent(event); // 这里
UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence()));
}
} catch (Throwable ex) {
LOGGER.error("Event listener exception : ", ex);
}
}
void receiveEvent(Event event) {
final long currentEventSequence = event.sequence();
if (!hasSubscriber()) {
LOGGER.warn("[NotifyCenter] the {} is lost, because there is no subscriber.");
return;
}
// Notification single event listener
for (Subscriber subscriber : subscribers) {
// Whether to ignore expiration events
if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire",
event.getClass());
continue;
}
// Because unifying smartSubscriber and subscriber, so here need to think of compatibility.
// Remove original judge part of codes.
notifySubscriber(subscriber, event); // 这里
}
}
@Override
public void notifySubscriber(final Subscriber subscriber, final Event event) {
LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);
final Runnable job = () -> subscriber.onEvent(event);
final Executor executor = subscriber.executor();
if (executor != null) {
executor.execute(job); // 这里
} else {
try {
job.run(); // 这里
} catch (Throwable e) {
LOGGER.error("Event callback exception: ", e);
}
}
}
}
public class InstancesChangeNotifier extends Subscriber<InstancesChangeEvent> {
@Override
public void onEvent(InstancesChangeEvent event) {
String key = ServiceInfo
.getKey(NamingUtils.getGroupedName(event.getServiceName(), event.getGroupName()), event.getClusters());
ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
if (CollectionUtils.isEmpty(eventListeners)) {
return;
}
for (final EventListener listener : eventListeners) {
final com.alibaba.nacos.api.naming.listener.Event namingEvent = transferToNamingEvent(event);
if (listener instanceof AbstractEventListener && ((AbstractEventListener) listener).getExecutor() != null) {
((AbstractEventListener) listener).getExecutor().execute(() -> listener.onEvent(namingEvent)); // 这里
} else {
listener.onEvent(namingEvent); // 这里
}
}
}
}
public class NacosRegistry extends FailbackRegistry {
@Override
public void onEvent(Event event) {
if (event instanceof NamingEvent) {
NamingEvent e = (NamingEvent) event;
notifier.notify(e.getInstances()); // 这里
}
}
}
public abstract class RegistryNotifier {
public synchronized void notify(Object rawAddresses) {
this.rawAddresses = rawAddresses;
long notifyTime = System.currentTimeMillis();
this.lastEventTime = notifyTime;
long delta = (System.currentTimeMillis() - lastExecuteTime) - delayTime;
// more than 10 calls && next execute time is in the future
boolean delay = shouldDelay.get() && delta < 0;
if (delay) {
scheduler.schedule(new NotificationTask(this, notifyTime), -delta, TimeUnit.MILLISECONDS); // 这里
} else {
// check if more than 10 calls
if (!shouldDelay.get() && executeTime.incrementAndGet() > DEFAULT_DELAY_EXECUTE_TIMES) {
shouldDelay.set(true);
}
scheduler.submit(new NotificationTask(this, notifyTime)); // 这里
}
}
public static class NotificationTask implements Runnable {
private final RegistryNotifier listener;
private final long time;
public NotificationTask(RegistryNotifier listener, long time) {
this.listener = listener;
this.time = time;
}
@Override
public void run() {
try {
if (this.time == listener.lastEventTime) {
listener.doNotify(listener.rawAddresses); // 这里
listener.lastExecuteTime = System.currentTimeMillis();
synchronized (listener) {
if (this.time == listener.lastEventTime) {
listener.rawAddresses = null;
}
}
}
} catch (Throwable t) {
logger.error("Error occurred when notify directory. ", t);
}
}
}}
}
public class NacosRegistry extends FailbackRegistry {
private class RegistryChildListenerImpl implements EventListener {
private RegistryNotifier notifier;
public RegistryChildListenerImpl(String serviceName, URL consumerUrl, NotifyListener listener) {
notifier = new RegistryNotifier(getUrl(), NacosRegistry.this.getDelay()) {
@Override
protected void doNotify(Object rawAddresses) {
List<Instance> instances = (List<Instance>) rawAddresses;
if (isServiceNamesWithCompatibleMode(consumerUrl)) {
/**
* Get all instances with corresponding serviceNames to avoid instance overwrite and but with empty instance mentioned
* in https://github.com/apache/dubbo/issues/5885 and https://github.com/apache/dubbo/issues/5899
*/
NacosInstanceManageUtil.initOrRefreshServiceInstanceList(serviceName, instances);
instances = NacosInstanceManageUtil.getAllCorrespondingServiceInstanceList(serviceName);
}
NacosRegistry.this.notifySubscriber(consumerUrl, listener, instances); // 这里
}
};
}
}
然后就调用了上面的????
什么时候添加监听器的?
public class NacosRegistry extends FailbackRegistry {
private void subscribeEventListener(String serviceName, final URL url, final NotifyListener listener)
throws NacosException {
EventListener eventListener = new RegistryChildListenerImpl(serviceName, url, listener); // 这里
namingService.subscribe(serviceName,
getUrl().getGroup(Constants.DEFAULT_GROUP),
eventListener); // 这里
}
private void doSubscribe(final URL url, final NotifyListener listener, final Set<String> serviceNames) {
try {
if (isServiceNamesWithCompatibleMode(url)) {
List<Instance> allCorrespondingInstanceList = Lists.newArrayList();
/**
* Get all instances with serviceNames to avoid instance overwrite and but with empty instance mentioned
* in https://github.com/apache/dubbo/issues/5885 and https://github.com/apache/dubbo/issues/5899
*
* namingService.getAllInstances with {@link org.apache.dubbo.registry.support.AbstractRegistry#registryUrl}
* default {@link DEFAULT_GROUP}
*
* in https://github.com/apache/dubbo/issues/5978
*/
for (String serviceName : serviceNames) {
List<Instance> instances = namingService.getAllInstances(serviceName,
getUrl().getGroup(Constants.DEFAULT_GROUP));
NacosInstanceManageUtil.initOrRefreshServiceInstanceList(serviceName, instances);
allCorrespondingInstanceList.addAll(instances);
}
notifySubscriber(url, listener, allCorrespondingInstanceList);
for (String serviceName : serviceNames) {
subscribeEventListener(serviceName, url, listener); // 这里
}
} else {
for (String serviceName : serviceNames) {
List<Instance> instances = new LinkedList<>();
instances.addAll(namingService.getAllInstances(serviceName
, getUrl().getGroup(Constants.DEFAULT_GROUP)));
String serviceInterface = serviceName;
String[] segments = serviceName.split(SERVICE_NAME_SEPARATOR, -1);
if (segments.length == 4) {
serviceInterface = segments[SERVICE_INTERFACE_INDEX];
}
URL subscriberURL = url.setPath(serviceInterface).addParameters(INTERFACE_KEY, serviceInterface,
CHECK_KEY, String.valueOf(false));
notifySubscriber(subscriberURL, listener, instances);
subscribeEventListener(serviceName, subscriberURL, listener);
}
}
} catch (Throwable cause) {
throw new RpcException("Failed to subscribe " + url + " to nacos " + getUrl() + ", cause: " + cause.getMessage(), cause);
}
}
}
org.apache.dubbo.rpc.RpcException: Directory already destroyed
public abstract class AbstractDirectory<T> implements Directory<T> {
@Override
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
if (destroyed) {
throw new RpcException("Directory already destroyed .url: " + getUrl());
}
return doList(invocation);
}
@Override
public void destroy() {
destroyed = true; // 这里
}
}
public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
private void checkInvokerAvailable() throws IllegalStateException {
if (shouldCheck() && !invoker.isAvailable()) {
invoker.destroy(); // 这里
throw new IllegalStateException("Should has at least one way to know which services this interface belongs to," +
" subscription url: " + invoker.getUrl());
}
}
protected synchronized void init() {
// ......
checkInvokerAvailable(); // 这里
}
}
public abstract class ReferenceConfigBase<T> extends AbstractReferenceConfig {
public boolean shouldCheck() {
checkDefault();
Boolean shouldCheck = isCheck(); // 这里
if (shouldCheck == null && getConsumer() != null) {
shouldCheck = getConsumer().isCheck();
}
if (shouldCheck == null) {
// default true // 这里
shouldCheck = true;
}
return shouldCheck;
}
}
public class RegistryDirectory<T> extends DynamicDirectory<T> {
@Override
public boolean isAvailable() {
if (isDestroyed() || this.forbidden) { // 这里
return false;
}
Map<URL, Invoker<T>> localUrlInvokerMap = urlInvokerMap; // 这里
return CollectionUtils.isNotEmptyMap(localUrlInvokerMap)
&& localUrlInvokerMap.values().stream().anyMatch(Invoker::isAvailable);
}
}
如果没有设置check
字段,那么就会在启动的时候检查提供方是否可用,如果不可用,就销毁了。