分布式Java应用
上QQ阅读APP看本书,新人免费读10天
设备和账号都新为新人

1.1 基于消息方式实现系统间的通信

1.1.1 基于Java自身技术实现消息方式的系统间通信

基于Java自身包实现消息方式的系统间通信的方式有:TCP/IP+BIO、TCP/IP+NIO、UDP/IP+BIO以及UDP/IP+NIO 4种,下面分别介绍如何实现这4种方式的系统间通信。

TCP/IP+BIO

在Java中可基于Socket、ServerSocket来实现TCP/IP+BIO的系统间通信。Socket主要用于实现建立连接及网络IO的操作,ServerSocket主要用于实现服务器端端口的监听及Socket对象的获取。基于Socket实现客户端的关键代码如下:

    // 创建连接,如果域名解析不了会抛出UnknownHostException,当连接不上时会抛出IOException,
    如果希望控制建立连接的超时,可先调用new Socket(),然后调用socket.connect(SocketAddress
    类型的目标地址,以毫秒为单位的超时时间)
    Socket socket=new Socket(目标IP或域名,目标端口);
    // 创建读取服务器端返回流的BufferedReader
    BufferedReader in=new BufferedReader(new
    InputStreamReader(socket.getInputStream()));
    // 创建向服务器写入流的PrintWriter
    PrintWriter out=new PrintWriter(socket.getOutputStream(),true);
    // 向服务器发送字符串信息,要注意的是,此处即使写失败也不会抛出异常信息,并且一直会阻塞到写入
    操作系统或网络IO出现异常为止
    out.println(“hello”);
    // 阻塞读取服务端的返回信息,以下代码会阻塞到服务端返回信息或网络IO出现异常为止,如果希望在超
    过一段时间后就不阻塞了,那么要在创建Socket对象后调用socket.setSoTimeout(以毫秒为单位的超
    时时间)
    in.readLine();

服务器端关键代码如下:

    // 创建对本地指定端口的监听,如端口冲突则抛出SocketException,其他网络IO方面的异常则抛出
    IOException ServerSocket ss=new ServerSocket(监听的端口)
    // 接受客户端建立连接的请求,并返回Socket对象,以便和客户端进行交互,交互的方式和客户端相同,
    也是通过Socket.getInputStream和Socket.getOutputStream来进行读写操作,此方法会一直阻
    塞到有客户端发送建立连接的请求,如果希望此方法最多阻塞一定的时间,则要在创建ServerSocket后
    调用其setSoTimeout(以毫秒为单位的超时时间)
    Socket socket=ss.accept();

上面是基于Socket、ServerSocket实现的一个简单的系统间通信的例子。而在实际的系统中,通常要面对的是客户端同时要发送多个请求到服务器端,服务器端则同时要接受多个连接发送的请求,上面的代码显然是无法满足的。

为了满足客户端能同时发送多个请求到服务器端,最简单的方法就是生成多个Socket。但这里会产生两个问题:一是生成太多的Socket会消耗过多的本地资源,在客户端机器多,服务器端机器少的情况下,客户端生成太多Socket会导致服务器端须要支撑非常高的连接数;二是生成Socket(建立连接)通常是比较慢的,因此频繁地创建会导致系统性能不足。鉴于这两个问题,通常采用连接池的方式来维护Socket是比较好的,一方面限制了能创建的Socket的个数;另一方面由于将Socket放入了池中,避免了重复创建Socket带来的性能下降问题。数据库连接池就是这种方式的典型代表,但连接池的方式会带来另一个问题,连接池中的Socket个数是有限的,但同时要用Socket的请求可能会很多,在这种情况下就会造成激烈的竞争和等待;还有一个需要注意的问题是合理控制等待响应的超时时间,如不设定超时会导致当服务器端处理变慢时,客户端相关的请求都在做无限的等待,而客户端的资源必然是有限的。因此这种情况下很容易造成当服务器端出现问题时,客户端挂掉的现象。超时时间具体设置为多少取决于客户端能承受的请求量及服务器端的处理时间。既要保证性能,又要保证出错率不会过高,对于直接基于TCP/IP+BIO的方式,可采用Socket.setSoTimeout来设置等待响应的超时时间。

