博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
dubbo源码系列之filter的前生
阅读量:5859 次
发布时间:2019-06-19

本文共 30737 字,大约阅读时间需要 102 分钟。


title: dubbo源码系列之filter的前生 tags:

  • dubbo
  • filter
  • spi
  • extensionLoader categories: dubbo date: 2017-06-25 18:18:53

dubbo的filter类似于spring的aop,提供了环绕增强功能。

参考

缓存是filter的一个很典型的实现。

那么filter是如何被调用的呢?

收到spring荼毒多年很自然的想到了spring的aop实现===》换汤不换药的动态代理

整篇故事要从invoker讲起。

首先介绍一下Protocol接口

/*     * Copyright 1999-2011 Alibaba Group.     *      * Licensed under the Apache License, Version 2.0 (the "License");     * you may not use this file except in compliance with the License.     * You may obtain a copy of the License at     *      *      http://www.apache.org/licenses/LICENSE-2.0     *      * Unless required by applicable law or agreed to in writing, software     * distributed under the License is distributed on an "AS IS" BASIS,     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.     * See the License for the specific language governing permissions and     * limitations under the License.     */    package com.alibaba.dubbo.rpc;         import com.alibaba.dubbo.common.URL;    import com.alibaba.dubbo.common.extension.Adaptive;    import com.alibaba.dubbo.common.extension.SPI;         /**     * Protocol. (API/SPI, Singleton, ThreadSafe)     *     * @author william.liangf     */    @SPI("dubbo")    public interface Protocol {                 /**         * 获取缺省端口,当用户没有配置端口时使用。         *         * @return 缺省端口         */        int getDefaultPort();             /**         * 暴露远程服务:
* 1. 协议在接收请求时,应记录请求来源方地址信息:RpcContext.getContext().setRemoteAddress();
* 2. export()必须是幂等的,也就是暴露同一个URL的Invoker两次,和暴露一次没有区别。
* 3. export()传入的Invoker由框架实现并传入,协议不需要关心。
* * @param
服务的类型 * @param invoker 服务的执行体 * @return exporter 暴露服务的引用,用于取消暴露 * @throws RpcException 当暴露服务出错时抛出,比如端口已占用 */ @Adaptive
Exporter
export(Invoker
invoker) throws RpcException; /** * 引用远程服务:
* 1. 当用户调用refer()所返回的Invoker对象的invoke()方法时,协议需相应执行同URL远端export()传入的Invoker对象的invoke()方法。
* 2. refer()返回的Invoker由协议实现,协议通常需要在此Invoker中发送远程请求。
* 3. 当url中有设置check=false时,连接失败不能抛出异常,并内部自动恢复。
* * @param
服务的类型 * @param type 服务的类型 * @param url 远程服务的URL地址 * @return invoker 服务的本地代理 * @throws RpcException 当连接服务提供方失败时抛出 */ @Adaptive
Invoker
refer(Class
type, URL url) throws RpcException; /** * 释放协议:
* 1. 取消该协议所有已经暴露和引用的服务。
* 2. 释放协议所占用的所有资源,比如连接和端口。
* 3. 协议在释放后,依然能暴露和引用新的服务。
*/ void destroy(); }复制代码

SPI在上文已经描述过,可以得出结论 默认的protocol为dubbo

我们查看查看一下所有的protocol

    registry=com.alibaba.dubbo.registry.integration.RegistryProtocol     filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper     listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper     mock=com.alibaba.dubbo.rpc.support.MockProtocol     injvm=com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol     dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol     rmi=com.alibaba.dubbo.rpc.protocol.rmi.RmiProtocol     hessian=com.alibaba.dubbo.rpc.protocol.hessian.HessianProtocol     com.alibaba.dubbo.rpc.protocol.http.HttpProtocol     com.alibaba.dubbo.rpc.protocol.webservice.WebServiceProtocol     thrift=com.alibaba.dubbo.rpc.protocol.thrift.ThriftProtocol     memcached=memcom.alibaba.dubbo.rpc.protocol.memcached.MemcachedProtocol     redis=com.alibaba.dubbo.rpc.protocol.redis.RedisProtocol 复制代码

