栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

java多线程启动BIO和NIO服务端同时启动接受图片和字符数据

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

java多线程启动BIO和NIO服务端同时启动接受图片和字符数据

main函数:

@MapperScan("com.naughty.userlogin02.dao")
@SpringBootApplication
public class Userlogin02Application  {

    private final static int AVALIABLE_PROCESSORS = Runtime.getRuntime().availableProcessors();
    private final static ThreadPoolExecutor POOL_EXECUTOR = new ThreadPoolExecutor(AVALIABLE_PROCESSORS, AVALIABLE_PROCESSORS * 2,
            1, TimeUnit.MINUTES, new linkedBlockingQueue<>(5),
            new ThreadPoolExecutor.CallerRunsPolicy());

//extends SpringBootServletInitializer
    public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {
//      ServerSocket socket = new ServerSocket(8800);

        SpringApplication.run(Userlogin02Application.class, args);

       TCPserver tcPserver =SpringUtils.getBean(TCPserver.class);
        FutureTask future = new FutureTask(tcPserver);
        new Thread(future).start();

Myselect myselect =SpringUtils.getBean(Myselect.class);
        FutureTask future2 = new FutureTask(myselect);
        new Thread(future2).start();


//Thread thread = new Thread();

    // System.out.println("future get"+future.get());
// new Myselect().getSelector();
}


}
BIO服务端:
@Component
public class TCPserver implements Callable {

private AtomicInteger pic = new AtomicInteger(0);


    public TCPserver() throws IOException {
    }
//@Async


    @Override
    public Object call() throws Exception {
        ServerSocket server = new ServerSocket(8003);
        while(true){
            //(2)开始在这里暂停等待接收客户端的连接,得到一个端到端的Socket管道
            Socket socket = server.accept();
            new ServerReadThread(socket).start();
            System.out.println(socket.getRemoteSocketAddress()+"上线了!");
        }

// 使用accept()阻塞当前线程,等待客户端请求
       // Socket socket = server.accept();


    }
    class ServerReadThread extends Thread {
        private Socket socket;

        public ServerReadThread(Socket socket) {
            this.socket = socket;
        }
        @Override
        public void run()
        {

            try ( // 创建一个ServerSocket监听8080端口的客户端请求

// 由Socket获得输入流,并创建缓冲输入流
                  BufferedInputStream in = new BufferedInputStream(socket.getInputStream());
// 由文件输出流创建缓冲输出流
                  //   System.getProperty("user.dir");
                  FileOutputStream out = new FileOutputStream("C:\Users\14172\Desktop\myback\resources\static\"+String.valueOf(pic)+".png")) {
                pic.addAndGet(1);
// 准备一个缓冲区
                byte[] buffer = new byte[1024];
// 首次从Socket读取数据
                int sum = 0;
                int len = in.read(buffer);
                while (len != -1) {
// 写入数据到文件

                    out.write(buffer, 0, len);
// 再次从Socket读取数据
                    len = in.read(buffer);
                    sum+=len;

                }
                System.out.println("sum"+sum);
                System.out.println("接收完成!");
                System.out.println(Thread.currentThread().getName());
                // return 1;
            } catch (IOException e) {
                e.printStackTrace();

                //  return 0;
            }finally {
                try {
                    socket.close();
                    System.out.println("closed");
                }catch (Exception e){
                      e.printStackTrace();

                }
            }

        }

}


}
NIO服务端:
package com.naughty.userlogin02.bean;

import com.alibaba.druid.sql.ast.statement.SQLIfStatement;
import com.naughty.userlogin02.service.WashService;
import com.naughty.userlogin02.service.WashServiceImpl;
import com.naughty.userlogin02.util.SpringUtils;
import com.naughty.userlogin02.util.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.io.FileOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;

import java.util.Arrays;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;


@RestController("Myselect")
public class Myselect implements Callable {
//
    @Autowired
    private WashService washService;
    @GetMapping("/washbase")
    public List getwash()
    {
        washService.allWash().stream().forEach(System.out::println);
        return washService.allWash();
    }

//@GetMapping("/select")
//    @Bean
//    @Qualifier("select")
    public void getSelector() throws IOException {
        //创建 Selector
       Selector selector = Selector.open();//用open方法创建
        ServerSocketChannel channel = ServerSocketChannel.open();//获取通道
        channel.configureBlocking(false);//切换为非阻塞模式
        channel.bind(new InetSocketAddress(8004));
        channel.register(selector, SelectionKey.OP_ACCEPT);//注册通道到选择器上,第二个参数为指定的事件为”监听接收事件“
//
//        * 读 : SelectionKey.OP_READ (1)
//* 写 : SelectionKey.OP_WRITE (4)
//* 连接 : SelectionKey.OP_ConNECT (8)
//* 接收 : SelectionKey.OP_ACCEPT (16)
//* 若注册时不止监听一个事件,则可以使用“位或”操作符连接。

        //轮询式的获取选择器上已经“准备就绪”的事件
        while (selector.select() > 0)
        {

            //7. 获取当前选择器中所有注册的“选择键(已就绪的监听事件)”
            Iterator it = selector.selectedKeys().iterator();
            while (it.hasNext())
            {
                //8. 获取准备“就绪”的是事件
                SelectionKey sk = it.next();
                //9. 判断具体是什么事件准备就绪
                if (sk.isAcceptable()) {
                    //10. 若“接收就绪”,获取客户端连接
                    SocketChannel sChannel = channel.accept();
                    //11. 切换非阻塞模式
                    sChannel.configureBlocking(false);
                    //12. 将该通道注册到选择器上
                    sChannel.register(selector, SelectionKey.OP_READ);
                } else if (sk.isReadable()) {
                    //13. 获取当前选择器上“读就绪”状态的通道
                    SocketChannel sChannel = (SocketChannel) sk.channel();
                    //14. 读取数据

                    ByteBuffer buf = ByteBuffer.allocate(5000);
                    int len = 0;
                    FileOutputStream out = new FileOutputStream("C:\Users\14172\Desktop\myback\resources\static\0.jpg");
                  //  FileOutputStream in = new FileOutputStream("C:\Users\14172\Desktop\myback\mydiary.txt");
                    FileChannel fileChannel = out.getChannel();

                    int length=0;
                    if(sChannel.socket().getPort() == 8000)
                    {

                       try{
                       // 暂时没有成功,无法接受到图片
                           //FileChannel的read方法 ,从通道读取数据并放到缓冲区中
                        while ((len = sChannel.read(buf)) > 0) {//把channel的东西读到buf
                            buf.flip();
                  //out.write(buf.array(),0,len);
                            while (buf.hasRemaining())
                            length += fileChannel.write(buf);//buf的数据写到filechannel

                            buf.clear();
                         out.flush();
                            System.out.println("长度" + length);
                        }
                    }finally {

                       fileChannel.close();

                           out.close();
                       }

                        byte[] buffer = new byte[1024];
// 首次从Socket读取数据

                    //    System.out.println("文件读写完毕");
                    }
                 else  if(sChannel.socket().getPort() == 8002)
                    {
                        try {
                            while ((len = sChannel.read(buf)) > 0) {
                                int alltime = 10;

                                buf.flip();
                                //  out.write(buf.array(),0,len);

                               String readfrompython = new String(buf.array(), 0, len);
                              //  fileChannel.write(buf);
                                 String[] arr = readfrompython.trim().split(",");
                                System.out.println("数据"+readfrompython+"长度"+readfrompython.length());
                            Arrays.stream(arr).forEach(System.out::print);
                                int[] time = new int[arr.length];

                       for (int i = 0; i < arr.length; i++) {
                           if(!StringUtils.isEmpty(arr[i])) {
                               time[i] = Integer.parseInt(arr[i]);
                               alltime += time[i];
                           }
                       }

                                System.out.println("total:" + alltime);

                                //wash.setTime(new Date());

                                Wash  wash2 = new Wash();
                                wash2.setLastTime(alltime);

                    //   WashService washService = SpringUtils.getBean("washservice");

                         washService.allWash().stream().forEach(System.out::println);
                           wash2.setTimes(washService.getWahNums()+1);
                           wash2.setAllTime(washService.gettotal()+alltime);//总共清洗时间

                                wash2.setWater(1.2);
                                wash2.setId(100);
                       wash2.setTime(new Date());

                                if(washService.save(wash2)) System.out.println("插入数据成功!");
                                else{
                                    System.out.println("插入失败");
                                }
                        System.out.println(new String(buf.array(), 0, len));

                                buf.clear();
                            }
                        } finally {
                        //    if(sChannel.read(buf)<0)
                            fileChannel.close();
                            out.flush();
                            out.close();
                        }
                    }
                }
                //15. 取消选择键 SelectionKey
                it.remove();
            }
        }
    }