为了满足服务器端能同时接受多个连接发送的请求,通常采用的方法是在accept获取Socket后,将此Socket放入一个线程中处理,通常将此方式称为一连接一线程。这样服务器端就可接受多个连接发送请求了,这种方式的缺点是无论连接上是否有真实的请求,都要耗费一个线程。为避免创建过多的线程导致服务器端资源耗尽,须限制创建的线程数量,这就造成了在采用BIO的情况下服务器端所能支撑的连接数是有限的。

TCP/IP+NIO

在Java中可基于java.nio.channels中的Channel和Selector的相关类来实现TCP/IP+NIO方式的系统间通信。Channel有SocketChannel和ServerSocketChannel两种,SocketChannel用于建立连接、监听事件及操作读写,ServerSocketChannel用于监听端口及监听连接事件;程序通过Selector来获取是否有要处理的事件。基于这两个类实现客户端的关键代码如下:

    SocketChannel channel=SocketChannel.open();
    // 设置为非阻塞模式
    channel.configureBlocking(false);
    //对于非阻塞模式,立刻返回false,表示连接正在建立中
    channel.connect(SocketAddress);
    Selector selector=Selector.open();
    // 向channel注册selector以及感兴趣的连接事件
    channel.register(selector,SelectionKey.OP_CONNECT);
    // 阻塞至有感兴趣的IO事件发生,或到达超时时间,如果希望一直等至有感兴趣的IO事件发生,可调用
    无参数的select方法,如果希望不阻塞直接返回目前是否有感兴趣的事件发生,可调用selectNow方法
    int nKeys=selector.select(以毫秒为单位的超时时间)
    // 如nKeys大于零,说明有感兴趣的IO事件发生
    SelectionKey sKey=null;
    if(nKeys>0){
      Set<SelectionKey> keys=selector.selectedKeys();
    for(SelectionKey key:keys){
        // 对于发生连接的事件
                if(key.isConnectable()){
                    SocketChannel sc=(SocketChannel) key.channel();
                    sc.configureBlocking(false);
                    // 注册感兴趣的IO读事件,通常不直接注册写事件,在发送缓冲区未满的情况下,一
    直是可写的,因此如注册了写事件,而又不用写数据,很容易造成CPU消耗100%的现象
    sKey = sc.register(selector, SelectionKey.OP_READ);
    // 完成连接的建立
                    sc.finishConnect();
            }
            // 有流可读取
                else if(key.isReadable()){
                    ByteBuffer buffer=ByteBuffer.allocate(1024);
                    SocketChannel sc=(SocketChannel) key.channel();
                    int readBytes=0;
                    try{
                        int ret=0;
                        try{
                            // 读取目前可读的流,sc.read返回的为成功复制到bytebuffer中的字
    节数,此步为阻塞操作,值可能为0;当已经是流的结尾时,返回-1
    while((ret=sc.read(buffer))>0){
                              readBytes+=ret;
                            }
                         }
                         finally{
                             buffer.flip();
                         }
                      }
                      finally{
                         if(buffer!=null){
                             buffer.clear();
                         }
                      }
                   }
                   // 可写入流
                   else if(key.isWritable()){
                       // 取消对OP_WRITE事件的注册
                       key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
                       SocketChannel sc=(SocketChannel) key.channel();
                       // 此步为阻塞操作,直到写入操作系统发送缓冲区或网络IO出现异常,返回的为成功
    写入的字节数,当操作系统的发送缓冲区已满,此处会返回0
    int writtenedSize=sc.write(ByteBuffer);
    // 如未写入,则继续注册感兴趣的OP_WRITE事件
    if(writtenedSize==0){
       key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
    }
    }
          }
       selector.selectedKeys().clear();
    }
    // 对于要写入的流,可直接调用channel.write来完成,只有在写入未成功时才要注册OP_WRITE事件
    int wSize=channel.write(ByteBuffer);
    if(wSize==0){
       key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
    }

从上可见,NIO是典型的Reactor模式的实现,通过注册感兴趣的事件及扫描是否有感兴趣的事件发生,从而做相应的动作。

