tigase插件 – packets是如何被session manager和plugins处理的(十二)

一、理解插件是如何工作的对开发插件是非常重要的,在不同的场景下由不同类型的插件来负责处理packet。

IQ的意思是Info/Query:它是一种请求和应答机制,和http有一些类似的地方。 IQ的语意允许一个实体向另一个实体发送请求,并从另一个实体获取应答。请求和应答当中的数据在IQ元素的第一级子节点(命名空间的声明)当中被定义,请求方实体可以通过id标签来跟踪交互过程。如此一来,IQ交互的数据交换模式就类似于“get/result” 或者 “set/result”(在某些情况下,也可能是get/error和set/error)。如有困惑请参考XMPP官方英文文档:http://xmpp.org/rfcs/rfc3920.html#stanzas-semantics-iq。

Requesting                 Responding
  Entity                     Entity
----------                 ----------
    |                           |
    | <iq type='get' id='1'>    |
    | ------------------------> |
    |                           |
    | <iq type='result' id='1'> |
    | <------------------------ |
    |                           |
    | <iq type='set' id='2'>    |
    | ------------------------> |
    |                           |
    | <iq type='error' id='2'>  |
    | <------------------------ |
    |                           |

为了能够确保这种方式被执行,有如下规则需要遵守:

1、每一个IQ stanza都必须含有“id”标签和值
2、每一个IQ stanza都必须含有“type”标签和值,且值必须为如下的几种:
get — 是一个请求信息
set — 提供了所需要的数据,设置了新的值或者替换某些已经存在的值
result — 一个对应get或set并且执行成功的应答
error — 之前发送的一个get或set发生了处理或者传输错误
3、如果一个实体接收到了“set”或“get” IQ请求,那么它必须回复一个“result”或“error”应答,并且应答的id标签值必须与请求的id标签值保持一致。
4、如果一个实体接收到了“result”或“error” stanza,那么它一定不能再返回“result”或“error”应答;但是发出请求的实体可以可以继续发送下一个请求。
5、一个“get” 或 “set”类型的 IQ stanza必须包含并只包含一个子元素指明特定请求或应答的语义。
6、一个“result”类型的 IQ stanza必须包含零或一个子元素。
7、一个“error”类型的IQ stanza应该包含有两个子元素。第一个子元素包含着与之发生错误相对应的“get”或“set”,第二个子元素是<error/>元素。

##plugin简介 插件是一段负责处理特定XMPP stanza的代码。有专门负责处理消息的插件,有专门负责处理在线状态的插件,有专门负责处理iq通讯录的插件,也有专门负责处理版本的插件。

插件通过xmlns和元素名来声明所有它“感兴趣”的那些在特定命名空间下的特定XML元素名,所以你可以创建一个对那些包含caps元素的packet“感兴趣”的插件。

对于那些没有插件“感兴趣”的stanza元素,它们会被直接传递到消息的目标地址。相反的,也有一些特定的stanza可能会被多个插件“感兴趣”,在这种情况下,多个插件可能会被多个线程同时进行处理,所以无法确保哪个插件先进行处理哪个插件后进行处理。

每一个stanza都会被session manager一步一步得进行处理,看看下面的图:  stanza在SM中的处理

就像图片里面展示的那样,stanza在session manager里面分四步进行处理:

1、预处理 – 所有已加载的预处理器都会接收stanza并进行处理。它们在session manager的内部线程里被调用,插件自己本身没有消息队列。需要注意的是:由于它们在session manager的内部线程中被调用,算法的执行效率会直接限制SM的执行最小时间(多多少少都会拖慢SM的执行速度)。设置预处理器的目的是为了阻止packet完成后续处理。如果预处理器返回的结果是true,那么这个packet就会被阻塞,被阻塞的packet不再进行后续处理。
2、处理 – 如果packet没有被预处理器阻塞,那么它会进入这一步进行处理。它会被插入到那些对它的特定元素“感兴趣”的处理器队列中。每一个处理器都拥有一个固定长度的处理队列,并在独立的线程里被调用。
3、投递处理 – 如果一个stanza没有被任何一个处理器处理,那么它就进入了这一步进行处理。内建的最后一个投递处理器会对它执行默认处理。通常默认处理是直接把这个packet发送到目的地地址。大多数情况这个packet是<message/>。
4、过滤 – 最后,如果上面的任何一个步骤产生了输出或result packet,那么这个输出或result packet会进入这一步进行处理,每一个过滤器都可能产生“阻塞”或“允许通过”的结果。

