文档

Java™ 教程-Java Tutorials 中文版
并行
Trail: Collections
Lesson: Aggregate Operations

并行

并行计算涉及将问题划分为子问题,同时解决这些问题(并行地,每个子问题在单独的线程中运行),然后组合子问题解决方案的结果。Java SE 提供 fork/join framework,使你可以更轻松地在应用程序中实现并行计算。但是,使用此框架,你必须指定如何细分(分区)问题。通过聚合操作,Java 运行时为你执行此分区和组合解决方案。

在使用集合的应用程序中实现并行性的一个难点是集合不是线程安全的,这意味着多个线程无法在不引入 线程干扰内存一致性错误的情况下操作集合。集合框架提供了 同步包装器,它将自动同步添加到任意集合,使其成为线程安全的。但是,同步会引入 线程争用。你希望避免线程争用,因为它会阻止线程并行运行。通过聚合操作和并行流,你可以使用非线程安全集合实现并行性,前提是在操作集合时不要修改集合。

请注意,并行不会自动地比串行执行操作更快,尽管如果你有足够的数据和处理器核心,并行可能会更快。虽然聚合操作使你能够更轻松地实现并行性,但你仍有责任确定你的应用程序是否适合并行性。

本节包括以下主题:

你可以在示例 ParallelismExamples 中找到本节中描述的代码片段。

并行执行流

你可以串行或并行执行流。当流并行执行时,Java 运行时将流分区为多个子流。聚合操作迭代并并行处理这些子流,然后组合结果。

创建流时,除非另行指定,否则它始终是串行流。要创建并行流,请调用操作 Collection.parallelStream。或者,调用操作 BaseStream.parallel。例如,以下语句并行计算所有男性成员的平均年龄:

double average = roster
    .parallelStream()
    .filter(p -> p.getGender() == Person.Sex.MALE)
    .mapToInt(Person::getAge)
    .average()
    .getAsDouble();

并发缩减

再次考虑以下示例(在 Reduction 部分中描述),该示例按性别对成员进行分组。此示例调用 collect 操作,该操作将集合 roster 缩减为 Map

Map<Person.Sex, List<Person>> byGender =
    roster
        .stream()
        .collect(
            Collectors.groupingBy(Person::getGender));

以下是等效的并行实现:

ConcurrentMap<Person.Sex, List<Person>> byGender =
    roster
        .parallelStream()
        .collect(
            Collectors.groupingByConcurrent(Person::getGender));

这称为 concurrent reduction (并发缩减)。如果对于包含 collect 操作的特定管道,以下所有条件均为 true,则 Java 运行时将执行并发缩减:

注意:此示例返回 ConcurrentMap 的实例,而不是 Map,并调用 groupingByConcurrent 操作而不是 groupingBy。(有关 ConcurrentMap 的更多信息,请参阅 Concurrent Collections 一节。)与 groupingByConcurrent 操作不同,操作 groupingBy 执行并行流很差。(这是因为它通过按键合并两个 map 来运行,这在计算上很昂贵。)同样,操作 Collectors.toConcurrentMap 对并行流的执行效果优于 Collectors.toMap

排序

管道处理流的元素的顺序取决于流是以串行还是并行方式执行,流的源和中间操作。例如,请考虑以下示例,该示例使用 forEach 操作多次打印 ArrayList 实例的元素:

Integer[] intArray = {1, 2, 3, 4, 5, 6, 7, 8 };
List<Integer> listOfIntegers =
    new ArrayList<>(Arrays.asList(intArray));

System.out.println("listOfIntegers:");
listOfIntegers
    .stream()
    .forEach(e -> System.out.print(e + " "));
System.out.println("");

System.out.println("listOfIntegers sorted in reverse order:");
Comparator<Integer> normal = Integer::compare;
Comparator<Integer> reversed = normal.reversed(); 
Collections.sort(listOfIntegers, reversed);  
listOfIntegers
    .stream()
    .forEach(e -> System.out.print(e + " "));
System.out.println("");
     
System.out.println("Parallel stream");
listOfIntegers
    .parallelStream()
    .forEach(e -> System.out.print(e + " "));
System.out.println("");
    
System.out.println("Another parallel stream:");
listOfIntegers
    .parallelStream()
    .forEach(e -> System.out.print(e + " "));
System.out.println("");
     
System.out.println("With forEachOrdered:");
listOfIntegers
    .parallelStream()
    .forEachOrdered(e -> System.out.print(e + " "));
System.out.println("");

此示例包含五个管道。它打印输出类似于以下内容:

listOfIntegers:
1 2 3 4 5 6 7 8
listOfIntegers sorted in reverse order:
8 7 6 5 4 3 2 1
Parallel stream:
3 4 1 6 2 5 7 8
Another parallel stream:
6 3 1 5 7 8 4 2
With forEachOrdered:
8 7 6 5 4 3 2 1

此示例执行以下操作:

副作用