我们获得了这些Protocol。 而Invoker的产生来自于

@Adaptive        
Invoker
refer(Class
type, URL url) throws RpcException;复制代码

该方法有一个Adaptive标签

package com.alibaba.dubbo.common.extension;         import java.lang.annotation.Documented;    import java.lang.annotation.ElementType;    import java.lang.annotation.Retention;    import java.lang.annotation.RetentionPolicy;    import java.lang.annotation.Target;         import com.alibaba.dubbo.common.URL;         /**     * 在{
@link ExtensionLoader}生成Extension的Adaptive Instance时,为{
@link ExtensionLoader}提供信息。 * * @author ding.lid * @export * * @see ExtensionLoader * @see URL */ @Documented @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.TYPE, ElementType.METHOD}) public @interface Adaptive { /** * 从{
@link URL}的Key名,对应的Value作为要Adapt成的Extension名。 *

* 如果{

@link URL}这些Key都没有Value,使用 用 缺省的扩展(在接口的{
@link SPI}中设定的值)。
* 比如,String[] {"key1", "key2"},表示 *

    *
  1. 先在URL上找key1的Value作为要Adapt成的Extension名; *
  2. key1没有Value,则使用key2的Value作为要Adapt成的Extension名。 *
  3. key2没有Value,使用缺省的扩展。 *
  4. 如果没有设定缺省扩展,则方法调用会抛出{
    @link IllegalStateException}。 *
*

* 如果不设置则缺省使用Extension接口类名的点分隔小写字串。

* 即对于Extension接口{
@code com.alibaba.dubbo.xxx.YyyInvokerWrapper}的缺省值为String[] {"yyy.invoker.wrapper"} * * @see SPI#value() */ String[] value() default {}; }复制代码

该注解可以同时出现在类或者方法上

这个注释也很有意思,当使用该注解的时候会在Url参数上找到对应的param以此作为扩展,如果没有则使用注解的value,否则就使用以点分割单词的方式。

我们从源码分析一下

