栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Java8新特性 - Stream - 20 - Stream的并行流与安全性处理

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Java8新特性 - Stream - 20 - Stream的并行流与安全性处理

1.说明

本文主要对并行流的常规操作进行了纪录,并没有详细的讲解。

2.代码
package com.northcastle.I_stream;



import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import java.util.stream.Stream;


public class StreamParallel {

    
    @Test
    public void test01(){
        String[] strs = {"aaa","bbb","ccc","ddd","eee","fff"};
        Stream.of(strs).map(s -> {
            System.out.println(Thread.currentThread()+" : "+s);
            return Thread.currentThread().getName()+"->"+s;
        }).count();
    }

    
    @Test
    public void test02(){
        //方式一 : parallelStream() 方法
        ArrayList list = new ArrayList<>();
        Stream parallelStream01 = list.parallelStream();
        System.out.println(parallelStream01);

        //方式二 :串行流转化为并行流
        Stream stream = list.stream();
        Stream parallelStream02 = stream.parallel();
        System.out.println(parallelStream02);
    }

    
    @Test
    public void test03(){
        String[] strs = {"aaa","bbb","ccc","ddd","eee","fff"};

        Stream.of(strs).parallel() // 串行流转换为并行流
                .map(s -> {
            System.out.println(Thread.currentThread()+" : "+s);
            return Thread.currentThread().getName()+"->"+s;
        }).count();

    }

    

    long beginTime = 0; // 开始时间
    long endTime = 0;//结束时间
    long times = 500000000; // 运行次数
    @Before
    public void before(){
        beginTime = System.currentTimeMillis();
        System.out.println("begin time : "+beginTime);
    }

    
    @Test
    public void commonFor(){
        long sum = 0;
        for (int i = 0; i <= times ; i++) {
            sum+=i;
        }
        System.out.println("sum = "+sum);
    }

    
    @Test
    public void serialStream(){
        long reduce = LongStream.rangeClosed(0, times)
                .reduce(0, Long::sum);
        System.out.println("sum = "+ reduce);
    }

    
    @Test
    public void parallelStream(){
        long reduce = LongStream.rangeClosed(0, times)
                .parallel() // 获取一个串行的流
                .reduce(0, Long::sum);
        System.out.println("sum = "+reduce);
    }

    @After
    public void after(){
        endTime = System.currentTimeMillis();
        System.out.println("end time : "+endTime);
        System.out.println("cost : "+(endTime - beginTime));
    }

    


    

    
    @Test
    public void parallelBug(){
        ArrayList list01 = new ArrayList<>();
        ArrayList list02 = new ArrayList<>();

        int times = 1000; // 定义元素的多少
        LongStream.rangeClosed(0, times).forEach(list01::add);
        System.out.println("串行流元素数量 = "+list01.size());

        LongStream.rangeClosed(0,times).parallel().forEach(list02::add);
        System.out.println("并行流元素数量 = "+list02.size());
    }

    
    @Test
    public void parallelResolve01(){

        ArrayList list01 = new ArrayList<>();
        ArrayList list02 = new ArrayList<>();

        int times = 1000; // 定义元素的多少
        LongStream.rangeClosed(0, times).forEach(list01::add);
        System.out.println("串行流元素数量 = "+list01.size());

        Object obj = new Object(); // 定义一个对象锁
        LongStream.rangeClosed(0,times)
                .parallel()
                .forEach(i ->{
                    synchronized (obj){
                        list02.add(i);
                    }
                });
        System.out.println("并行流元素数量 = "+list02.size());

    }
    
    @Test
    public void parallelResolve02(){

        ArrayList list01 = new ArrayList<>();
        Vector list02 = new Vector<>();

        int times = 1000; // 定义元素的多少
        LongStream.rangeClosed(0, times).forEach(list01::add);
        System.out.println("串行流元素数量 = "+list01.size());

        LongStream.rangeClosed(0,times)
                .parallel()
                .forEach(list02::add);
        System.out.println("并行流元素数量 = "+list02.size());

    }

    
    @Test
    public void parallelResolve03(){

        ArrayList list01 = new ArrayList<>();
        ArrayList list02 = new ArrayList<>();
        List synchronizedList02 = Collections.synchronizedList(list02); // 将list02 转为 线程安全的容器

        int times = 1000; // 定义元素的多少
        LongStream.rangeClosed(0, times).forEach(list01::add);
        System.out.println("串行流元素数量 = "+list01.size());

        LongStream.rangeClosed(0,times)
                .parallel()
                .forEach(synchronizedList02::add);
        System.out.println("并行流元素数量 = "+list02.size());

    }

    
    @Test
    public void parallelResolve04(){

        ArrayList list01 = new ArrayList<>();
        List list02 = new ArrayList<>();

        int times = 1000; // 定义元素的多少
        LongStream.rangeClosed(0, times).forEach(list01::add);
        System.out.println("串行流元素数量 = "+list01.size());

        list02 = LongStream.rangeClosed(0, times)
                .parallel() // 转成 并行流
                .boxed() // 把一个 long 类型转换成了 Long 封装类型的数据
                .collect(Collectors.toList());
        System.out.println("并行流元素数量 list02 = "+list02.size());

        Object[] objects = LongStream.rangeClosed(0, times)
                .parallel() // 转成 并行流
                .boxed() // 把一个 long 类型转换成了 Long 封装类型的数据
                .toArray();
        System.out.println("并行流元素数量 objects = "+objects.length);

    }

    


}

3.完成

Congratulatioins!
You are one step closer to success!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/770381.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号