服务器端关键代码如下:

    ServerSocketChannel ssc=ServerSocketChannel.open();
      ServerSocket serverSocket=ssc.socket();
      // 绑定要监听的端口
    serverSocket.bind(new InetSocketAddress(port));
      ssc.configureBlocking(false);
    // 注册感兴趣的连接建立事件
    ssc.register(selector, SelectionKey.OP_ACCEPT);

之后则可采取和客户端同样的方式对selector.select进行轮询,只是要增加一个对key.isAcceptable的处理,代码如下:

    if(key.isAcceptable()){
      ServerSocketChannel server=(ServerSocketChannel)key.channel();
      SocketChannel sc=server.accept();
      if(sc==null){
            continue;
    }
    sc.configureBlocking(false);
    sc.register(selector,SelectionKey.OP_READ);
    }

上面只是基于TCP/IP+NIO实现的一个简单例子,同样来看看基于TCP/IP+NIO如何支撑客户端同时发送多个请求及服务器端接受多个连接发送的请求。

对于客户端发送多个请求的需求,采用TCP/IP+NIO和采用TCP/IP+BIO的方式没有任何不同。但NIO方式可做到不阻塞,因此如果服务器端返回的响应能带上请求标识,那么客户端则可采用连接复用的方式,即每个SocketChannel在发送消息后,不用等响应即可继续发送其他消息,这种方式可降低连接池带来的资源争抢的问题,从而提升系统性能;对于连接不复用的情况,可基于Socket.setSoTimeout的方式来控制同步请求的超时;对于连接复用的情况,同步请求的超时可基于BlockingQueue、对象的wait/notify机制或Future机制来实现。

对于服务器端接受多个连接请求的需求,通常采用的是由一个线程来监听连接的事件,另一个或多个线程来监听网络流读写的事件。当有实际的网络流读写事件发生后,再放入线程池中处理。这种方式比TCP/IP+BIO的好处在于可接受很多的连接,而这些连接只在有真实的请求时才会创建线程来处理,这种方式通常又称为一请求一线程。当连接数不多,或连接数较多,且连接上的请求发送非常频繁时,TCP/IP+NIO的方式不会带来太大的优势,但在实际的场景中,通常是服务器端要支持大量的连接数,但这些连接同时发送的请求并不会非常多。

在基于Sun JDK开发Java NIO程序时,尤其要注意selector.select抛出IOException异常的处理http://bugs.sun.com/view_bug.do?bug_id=6693490及selector.select不阻塞就直接返回的情况http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6403933。这两种状况都有可能造成CPU消耗达到100%,对于selector.select抛出IOException的状况,可以采用绕过的方法为捕捉异常并将当前Thread sleep一段时间,或是重新创建Selector。为避免selector.select不阻塞就直接返回,可采用bug库中提到的修改建议。

从上面可以看出,对于高访问量的系统而言,TCP/IP+NIO方式结合一定的改造在客户端能够带来更高的性能,在服务器端能支撑更高的连接数。

UDP/IP+BIO

Java对UDP/IP方式的网络数据传输同样采用Socket机制,只是UDP/IP下的Socket没有建立连接的要求,由于UDP/IP是无连接的,因此无法进行双向的通信。这也就要求如果要双向通信的话,必须两端都成为UDP Server。

在Java中可基于DatagramSocket和DatagramPacket来实现UDP/IP+BIO方式的系统间通信, DatagramSocket负责监听端口及读写数据。DatagramPacket作为数据流对象进行传输,基于这两个类实现客户端的关键代码如下:

    // 由于UDP/IP是无连接的,如果希望双向通信,就必须启动一个监听端口,承担服务器的职责,如不能
    绑定到指定端口,则抛出SocketException
    DatagramSocket serverSocket=new DatagramSocket(监听的端口);
    byte[] buffer=new byte[65507];
    DatagramPacket receivePacket=new DatagramPacket(buffer,buffer.length);
    DatagramSocket socket=new DatagramSocket();
    DatagramPacket packet=new DatagramPacket(datas,datas.length,server,port);
    // 阻塞发送packet到指定的服务器和端口,当出现网络IO异常时抛出IOException,当连不上目标地
    址和端口时,抛出PortUnreachableException
    socket.send(packet)
    // 阻塞并同步读取流信息,如接收到的流信息比packet长度长,则删除更长的信息,可通过调用
    DatagramSocket.setSoTimeout(以毫秒为单位的超时时间)来设置读取流的超时时间
    serverSocket.receive(receivePacket)