需要提醒两点:

(1)有两个地方或两种方法能够阻塞/过滤packet。一个地方是在“处理”前被“预处理”过程阻塞;另一个在“处理”完毕后,处理结果被“过滤”。
(2)session manager和处理器的执行方式像是一个packet消费者。packet先被处理,处理一旦结束,它就被销毁了。所以在把一个packet发送到处理器之前,必须对packet进行复制,设置好所有的属性然后把它作为结果输出。当然处理器也可以输出任意多个packet作为结果,上面四个步骤当中的任何一个步骤都可以产生结果packet。看看下图:

用户发送packet 如果packet p1要向服务器外部发送(比如要发送给另外一台服务器的一个用户,或者另外一个组件 – MUC/PubSub/transport之类的),那么服务器中的第一个处理器必须为p1创建一个备份p2,并且正确设置好所有的所有的属性和目的地地址。当p1被SM在处理过程中销毁,最终服务器中的插件还能够产生一个新的packet。

如果是组件向用户发送也是一样:  用户向服务器发送一个请求并获取应答

这种设计产生惊人的结果。如果你看完下面两个用户之间的交互,你会发现,packet在投递到目的地之前被拷贝了两次:

一个用户向另一个用户发送packet

就像图片所展示的那样,packet被SM处理了两次。第一次是作为用户A的传出packet进行处理,第二次是作为用户B的传入packet进行处理。

这样做的目的是为了首先确定用户A有权限发送packet,然后是确定用户B有权限接收数据。如果用户B不在线,那么离线消息处理器会把packet保存到数据库当中。

二、处理分为四个步骤,每个步骤都有相对应类型的插件负责处理。

第一步 – 预处理 – XMPPPreprocessorIfc:这是预处理器插件需要实现的接口

第二步 – 处理 – XMPPProcessorIfc:这是处理器插件需要实现的接口

第三步 – 投递 – XMPPPostProcessorIfc:这是投递处理器插件需要实现的接口

第四步 – 过滤 – XMPPPacketFilterIfc:这是结果过滤器插件需要实现的接口

如果你已经看过这四个接口的代码,你会发现每个接口都只有一个方法需要实现。 这个方法就是处理packet的地方它们具有非常相似的入口参数,下面对这些参数进行介绍:

Packet packet – 需要被处理的packet,这个参数不可以为null。即使这个对象不是immutable的,在方法里也不能对它进行修改。它的任何一个变亮都不能发生改变。

XMPPResourceConnection session – session里面包含所有的用户会话数据和访问用户数据库的方法。它允许向持久化数据库中存储信息,但如果用户在线只允许向内存中存储数据。在方法调用时,如果没有在线的用户会话,那么这个参数可以为null。 NonAuthUserRespository repo – 当上面的参数-即用户会话为空的时候,这个参数通常用来存储用户数据。它只允许非常有限的数据访问。比如在用户离线时存储用户的离线消息(对已经存在的数据不允许覆写),比如读取用户的公共Vcard信息。 Queue results – 这是处理产生的结果packet队列。不管怎样,都必须对输入的packet进行备份,并把备份存储到结果队别里面。 Map<String, Object> setting – map里面保存着tigase服务器专为插件准备的配置信息。在大多数情况下,插件并不需要这些配置信息,但如果某个插件需要访问外部数据库,那么tigase服务器可以通过这个参数向它传递数据库的连接字符串。

/**
 * 需要添加XMPPImplIfc接口的描述
 */
