title: dubbo源码系列之filter的前生 tags:
- dubbo
- filter
- spi
- extensionLoader categories: dubbo date: 2017-06-25 18:18:53
dubbo的filter类似于spring的aop,提供了环绕增强功能。
参考
缓存是filter的一个很典型的实现。
那么filter是如何被调用的呢?
收到spring荼毒多年很自然的想到了spring的aop实现===》换汤不换药的动态代理
整篇故事要从invoker讲起。
首先介绍一下Protocol接口
/* * Copyright 1999-2011 Alibaba Group. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.alibaba.dubbo.rpc; import com.alibaba.dubbo.common.URL; import com.alibaba.dubbo.common.extension.Adaptive; import com.alibaba.dubbo.common.extension.SPI; /** * Protocol. (API/SPI, Singleton, ThreadSafe) * * @author william.liangf */ @SPI("dubbo") public interface Protocol { /** * 获取缺省端口,当用户没有配置端口时使用。 * * @return 缺省端口 */ int getDefaultPort(); /** * 暴露远程服务: * 1. 协议在接收请求时,应记录请求来源方地址信息:RpcContext.getContext().setRemoteAddress(); * 2. export()必须是幂等的,也就是暴露同一个URL的Invoker两次,和暴露一次没有区别。 * 3. export()传入的Invoker由框架实现并传入,协议不需要关心。 * * @param服务的类型 * @param invoker 服务的执行体 * @return exporter 暴露服务的引用,用于取消暴露 * @throws RpcException 当暴露服务出错时抛出,比如端口已占用 */ @Adaptive Exporter export(Invoker invoker) throws RpcException; /** * 引用远程服务: * 1. 当用户调用refer()所返回的Invoker对象的invoke()方法时,协议需相应执行同URL远端export()传入的Invoker对象的invoke()方法。 * 2. refer()返回的Invoker由协议实现,协议通常需要在此Invoker中发送远程请求。 * 3. 当url中有设置check=false时,连接失败不能抛出异常,并内部自动恢复。 * * @param 服务的类型 * @param type 服务的类型 * @param url 远程服务的URL地址 * @return invoker 服务的本地代理 * @throws RpcException 当连接服务提供方失败时抛出 */ @Adaptive Invoker refer(Class type, URL url) throws RpcException; /** * 释放协议: * 1. 取消该协议所有已经暴露和引用的服务。 * 2. 释放协议所占用的所有资源,比如连接和端口。 * 3. 协议在释放后,依然能暴露和引用新的服务。 */ void destroy(); }复制代码
SPI在上文已经描述过,可以得出结论 默认的protocol为dubbo
我们查看查看一下所有的protocol
registry=com.alibaba.dubbo.registry.integration.RegistryProtocol filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper mock=com.alibaba.dubbo.rpc.support.MockProtocol injvm=com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol rmi=com.alibaba.dubbo.rpc.protocol.rmi.RmiProtocol hessian=com.alibaba.dubbo.rpc.protocol.hessian.HessianProtocol com.alibaba.dubbo.rpc.protocol.http.HttpProtocol com.alibaba.dubbo.rpc.protocol.webservice.WebServiceProtocol thrift=com.alibaba.dubbo.rpc.protocol.thrift.ThriftProtocol memcached=memcom.alibaba.dubbo.rpc.protocol.memcached.MemcachedProtocol redis=com.alibaba.dubbo.rpc.protocol.redis.RedisProtocol 复制代码
我们获得了这些Protocol。 而Invoker的产生来自于
@AdaptiveInvoker refer(Class type, URL url) throws RpcException;复制代码
该方法有一个Adaptive标签
package com.alibaba.dubbo.common.extension; import java.lang.annotation.Documented; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; import com.alibaba.dubbo.common.URL; /** * 在{ @link ExtensionLoader}生成Extension的Adaptive Instance时,为{ @link ExtensionLoader}提供信息。 * * @author ding.lid * @export * * @see ExtensionLoader * @see URL */ @Documented @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.TYPE, ElementType.METHOD}) public @interface Adaptive { /** * 从{ @link URL}的Key名,对应的Value作为要Adapt成的Extension名。 ** 如果{
@link URL}这些Key都没有Value,使用 用 缺省的扩展(在接口的{ @link SPI}中设定的值)。 * 比如,String[] {"key1", "key2"}
,表示 **
*- 先在URL上找key1的Value作为要Adapt成的Extension名; *
- key1没有Value,则使用key2的Value作为要Adapt成的Extension名。 *
- key2没有Value,使用缺省的扩展。 *
- 如果没有设定缺省扩展,则方法调用会抛出{ @link IllegalStateException}。 *
* 如果不设置则缺省使用Extension接口类名的点分隔小写字串。
* 即对于Extension接口{ @code com.alibaba.dubbo.xxx.YyyInvokerWrapper}的缺省值为String[] {"yyy.invoker.wrapper"}
* * @see SPI#value() */ String[] value() default {}; }复制代码
该注解可以同时出现在类或者方法上
这个注释也很有意思,当使用该注解的时候会在Url参数上找到对应的param以此作为扩展,如果没有则使用注解的value,否则就使用以点分割单词的方式。
我们从源码分析一下
private void loadFile(Map> extensionClasses, String dir) { String fileName = dir + type.getName(); try { Enumeration urls; ClassLoader classLoader = findClassLoader(); if (classLoader != null) { urls = classLoader.getResources(fileName); } else { urls = ClassLoader.getSystemResources(fileName); } if (urls != null) { while (urls.hasMoreElements()) { java.net.URL url = urls.nextElement(); try { BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream(), "utf-8")); try { String line = null; while ((line = reader.readLine()) != null) { final int ci = line.indexOf('#'); if (ci >= 0) line = line.substring(0, ci); line = line.trim(); if (line.length() > 0) { try { String name = null; int i = line.indexOf('='); if (i > 0) { name = line.substring(0, i).trim(); line = line.substring(i + 1).trim(); } if (line.length() > 0) { Class clazz = Class.forName(line, true, classLoader); if (! type.isAssignableFrom(clazz)) { throw new IllegalStateException("Error when load extension class(interface: " + type + ", class line: " + clazz.getName() + "), class " + clazz.getName() + "is not subtype of interface."); } if (clazz.isAnnotationPresent(Adaptive.class)) { if(cachedAdaptiveClass == null) { cachedAdaptiveClass = clazz; } else if (! cachedAdaptiveClass.equals(clazz)) { throw new IllegalStateException("More than 1 adaptive class found: " + cachedAdaptiveClass.getClass().getName() + ", " + clazz.getClass().getName()); } } else { try { clazz.getConstructor(type); Set > wrappers = cachedWrapperClasses; if (wrappers == null) { cachedWrapperClasses = new ConcurrentHashSet >(); wrappers = cachedWrapperClasses; } wrappers.add(clazz); } catch (NoSuchMethodException e) { clazz.getConstructor(); if (name == null || name.length() == 0) { name = findAnnotationName(clazz); if (name == null || name.length() == 0) { if (clazz.getSimpleName().length() > type.getSimpleName().length() && clazz.getSimpleName().endsWith(type.getSimpleName())) { name = clazz.getSimpleName().substring(0, clazz.getSimpleName().length() - type.getSimpleName().length()).toLowerCase(); } else { throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + url); } } } String[] names = NAME_SEPARATOR.split(name); if (names != null && names.length > 0) { Activate activate = clazz.getAnnotation(Activate.class); if (activate != null) { cachedActivates.put(names[0], activate); } for (String n : names) { if (! cachedNames.containsKey(clazz)) { cachedNames.put(clazz, n); } Class c = extensionClasses.get(n); if (c == null) { extensionClasses.put(n, clazz); } else if (c != clazz) { throw new IllegalStateException("Duplicate extension " + type.getName() + " name " + n + " on " + c.getName() + " and " + clazz.getName()); } } } } } } } catch (Throwable t) { IllegalStateException e = new IllegalStateException("Failed to load extension class(interface: " + type + ", class line: " + line + ") in " + url + ", cause: " + t.getMessage(), t); exceptions.put(line, e); } } } // end of while read lines } finally { reader.close(); } } catch (Throwable t) { logger.error("Exception when load extension class(interface: " + type + ", class file: " + url + ") in " + url, t); } } // end of while urls } } catch (Throwable t) { logger.error("Exception when load extension class(interface: " + type + ", description file: " + fileName + ").", t); } }复制代码
这段代码负责从指定的文件夹下面加载指定Class的SPI(此处使用Dubbo自己实现而不是ServiceLoader)
为了便于说明 我们从Protocl来说明。
因此查找文件
META-INF/services/com.alibaba.dubbo.rpc.Protocol META-INF/dubbo/com.alibaba.dubbo.rpc.Protocol META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Protocol 复制代码
registry=com.alibaba.dubbo.registry.integration.RegistryProtocol filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper mock=com.alibaba.dubbo.rpc.support.MockProtocol injvm=com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol rmi=com.alibaba.dubbo.rpc.protocol.rmi.RmiProtocol hessian=com.alibaba.dubbo.rpc.protocol.hessian.HessianProtocol com.alibaba.dubbo.rpc.protocol.http.HttpProtocol com.alibaba.dubbo.rpc.protocol.webservice.WebServiceProtocol thrift=com.alibaba.dubbo.rpc.protocol.thrift.ThriftProtocol memcached=memcom.alibaba.dubbo.rpc.protocol.memcached.MemcachedProtocol redis=com.alibaba.dubbo.rpc.protocol.redis.RedisProtocol 复制代码
此处按照顺序加载
- check该类是否在类上标有注解Adaptive 如果有那么将该类标记为Adaptive类(校验是都存在多个,如果存在多个则直接报错)
- 如果不包含该注解那么将该类初始化(check是否包含单个参数为com.alibaba.dubbo.rpc.Protocol的构造函数,如果有那么将该类放入wrapper集合,否则使用午餐构造函数进行初始化)===》我们可以得出结论 如果spi文件中对应的类为包含接口类型的单参数构造函数 那么将被标记为wrapper,否则调用无参数构造函数进行初始化也就是真正的spi实现
- 那么Adaptive的含义究竟是什么呢?中文叫适配,通常来说我们会把这种理解为自适应。那么自适应的结果是根据Url中的参数返回不同的实现(颇有依赖反转的含义),当spi文件中存在的类型不包含Adaptive的结果时我们会生成代码
private Class getAdaptiveExtensionClass() { getExtensionClasses(); if (cachedAdaptiveClass != null) { return cachedAdaptiveClass; } return cachedAdaptiveClass = createAdaptiveExtensionClass(); } private Class createAdaptiveExtensionClass() { String code = createAdaptiveExtensionClassCode(); ClassLoader classLoader = findClassLoader(); com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension(); return compiler.compile(code, classLoader); } private String createAdaptiveExtensionClassCode() { StringBuilder codeBuidler = new StringBuilder(); Method[] methods = type.getMethods(); boolean hasAdaptiveAnnotation = false; for(Method m : methods) { if(m.isAnnotationPresent(Adaptive.class)) { hasAdaptiveAnnotation = true; break; } } // 完全没有Adaptive方法,则不需要生成Adaptive类 if(! hasAdaptiveAnnotation) throw new IllegalStateException("No adaptive method on extension " + type.getName() + ", refuse to create the adaptive class!"); codeBuidler.append("package " + type.getPackage().getName() + ";"); codeBuidler.append("\nimport " + ExtensionLoader.class.getName() + ";"); codeBuidler.append("\npublic class " + type.getSimpleName() + "$Adpative" + " implements " + type.getCanonicalName() + " {"); for (Method method : methods) { Class rt = method.getReturnType(); Class [] pts = method.getParameterTypes(); Class [] ets = method.getExceptionTypes(); Adaptive adaptiveAnnotation = method.getAnnotation(Adaptive.class); StringBuilder code = new StringBuilder(512); if (adaptiveAnnotation == null) { code.append("throw new UnsupportedOperationException(\"method ") .append(method.toString()).append(" of interface ") .append(type.getName()).append(" is not adaptive method!\");"); } else { int urlTypeIndex = -1; for (int i = 0; i < pts.length; ++i) { if (pts[i].equals(URL.class)) { urlTypeIndex = i; break; } } // 有类型为URL的参数 if (urlTypeIndex != -1) { // Null Point check String s = String.format("\nif (arg%d == null) throw new IllegalArgumentException(\"url == null\");", urlTypeIndex); code.append(s); s = String.format("\n%s url = arg%d;", URL.class.getName(), urlTypeIndex); code.append(s); } // 参数没有URL类型 else { String attribMethod = null; // 找到参数的URL属性 LBL_PTS: for (int i = 0; i < pts.length; ++i) { Method[] ms = pts[i].getMethods(); for (Method m : ms) { String name = m.getName(); if ((name.startsWith("get") || name.length() > 3) && Modifier.isPublic(m.getModifiers()) && !Modifier.isStatic(m.getModifiers()) && m.getParameterTypes().length == 0 && m.getReturnType() == URL.class) { urlTypeIndex = i; attribMethod = name; break LBL_PTS; } } } if(attribMethod == null) { throw new IllegalStateException("fail to create adative class for interface " + type.getName() + ": not found url parameter or url attribute in parameters of method " + method.getName()); } // Null point check String s = String.format("\nif (arg%d == null) throw new IllegalArgumentException(\"%s argument == null\");", urlTypeIndex, pts[urlTypeIndex].getName()); code.append(s); s = String.format("\nif (arg%d.%s() == null) throw new IllegalArgumentException(\"%s argument %s() == null\");", urlTypeIndex, attribMethod, pts[urlTypeIndex].getName(), attribMethod); code.append(s); s = String.format("%s url = arg%d.%s();",URL.class.getName(), urlTypeIndex, attribMethod); code.append(s); } String[] value = adaptiveAnnotation.value(); // 没有设置Key,则使用“扩展点接口名的点分隔 作为Key if(value.length == 0) { char[] charArray = type.getSimpleName().toCharArray(); StringBuilder sb = new StringBuilder(128); for (int i = 0; i < charArray.length; i++) { if(Character.isUpperCase(charArray[i])) { if(i != 0) { sb.append("."); } sb.append(Character.toLowerCase(charArray[i])); } else { sb.append(charArray[i]); } } value = new String[] {sb.toString()}; } boolean hasInvocation = false; for (int i = 0; i < pts.length; ++i) { if (pts[i].getName().equals("com.alibaba.dubbo.rpc.Invocation")) { // Null Point check String s = String.format("\nif (arg%d == null) throw new IllegalArgumentException(\"invocation == null\");", i); code.append(s); s = String.format("\nString methodName = arg%d.getMethodName();", i); code.append(s); hasInvocation = true; break; } } String defaultExtName = cachedDefaultName; String getNameCode = null; for (int i = value.length - 1; i >= 0; --i) { if(i == value.length - 1) { if(null != defaultExtName) { if(!"protocol".equals(value[i])) if (hasInvocation) getNameCode = String.format("url.getMethodParameter(methodName, \"%s\", \"%s\")", value[i], defaultExtName); else getNameCode = String.format("url.getParameter(\"%s\", \"%s\")", value[i], defaultExtName); else getNameCode = String.format("( url.getProtocol() == null ? \"%s\" : url.getProtocol() )", defaultExtName); } else { if(!"protocol".equals(value[i])) if (hasInvocation) getNameCode = String.format("url.getMethodParameter(methodName, \"%s\", \"%s\")", value[i], defaultExtName); else getNameCode = String.format("url.getParameter(\"%s\")", value[i]); else getNameCode = "url.getProtocol()"; } } else { if(!"protocol".equals(value[i])) if (hasInvocation) getNameCode = String.format("url.getMethodParameter(methodName, \"%s\", \"%s\")", value[i], defaultExtName); else getNameCode = String.format("url.getParameter(\"%s\", %s)", value[i], getNameCode); else getNameCode = String.format("url.getProtocol() == null ? (%s) : url.getProtocol()", getNameCode); } } code.append("\nString extName = ").append(getNameCode).append(";"); // check extName == null? String s = String.format("\nif(extName == null) " + "throw new IllegalStateException(\"Fail to get extension(%s) name from url(\" + url.toString() + \") use keys(%s)\");", type.getName(), Arrays.toString(value)); code.append(s); s = String.format("\n%s extension = (%0) { codeBuidler.append(", "); } codeBuidler.append(pts[i].getCanonicalName()); codeBuidler.append(" "); codeBuidler.append("arg" + i); } codeBuidler.append(")"); if (ets.length > 0) { codeBuidler.append(" throws "); for (int i = 0; i < ets.length; i ++) { if (i > 0) { codeBuidler.append(", "); } codeBuidler.append(pts[i].getCanonicalName()); } } codeBuidler.append(" {"); codeBuidler.append(code.toString()); codeBuidler.append("\n}"); } codeBuidler.append("\n}"); if (logger.isDebugEnabled()) { logger.debug(codeBuidler.toString()); } return codeBuidler.toString(); } 这段代码负责生成Java代码,然后使用Compiler进行编译,其次生成对应的class===》注意这段代码首先校验对应的接口包含Adaptive的方法,如果不包含直接不生成(因为无需适配) 其次找到接口对应Adaptive方法的Url参数(也可能在JavaBean的参数中)根据URL获取对应参数 依次来找到对应的适配(通过getExtension方法) /** * 返回指定名字的扩展。如果指定名字的扩展不存在,则抛异常 { @link IllegalStateException}. * * @param name * @return */ @SuppressWarnings("unchecked") public T getExtension(String name) { if (name == null || name.length() == 0) throw new IllegalArgumentException("Extension name == null"); if ("true".equals(name)) { return getDefaultExtension(); } Holder
filter的实现完全由buildInvokerChain来控制
再来细细分说filter的加载过程