private void loadFile(Map
> extensionClasses, String dir) { String fileName = dir + type.getName(); try { Enumeration
urls; ClassLoader classLoader = findClassLoader(); if (classLoader != null) { urls = classLoader.getResources(fileName); } else { urls = ClassLoader.getSystemResources(fileName); } if (urls != null) { while (urls.hasMoreElements()) { java.net.URL url = urls.nextElement(); try { BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream(), "utf-8")); try { String line = null; while ((line = reader.readLine()) != null) { final int ci = line.indexOf('#'); if (ci >= 0) line = line.substring(0, ci); line = line.trim(); if (line.length() > 0) { try { String name = null; int i = line.indexOf('='); if (i > 0) { name = line.substring(0, i).trim(); line = line.substring(i + 1).trim(); } if (line.length() > 0) { Class
clazz = Class.forName(line, true, classLoader); if (! type.isAssignableFrom(clazz)) { throw new IllegalStateException("Error when load extension class(interface: " + type + ", class line: " + clazz.getName() + "), class " + clazz.getName() + "is not subtype of interface."); } if (clazz.isAnnotationPresent(Adaptive.class)) { if(cachedAdaptiveClass == null) { cachedAdaptiveClass = clazz; } else if (! cachedAdaptiveClass.equals(clazz)) { throw new IllegalStateException("More than 1 adaptive class found: " + cachedAdaptiveClass.getClass().getName() + ", " + clazz.getClass().getName()); } } else { try { clazz.getConstructor(type); Set
> wrappers = cachedWrapperClasses; if (wrappers == null) { cachedWrapperClasses = new ConcurrentHashSet
>(); wrappers = cachedWrapperClasses; } wrappers.add(clazz); } catch (NoSuchMethodException e) { clazz.getConstructor(); if (name == null || name.length() == 0) { name = findAnnotationName(clazz); if (name == null || name.length() == 0) { if (clazz.getSimpleName().length() > type.getSimpleName().length() && clazz.getSimpleName().endsWith(type.getSimpleName())) { name = clazz.getSimpleName().substring(0, clazz.getSimpleName().length() - type.getSimpleName().length()).toLowerCase(); } else { throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + url); } } } String[] names = NAME_SEPARATOR.split(name); if (names != null && names.length > 0) { Activate activate = clazz.getAnnotation(Activate.class); if (activate != null) { cachedActivates.put(names[0], activate); } for (String n : names) { if (! cachedNames.containsKey(clazz)) { cachedNames.put(clazz, n); } Class
c = extensionClasses.get(n); if (c == null) { extensionClasses.put(n, clazz); } else if (c != clazz) { throw new IllegalStateException("Duplicate extension " + type.getName() + " name " + n + " on " + c.getName() + " and " + clazz.getName()); } } } } } } } catch (Throwable t) { IllegalStateException e = new IllegalStateException("Failed to load extension class(interface: " + type + ", class line: " + line + ") in " + url + ", cause: " + t.getMessage(), t); exceptions.put(line, e); } } } // end of while read lines } finally { reader.close(); } } catch (Throwable t) { logger.error("Exception when load extension class(interface: " + type + ", class file: " + url + ") in " + url, t); } } // end of while urls } } catch (Throwable t) { logger.error("Exception when load extension class(interface: " + type + ", description file: " + fileName + ").", t); } }复制代码

这段代码负责从指定的文件夹下面加载指定Class的SPI(此处使用Dubbo自己实现而不是ServiceLoader)

为了便于说明 我们从Protocl来说明。

因此查找文件

META-INF/services/com.alibaba.dubbo.rpc.Protocol     META-INF/dubbo/com.alibaba.dubbo.rpc.Protocol     META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Protocol 复制代码
    registry=com.alibaba.dubbo.registry.integration.RegistryProtocol     filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper     listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper     mock=com.alibaba.dubbo.rpc.support.MockProtocol     injvm=com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol     dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol     rmi=com.alibaba.dubbo.rpc.protocol.rmi.RmiProtocol     hessian=com.alibaba.dubbo.rpc.protocol.hessian.HessianProtocol     com.alibaba.dubbo.rpc.protocol.http.HttpProtocol     com.alibaba.dubbo.rpc.protocol.webservice.WebServiceProtocol     thrift=com.alibaba.dubbo.rpc.protocol.thrift.ThriftProtocol     memcached=memcom.alibaba.dubbo.rpc.protocol.memcached.MemcachedProtocol     redis=com.alibaba.dubbo.rpc.protocol.redis.RedisProtocol 复制代码

此处按照顺序加载

  1. check该类是否在类上标有注解Adaptive 如果有那么将该类标记为Adaptive类(校验是都存在多个,如果存在多个则直接报错)
  2. 如果不包含该注解那么将该类初始化(check是否包含单个参数为com.alibaba.dubbo.rpc.Protocol的构造函数,如果有那么将该类放入wrapper集合,否则使用午餐构造函数进行初始化)===》我们可以得出结论 如果spi文件中对应的类为包含接口类型的单参数构造函数 那么将被标记为wrapper,否则调用无参数构造函数进行初始化也就是真正的spi实现
  3. 那么Adaptive的含义究竟是什么呢?中文叫适配,通常来说我们会把这种理解为自适应。那么自适应的结果是根据Url中的参数返回不同的实现(颇有依赖反转的含义),当spi文件中存在的类型不包含Adaptive的结果时我们会生成代码
private Class
getAdaptiveExtensionClass() { getExtensionClasses(); if (cachedAdaptiveClass != null) { return cachedAdaptiveClass; } return cachedAdaptiveClass = createAdaptiveExtensionClass(); } private Class
createAdaptiveExtensionClass() { String code = createAdaptiveExtensionClassCode(); ClassLoader classLoader = findClassLoader(); com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension(); return compiler.compile(code, classLoader); } private String createAdaptiveExtensionClassCode() { StringBuilder codeBuidler = new StringBuilder(); Method[] methods = type.getMethods(); boolean hasAdaptiveAnnotation = false; for(Method m : methods) { if(m.isAnnotationPresent(Adaptive.class)) { hasAdaptiveAnnotation = true; break; } } // 完全没有Adaptive方法,则不需要生成Adaptive类 if(! hasAdaptiveAnnotation) throw new IllegalStateException("No adaptive method on extension " + type.getName() + ", refuse to create the adaptive class!"); codeBuidler.append("package " + type.getPackage().getName() + ";"); codeBuidler.append("\nimport " + ExtensionLoader.class.getName() + ";"); codeBuidler.append("\npublic class " + type.getSimpleName() + "$Adpative" + " implements " + type.getCanonicalName() + " {"); for (Method method : methods) { Class
rt = method.getReturnType(); Class
[] pts = method.getParameterTypes(); Class
[] ets = method.getExceptionTypes(); Adaptive adaptiveAnnotation = method.getAnnotation(Adaptive.class); StringBuilder code = new StringBuilder(512); if (adaptiveAnnotation == null) { code.append("throw new UnsupportedOperationException(\"method ") .append(method.toString()).append(" of interface ") .append(type.getName()).append(" is not adaptive method!\");"); } else { int urlTypeIndex = -1; for (int i = 0; i < pts.length; ++i) { if (pts[i].equals(URL.class)) { urlTypeIndex = i; break; } } // 有类型为URL的参数 if (urlTypeIndex != -1) { // Null Point check String s = String.format("\nif (arg%d == null) throw new IllegalArgumentException(\"url == null\");", urlTypeIndex); code.append(s); s = String.format("\n%s url = arg%d;", URL.class.getName(), urlTypeIndex); code.append(s); } // 参数没有URL类型 else { String attribMethod = null; // 找到参数的URL属性 LBL_PTS: for (int i = 0; i < pts.length; ++i) { Method[] ms = pts[i].getMethods(); for (Method m : ms) { String name = m.getName(); if ((name.startsWith("get") || name.length() > 3) && Modifier.isPublic(m.getModifiers()) && !Modifier.isStatic(m.getModifiers()) && m.getParameterTypes().length == 0 && m.getReturnType() == URL.class) { urlTypeIndex = i; attribMethod = name; break LBL_PTS; } } } if(attribMethod == null) { throw new IllegalStateException("fail to create adative class for interface " + type.getName() + ": not found url parameter or url attribute in parameters of method " + method.getName()); } // Null point check String s = String.format("\nif (arg%d == null) throw new IllegalArgumentException(\"%s argument == null\");", urlTypeIndex, pts[urlTypeIndex].getName()); code.append(s); s = String.format("\nif (arg%d.%s() == null) throw new IllegalArgumentException(\"%s argument %s() == null\");", urlTypeIndex, attribMethod, pts[urlTypeIndex].getName(), attribMethod); code.append(s); s = String.format("%s url = arg%d.%s();",URL.class.getName(), urlTypeIndex, attribMethod); code.append(s); } String[] value = adaptiveAnnotation.value(); // 没有设置Key,则使用“扩展点接口名的点分隔 作为Key if(value.length == 0) { char[] charArray = type.getSimpleName().toCharArray(); StringBuilder sb = new StringBuilder(128); for (int i = 0; i < charArray.length; i++) { if(Character.isUpperCase(charArray[i])) { if(i != 0) { sb.append("."); } sb.append(Character.toLowerCase(charArray[i])); } else { sb.append(charArray[i]); } } value = new String[] {sb.toString()}; } boolean hasInvocation = false; for (int i = 0; i < pts.length; ++i) { if (pts[i].getName().equals("com.alibaba.dubbo.rpc.Invocation")) { // Null Point check String s = String.format("\nif (arg%d == null) throw new IllegalArgumentException(\"invocation == null\");", i); code.append(s); s = String.format("\nString methodName = arg%d.getMethodName();", i); code.append(s); hasInvocation = true; break; } } String defaultExtName = cachedDefaultName; String getNameCode = null; for (int i = value.length - 1; i >= 0; --i) { if(i == value.length - 1) { if(null != defaultExtName) { if(!"protocol".equals(value[i])) if (hasInvocation) getNameCode = String.format("url.getMethodParameter(methodName, \"%s\", \"%s\")", value[i], defaultExtName); else getNameCode = String.format("url.getParameter(\"%s\", \"%s\")", value[i], defaultExtName); else getNameCode = String.format("( url.getProtocol() == null ? \"%s\" : url.getProtocol() )", defaultExtName); } else { if(!"protocol".equals(value[i])) if (hasInvocation) getNameCode = String.format("url.getMethodParameter(methodName, \"%s\", \"%s\")", value[i], defaultExtName); else getNameCode = String.format("url.getParameter(\"%s\")", value[i]); else getNameCode = "url.getProtocol()"; } } else { if(!"protocol".equals(value[i])) if (hasInvocation) getNameCode = String.format("url.getMethodParameter(methodName, \"%s\", \"%s\")", value[i], defaultExtName); else getNameCode = String.format("url.getParameter(\"%s\", %s)", value[i], getNameCode); else getNameCode = String.format("url.getProtocol() == null ? (%s) : url.getProtocol()", getNameCode); } } code.append("\nString extName = ").append(getNameCode).append(";"); // check extName == null? String s = String.format("\nif(extName == null) " + "throw new IllegalStateException(\"Fail to get extension(%s) name from url(\" + url.toString() + \") use keys(%s)\");", type.getName(), Arrays.toString(value)); code.append(s); s = String.format("\n%s extension = (%
0) { codeBuidler.append(", "); } codeBuidler.append(pts[i].getCanonicalName()); codeBuidler.append(" "); codeBuidler.append("arg" + i); } codeBuidler.append(")"); if (ets.length > 0) { codeBuidler.append(" throws "); for (int i = 0; i < ets.length; i ++) { if (i > 0) { codeBuidler.append(", "); } codeBuidler.append(pts[i].getCanonicalName()); } } codeBuidler.append(" {"); codeBuidler.append(code.toString()); codeBuidler.append("\n}"); } codeBuidler.append("\n}"); if (logger.isDebugEnabled()) { logger.debug(codeBuidler.toString()); } return codeBuidler.toString(); } 这段代码负责生成Java代码,然后使用Compiler进行编译,其次生成对应的class===》注意这段代码首先校验对应的接口包含Adaptive的方法,如果不包含直接不生成(因为无需适配) 其次找到接口对应Adaptive方法的Url参数(也可能在JavaBean的参数中)根据URL获取对应参数 依次来找到对应的适配(通过getExtension方法) /** * 返回指定名字的扩展。如果指定名字的扩展不存在,则抛异常 {
@link IllegalStateException}. * * @param name * @return */ @SuppressWarnings("unchecked") public T getExtension(String name) { if (name == null || name.length() == 0) throw new IllegalArgumentException("Extension name == null"); if ("true".equals(name)) { return getDefaultExtension(); } Holder
holder = cachedInstances.get(name); if (holder == null) { cachedInstances.putIfAbsent(name, new Holder()); holder = cachedInstances.get(name); } Object instance = holder.get(); if (instance == null) { synchronized (holder) { instance = holder.get(); if (instance == null) { instance = createExtension(name); holder.set(instance); } } } return (T) instance; } 此处获取指定名称的Extension如果缓存中没有则去创建 @SuppressWarnings("unchecked") private T createExtension(String name) { Class
clazz = getExtensionClasses().get(name); if (clazz == null) { throw findException(name); } try { T instance = (T) EXTENSION_INSTANCES.get(clazz); if (instance == null) { EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance()); instance = (T) EXTENSION_INSTANCES.get(clazz); } injectExtension(instance); Set
> wrapperClasses = cachedWrapperClasses; if (wrapperClasses != null && wrapperClasses.size() > 0) { for (Class
wrapperClass : wrapperClasses) { instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance)); } } return instance; } catch (Throwable t) { throw new IllegalStateException("Extension instance(name: " + name + ", class: " + type + ") could not be instantiated: " + t.getMessage(), t); } } private T injectExtension(T instance) { try { if (objectFactory != null) { for (Method method : instance.getClass().getMethods()) { if (method.getName().startsWith("set") && method.getParameterTypes().length == 1 && Modifier.isPublic(method.getModifiers())) { Class
pt = method.getParameterTypes()[0]; try { String property = method.getName().length() > 3 ? method.getName().substring(3, 4).toLowerCase() + method.getName().substring(4) : ""; Object object = objectFactory.getExtension(pt, property); if (object != null) { method.invoke(instance, object); } } catch (Exception e) { logger.error("fail to inject via method " + method.getName() + " of interface " + type.getName() + ": " + e.getMessage(), e); } } } } } catch (Exception e) { logger.error(e.getMessage(), e); } return instance; } 创建指定名称的Extension(此处就是将spi文件的对应文件归类后放在map中进行映射查找) 此时会执行injectExtension这样实现了依赖注入。 上文中还有个关键点,使用Wrapper来包装对应的extension,比如Protocol此时就会有两个包装类 ProtocolFilterWrapper和ProtocolListenerWrapper 而filter的主要实现通过ProtocolFilterWrapper来完成。 public
Exporter
export(Invoker
invoker) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER)); } public
Invoker
refer(Class
type, URL url) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { return protocol.refer(type, url); } return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER); } public void destroy() { protocol.destroy(); } private static
Invoker
buildInvokerChain(final Invoker
invoker, String key, String group) { Invoker
last = invoker; List
filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); if (filters.size() > 0) { for (int i = filters.size() - 1; i >= 0; i --) { final Filter filter = filters.get(i); final Invoker
next = last; last = new Invoker
() { public Class
getInterface() { return invoker.getInterface(); } public URL getUrl() { return invoker.getUrl(); } public boolean isAvailable() { return invoker.isAvailable(); } public Result invoke(Invocation invocation) throws RpcException { return filter.invoke(next, invocation); } public void destroy() { invoker.destroy(); } @Override public String toString() { return invoker.toString(); } }; } } return last; } 复制代码