服务器端代码和客户端代码的结构基本一致,这里就不列了。

由于UDP/IP通信的两端不建立连接,就不会有TCP/IP通信连接竞争的问题,只是最终读写流的动作是同步的。

对于服务器端同时接收多请求的需求,通常采取每接收到一个packet就放入一个线程中进行处理的方式来实现。

UDP/IP+NIO

在Java中可通过DatagramChannel和ByteBuffer来实现UDP/IP+NIO方式的系统间通信, DatagramChannel负责监听端口及进行读写,ByteBuffer则用于数据流传输。基于这两个类实现客户端的关键代码如下:

    DatagramChannel receiveChannel=DatagramChannel.open();
      receiveChannel.configureBlocking(false);
      DatagramSocket socket=receiveChannel.socket();
      socket.bind(new InetSocketAddress(rport));
      Selector selector=Selector.open();
      receiveChannel.register(selector, SelectionKey.OP_READ);
      之后即可采取和TCP/IP+NIO中对selector遍历一样的方式进行流信息的读取。
      DatagramChannel sendChannel=DatagramChannel.open();
      sendChannel.configureBlocking(false);
      SocketAddress target=new InetSocketAddress("127.0.0.1",sport);
    sendChannel.connect(target);
    // 阻塞写入流,如发送缓冲区已满,则返回0,此时可通过注册SelectionKey.OP_WRITE事件,以便
    在可写入时再进行写操作,方式和TCP/IP+NIO基本一致
    sendChannel.write(ByteBuffer);

服务端代码和客户端代码基本一致,就不再一一描述。

从以上代码来看,对于UDP/IP方式,NIO带来的好处是只在有流要读取或可写入流时才做相应的IO操作,而不用像BIO方式直接阻塞当前线程。

以上列举了基于Java包实现一对一的系统间通信的方式,在实际的场景中,通常还会要将消息发送到多台机器,此时可以选择为每个目标机器建立一个连接,这种方式对于发送消息端会造成很大的网络流量压力。例如传输的消息是视频数据的场景,在网络协议上还有一个基于UDP/IP扩展出来的多播协议,多播协议的传输方式是一份数据在网络上进行传输,而不是由发送者给每个接收者都传一份数据,这样,网络的流量就大幅度下降了。

在Java中可基于MulticastSocket和DatagramPacket来实现多播网络通信,MulticastSocket是基于DatagramSocket派生出来的类,其作用即为基于UDP/IP实现多播方式的网络通信。在多播通信中,不同的地方在于接收数据端通过加入到多播组来进行数据的接收,同样发送数据也要求加入到多播组进行发送,多播的目标地址具有指定的地址范围,在224.0.0.0和239.255.255.255之间。基于多播方式实现网络通信的服务器端关键代码如下:

    // 组播地址
    InetAddress groupAddress=InetAddress.getByName("224.1.1.1");
    MulticastSocket server=new MulticastSocket(port);
      // 加入组播,如地址为非组播地址,则抛出IOException,当已经不希望再发送数据到组播地址,或
    不希望再读取数据时,可调用server.leaveGroup(组播地址)
    server.joinGroup(groupAddress);
      MulticastSocket client=new MulticastSocket();
    client.joinGroup(groupAddress);

之后则可和UDP/IP+BIO一样通过receive和send方法来进行读写操作。

Client端代码和服务端代码基本一致,就不再列举了。

在Java应用中,多播通常用于多台机器的状态的同步。例如JGroups,默认基于UDP/IP多播协议,由于UDP/IP协议在数据传输时不够可靠,对于可靠性要求很高的系统,会希望采用多播方式,同时又要做到可靠。对于这样的需求,业界提出了一些能够确保可靠实现多播的方式:SRM(Scalable Reliable Multicast)http://ee.lbl.gov/papers/srm_ton.pdf、URGCP(Uniform Reliable Group Communication Protocol),其中SRM是在UDP/IP多播的基础上增加了确认机制,从而保证可靠,eBay采用了SRM框架来实现将数据从主数据库同步到各个搜索节点机器http://highscalability.com/ebay-architecture

