jdk1.8种parallel并发问题

背景:在使用无序并发执行一些循环方法的时候,并发执行可以有效的避免串行方法调用链路的等待时间

例如:

@Test
public void testParallel() {
    List<Integer> list = Lists.newArrayList();
    for (int i = 0; i < 10000; i++) {
        list.add(i);
    }
    System.out.println(list.size());
    List<Integer> streamList = new ArrayList<>();
    list.parallelStream().forEach(streamList::add);
    System.out.println(streamList.size());
}

编译结果:

观察发现,原来集合中的数据有10000条,但是使用并行流遍历数据插入到新集合streamList中后,新的集合中只有5746条数据。并且会在多次之后可能会出现数组下标越界异常和null,显然这里的代码是不合逻辑的。

分析:

parallelStream中使用的是ForkJobTask。Fork/Join的框架是通过把一个大任务不断fork成许多子任务,然后多线程执行这些子任务,最后再Join这些子任务得到最终结果。从程序上看,就是先将list集合fork成多段,然后多线程添加到streamList的结合中,而streamList是ArrayList类型,它的add方法并不能保证原子性。

ArrayList种add源码如下:

    /**
     * Appends the specified element to the end of this list.
     *
     * @param e element to be appended to this list
     * @return <tt>true</tt> (as specified by {@link Collection#add})
     */
    public boolean add(E e) {
        ensureCapacityInternal(size + 1);  // Increments modCount!!
        elementData[size++] = e;
        return true;
    }

在并发情况下,如果同时有A、B两个线程同时执行add,在第一步ensureCapacityInternal校验数组容量时,A、B线程都发现当前容量还可以添加最有一个元素,不需扩容;因此进入第二步,此时,A线程先执行完,数组容量已满,然后B线程再对elementData赋值时,就会抛出“ArrayIndexOutOfBoundsException”。

解决方法:

第一种:将parallelStream改成stream,或者直接使用foreach处理。这可以通过判断并发处理真实能带来多大的好处,做取舍。

第二种:使用resultList =new CopyOnWriteArrayList<>(); 这是个线程安全的类。从源码上看,CopyOnWriteArrayList在add操作时,通过ReentrantLock进行加锁,防止并发写。不给过CopyOnWriteArrayList,每次add操作都是把原数组中的元素拷贝一份到新数组中,然后在新数组中添加新元素,最后再把引用指向新数组。这会导致频繁的对象创建,况且数组还是需要一块连续的内存空间,如果有大量add操作,慎用。

第三种:使用包装类 resultList = Collections.synchronizedList(Arrays.asList());