`
iwindyforest
  • 浏览: 230258 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

从TcpSocket上读取数据的三种方式

阅读更多
我在一个项目中碰到了一个TcpSocket的应用。在java程序中使用TcpSocket同本机的一个服务进行进程间的通信。

由于通信路径只是单机并没有经过网络,因此两个进程之间的互通相对与网络传输是比较快速的。因此,进程间的交互使用了如下方式:

(见上传图片)

让我们看一下代码实现:

	public synchronized void send(byte[] bytes) throws IOException
	{
		if (bytes != null && bytes.length > 0)
		{
			this.bos.write(bytes);
			this.bos.flush();
		}
	}

	/**
	 * 尝试读取一次,速度最快,但返回的信息可能不全
	 */
	public synchronized byte[] receiveOnce() throws IOException
	{
		byte[] reciveBytes = new byte[0];
		int len = this.bis.read(this.b_buf, 0, this.bufferSize);

		return ArrayUtils.addAll(reciveBytes, ArrayUtils.subarray(this.b_buf, 0, len));
	}


	/**
	 * 用于发送并接收数据,在返回数据比较少的情况下使用
	 */
	public byte[] sendAndReceiveOnce(byte[] bytes) throws IOException
	{
		this.send(bytes);
		return this.receiveOnce();
	}


我们通过调用sendAndReceiveOnce()来接收并返回数据。

这样实现会导致什么问题呢?

1.       最明显的就是发送数据,和接收数据在同一个线程里边,如果运行在主线程,那么主线程将会在执行receiveOnce()的时候停滞,除非接收到数据或者触发超时异常。但之前已经说明了,应用本函数的环境是单机的进程间的通信,因此接收到数据的时间实际上就取决于Server处理并给予响应时间的长短,不存在网络传输的滞留时间。所以,如果在Server响应快速的情况下,客户端Socket几乎感觉不到延迟太多。

2.       再一个明显问题就是,receiveOnce()根据其函数名称就可以得知,它只能读取一次,如果返回信息的长度,大于其缓冲区的长度,那它只能得到部分数据了。

综合分析以上两种情况,我们先解决问题2吧,看看如何读取完整的响应信息?

下面是我的实现方法:

   
    /**
     * 从流的开头开始读取,读取全部的输入数据并返回 如果执行read的时候流中没有数据则阻塞读取直到超时
     * 最少经过一次超时,因此速度比较慢,但读的时候流中没有数据可以等到超时 ,因此获取数据比较准确
     * 
     * @return
     * @throws 除了SocketTimeoutException的一切IOException
     */
    public synchronized byte[] blockReceive() throws IOException
    {
        byte[] reciveBytes = new byte[0];
        // 偏移量
        int offset = 0;
        // 每次读取的字节数
        int len = 0;
        while(len != -1)
        {
            try
            {
                len = this.in.read(this.buffer, 0, this.bufferSize);
            }
            catch(SocketTimeoutException e)
            {
                break;
            }

            if(len != -1)
            {
                reciveBytes = ArrayUtils.addAll(reciveBytes, ArrayUtils
                        .subarray(this.buffer, 0, len));
                offset = offset + len;
            }
        }

        return reciveBytes;
    }


这个方法就如同它的注释所说的那样,它总是尝试去仅可能多的读取信息,但是在触发了一次超时之后将会返回读取到的字节。

有人提问:那判断流是否已经读完不是看读到最后会返回-1么?
我的回答是:返回-1取决于处理的流的源头是什么,如果是文件流,这种想法或许是对的,因为文件流的大小是固定的,持续的读,总会读到文件的末尾(EOF),它总会返回-1的。

就像下面的文件流在读取文件的实现一样,我们这样读取文件流:
    
    protected byte[] receiveBytes() throws IOException
    {
        byte[] reciveBytes = new byte[0];
        // 偏移量
         int offset = 0;
        // 每次读取的字节数
         int len = 0;
        while(len != -1)
        {
            this.buffer = new byte[bufferSize];
            len = this.in.read(this.buffer, 0, this.bufferSize);

            if(len != -1)
            {
                reciveBytes = ArrayUtils.addAll(reciveBytes, this.buffer);
                offset = offset + len;
            }
        }
        return ArrayUtils.subarray(reciveBytes, 0, offset);
    }


但是,如果是网络流,例如TcpSocket,这样的流是没有末尾(EOF)的,如果你想读到-1,或许在远端被关闭了,而你还在读取,还是有可能读到-1的。实际情况是:网络连接状况很复杂,很有可能远端没有正常关闭而是进程死掉了,而是连接的线路断掉了,或者任何一个原因导致连接的通路无法正常传输数据。由于在这种情况下,java中BufferedInputStream的read(byte[] b, int off, int len)函数(其它流对象也一样)总是尝试读取更多的数据,如果没有设置超时,就会一直堵塞在那里,像死掉了一样。而不是像你所期待的那样,返回-1。因此,我们才才用让它最少经过一次超时,来尝试读取更多的数据。当然,这也是仅仅在网络状况足够好的情况下,或者超时对于响应结果不会影响太多的情况下的解决方法。

(加一个小插曲:前段时间本人曾经在电话里面被一个面试我的作开发的兄弟考到“怎么获取tcpScoket远端关闭”,我就是阐述了因为以上观点“检测远端是否关闭的最好方法就是向远端发送数据,看是否发生IO异常”,而那位仁兄一直坚持远端关闭得到-1是对的,我说不对就反问:如果不是关闭,而是把网线切断或者网络不通也能得到-1么?仁兄语塞,面试后来以尴尬结束。后来反思自己当时实在太轻狂了,没给仁兄面子。去不去应聘倒无所谓,但是态度还是不是面试时应该有的态度啊。现在想向那位仁兄道歉,也没机会了)

那现在又有一个问题了,虽然与远端的交互出现无法读取到数据的时候不会一直堵塞在那里,像死掉了一样。但是我在使用blockReceive()的时候总是需要等一个超时的时间才能返回!

那我如果设超时为5秒,操作完了之后,也至少等到5秒才能达到消息的反馈。哦,天哪,慢到要死了!!!!
这个问题必须解决,那么:
让我们用更聪明的实现方法吧,我们不要阻塞了!!!

    /**
     * 如果流中有数据,则从流的开头开始读取,读取全部的输入数据并返回,否则马上返回空 尝试获取当前流中的最大数据量,
     * 由于使用了非阻塞接收,需要保证在执行本函数的时候流中恰好有数据, 在执行此函数之前必须给后台足够的响应时间
     *
     * @return
     * @throws 除了SocketTimeoutException的一切IOException
     */
    public synchronized byte[] unblocReceive() throws IOException
    {
        byte[] reciveBytes = new byte[0];
        // 当前流中的最大可读数
        int contentLength = this.in.available();
        // 偏移量
        int offset = 0;
        // 每次读取的字节数
        int len = 0;

        while(contentLength > 0 && offset < contentLength && len != -1)
        {
            try
            {
                len = this.in.read(this.buffer, 0, this.bufferSize);
            }
            catch(SocketTimeoutException e)
            {
                break;
            }

            if(len != -1)
            {
                reciveBytes = ArrayUtils.addAll(reciveBytes, ArrayUtils
                        .subarray(this.buffer, 0, len));
                offset = offset + len;
            }
        } 

        return reciveBytes;
    } 


我们发现:这个方法真是不错!我们每次在读取数据之前,总是先用available()方法获取流中当前的最大可读字节数,然后再读。否则,我直接返回。但是为了以防我在读取数据的时候也出现超时问题导致堵塞,我还是小心的加入了超时的处理,虽然它在绝大部分情况下并不会发生。

好了!现在我们满怀希望的来调用所谓的“完美”解决方案:
	public byte[] sendAndUnblockReceive(byte[] bytes) throws IOException
	{
		this.send(bytes);		
		return this.unblocReceive();
	}


然后,测试,却发现了一个奇怪的现象:

我们在程序里面连续调用了两次sendAndUnblockReceive(),并期待每次发送都会迅速并完整准确的接收它们每次的响应。但是,没有效果。事实是:我第一次发送的请求,并没有接收到它需要的正确响应。而我们第二次发送的请求,却接收到了第一次的响应,还要第二次的响应,这样两条数据!!!

这是为什么呢?因为:


我们第一次在发送完数据之后,马上就调用了unblocReceive()。但是由于这次我们调用的实在太快了,Server那一端没来的及处理,甚至没来的及接收,更不用说响应了。因此unblocReceive()里面我们用available()方法获取流中当前的最大可读字节数为0!!!因此,当然就不会读取了!!!而第二次再发送时,第一次的响应刚刚到达,因此,unblocReceive()再被第二次调用的时候“尽最大可能”的读取到了这两次的响应信息。


唉,看来就没有更好的方法了么?或许,还是有的吧!!!

我们先这样改一下:

	public byte[] sendAndUnblockReceive(byte[] bytes) throws IOException
	{
		this.send(bytes);
		// 由于使用了非阻塞接收,为保证在执行read的时候流中恰好有数据,
		// 必须给后台足够的响应时间
		try
		{
			Thread.sleep(500);
		}
		catch (InterruptedException e)
		{
			logger.error("InterruptedException error.", e);
		}
		return this.unblocReceive();
	}


强制的让线程在发送完后sleep一端时间,半秒钟,给Server足够的响应时间,然后再去读取,或许,这样比那个blockReceive()的实现要好一点吧。



最后来一下总结:

我们在这个TcpScoket中,在发送和读取使用同一线程的情况下,使用了三种读取方式:

一次读取,阻塞式完整读取,非阻塞式完整读取

这三种读取方式的优缺点分析如下:

一次读取receiveOnce():   是最快速,并且在缓冲区足够大的情况下能够完整读取的方法,当然如果没有设置超时,它仍然用可能存在阻塞。

阻塞式完整读取blockReceive():在返回数据之前总是至少经过一次超时以读取更多数据,因此在网络状况足够好的情况下,速度仍然比较慢。

非阻塞式完整读取unblocReceive(): 在尝试读取数据之前,首先判断可以读取的最大字节数,如果有数据则尝试去读,否则直接返回。所有是这一种不用考虑缓冲区大小,还能兼顾速度的方法。但是如果远端响应慢的情况下,依然会错过读取数据。

综合上述三中读取方式:我们可以在确定返回数据量较少,而又要求速度快而准确的情况下,使用receiveOnce()。在返回数据量较多,而又要求速度快而准确的情况下,使用unblocReceive(),不过需要留给远端足够的响应时间。在不需要响应速度很快,而需要返回大量数据,而且准确的情况下使用blockReceive()。

现在我们抛开这些,想一想:

我们真的需要这样三种读取方式吗?需要吗?

我们为什么这么罗里罗嗦使用这三种方式?

因为,我们把发送数据,和接收数据这两个功能放在一个线程里执行了!!!

这才是最主要的问题!

因此,:

尽量不要在使用Socket的流的时候,把发送数据和接收数据的调用放在一个线程里。

因为,网络上的流是不稳定的,因此java在设计流的时候也是尽量去读取尽可能多的数据,很可能发生堵塞。如果放在一个线程里面,试图我发送了就会想当然的又快又准确的接收到,就会像上面的解决方案一样,用尽招数,仍然束手无策。
  • 大小: 16.4 KB
分享到:
评论
1 楼 presses 2008-07-08  
一、TCP层很多都实现了保活定时器,当客户端socket断开时,服务端对应的socket会自动抛出异常。
  所以不需要靠读到"-1"来判断socket的关闭。
二、如果客户端和服务端用Socket传输信息,最好有自已的协议,例如用一串特殊字符(magic number)时来分割数据,当  一端读到magic number时,就把数据返回给业务层处理。
三、对这类Socket应用,最好还是多线程吧。用多线程的原因并不仅仅是处理网络延时。处理并发等,也是原因。

相关推荐

Global site tag (gtag.js) - Google Analytics