从上面的介绍来看,使用Java包来实现基于消息方式的系统间通信还是比较麻烦。为了让开发人员能更加专注对数据进行业务处理,而不用过多关注纯技术细节,开源业界诞生了很多优秀的基于以上各种协议的系统间通信的框架。这其中的佼佼者就是Mina了,1.1.2节将会介绍这个佼佼者。

1.1.2 基于开源框架实现消息方式的系统间通信

这一节讲述基于Mina如何实现消息方式的系统间通信,同时分析开源通信框架的优势。

Mina是Apache的顶级项目http://mina.apache.org,基于Java NIO构建,同时支持TCP/IP和UDP/IP两种协议。Mina对外屏蔽了Java NIO使用的复杂性,并在性能上做了不少的优化。

在使用Mina时,关键的类为IoConnector、IoAcceptor、IoHandler及IoSession,Mina采用Filter Chain的方式封装消息发送和接收的流程,在这个Filter Chain过程中可进行消息的处理、消息的发送和接收等。

IoConnector负责配置客户端的消息处理器、IO事件处理线程池、消息发送/接收的Filter Chain等。

IoAcceptor负责配置服务器端的IO事件处理线程池、消息发送/接收的Filter Chain等。

IoHandler作为Mina和应用的接口,当发生了连接事件、IO事件或异常事件时,Mina都会通知应用所实现的IoHandler。

IoSession有点类似SocketChannel的封装,不过Mina对连接做了进一步的抽象,因此可进行更多连接的控制及流信息的输出。