    @Override
    public Object call() {

        System.out.println(Thread.currentThread().getName());
        try {
getSelector();
        }catch (IOException e)
        {
            e.printStackTrace();
        }
        return 0;
    }
}

客户端1(接收图片数据)
import os
import sys
import struct

# import win32ui

import socket

host = 'localhost'
port = 8003
bufsize = 1024
addr = (host, port)
mydaar = ('localhost',8012)
filepath = 'C:\Users\14172\PycharmProjects\pythonProject1\0.png'

def socket_client():
    try:

        optval = struct.pack("ii", 1, 0)
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.bind(mydaar)
        # s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, optval)
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        # level:选项定义的层次。支持SOL_SOCKET、IPPROTO_TCP、IPPROTO_IP和IPPROTO_IPV6。
        #
        # optname:需设置的选项。
        #
        # value:设置选项的值。
        s.connect((host, port))

    except socket.error as msg:
        print(msg)
        sys.exit(1)

    # print(s.recv(1024).decode("utf-8"))

    while 1:

        fp = open(filepath, 'rb')
        while 1:
            data = fp.read(1024)
            if not data:
                print('{0} file send over...'.format(filepath))
                break
            s.send(data)
        # print(s.recv(1024).decode("utf-8"))
        break

    s.close()


# HTTP断开连接需要经过四次挥手,第四次挥手后为了保证发送的ACK被对方接受需要等待2MSL时间 MSL:报文的最长生存时间

if __name__ == '__main__':
    socket_client()
客户端2(字符数据)
import os
import sys
import struct
# import win32ui

import socket

host = 'localhost'
port = 8004
bufsize = 1024
addr = (host, port)
mydaar = ('localhost',8002)
filepath = 'C:\Users\14172\PycharmProjects\pythonProject1\time.txt'

def socket_client():
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.bind(mydaar)
        s.connect((host, port))
    except socket.error as msg:
        print(msg)
        sys.exit(1)

    # print(s.recv(1024).decode("utf-8"))

    while 1:

        fp = open(filepath, 'rb')
        while 1:
            data = fp.read(1024)
            if not data:
                print('{0} file send over...'.format(filepath))
                break
            s.send(data)
        # print(s.recv(1024).decode("utf-8"))
        break


if __name__ == '__main__':
    socket_client()


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/295387.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号