如果方法或表达式除了返回或产生值之外还修改计算机的状态,则该方法或表达式具有副作用。示例包括可变缩减(使用 collect 操作的操作;有关更多信息,请参阅 Reduction 部分)以及调用 System.out.println 调试方法。JDK 很好地处理了管道中的某些副作用。特别地,collect 方法被设计为以并行安全的方式执行具有副作用的最常见的流操作。forEachpeek 等操作是针对副作用而设计的;返回 void 的 lambda 表达式,例如调用 System.out.println 的表达式,只能产生副作用。即便如此,你应该谨慎使用 forEachpeek 操作;如果你对并行流使用其中一个操作,那么 Java 运行时可以从多个线程同时调用你指定为其参数的 lambda 表达式。此外,永远不要传递作为参数的 lambda 表达式,这些表达式在 filtermap 等操作中具有副作用。以下部分讨论 interferencestateful lambda expressions,这两者都可能是副作用的来源,并且可能返回不一致或不可预测的结果,尤其是在并行流中。然而,首先讨论 laziness 的概念,因为它对干扰有直接影响。

懒惰

所有中间操作都是 lazy (懒惰的)。如果表达式、方法或算法的值仅在需要时才进行计算,则它是懒惰的。(如果立即计算或处理算法,则算法是 eager (急切的)。)中间操作是懒惰的,因为它们在终端操作开始之前不会开始处理流的内容。懒惰地处理流使 Java 编译器和运行时能够优化它们处理流的方式。例如,在 Aggregate Operations 部分中描述的 filter-mapToInt-average 示例中,average 操作可以从 mapToInt 操作创建的流中获取前几个整数,该操作从 filter 操作中获取元素。average 操作将重复此过程,直到它从流中获得所有必需元素,然后它将计算平均值。

干扰

流操作中的 Lambda 表达式不应该 interfere (干扰)。当管道处理流时修改流的源时,会发生干扰。例如,以下代码尝试连接 List listOfStrings 中包含的字符串。但是,它会抛出 ConcurrentModificationException

try {
    List<String> listOfStrings =
        new ArrayList<>(Arrays.asList("one", "two"));
         
    // This will fail as the peek operation will attempt to add the
    // string "three" to the source after the terminal operation has
    // commenced. 
             
    String concatenatedString = listOfStrings
        .stream()
        
        // Don't do this! Interference occurs here.
        .peek(s -> listOfStrings.add("three"))
        
        .reduce((a, b) -> a + " " + b)
        .get();
                 
    System.out.println("Concatenated string: " + concatenatedString);
         
} catch (Exception e) {
    System.out.println("Exception caught: " + e.toString());
}

此示例使用 reduce 操作将 listOfStrings 中包含的字符串连接到 Optional<String> 值,这是一个终端操作。但是,此处的管道调用中间操作 peek,它尝试向 listOfStrings 添加新元素。请记住,所有中间操作都是懒惰的。这意味着此示例中的管道在调用操作 get 时开始执行,并在 get 操作完成时结束执行。peek 操作的参数尝试在执行管道期间修改流源,这会导致 Java 运行时抛出 ConcurrentModificationException

有状态 Lambda 表达式

避免在流操作中使用 stateful lambda expressions (有状态 lambda 表达式) 作为参数。有状态 lambda 表达式的结果取决于在执行管道期间可能更改的任何状态。以下示例使用 map 中间操作将 List listOfIntegers 中的元素添加到新的 List 实例中。它执行两次,首先使用串行流,然后使用并行流:

List<Integer> serialStorage = new ArrayList<>();
     
System.out.println("Serial stream:");
listOfIntegers
    .stream()
    
    // Don't do this! It uses a stateful lambda expression.
    .map(e -> { serialStorage.add(e); return e; })
    
    .forEachOrdered(e -> System.out.print(e + " "));
System.out.println("");
     
serialStorage
    .stream()
    .forEachOrdered(e -> System.out.print(e + " "));
System.out.println("");

System.out.println("Parallel stream:");
List<Integer> parallelStorage = Collections.synchronizedList(
    new ArrayList<>());
listOfIntegers
    .parallelStream()
    
    // Don't do this! It uses a stateful lambda expression.
    .map(e -> { parallelStorage.add(e); return e; })
    
    .forEachOrdered(e -> System.out.print(e + " "));
System.out.println("");
     
parallelStorage
    .stream()
    .forEachOrdered(e -> System.out.print(e + " "));
System.out.println("");

lambda 表达式 e -> { parallelStorage.add(e); return e; } 是一个有状态的 lambda 表达式。每次运行代码时,其结果都会有所不同。此示例打印以下内容:

Serial stream:
8 7 6 5 4 3 2 1
8 7 6 5 4 3 2 1
Parallel stream:
8 7 6 5 4 3 2 1
1 3 6 2 4 5 8 7

无论流是以串行还是并行方式执行,操作 forEachOrdered 都按流指定的顺序处理元素。但是,当并行执行流时,map 操作会处理由 Java 运行时和编译器指定的流的元素。因此,lambda 表达式 e -> { parallelStorage.add(e); return e; } 将元素添加到 List parallelStorage 的顺序,每次运行代码时都会有所不同。对于确定性和可预测的结果,请确保流操作中的 lambda 表达式参数不具有状态。

注意:此示例调用方法 synchronizedList,以便 List parallelStorage 是线程安全的。请记住,集合不是线程安全的。这意味着多个线程不应同时访问特定集合。假设你在创建 parallelStorage 时未调用方法 synchronizedList

List<Integer> parallelStorage = new ArrayList<>();

该示例行为不正常,因为多个线程访问和修改 parallelStorage 而没有像同步这样的机制来安排特定线程何时可以访问 List 实例。因此,该示例可以打印类似于以下内容的输出:

Parallel stream:
8 7 6 5 4 3 2 1
null 3 5 4 7 8 1 2

Previous page: Reduction
Next page: Questions and Exercises: Aggregate Operations