基于Mina实现TCP/IP+NIO客户端的关键代码如下:

    // 创建一个线程池大小为CPU核数+1的SocketConnector对象
    SocketConnector ioConnector = new
    SocketConnector(Runtime.getRuntime().availableProcessors() + 1,
    Executors.newCachedThreadPool());
    // 设置TCP NoDelay为true
    ioConnector.getDefaultConfig().getSessionConfig().setTcpNoDelay(true);
    // 增加一个将发送对象进行序列化以及接收字节流进行反序列化的类至filter Chain
    ioConnector.getFilterChain().addLast("stringserialize", new
    ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
    // IoHandler的实现,以便当mina建立连接、接收到消息后通知应用
    IoHandler handler=new IoHandlerAdapter(){
                public void messageReceived(IoSession session, Object message)
                        throws Exception {
                    System.out.println(message);
                }
      };
            // 异步建立连接
    ConnectFuture connectFuture = ioConnector.connect(socketAddress,handler);
            // 阻塞等待连接建立完毕,如须设置连接创建的超时时间,可调用
    SocketConnectorConfig.setConnectTimeout(以秒为单位的超时时间)
      connectFuture.join();
    IoSession session=connectFuture.getSession();
    // 发送对象
    session.write(Object);

使用Mina后,客户端的代码变得简单多了。

服务器端关键代码如下:

    // 创建一个线程池大小为CPU核数+1的IoAcceptor对象
    final IoAcceptor acceptor=new
    SocketAcceptor(Runtime.getRuntime().availableProcessors() + 1,
                                  Executors.newCachedThreadPool());
    acceptor.getFilterChain().addLast("stringserialize", new
    ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
    IoHandler handler=new IoHandlerAdapter(){
                public void messageReceived(IoSession session, Object message)
                        throws Exception {
                    // 接收到客户端发送的对象
                }
            };
    // 绑定监听的端口以及当有连接建立、接收到对象等事件发送时需要通知的IoHandler对象
    acceptor.bind(new InetSocketAddress(port), handler);

采用Mina后,服务器端代码无须再关注建立连接时的OP_ACCEPT的事件,同样也无须去注册OP_READ、OP_WRITE这些Key的事件,取而代之的是更友好地使用接口。通过这些封装使用者可以非常方便地使用,而无须过多考虑Java NIO的用法。但Mina 2.0之前的版本中并未提供连接的管理(连接的创建、自动重连、连接的心跳、连接池等)、同步发送数据支持,因此在实际使用中通常还需在Mina的基础上进行封装。

在使用Mina 2.0之前的版本时,以下几个方面值得注意:

● 使用自定义的ThreadModel

通过SocketConnectorConfig.setThreadMode(l ThreadModel.MANUAL)将线程模式改为自定义模式,否则Mina会自行启动一个最大线程数为16个的线程池来处理具体的消息,这对于多数应用而言都不适用,因此最好是自行控制具体处理消息的线程池。

● 合理配置IO处理线程池

在创建SocketAcceptor或SocketConnector时要提供一个线程池及最大的线程数,也就是Mina用于IO事件处理的线程数,通常建议将这个线程数配置为CPU核数+1。

● 监听是否成功写入操作系统的发送缓冲区

在调用IoSession.write时,Mina并不确保其会成功写入操作系统的发送缓冲区中(例如写入时连接刚好断开),为了确定是否成功,可采用如下方法:

    WriteFuture writeResult=session.write(data);
    writeResult.addListener(new IoFutureListener() {
      public void operationComplete(IoFuture future){
          WriteFuture wfuture=(WriteFuture)future;
          // 写入成功
          if(wfuture.isWritten()){
              return;
          }
              // 写入失败,自行进行处理
      }
    });

这对于同步请求而言特别重要,通常同步请求时都会设置一个等待响应的超时时间,如果不去监听是否成功写入的话,那么同步的请求一直要等到设定的超时时间才能返回。

● 监听写超时

当接收消息方的接收缓冲区占满时,发送方会出现写超时的现象,这时Mina会向外抛出WriteTimeoutException,如有必要,可在IoHandler实现的exceptionCaught方法里进行处理。

● 借助Mina IoSession上未发送的bytes信息实现流控

当IoSession上堆积了过多未发送的byte时,会造成jvm内存消耗过多的现象。因此通常要控制IoSession上堆积的未发送的byte量,此值可通过Mina IoSession的getScheduledWriteBytes来获取,从而进行流控。

● messageReceived方法占用IO处理线程

在使用Thread.MANUAL的情况下,IOHandler里的messageReceived方法会占用Mina的IO处理线程,为了避免业务处理接收消息的速度影响IO处理性能,建议在此方法中另起线程来做业务处理。

● 序列化/反序列化过程会占用IO处理线程

由于Mina的序列化/反序列化过程是在FilterChain上做的,同样会占据IO处理线程。Mina将同一连接上需要发送和接收的消息放在队列中串行处理。如果序列化/反序列化过程耗时较长,就会造成同一连接上其他消息的接收或发送变慢。

● 反序列化时注意继承CumulativeProtocolDecoder

在使用NIO的情况下,每次读取的流并不一定完整,因此要通过继承CumulativeProtocolDecoder来确保当流没读完时,下次接着读,这同时也要求应用在协议头中保持此次发送流的长度信息。

● Mina 1.1.6及以前的版本中sessionClosed可能会不被调用的bug

在某些高压力的情况下,当连接断开时,Mina 1.1.6及以前的版本并不会调用IoHandler中的sessionClosed方法,这对于某些要在sessionClosed做相应处理的应用来说会出现问题,这个bughttps://issues.apache.org/jira/browse/DIRMINA-549在Mina 1.1.7的版本中已修复。

除了Mina之外,JBoss Netty也是现在一个广受关注的Java通信框架,其作者也是Mina的作者(Trustin Lee),据评测JBoss Netty的性能好于Minahttp://www.jboss.org/netty/performance,如读者感兴趣,可访问http://www.jboss.org/netty来了解更多的细节。

以上两节介绍了基于Java自身包及开源通信框架来实现消息方式的系统间通信,Java系统内的通信都是以Java对象调用的方式来实现的,例如A a =new AImpl();a.call();,但当系统变成分布式后,就无法用以上的方式直接调用了,因为在调用端并不会有AImpl这个类。这时如果通过基于以上的消息方式来做,对于开发而言就会显得比较晦涩了,因此Java中也提供了各种各样的支持对象方式的系统间通信的技术,例如RMI、WebService等。同样,在Java中也有众多的开源框架提供了RMI、WebService的实现和封装,例如Spring RMI、CXF等,下面来看看基于远程调用方式如何实现系统间的通信。