jdk1.8种parallel并发问题
背景:在使用无序并发执行一些循环方法的时候,并发执行可以有效的避免串行方法调用链路的等待时间
例如:
@Test
public void testParallel() {
List<Integer> list = Lists.newArrayList();
for (int i = 0; i < 10000; i++) {
list.add(i);
}
System.out.println(list.size());
List<Integer> streamList = new ArrayList<>();
list.parallelStream().forEach(streamList::add);
System.out.println(streamList.size());
}
编译结果:
观察发现,原来集合中的数据有10000条,但是使用并行流遍历数据插入到新集合streamList中后,新的集合中只有5746条数据。并且会在多次之后可能会出现数组下标越界异常和null,显然这里的代码是不合逻辑的。
分析:
parallelStream中使用的是ForkJobTask。Fork/Join的框架是通过把一个大任务不断fork成许多子任务,然后多线程执行这些子任务,最后再Join这些子任务得到最终结果。从程序上看,就是先将list集合fork成多段,然后多线程添加到streamList的结合中,而streamList是ArrayList类型,它的add方法并不能保证原子性。
ArrayList种add源码如下:
/**
* Appends the specified element to the end of this list.
*
* @param e element to be appended to this list
* @return <tt>true</tt> (as specified by {@link Collection#add})
*/
public boolean add(E e) {
ensureCapacityInternal(size + 1); // Increments modCount!!
elementData[size++] = e;
return true;
}
在并发情况下,如果同时有A、B两个线程同时执行add,在第一步ensureCapacityInternal校验数组容量时,A、B线程都发现当前容量还可以添加最有一个元素,不需扩容;因此进入第二步,此时,A线程先执行完,数组容量已满,然后B线程再对elementData赋值时,就会抛出“ArrayIndexOutOfBoundsException”。
解决方法:
第一种:将parallelStream改成stream,或者直接使用foreach处理。这可以通过判断并发处理真实能带来多大的好处,做取舍。
第二种:使用resultList =new CopyOnWriteArrayList<>(); 这是个线程安全的类。从源码上看,CopyOnWriteArrayList在add操作时,通过ReentrantLock进行加锁,防止并发写。不给过CopyOnWriteArrayList,每次add操作都是把原数组中的元素拷贝一份到新数组中,然后在新数组中添加新元素,最后再把引用指向新数组。这会导致频繁的对象创建,况且数组还是需要一块连续的内存空间,如果有大量add操作,慎用。
第三种:使用包装类 resultList = Collections.synchronizedList(Arrays.asList());
评论区