栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Socket(AIO)实现的客户端与服务端之间通信(用socket实现客户端与服务端通信)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Socket(AIO)实现的客户端与服务端之间通信(用socket实现客户端与服务端通信)

AIO 介绍

        AIO是在NIO基础上引入了异步通道的概念,并提高了异步文件和异步套接字通道的实现,从而在真正意义上实现了异步阻塞,之前我们学习的NIO只是非阻塞而并非异步。而AIO它不需要通过多路复用器对注册的通道进行轮询操作即可实现异步读写,从而简化了NIO编程模型。也可以称之为NIO 2.0,这种模式才真正的属于我们异步非阻塞的模型。

AIO 设计原理

        AIO 并没有采用NIO的多路复用器,而是使用异步通道的概念。其read,write方法的返回类型都是Future对象。而Future模型是异步的,其核心思想是:去主函数等待时间。

服务端
package com.example.netty.socket.aio;

import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class Server {

    //线程池
    private ExecutorService executorService;
    //线程组
    private AsynchronousChannelGroup channelGroup;
    //服务器通道
    public AsynchronousServerSocketChannel serverSocketChannel;

    public Server(int port) {
        try {
            //创建一个线程池
            executorService = Executors.newCachedThreadPool();
            //创建一个线程组
            channelGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1);
            //创建服务器通道
            serverSocketChannel = AsynchronousServerSocketChannel.open(channelGroup);
            //进行端口绑定
            serverSocketChannel.bind(new InetSocketAddress(port));
            //进行阻塞 (内部递归调用)
            serverSocketChannel.accept(this,new ServerCompletionHandler());
            //一直阻塞 不让服务停止
            Thread.sleep(Integer.MAX_VALUE);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        new Server(8765);
    }
}
package com.example.netty.socket.aio;

import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutionException;


public class ServerCompletionHandler implements CompletionHandler {


    
    @Override
    public void completed(AsynchronousSocketChannel result, Server attachment) {

        
        attachment.serverSocketChannel.accept(attachment, this);
        read(result);
    }

    @Override
    public void failed(Throwable exc, Server attachment) {

    }

    
    private void read(AsynchronousSocketChannel asc) {
        //读取数据缓冲区
        ByteBuffer buf = ByteBuffer.allocate(1024);
        //异步读取数据
        asc.read(buf, buf, new CompletionHandler() {
            
            @Override
            public void completed(Integer result, ByteBuffer attachment) {
                //进行读取数据后,重置标识位
                attachment.flip();
                //获得读取数据的长度
                System.out.println("服务端读取客户端数据长度:" + result);
                //获得读取的数据
                String data = new String(attachment.array()).trim();
                System.out.println("服务端读取客户端数据:" + data);
                String response = "服务器响应,接收到客户端发送的数据:" + data;
                write(asc, response);
            }

            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                exc.printStackTrace();
            }
        });
    }

    
    private void write(AsynchronousSocketChannel asc, String response) {
        try {
            //写入数据缓冲区
            ByteBuffer buf = ByteBuffer.allocate(1024);
            //写入缓冲区
            buf.put(response.getBytes());
            //进行读取数据后,重置标识位 复位
            buf.flip();
            //写出数据 异步写数据
            asc.write(buf).get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }


}
客户端
package com.example.netty.socket.aio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.ExecutionException;


public class Client implements Runnable {

    //客户端通道
    private AsynchronousSocketChannel asc;

    public Client() {
        try {
            //打开客户端通道
            asc = AsynchronousSocketChannel.open();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    
    private void connect() {
        asc.connect(new InetSocketAddress("127.0.0.1", 8765));
    }

    
    private void write(String request) {
        try {
            //写入数据
            asc.write(ByteBuffer.wrap(request.getBytes())).get();
            read();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

    
    private void read() {
        ByteBuffer buf = ByteBuffer.allocate(1024);
        try {
            //读取数据到缓冲区
            asc.read(buf).get();
            //缓冲区数据复位
            buf.flip();
            //根据缓冲区可用长度初始化byte数据
            byte[] respByte = new byte[buf.remaining()];
            //读取缓冲区数据到respByte
            buf.get(respByte);
            //打印读取到的数据
            System.out.println("读取服务端返回数据:" + new String(respByte, "utf-8").trim());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        while (true) {

        }
    }

    public static void main(String[] args) throws InterruptedException {

        Client c1 = new Client();
        c1.connect();
        Client c2 = new Client();
        c2.connect();
        Client c3 = new Client();
        c3.connect();

        new Thread(c1, "c1").start();
        new Thread(c2, "c2").start();
        new Thread(c3, "c3").start();

        Thread.sleep(1000);
        c1.write("c1 data");
        c2.write("c2 data");
        c3.write("c3 data");
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/773306.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号