栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Java使用零拷贝上传文件

Java使用零拷贝上传文件

文章目录
  • 前言
  • zero copy
    • Server
    • Client
  • traditional
    • Server
    • Client

前言

我们知道Kafka和Netty这些框架都使用了零拷贝来提升性能,那自己的项目如何能像他们一样使用零拷贝呢?下面给出简演示的代码

zero copy Server

服务端准备接受文件

package org.nefu.copy.zero;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

public class TransferToServer {
    ServerSocketChannel listener = null;

    protected void mySetup() {
        InetSocketAddress listenAddr = new InetSocketAddress(5230);

        try {
            listener = ServerSocketChannel.open();
            ServerSocket ss = listener.socket();
            ss.setReuseAddress(true);
            ss.bind(listenAddr);
            System.out.println("Listening on port : " + listenAddr.toString());
        } catch (IOException e) {
            System.out.println("Failed to bind, is port : " + listenAddr.toString()
                    + " already in use ? Error Msg : " + e.getMessage());
            e.printStackTrace();
        }

    }

    public static void main(String[] args) {
        TransferToServer dns = new TransferToServer();
        dns.mySetup();
        dns.readData();
    }

    private void readData() {
        ByteBuffer dst = ByteBuffer.allocate(4096);
        try {
            while (true) {
                SocketChannel conn = listener.accept();
                System.out.println("Accepted : " + conn);
                conn.configureBlocking(true);
                long sum = 0;
                int nread = 0;
                while (nread != -1) {
                    try {
                        nread = conn.read(dst);
                    } catch (IOException e) {
                        e.printStackTrace();
                        nread = -1;
                    }
                    sum += nread;
                    dst.rewind();
                }
                System.out.println("receive total " + sum + " bytes");
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
Client

客户端发送文件

package org.nefu.copy.zero;

import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;

public class TransferToClient {

    public static void main(String[] args) throws IOException {
        TransferToClient sfc = new TransferToClient();
        sfc.testSendfile();
    }

    public void testSendfile() {
        SocketAddress sad = new InetSocketAddress("localhost", 5230);
        String fname = "/Users/jiajunbernoulli/Downloads/Softwares/jdk-11.0.1_windows-x64_bin.exe";
        long fsize = 158313840L;
        // FileProposerExample.stuffFile(fname, fsize);
        try (SocketChannel sc = SocketChannel.open();
            FileChannel fc = new FileInputStream(fname).getChannel()) {
            // Socket
            sc.connect(sad);
            sc.configureBlocking(true);
            // Transfer
            long start = System.currentTimeMillis();
            long curnset;
            curnset = fc.transferTo(0, fsize, sc);
            System.out.println("total bytes transferred--" + curnset + " and time taken in MS--" + (
                System.currentTimeMillis() - start));
        } catch (IOException e) {
            System.out.println(e);
        }
    }
}

traditional

同时也提供传统上传方式的代码,便于对比性能

Server
package org.nefu.copy.traditional;

import java.io.DataInputStream;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;


public class TraditionalServer {

    public static void main(String[] args) {

        DataInputStream inputStream = null;

        try (ServerSocket serverSocket = new ServerSocket(5230);) {
            System.out.println(
                String.format("Server listening on port %d waiting for client on port",
                    serverSocket.getLocalPort()));
            // server infinite loop for receive data from client
            while (true) {
                Socket socket = serverSocket.accept();
                System.out.println(
                    "New connection accepted " + socket.getInetAddress() + ":" + socket.getPort());
                inputStream = new DataInputStream(socket.getInputStream());

                byte[] readBuf = new byte[4096];
                while (true) {
                    int readN = inputStream.read(readBuf, 0, readBuf.length);

                    if (-1 == readN) {

                        break;
                    }
                }
                socket.close();
                inputStream.close();
            }
        } catch (IOException e) {
            System.out.println("io exception " + e.getMessage());
        }
    }
}
Client
package org.nefu.copy.traditional;

import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.Socket;
import java.net.UnknownHostException;
import org.apache.commons.io.FileUtils;

public class TraditionalClient {

    public static void main(String[] args) {
        String fileName = "/Users/jiajunbernoulli/Downloads/Softwares/jdk-11.0.1_windows-x64_bin.exe";
        try (Socket socket = new Socket("localhost", 5230);
            FileInputStream inputStream = new FileInputStream(fileName);
            DataOutputStream output = new DataOutputStream(socket.getOutputStream())) {
            // socket
            socket.setTcpNoDelay(true);
            socket.setReuseAddress(true);
            socket.setSendBufferSize(4096);
            System.out.println("Connected with server " +
                socket.getInetAddress() +
                ":" + socket.getPort());
            // file
            File file = new File(fileName);
            System.out.println("file size is " + FileUtils.sizeOf(file));
            long start = System.currentTimeMillis();
            byte[] b = new byte[4096];
            long read = 0, total = 0;
            while ((read = inputStream.read(b)) >= 0) {
                total = total + read;
                output.write(b);
            }
            output.flush();
            System.out.println(
                "bytes send--" + total + " and totaltime--" + (System.currentTimeMillis() - start));
        } catch (UnknownHostException e) {
            System.out.println(e);
        } catch (IOException e) {
            System.out.println(e);
        }
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/307282.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号