public interface XMPPImplIfc {
int concurrentQueuesNo();

@Deprecated
int concurrentThreadsPerQueue();

/**
 * id()返回插件的唯一ID。每一个插件都拥有唯一ID:它在配置文件中用来指定哪个插件需要加载,哪个不需要。
 * 在大多数情况,ID就是该插件感兴趣的packet的XMLNS。
 *
 * @return id,字符串格式
 */
String id();

/**
 * init()方法在插件被加载到内存之后立即被执行,检查数据库是否可用和其他的初始化过程都可以写到这个方法里。
 * 这对于那些通过非标准存储方式访问数据库或需要对数据库scheme进行升级的插件来说非常有用。
 *
 * @param settings 初始化配置信息
 * @throws TigaseDBException
 */
void init(Map settings)throwsTigaseDBException;

//~--- get methods ----------------------------------------------------------

/**
 * isSupporting方法传入元素的名称和命名空间,返回这个元素是否被该插件“感兴趣”
 *
 * @param elem 元素名称,字符串格式
 * @param ns   命名空间,字符串格式
 * @return 一个布尔类型,true:感兴趣;false:不感兴趣
 */
boolean isSupporting(String elem, String ns);

//~--- methods --------------------------------------------------------------

/**
 * supDiscoFeatures()方法向请求的发起者返回一个XML元素数组格式的服务发现(service discovery)特性信息。
 * 返回的服务发现特性取决于该插件支持哪些服务。
 *
 * @param session 一个XMPPResourceConnection实例
 * @return 一个XML元素数组
 */
Element[] supDiscoFeatures(XMPPResourceConnection session);

/**
 * supElements()方法返回该插件“感兴趣”的XML元素名数组,数组当中的每一个元素名都依次对应着supNamespaces()返回的命名空间
 *
 * @return 字符串数组
 */
String[] supElements();

/**
 * supNamespaces()方法返回该插件“感兴趣”的stanza命名空间,数组当中的每一个命名空间都依次对应着supElements()方法返回的XML元素名
 *
 * @return 字符串数组
 */
String[] supNamespaces();

/**
 * supStreamFeatures()方法对请求的发起者返回一个XML元素数组格式的流特性信息。
 * 返回的流特性取决于该插件支持哪些特性。
 *
 * @param session 一个XMPPResourceConnection实例
 * @return XML元素数组
 */
Element[] supStreamFeatures(XMPPResourceConnection session);
}