filter的实现完全由buildInvokerChain来控制

再来细细分说filter的加载过程

转载地址:http://dkljx.baihongyu.com/

你可能感兴趣的文章
用户控件
查看>>
elasticsearch term 查询二:Range Query
查看>>
DTcms 扩展字段标签调用
查看>>
微软职位内部推荐-Software Engineer II
查看>>
camera HSYNC:VSYNC
查看>>
php5.6安装window7安装memcache.dll库所遇到的误区
查看>>
使用Eclipse开发一个web应用
查看>>
三星收购的Viv Labs能否让其再做突破?
查看>>
静态include与动态include的区别
查看>>
Eclipse.Error.gen already exists but is not a source folder.
查看>>
LeetCode:Remove Duplicates from Sorted List I II
查看>>
az nginx install and other
查看>>
杭电1716
查看>>
.Net使用Newtonsoft.Json.dll(JSON.NET)对象序列化成json、反序列化json示例教程
查看>>
Hibernate学习小结
查看>>
51nod 1135 原根 (数论)
查看>>
web设计师和前端设计师的互动—前端工程师应该具备的三种思维
查看>>
说下我费了几个钟头才搞定的myeclipse和tomcat问题
查看>>
Eclipse启动Tomcat后无法访问项目
查看>>
如何高效的使用-Notepad++
查看>>