摘要:如果有多個注冊中心,多個服務提供者,這個時候會得到一組實例,此時需要通過集群管理類將多個合并成一個實例。并保留服務提供者的部分配置,比如版本,,時間戳等最后將合并后的配置設置為查詢字符串中。
1. 簡介
在上一篇文章中,我詳細的分析了服務導出的原理。本篇文章我們趁熱打鐵,繼續(xù)分析服務引用的原理。在 Dubbo 中,我們可以通過兩種方式引用遠程服務。第一種是使用服務直聯(lián)的方式引用服務,第二種方式是基于注冊中心進行引用。服務直聯(lián)的方式僅適合在調試或測試服務的場景下使用,不適合在線上環(huán)境使用。因此,本文我將重點分析通過注冊中心引用服務的過程。從注冊中心中獲取服務配置只是服務引用過程中的一環(huán),除此之外,服務消費者還需要經歷 Invoker 創(chuàng)建、代理類創(chuàng)建等步驟。這些步驟,我將在后續(xù)章節(jié)中一一進行分析。
2.服務引用原理Dubbo 服務引用的時機有兩個,第一個是在 Spring 容器調用 ReferenceBean 的 afterPropertiesSet 方法時引用服務,第二個是在 ReferenceBean 對應的服務被注入到其他類中時引用。這兩個引用服務的時機區(qū)別在于,第一個是餓漢式的,第二個是懶漢式的。默認情況下,Dubbo 使用懶漢式引用服務。如果需要使用餓漢式,可通過配置
以上就是 Dubbo 引用服務的大致原理,下面我們深入到代碼中,詳細分析服務引用細節(jié)。
3.源碼分析服務引用的入口方法為 ReferenceBean 的 getObject 方法,該方法定義在 Spring 的 FactoryBean 接口中,ReferenceBean 實現了這個方法。實現代碼如下:
public Object getObject() throws Exception { return get(); } public synchronized T get() { if (destroyed) { throw new IllegalStateException("Already destroyed!"); } // 檢測 ref 是否為空,為空則通過 init 方法創(chuàng)建 if (ref == null) { // init 方法主要用于處理配置,以及調用 createProxy 生成代理類 init(); } return ref; }
這里兩個方法代碼都比較簡短,并不難理解。不過這里需要特別說明一下,如果大家從 getObject 方法進行代碼調試時,會碰到比較詫異的問題。這里假設你使用 IDEA,且保持了 IDEA 的默認配置。當你面調試到 get 方法的if (ref == null)時,你會驚奇的發(fā)現 ref 不為空,導致你無法進入到 init 方法中繼續(xù)調試。導致這個現象的原因是 Dubbo 框架本身有點小問題,這個小問題會引發(fā)一些讓人詫異的現象。關于這個問題,我進行了將近兩個小時的排查。查明問題后,我給 Dubbo 提交了一個 pull request (#2754) 修復了此問題。另外,beiwei30 前輩開了一個 issue (#2757) 介紹這個問題,有興趣的朋友可以去看看。大家如果想規(guī)避這個問題,可以修改一下 IDEA 的配置。在配置面板中搜索 toString,然后取消Enable "toString" object view前的對號。具體如下:
講完需要注意的點,我們繼續(xù)向下分析,接下來將分析配置的處理過程。
3.1 處理配置Dubbo 提供了豐富的配置,用于調整和優(yōu)化框架行為,性能等。Dubbo 在引用或導出服務時,首先會對這些配置進行檢查和處理,以保證配置到正確性。如果大家不是很熟悉 Dubbo 配置,建議先閱讀以下官方文檔。配置解析的方法為 ReferenceConfig 的 init 方法,下面來看一下方法邏輯。
private void init() { if (initialized) { return; } initialized = true; if (interfaceName == null || interfaceName.length() == 0) { throw new IllegalStateException("interface not allow null!"); } // 檢測 consumer 變量是否為空,為空則創(chuàng)建 checkDefault(); appendProperties(this); if (getGeneric() == null && getConsumer() != null) { // 設置 generic setGeneric(getConsumer().getGeneric()); } // 檢測是否為泛化接口 if (ProtocolUtils.isGeneric(getGeneric())) { interfaceClass = GenericService.class; } else { try { // 加載類 interfaceClass = Class.forName(interfaceName, true, Thread.currentThread() .getContextClassLoader()); } catch (ClassNotFoundException e) { throw new IllegalStateException(e.getMessage(), e); } checkInterfaceAndMethods(interfaceClass, methods); } // -------------------------------? 分割線1 ?------------------------------ // 從系統(tǒng)變量中獲取與接口名對應的屬性值 String resolve = System.getProperty(interfaceName); String resolveFile = null; if (resolve == null || resolve.length() == 0) { // 從系統(tǒng)屬性中獲取解析文件路徑 resolveFile = System.getProperty("dubbo.resolve.file"); if (resolveFile == null || resolveFile.length() == 0) { // 從指定位置加載配置文件 File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties"); if (userResolveFile.exists()) { // 獲取文件絕對路徑 resolveFile = userResolveFile.getAbsolutePath(); } } if (resolveFile != null && resolveFile.length() > 0) { Properties properties = new Properties(); FileInputStream fis = null; try { fis = new FileInputStream(new File(resolveFile)); // 從文件中加載配置 properties.load(fis); } catch (IOException e) { throw new IllegalStateException("Unload ..., cause:..."); } finally { try { if (null != fis) fis.close(); } catch (IOException e) { logger.warn(e.getMessage(), e); } } // 獲取與接口名對應的配置 resolve = properties.getProperty(interfaceName); } } if (resolve != null && resolve.length() > 0) { // 將 resolve 賦值給 url url = resolve; } // -------------------------------? 分割線2 ?------------------------------ if (consumer != null) { if (application == null) { // 從 consumer 中獲取 Application 實例,下同 application = consumer.getApplication(); } if (module == null) { module = consumer.getModule(); } if (registries == null) { registries = consumer.getRegistries(); } if (monitor == null) { monitor = consumer.getMonitor(); } } if (module != null) { if (registries == null) { registries = module.getRegistries(); } if (monitor == null) { monitor = module.getMonitor(); } } if (application != null) { if (registries == null) { registries = application.getRegistries(); } if (monitor == null) { monitor = application.getMonitor(); } } // 檢測本地 Application 和本地存根配置合法性 checkApplication(); checkStubAndMock(interfaceClass); // -------------------------------? 分割線3 ?------------------------------ Mapmap = new HashMap (); Map
上面的代碼很長,做的事情比較多。這里我根據代碼邏輯,對代碼進行了分塊,下面我們一起來看一下。
首先是方法開始到分割線1之間的代碼。這段代碼主要用于檢測 ConsumerConfig 實例是否存在,如不存在則創(chuàng)建一個新的實例,然后通過系統(tǒng)變量或 dubbo.properties 配置文件填充 ConsumerConfig 的字段。接著是檢測泛化配置,并根據配置設置 interfaceClass 的值。本段代碼邏輯大致就是這些,接著來看分割線1到分割線2之間的邏輯。這段邏輯用于從系統(tǒng)屬性或配置文件中加載與接口名相對應的配置,并將解析結果賦值給 url 字段。url 字段的作用一般是用于點對點調用。繼續(xù)向下看,分割線2和分割線3之間的代碼用于檢測幾個核心配置類是否為空,為空則嘗試從其他配置類中獲取。分割線3與分割線4之間的代碼主要是用于收集各種配置,并將配置存儲到 map 中。分割線4和分割線5之間的代碼用于處理 MethodConfig 實例。該實例包含了事件通知配置,比如 onreturn、onthrow、oninvoke 等。分割線5到方法結尾的代碼主要用于解析服務消費者 ip,以及調用 createProxy 創(chuàng)建代理對象。關于該方法的詳細分析,將會在接下來的章節(jié)中展開。
到這里,關于配置的檢查與處理過長就分析完了。這部分邏輯不是很難理解,但比較繁雜,大家需要耐心看一下。好了,本節(jié)先到這,接下來分析服務引用的過程。
3.2 引用服務本節(jié)我們要從 createProxy 開始看起。createProxy 這個方法表面上看起來只是用于創(chuàng)建代理對象,但實際上并非如此。該方法還會調用其他方法構建以及合并 Invoker 實例。具體細節(jié)如下。
private T createProxy(Mapmap) { URL tmpUrl = new URL("temp", "localhost", 0, map); final boolean isJvmRefer; if (isInjvm() == null) { // url 配置被指定,則不做本地引用 if (url != null && url.length() > 0) { isJvmRefer = false; // 根據 url 的協(xié)議、scope 以及 injvm 等參數檢測是否需要本地引用 // 比如如果用戶顯式配置了 scope=local,此時 isInjvmRefer 返回 true } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) { isJvmRefer = true; } else { isJvmRefer = false; } } else { // 獲取 injvm 配置值 isJvmRefer = isInjvm().booleanValue(); } // 本地引用 if (isJvmRefer) { // 生成本地引用 URL,協(xié)議為 injvm URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map); // 調用 refer 方法構建 InjvmInvoker 實例 invoker = refprotocol.refer(interfaceClass, url); // 遠程引用 } else { // url 不為空,表明用戶可能想進行點對點調用 if (url != null && url.length() > 0) { // 當需要配置多個 url 時,可用分號進行分割,這里會進行切分 String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url); if (us != null && us.length > 0) { for (String u : us) { URL url = URL.valueOf(u); if (url.getPath() == null || url.getPath().length() == 0) { // 設置接口全限定名為 url 路徑 url = url.setPath(interfaceName); } // 檢測 url 協(xié)議是否為 registry,若是,表明用戶想使用指定的注冊中心 if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { // 將 map 轉換為查詢字符串,并作為 refer 參數的值添加到 url 中 urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } else { // 合并 url,移除服務提供者的一些配置(這些配置來源于用戶配置的 url 屬性), // 比如線程池相關配置。并保留服務提供者的部分配置,比如版本,group,時間戳等 // 最后將合并后的配置設置為 url 查詢字符串中。 urls.add(ClusterUtils.mergeUrl(url, map)); } } } } else { // 加載注冊中心 url List us = loadRegistries(false); if (us != null && !us.isEmpty()) { for (URL u : us) { URL monitorUrl = loadMonitor(u); if (monitorUrl != null) { map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString())); } // 添加 refer 參數到 url 中,并將 url 添加到 urls 中 urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } } // 未配置注冊中心,拋出異常 if (urls.isEmpty()) { throw new IllegalStateException("No such any registry to reference..."); } } // 單個注冊中心或服務提供者(服務直聯(lián),下同) if (urls.size() == 1) { // 調用 RegistryProtocol 的 refer 構建 Invoker 實例 invoker = refprotocol.refer(interfaceClass, urls.get(0)); // 多個注冊中心或多個服務提供者,或者兩者混合 } else { List > invokers = new ArrayList >(); URL registryURL = null; // 獲取所有的 Invoker for (URL url : urls) { // 通過 refprotocol 調用 refer 構建 Invoker,refprotocol 會在運行時 // 根據 url 協(xié)議頭加載指定的 Protocol 實例,并調用實例的 refer 方法 invokers.add(refprotocol.refer(interfaceClass, url)); if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { registryURL = url; } } if (registryURL != null) { // 如果注冊中心鏈接不為空,則將使用 AvailableCluster URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); // 創(chuàng)建 StaticDirectory 實例,并由 Cluster 對多個 Invoker 進行合并 invoker = cluster.join(new StaticDirectory(u, invokers)); } else { invoker = cluster.join(new StaticDirectory(invokers)); } } } Boolean c = check; if (c == null && consumer != null) { c = consumer.isCheck(); } if (c == null) { c = true; } // invoker 可用性檢查 if (c && !invoker.isAvailable()) { throw new IllegalStateException("No provider available for the service..."); } // 生成代理類 return (T) proxyFactory.getProxy(invoker); }
上面代碼很多,不過邏輯比較清晰。首先根據配置檢查是否為本地調用,若是,則調用 InjvmProtocol 的 refer 方法生成 InjvmInvoker 實例。若不是,則讀取直聯(lián)配置項,或注冊中心 url,并將讀取到的 url 存儲到 urls 中。然后,根據 urls 元素數量進行后續(xù)操作。若 urls 元素數量為1,則直接通過 Protocol 自適應拓展構建 Invoker 實例接口。若 urls 元素數量大于1,即存在多個注冊中心或服務直聯(lián) url,此時先根據 url 構建 Invoker。然后再通過 Cluster 合并多個 Invoker,最后調用 ProxyFactory 生成代理類。這里,Invoker 的構建過程以及代理類的過程比較重要,因此我將分兩小節(jié)對這兩個過程進行分析。
3.2.1 創(chuàng)建 InvokerInvoker 是 Dubbo 的核心模型,代表一個可執(zhí)行體。在服務提供方,Invoker 用于調用服務提供類。在服務消費方,Invoker 用于執(zhí)行遠程調用。Invoker 在 Dubbo 中的位置十分重要,因此我們有必要去搞懂它。從前面的代碼中可知,Invoker 是由 Protocol 實現類構建的。Protocol 實現類有很多,這里我會分析最常用的兩個,分別是 RegistryProtocol 和 DubboProtocol,其他的大家自行分析。下面先來分析 DubboProtocol 的 refer 方法源碼。如下:
publicInvoker refer(Class serviceType, URL url) throws RpcException { optimizeSerialization(url); // 創(chuàng)建 DubboInvoker DubboInvoker invoker = new DubboInvoker (serviceType, url, getClients(url), invokers); invokers.add(invoker); return invoker; }
上面方法看起來比較簡單,不過這里有一個調用需要我們注意一下,即 getClients。這個方法用于獲取客戶端實例,實例類型為 ExchangeClient。ExchangeClient 實際上并不具備通信能力,因此它需要更底層的客戶端實例進行通信。比如 NettyClient、MinaClient 等,默認情況下,Dubbo 使用 NettyClient 進行通信。接下來,我們簡單看一下 getClients 方法的邏輯。
private ExchangeClient[] getClients(URL url) { // 是否共享連接 boolean service_share_connect = false; // 獲取連接數,默認為0,表示未配置 int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0); // 如果未配置 connections,則共享連接 if (connections == 0) { service_share_connect = true; connections = 1; } ExchangeClient[] clients = new ExchangeClient[connections]; for (int i = 0; i < clients.length; i++) { if (service_share_connect) { // 獲取共享客戶端 clients[i] = getSharedClient(url); } else { // 初始化新的客戶端 clients[i] = initClient(url); } } return clients; }
這里根據 connections 數量決定是獲取共享客戶端還是創(chuàng)建新的客戶端實例,默認情況下,使用共享客戶端實例。不過 getSharedClient 方法中也會調用 initClient 方法,因此下面我們一起看一下這兩個方法。
private ExchangeClient getSharedClient(URL url) { String key = url.getAddress(); // 獲取帶有“引用計數”功能的 ExchangeClient ReferenceCountExchangeClient client = referenceClientMap.get(key); if (client != null) { if (!client.isClosed()) { // 增加引用計數 client.incrementAndGetCount(); return client; } else { referenceClientMap.remove(key); } } locks.putIfAbsent(key, new Object()); synchronized (locks.get(key)) { if (referenceClientMap.containsKey(key)) { return referenceClientMap.get(key); } // 創(chuàng)建 ExchangeClient 客戶端 ExchangeClient exchangeClient = initClient(url); // 將 ExchangeClient 實例傳給 ReferenceCountExchangeClient,這里明顯用了裝飾模式 client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap); referenceClientMap.put(key, client); ghostClientMap.remove(key); locks.remove(key); return client; } }
上面方法先訪問緩存,若緩存未命中,則通過 initClient 方法創(chuàng)建新的 ExchangeClient 實例,并將該實例傳給 ReferenceCountExchangeClient 構造方法創(chuàng)建一個帶有引用技術功能的 ExchangeClient 實例。ReferenceCountExchangeClient 內部實現比較簡單,就不分析了。下面我們再來看一下 initClient 方法的代碼。
private ExchangeClient initClient(URL url) { // 獲取客戶端類型,默認為 netty String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT)); // 添加編解碼和心跳包參數到 url 中 url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME); url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); // 檢測客戶端類型是否存在,不存在則拋出異常 if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) { throw new RpcException("Unsupported client type: ..."); } ExchangeClient client; try { // 獲取 lazy 配置,并根據配置值決定創(chuàng)建的客戶端類型 if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) { // 創(chuàng)建懶加載 ExchangeClient 實例 client = new LazyConnectExchangeClient(url, requestHandler); } else { // 創(chuàng)建普通 ExchangeClient 實例 client = Exchangers.connect(url, requestHandler); } } catch (RemotingException e) { throw new RpcException("Fail to create remoting client for service..."); } return client; }
initClient 方法首先獲取用戶配置的客戶端類型,默認為 netty。然后檢測用戶配置的客戶端類型是否存在,不存在則拋出異常。最后根據 lazy 配置決定創(chuàng)建什么類型的客戶端。這里的 LazyConnectExchangeClient 代碼并不是很復雜,該類會在 request 方法被調用時通過 Exchangers 的 connect 方法創(chuàng)建 ExchangeClient 客戶端,這里就不分析 LazyConnectExchangeClient 的代碼了。下面我們分析一下 Exchangers 的 connect 方法。
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handler == null) { throw new IllegalArgumentException("handler == null"); } url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); // 獲取 Exchanger 實例,默認為 HeaderExchangeClient return getExchanger(url).connect(url, handler); }
如上,getExchanger 會通過 SPI 加載 HeaderExchangeClient 實例,這個方法比較簡單,大家自己看一下吧。接下來分析 HeaderExchangeClient 的實現。
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { // 這里包含了多個調用,分別如下: // 1. 創(chuàng)建 HeaderExchangeHandler 對象 // 2. 創(chuàng)建 DecodeHandler 對象 // 3. 通過 Transporters 構建 Client 實例 // 4. 創(chuàng)建 HeaderExchangeClient 對象 return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true); }
這里的調用比較多,我們這里重點看一下 Transporters 的 connect 方法。如下:
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } ChannelHandler handler; if (handlers == null || handlers.length == 0) { handler = new ChannelHandlerAdapter(); } else if (handlers.length == 1) { handler = handlers[0]; } else { // 如果 handler 數量大于1,則創(chuàng)建一個 ChannelHandler 分發(fā)器 handler = new ChannelHandlerDispatcher(handlers); } // 獲取 Transporter 自適應拓展類,并調用 connect 方法生成 Client 實例 return getTransporter().connect(url, handler); }
這里,getTransporter 方法返回的是自適應拓展類,該類會在運行時根據客戶端類型加載指定的 Transporter 實現類。若用戶未顯示配置客戶端類型,則默認加載 NettyTransporter,并調用該類的 connect 方法。如下:
public Client connect(URL url, ChannelHandler listener) throws RemotingException { // 創(chuàng)建 NettyClient 對象 return new NettyClient(url, listener); }
到這里就不繼續(xù)跟下去了,在往下就是通過 Netty 提供的接口構建 Netty 客戶端了,大家有興趣自己看看。到這里,關于 DubboProtocol 的 refer 方法就分析完了。接下來,繼續(xù)分析 RegistryProtocol 的 refer 方法邏輯。
publicInvoker refer(Class type, URL url) throws RpcException { // 取 registry 參數值,并將其設置為協(xié)議頭 url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY); // 獲取注冊中心實例 Registry registry = registryFactory.getRegistry(url); // 這個判斷暫時不知道有什么意圖,為什么要給 RegistryService 類型生成 Invoker ? if (RegistryService.class.equals(type)) { return proxyFactory.getInvoker((T) registry, type, url); } // 將 url 查詢字符串轉為 Map Map qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY)); // 獲取 group 配置 String group = qs.get(Constants.GROUP_KEY); if (group != null && group.length() > 0) { if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) { // 通過 SPI 加載 MergeableCluster 實例,并調用 doRefer 繼續(xù)執(zhí)行引用服務邏輯 return doRefer(getMergeableCluster(), registry, type, url); } } // 調用 doRefer 繼續(xù)執(zhí)行引用服務邏輯 return doRefer(cluster, registry, type, url); }
上面代碼首先為 url 設置協(xié)議頭,然后根據 url 參數加載注冊中心實例。接下來對 RegistryService 繼續(xù)針對性處理,這個處理邏輯我不是很明白,不知道為什么要為 RegistryService 類型生成 Invoker,有知道同學麻煩告知一下。然后就是獲取 group 配置,根據 group 配置決定 doRefer 第一個參數的類型。這里的重點是 doRefer 方法,如下:
privateInvoker doRefer(Cluster cluster, Registry registry, Class type, URL url) { // 創(chuàng)建 RegistryDirectory 實例 RegistryDirectory directory = new RegistryDirectory (type, url); // 設置注冊中心和協(xié)議 directory.setRegistry(registry); directory.setProtocol(protocol); Map parameters = new HashMap (directory.getUrl().getParameters()); // 生成服務消費者鏈接 URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters); // 注冊服務消費者,在 consumers 目錄下新節(jié)點 if (!Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true)) { registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, Constants.CHECK_KEY, String.valueOf(false))); } // 訂閱 providers、configurators、routers 等節(jié)點數據 directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + "," + Constants.CONFIGURATORS_CATEGORY + "," + Constants.ROUTERS_CATEGORY)); // 一個注冊中心可能有多個服務提供者,因此這里需要將多個服務提供者合并為一個 Invoker invoker = cluster.join(directory); ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory); return invoker; }
如上,doRefer 方法創(chuàng)建一個 RegistryDirectory 實例,然后生成服務者消費者鏈接,并向注冊中心進行注冊。注冊完畢后,緊接著訂閱 providers、configurators、routers 等節(jié)點下的數據。完成訂閱后,RegistryDirectory 會收到這幾個節(jié)點下的子節(jié)點信息,比如可以獲取到服務提供者的配置信息。由于一個服務可能部署在多臺服務器上,這樣就會在 providers 產生多個節(jié)點,這個時候就需要 Cluster 將多個服務節(jié)點合并為一個,并生成一個 Invoker。關于 RegistryDirectory 和 Cluster,本文不打算進行分析,相關分析將會在隨后的文章中展開。
好了,關于 Invoker 的創(chuàng)建的邏輯就先分析到這。邏輯比較多,大家耐心看一下。
3.2.2 創(chuàng)建代理Invoker 創(chuàng)建完畢后,接下來要做的事情是為服務接口生成代理對象。有了代理對象,我們就可以通過代理對象進行遠程調用。代理對象生成的入口方法為在 ProxyFactory 的 getProxy,接下來進行分析。
publicT getProxy(Invoker invoker) throws RpcException { // 調用重載方法 return getProxy(invoker, false); } public T getProxy(Invoker invoker, boolean generic) throws RpcException { Class>[] interfaces = null; // 獲取接口列表 String config = invoker.getUrl().getParameter("interfaces"); if (config != null && config.length() > 0) { // 切分接口列表 String[] types = Constants.COMMA_SPLIT_PATTERN.split(config); if (types != null && types.length > 0) { interfaces = new Class>[types.length + 2]; // 設置服務接口類和 EchoService.class 到 interfaces 中 interfaces[0] = invoker.getInterface(); interfaces[1] = EchoService.class; for (int i = 0; i < types.length; i++) { // 加載接口類 interfaces[i + 1] = ReflectUtils.forName(types[i]); } } } if (interfaces == null) { interfaces = new Class>[]{invoker.getInterface(), EchoService.class}; } // 為 http 和 hessian 協(xié)議提供泛化調用支持,參考 pull request #1827 if (!invoker.getInterface().equals(GenericService.class) && generic) { int len = interfaces.length; Class>[] temp = interfaces; // 創(chuàng)建新的 interfaces 數組 interfaces = new Class>[len + 1]; System.arraycopy(temp, 0, interfaces, 0, len); // 設置 GenericService.class 到數組中 interfaces[len] = GenericService.class; } // 調用重載方法 return getProxy(invoker, interfaces); } public abstract T getProxy(Invoker invoker, Class>[] types);
如上,上面大段代碼都是用來獲取 interfaces 數組的,因此我們需要繼續(xù)往下看。getProxy(Invoker, Class>[]) 這個方法是一個抽象方法,下面我們到 JavassistProxyFactory 類中看一下該方法的實現代碼。
publicT getProxy(Invoker invoker, Class>[] interfaces) { // 生成 Proxy 子類(Proxy 是抽象類)。并調用Proxy 子類的 newInstance 方法生成 Proxy 實例 return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); }
上面代碼并不多,首先是通過 Proxy 的 getProxy 方法獲取 Proxy 子類,然后創(chuàng)建 InvokerInvocationHandler 對象,并將該對象傳給 newInstance 生成 Proxy 實例。InvokerInvocationHandler 實現自 JDK 的 InvocationHandler 接口,具體的用途是攔截接口類調用。該類邏輯比較簡單,這里就不分析了。下面我們重點關注一下 Proxy 的 getProxy 方法,如下。
public static Proxy getProxy(Class>... ics) { // 調用重載方法 return getProxy(ClassHelper.getClassLoader(Proxy.class), ics); } public static Proxy getProxy(ClassLoader cl, Class>... ics) { if (ics.length > 65535) throw new IllegalArgumentException("interface limit exceeded"); StringBuilder sb = new StringBuilder(); // 遍歷接口列表 for (int i = 0; i < ics.length; i++) { String itf = ics[i].getName(); // 檢測類型是否為接口 if (!ics[i].isInterface()) throw new RuntimeException(itf + " is not a interface."); Class> tmp = null; try { // 重新加載接口類 tmp = Class.forName(itf, false, cl); } catch (ClassNotFoundException e) { } // 檢測接口是否相同,這里 tmp 有可能為空 if (tmp != ics[i]) throw new IllegalArgumentException(ics[i] + " is not visible from class loader"); // 拼接接口全限定名,分隔符為 ; sb.append(itf).append(";"); } // 使用拼接后接口名作為 key String key = sb.toString(); Mapcache; synchronized (ProxyCacheMap) { cache = ProxyCacheMap.get(cl); if (cache == null) { cache = new HashMap (); ProxyCacheMap.put(cl, cache); } } Proxy proxy = null; synchronized (cache) { do { // 從緩存中獲取 Reference 實例 Object value = cache.get(key); if (value instanceof Reference>) { proxy = (Proxy) ((Reference>) value).get(); if (proxy != null) { return proxy; } } // 多線程控制,保證只有一個線程可以進行后續(xù)操作 if (value == PendingGenerationMarker) { try { // 其他線程在此處進行等待 cache.wait(); } catch (InterruptedException e) { } } else { // 放置標志位到緩存中,并跳出 while 循環(huán)進行后續(xù)操作 cache.put(key, PendingGenerationMarker); break; } } while (true); } long id = PROXY_CLASS_COUNTER.getAndIncrement(); String pkg = null; ClassGenerator ccp = null, ccm = null; try { // 創(chuàng)建 ClassGenerator 對象 ccp = ClassGenerator.newInstance(cl); Set worked = new HashSet (); List methods = new ArrayList (); for (int i = 0; i < ics.length; i++) { // 檢測接口訪問級別是否為 protected 或 privete if (!Modifier.isPublic(ics[i].getModifiers())) { // 獲取接口包名 String npkg = ics[i].getPackage().getName(); if (pkg == null) { pkg = npkg; } else { if (!pkg.equals(npkg)) // 非 public 級別的接口必須在同一個包下,否者拋出異常 throw new IllegalArgumentException("non-public interfaces from different packages"); } } // 添加接口到 ClassGenerator 中 ccp.addInterface(ics[i]); // 遍歷接口方法 for (Method method : ics[i].getMethods()) { // 獲取方法描述,可理解為方法簽名 String desc = ReflectUtils.getDesc(method); // 如果已包含在 worked 中,則忽略。考慮這種情況, // A 接口和 B 接口中包含一個完全相同的方法 if (worked.contains(desc)) continue; worked.add(desc); int ix = methods.size(); // 獲取方法返回值類型 Class> rt = method.getReturnType(); // 獲取參數列表 Class>[] pts = method.getParameterTypes(); // 生成 Object[] args = new Object[1...N] StringBuilder code = new StringBuilder("Object[] args = new Object[").append(pts.length).append("];"); for (int j = 0; j < pts.length; j++) // 生成 args[1...N] = ($w)$1...N; code.append(" args[").append(j).append("] = ($w)$").append(j + 1).append(";"); // 生成 InvokerHandler 接口的 invoker 方法調用語句,如下: // Object ret = handler.invoke(this, methods[1...N], args); code.append(" Object ret = handler.invoke(this, methods[" + ix + "], args);"); // 返回值不為 void if (!Void.TYPE.equals(rt)) // 生成返回語句,形如 return (java.lang.String) ret; code.append(" return ").append(asArgument(rt, "ret")).append(";"); methods.add(method); // 添加方法名、訪問控制符、參數列表、方法代碼等信息到 ClassGenerator 中 ccp.addMethod(method.getName(), method.getModifiers(), rt, pts, method.getExceptionTypes(), code.toString()); } } if (pkg == null) pkg = PACKAGE_NAME; // 構建接口代理類名稱:pkg + ".proxy" + id,比如 com.tianxiaobo.proxy0 String pcn = pkg + ".proxy" + id; ccp.setClassName(pcn); ccp.addField("public static java.lang.reflect.Method[] methods;"); // 生成 private java.lang.reflect.InvocationHandler handler; ccp.addField("private " + InvocationHandler.class.getName() + " handler;"); // 為接口代理類添加帶有 InvocationHandler 參數的構造方法,比如: // porxy0(java.lang.reflect.InvocationHandler arg0) { // handler=$1; // } ccp.addConstructor(Modifier.PUBLIC, new Class>[]{InvocationHandler.class}, new Class>[0], "handler=$1;"); // 為接口代理類添加默認構造方法 ccp.addDefaultConstructor(); // 生成接口代理類 Class> clazz = ccp.toClass(); clazz.getField("methods").set(null, methods.toArray(new Method[0])); // 構建 Proxy 子類名稱,比如 Proxy1,Proxy2 等 String fcn = Proxy.class.getName() + id; ccm = ClassGenerator.newInstance(cl); ccm.setClassName(fcn); ccm.addDefaultConstructor(); ccm.setSuperClass(Proxy.class); // 為 Proxy 的抽象方法 newInstance 生成實現代碼,形如: // public Object newInstance(java.lang.reflect.InvocationHandler h) { // return new com.tianxiaobo.proxy0($1); // } ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn + "($1); }"); // 生成 Proxy 實現類 Class> pc = ccm.toClass(); // 通過反射創(chuàng)建 Proxy 實例 proxy = (Proxy) pc.newInstance(); } catch (RuntimeException e) { throw e; } catch (Exception e) { throw new RuntimeException(e.getMessage(), e); } finally { if (ccp != null) // 釋放資源 ccp.release(); if (ccm != null) ccm.release(); synchronized (cache) { if (proxy == null) cache.remove(key); else // 寫緩存 cache.put(key, new WeakReference (proxy)); // 喚醒其他等待線程 cache.notifyAll(); } } return proxy; }
上面代碼比較復雜,我也寫了很多注釋。大家在閱讀這段代碼時,要搞清楚 ccp 和 ccm 的用途,不然會被搞暈。ccp 用于為服務接口生成代理類,比如我們有一個 DemoService 接口,這個接口代理類就是由 ccp 生成的。ccm 則是用于為 org.apache.dubbo.common.bytecode.Proxy 抽象類生成子類,主要是實現 Proxy 的抽象方法。下面以 org.apache.dubbo.demo.DemoService 這個接口為例,來看一下該接口代理類代碼大致是怎樣的(忽略 EchoService 接口)。
package org.apache.dubbo.common.bytecode; public class proxy0 implements org.apache.dubbo.demo.DemoService { public static java.lang.reflect.Method[] methods; private java.lang.reflect.InvocationHandler handler; public proxy0() { } public proxy0(java.lang.reflect.InvocationHandler arg0) { handler = $1; } public java.lang.String sayHello(java.lang.String arg0) { Object[] args = new Object[1]; args[0] = ($w) $1; Object ret = handler.invoke(this, methods[0], args); return (java.lang.String) ret; } }
好了,到這里代理類生成邏輯就分析完了。整個過程比較復雜,大家需要耐心看一下,本節(jié)點到這里。
4.總結本篇文章對服務引用的過程進行了較為詳盡的分析,之所以說是較為詳盡,是因為還有一些地方沒有分析到。比如 Directory、Cluster 等實現類的代碼并未進行詳細分析,由于這些類功能比較獨立,因此我打算后續(xù)多帶帶成文進行分析。暫時我們可以先把這些類看成黑盒,只要知道這些類的用途即可。引用服務過程涉及到的調用也非常多,大家在閱讀相關代碼的中耐心些,并多進行調試。
好了,本篇文章就先到這里了。謝謝閱讀。
本文在知識共享許可協(xié)議 4.0 下發(fā)布,轉載需在明顯位置處注明出處
作者:田小波
本文同步發(fā)布在我的個人博客:http://www.tianxiaobo.com
本作品采用知識共享署名-非商業(yè)性使用-禁止演繹 4.0 國際許可協(xié)議進行許可。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉載請注明本文地址:http://www.ezyhdfw.cn/yun/72127.html
摘要:服務引用過程目標從源碼的角度分析服務引用過程。并保留服務提供者的部分配置,比如版本,,時間戳等最后將合并后的配置設置為查詢字符串中。的可以參考源碼解析二十三遠程調用的一的源碼分析。 dubbo服務引用過程 目標:從源碼的角度分析服務引用過程。 前言 前面服務暴露過程的文章講解到,服務引用有兩種方式,一種就是直連,也就是直接指定服務的地址來進行引用,這種方式更多的時候被用來做服務測試,不...
摘要:服務提供者代碼上面這個類會被封裝成為一個實例,并新生成一個實例。這樣當網絡通訊層收到一個請求后,會找到對應的實例,并調用它所對應的實例,從而真正調用了服務提供者的代碼。 這次源碼解析借鑒《肥朝》前輩的dubbo源碼解析,進行源碼學習??偨Y起來就是先總體,后局部.也就是先把需要注意的概念先拋出來,把整體架構圖先畫出來.讓讀者拿著地圖跟著我的腳步,并且每一步我都提醒,現在我們在哪,我們下一...
摘要:將標簽的各種子標簽如,存到一個叫的中。內部中一定存在一個方法從中拿到解析器。生成來實現的以方法為例。該方法的一大部分都是在拆解。 DUBBO 加載 spring加載bean的時候,遇到dubbo的命名空間時,會調用DubboNamespaceHandler類。執(zhí)行init方法。將dubbo標簽的各種子標簽如service,reference存到一個叫parsers的HashMap中。 ...
摘要:遠程調用開篇目標介紹之后解讀遠程調用模塊的內容如何編排介紹中的包結構設計以及最外層的的源碼解析。十該類就是遠程調用的上下文,貫穿著整個調用,例如調用,然后調用。十五該類是系統(tǒng)上下文,僅供內部使用。 遠程調用——開篇 目標:介紹之后解讀遠程調用模塊的內容如何編排、介紹dubbo-rpc-api中的包結構設計以及最外層的的源碼解析。 前言 最近我面臨著一個選擇,因為dubbo 2.7.0-...
摘要:源碼分析一該類實現了,是服務引用監(jiān)聽器的包裝類。取消暴露遍歷監(jiān)聽集合監(jiān)聽取消暴露該方法是對每個取消服務暴露的監(jiān)聽。五暴露服務取消暴露服務該類是服務暴露監(jiān)聽器的適配類,沒有做實際的操作。 遠程調用——Listener 目標:介紹dubbo-rpc-api中的各種listener監(jiān)聽器的實現邏輯,內容略少,隨便撇兩眼,不是重點。 前言 本文介紹監(jiān)聽器的相關邏輯。在服務引用和服務發(fā)現中監(jiān)聽器...
閱讀 960·2021-10-09 09:58
閱讀 792·2021-08-27 16:24
閱讀 1869·2019-08-30 14:15
閱讀 2520·2019-08-30 11:04
閱讀 2246·2019-08-29 18:43
閱讀 2307·2019-08-29 15:20
閱讀 2867·2019-08-26 12:20
閱讀 1782·2019-08-26 11:44