接下来,我们实现一个专门处理 packet的简单插件,插件的工作就是把packet投递到目的地地址。传入packet会被转发给用户,而传出packet会被转发到一个外部目的地地址。这个插件其实已经实现了,它保存在我们的SVN服务器上(https://svn.tigase.org/reps/tigase-server/trunk/src/main/java/tigase/xmpp/impl/Message.java)。代码当中有一些备注,但是这篇文档会更深入的介绍实现细节。

在开始之前你需要选择一个插件类型。如果要开发一个处理器插件,那么就需要实现XMPPProcessorIfc接口;如果是预处理插件,就需要实现XMPPPreprocessorIfc接口;当然你也可以实现多个接口,这个取决于你的需求和情况,你也可以使用helper抽象类作为基类来实现所有的插件。插件类的声明应该像下面那样(假如你要实现一个处理器插件):

要做的第一件事情就是确定插件ID。它是唯一的,需要放到配置文件里面,告诉服务器在启动时加载并使用相对应的插件。如果这个插件只对特定命名空间下特定名称的元素“感兴趣”,在多数情况下,可以直接以命名空间来作为ID,当然了谁也无法保证这个名称的元素不会出现在其他的packet里面。因为我们想开发一个能够处理所有的的处理器插件,但是又不想花费一整天来考虑如何为这个插件起一个很酷的ID,所以我们干脆就叫它“message”吧。

@Id(ELEM_NAME)  
protected static final String     ELEM_NAME = tigase.server.Message.ELEM_NAME;
public static final String ELEM_NAME = "message";

用下面的代码来声明插件的ID:就像之前我们描述的那样,插件只接收并处理它“感兴趣”的packet。我们的插件只对“jabber:client”命名空间下的元素感兴趣。声明插件所感兴趣的东西,需要添加两个方法:

public String[] supElements() {
  returnnewString[] {"message"};
}
 
public String[] supNamespaces() {
  return newString[] {"jabber:client"};
}

现在我们已经准备好了把插件加载到tigase服务器。下一步就是实现packet处理的方,请参考SVN中的代码(https://svn.tigase.org/reps/tigase-server/trunk/src/main/java/tigase/xmpp/impl/Message.java)。我只会在容易造成困惑的代码上面添加注释,然后添加一两行代码帮助你理解。
public void process(finalPacket packet,
final XMPPResourceConnection session,
final NonAuthUserRepository repo,
final Queue<Packet> results,
final Map<String, Object> settings)
throwsXMPPException {

  // 出于性能的考虑,最好在打印日志之前现检查一下日志级别
  if(log.isLoggable(Level.FINEST)) {
    log.finest("Processing packet: "+ packet.toString());
  }
 
  // 如果用户不在线,你也许想跳过后面的处理环节
  if(session ==null) {
    return;
  }// end of if (session == null)
 
  // 当插件在第一次处理这个用户的会话信息的时候,还有另外一种方法可以执行必要的操作
  if(session.getSessionData(ID) ==null) {
    session.putSessionData(ID, ID);
    // 你可以把你的代码放到这里
    .....
    // 如果你不希望终止操作,那么就把return语句去掉
    return;
  }
 
  // 如果用户的会话没有授权,那么每一次调用session.getUserId()方法都会抛出异常
  try{
 
    // 在比较JID之前一定记得要去掉resource部分
    // JID的组成:jid = [ node "@" ] domain [ "/" resource ]
    // 比如:chutianxing@gmail.com/home
    String id = JIDUtils.getNodeID(packet.getElemTo());
    // 检查一下这个packet是否是发给会话的拥有者
    if(session.getUserId().equals(id)) {
      // 如果是,那么这个消息的确是要发送给这个客户端的
      Element elem = packet.getElement().clone();
      Packet result =newPacket(elem);
      // 这里就是我们为最终收到消息的用户设置客户端组件地址的地方了
      // 在大多数情况,这可能是一个能够保持于客户端连接的c2s或Bosh组件
      result.setTo(session.getConnectionId(packet.getElemTo()));
      // 在大多数情况,这一步可以跳过,但是当packet的投递过程出现了什么问题,这么做可以为调用者返回一个错误
      result.setFrom(packet.getTo());
      // 最后不要忘记把结果packet放到结果队列里面去,否则结果会丢失
      results.offer(result);
    }// end of else
 
    // 在比较JID之前一定记得要去掉resource部分
    id = JIDUtils.getNodeID(packet.getElemFrom());
    // 检查一下这个packet是否由会话的拥有者发出
    if(session.getUserId().equals(id)) {
      // 这是一个由客户端发出的packet,最简单的处理就是把packet转发到packet的目的地地址:
      // 简单的对XML元素进行克隆,然后……
      Element result = packet.getElement().clone();
      // 把他放到传出packet队列里面就行了
      results.offer(newPacket(result));
      return;
    }
 
    // 程序真的会运行到这里吗?
    // 是的,一些packet即没有from也没有to地址。最容易理解的一个例子是向服务器发送的获取某些数据的IQ请求。这类packet没有任何地址,并且需要对它做很多复杂的处理
    // 下面的代码展示了如何确定这个seesion就是请求发起者的session
    id = packet.getFrom();
    // 下面的处理和检查getElementFrom差不多
    if(session.getConnectionId().equals(id)) {
      // 这里需要针对IQ packet做一些特别处理,但是我们需要处理的是message,所以这里只需要对它进行转发
      Element result = packet.getElement().clone();
      // 如果程序运行到这里说明packet的from地址是没有的,现在对from属性就行设置
      result.setAttribute("from", session.getJID());
      // 最后把传出packet放到结果队列里面就ok乐
      results.offer(newPacket(result));
    }

  }catch(NotAuthorizedException e) {
    log.warning("NotAuthorizedException for packet: "  +
      packet.getStringData());
    results.offer(Authorization.NOT_AUTHORIZED.getResponseMessage(packet,
      "You must authorize session first.",true));
  }// end of try-catch
 
}
 

三、Tigase插件 – 配置

在对Tigase载入插件的配置方式当中,最佳最简单的是通过init.properties文件。 –sm-plugins属性值标明了哪些插件会在运行时被加载,多个插件之间使用逗号“,”分割。 –sm-plugins = list of plugins – 被服务器加载的插件列表。通常你不需要指定这个参数,服务器会自动加载默认的插件,默认的插件列表包含了所有可用的插件。如果你不想加载所有的插件:比如你的用户管理是由第三方的系统提供,tigase仅作为XMPP服务与其集成,这时你也许不希望用户通过XMPP来注册新用户,那么你就不应该加载用户注册插件;另一种情况是你开发了一个自己的插件来替换tigase的默认插件,比如vCard或者通讯录管理…… 如果不希望加载,就在插件id之前加上一个“-”;如果希望加载,就在插件之前加上一个“+”, “+”是可以省略的。比如,下面的一个设置就是关闭了用户注册,并且添加了一个你自己的插件“your-plugin”:

--sm-plugins=-jabber:iq:register,+your-plugin

另外需要说明的是,每一个插件都会在一个或多个线程内运行。大多数访问数据库的插件都会在N个线程里运行,N为cpu或cpu的核心个数。在某些情况下,这样简单的配置方式还不够:比如数据库比较慢,或者你的服务负载压力比较大,你需要调节插件线程的个数。那么可以在插件id后面添加“=N”,N就是线程的个数。还是上面的那个例子,修改为8个线程执行“your-plugin”,16个线程执行认证服务:

--sm-plugins=-jabber:iq:register,+your-plugin=8,jabber:iq:auth=16

很明显,如果想自如的修改配置,必须要知道插件列表的id。获得插件列表id有两种途径: 一是通过日志文件:logs/tigase-console.log。如果你看一下日志文件,你会找到类似下面的输入:

Loading plugin: jabber:iq:register ...
Loading plugin: jabber:iq:auth ...
Loading plugin: urn:ietf:params:xml:ns:xmpp-sasl ...
Loading plugin: urn:ietf:params:xml:ns:xmpp-bind ...
Loading plugin: urn:ietf:params:xml:ns:xmpp-session ...
Loading plugin: roster-presence ...
Loading plugin: jabber:iq:privacy ...
Loading plugin: jabber:iq:version ...
Loading plugin: http://jabber.org/protocol/stats ...
Loading plugin: starttls ...
Loading plugin: vcard-temp ...
Loading plugin: http://jabber.org/protocol/commands ...
Loading plugin: jabber:iq:private ...
Loading plugin: urn:xmpp:ping ...

在日志中会找到已经被加载的插件列表。

二是查看session manager的源码,那里硬编码了默认的插件列表:

private static final String[] PLUGINS_FULL_PROP_VAL = {"jabber:iq:register", "jabber:iq:auth", "urn:ietf:params:xml:ns:xmpp-sasl", "urn:ietf:params:xml:ns:xmpp-bind", "urn:ietf:params:xml:ns:xmpp-session", "roster-presence", "jabber:iq:privacy", "jabber:iq:version", "http://jabber.org/protocol/stats", "starttls", "msgoffline", "vcard-temp", "http://jabber.org/protocol/commands", "jabber:iq:private", "urn:xmpp:ping", "basic-filter", "domain-filter"};

无论怎样,你都必须把插件id正确地配置到“–sm-plugins”属性值当中,如果插件在正确的类路径下,那么它就会在运行时被加载。

还有一种加载插件的方式,如果看过之前的一章:Tigase插件-编写插件,应该还记得插件接口定义的方法中都有一个入口参数:Map setting,这个map里面包含了写在配置文件当中的那些配置项,它们会在插件执行时被传递给插件。 init.properties是你放置配置信息的地方,这些配置项都以字符串 sess-man/plugins-conf 开头,那么就可以把你的插件id放到你的配置项里:

sess-man/plugins-conf/message/key1=val1 sess-man/plugins-conf/message/key2=val2 sess-man/plugins-conf/message/key3=val3

##Tigase数据流和数据处理

tigase通过tigase.io包当中的代码读取网络中的字节数组,然后通过tigase.net包当中的类把字节数组转换为字符,最后通过tigase.xml包当中的XML解析器把这些字符转换成XML DOM对象。

所有服务之间的通讯均使用XML DOM对象,这是因为XMPP协议使用的就是XML DOM对象。tigase使用“Tigase XML parser and DOM builder”来进行基本的XML数据处理(转换字符流,构建DOM对象,读写XML元素和属性)

所有的stanza都保存在tigase.xml.Element对象里面,所有的Element属性和子节点都可以通过类的API来进行访问。

如果想让解析/生成DOM的工作简单一些,比如类似“基于stanza当中的元素操作(互换from/to的值,设置type=result等)对应答stanza进行初始化”的操作可以直接使用Packet类来完成,大部分的对Element的操作都已经被封装到tigase.server.Packet类里。

学海无涯、何时是岸
原文地址:https://www.cnblogs.com/veblen/p/14703856.html