翻译 Chapter 15 How Spark Runs on a Cluster

1

Chapter 15 How Spark Runs on a Cluster Spark如何在集群上的运行

译者https://snaildove.github.io
Thus far in the book, we focused on Spark’s properties as a programming interface. We have discussed how the structured APIs take a logical operation, break it up into a logical plan, and convert that to a physical plan that actually consists of Resilient Distributed Dataset (RDD) operations that execute across the cluster of machines. This chapter focuses on what happens when Spark goes about executing that code. We discuss this in an implementation-agnostic way—this depends on neither the cluster manager that you’re using nor the code that you’re running. At the end of the day, all Spark code runs the same way.

到目前为止,在书中,我们将重点放在Spark作为编程接口的属性上。我们已经讨论了结构化API如何执行逻辑操作,将其分解为逻辑计划,并将其转换为实际由跨机器集群执行的弹性分布式数据集(RDD)操作组成的物理计划。本章主要讨论 Spark 执行该代码时会发生什么。我们以一种不知实现的方式讨论这个问题,这既不依赖于您正在使用的集群管理器,也不依赖于您正在运行的代码。一天结束时,所有 Spark 代码都以相同的方式运行。

This chapter covers several key topics:

本章包括几个关键主题:

  • The architecture and components of a Spark Application
    Spark应用程序的体系结构和组件

  • The life cycle of a Spark Application inside and outside of Spark

    Spark 内外 Spark 应用的生命周期

  • Important low-level execution properties, such as pipelining

    重要的低级执行属性,如管道

  • What it takes to run a Spark Application, as a segue into Chapter 16.

    运行Spark应用程序需要什么,作为转到第16章的桥接。

Let’s begin with the architecture.

让我们从架构开始

The Architecture of a Spark Application Spark应用程序的架构

In Chapter 2, we discussed some of the high-level components of a Spark Application. Let’s review those again:

在第2章中,我们讨论了 Spark 应用程序的一些高级组件。让我们再次回顾一下:

The Spark driver Spark驱动器

The driver is the process “in the driver seat” of your Spark Application. It is the controller of the execution of a Spark Application and maintains all of the state of the Spark cluster (the state and tasks of the executors). It must interface with the cluster manager in order to actually get physical resources and launch executors. At the end of the day, this is just a process on a physical machine that is responsible for maintaining the state of the application running on the cluster.

驱动器是 Spark 应用程序处在“驾驶员席位”的进程。它是Spark应用程序执行的控制器,维护Spark集群的所有状态(执行器的状态和任务)。它必须与群集管理器接口,以便实际获得物理资源和启动执行器。最后,这只是一个物理机器上的进程,负责维护集群上运行的应用程序的状态。

The Spark executors Spark执行器

Spark executors are the processes that perform the tasks assigned by the Spark driver. Executors have one core responsibility: take the tasks assigned by the driver, run them, and report back their state (success or failure) and results. Each Spark Application has its own separate executor processes.

Spark 执行器是执行 Spark 驱动程序分配的任务的进程。执行者有一个核心责任:承担驱动程序分配的任务,运行它们,并报告它们的状态(成功或失败)和结果。每个Spark应用程序都有自己的独立执行器进程。

The cluster manager 集群管理员

The Spark Driver and Executors do not exist in a void, and this is where the cluster manager comes in. The cluster manager is responsible for maintaining a cluster of machines that will run your Spark Application(s). Somewhat confusingly, a cluster manager will have its own “driver” (sometimes called master) and “worker” abstractions. The core difference is that these are tied to physical machines rather than processes (as they are in Spark). Figure 15-1 shows a basic cluster setup. The machine on the left of the illustration is the Cluster Manager Driver Node. The circles represent daemon processes running on and managing each of the individual worker nodes. There is no Spark Application running as of yet—these are just the processes from the cluster manager.

Spark驱动程序和执行器不存在于一个空间,这就是集群管理器所处的位置。集群管理器负责维护运行Spark应用程序的机器集群。有些令人困惑的是,集群管理器将有自己的“驱动程序(driver)”(有时称为master)和“工作者(worker)”的抽象结构。核心区别在于,它们与物理机器而不是进程(如 Spark 中的进程)联系在一起。 图15-1显示了一个基本的集群设置。图左侧的机器是群集管理器驱动程序节点。圆圈表示运行在每个工作节点上并管理每个工作节点的守护进程。到目前为止还没有运行spark应用程序,这些只是来自集群管理器的进程。

![1566491986663](http://rufzo5fy8.hn-bkt.clouddn.com/SparkTheDefinitiveGuide/Chapter15/1566491986663.png)

When it comes time to actually run a Spark Application, we request resources from the cluster manager to run it. Depending on how our application is configured, this can include a place to run the Spark driver or might be just resources for the executors for our Spark Application. Over the course of Spark Application execution, the cluster manager will be responsible for managing the underlying machines that our application is running on.

在实际运行Spark应用程序时,我们从集群管理器请求资源来运行它。根据应用程序的配置方式,这可能包括一个运行Spark驱动程序的位置,或者可能只是Spark应用程序的执行者的资源。在Spark应用程序执行过程中,集群管理员将负责管理应用程序运行的底层机器。

Spark currently supports three cluster managers: a simple built-in standalone cluster manager, Apache Mesos, and Hadoop YARN. However, this list will continue to grow, so be sure to check the documentation for your favorite cluster manager. Now that we’ve covered the basic components of an application, let’s walk through one of the first choices you will need to make when running your applications: choosing the execution mode.

Spark目前支持三个集群管理器:一个简单的内置独立集群管理器、Apache Mesos 和 Hadoop Yarn。但是,这个列表将继续增长,因此一定要检查您最喜欢的集群管理器的文档。既然我们已经介绍了应用程序的基本组件,那么让我们来看看在运行应用程序时需要做的第一个选择:选择执行模式。

### Execution Modes 执行模式

An execution mode gives you the power to determine where the aforementioned resources are physically located when you go to run your application. You have three modes to choose from:

执行模式使您能够在运行应用程序时确定上述资源的物理位置。您有三种模式可供选择:

- Cluster mode - Client mode - Local mode

We will walk through each of these in detail using Figure 15-1 as a template. In the following section, rectangles with solid borders represent Spark driver process whereas those with dotted borders represent the executor processes.

我们将使用图15-1作为模板详细介绍每种方法。在下面的部分中,带实心边框的矩形表示 Spark 驱动程序进程,而带虚线边框的矩形表示执行程序进程。

Cluster mode 集群模式

Cluster mode is probably the most common way of running Spark Applications. In cluster mode, a user submits a pre-compiled JAR, Python script, or R script to a cluster manager. The cluster manager then launches the driver process on a worker node inside the cluster, in addition to the executor processes. This means that the cluster manager is responsible for maintaining all Spark Application–related processes. Figure 15-2 shows that the cluster manager placed our driver on a worker node and the executors on other worker nodes.

集群模式可能是运行Spark应用程序的最常见方式。在集群模式下,用户向集群管理器提交预编译的JAR、Python脚本或R脚本。然后,除了执行器进程之外,集群管理员在集群内的工作节点上启动驱动程序进程。这意味着集群管理员负责维护所有与Spark应用程序相关的流程。图15-2显示集群管理器将我们的驱动程序放在一个工作节点上,而执行器放在其他工作节点上。

1566492359588

Client mode 客户端模式

Client mode is nearly the same as cluster mode except that the Spark driver remains on the client machine that submitted the application. This means that the client machine is responsible for maintaining the Spark driver process, and the cluster manager maintains the executor processses. In Figure 15-3, we are running the Spark Application from a machine that is not colocated on the cluster. These machines are commonly referred to as gateway machines or edge nodes. In Figure 15-3, you can see that the driver is running on a machine outside of the cluster but that the workers are located on machines in the cluster.

客户端模式与集群模式几乎相同,只是Spark驱动程序保留在提交应用程序的客户端上。这意味着客户端负责维护Spark 驱动程序进程,集群管理员维护执行器进程。在图15-3中,我们运行的Spark应用程序来自一台未在集群上并置的机器。这些机器通常被称为网关机器或边缘节点。在图15-3中,您可以看到驱动程序(driver)在集群外部的一台机器上运行,但工作人员(worker)位于集群中的机器上。

![1566493012713](http://rufzo5fy8.hn-bkt.clouddn.com/SparkTheDefinitiveGuide/Chapter15/1566493012713.png)

Local mode 本地模式

Local mode is a significant departure from the previous two modes: it runs the entire Spark Application on a single machine. It achieves parallelism through threads on that single machine. This is a common way to learn Spark, to test your applications, or experiment iteratively with local development. However, we do not recommend using local mode for running production applications.

本地模式与前两种模式有很大的不同:它在一台机器上运行整个Spark应用程序。它通过单个机器上的线程实现并行性。这是学习Spark、测试应用程序或使用本地开发进行迭代实验的常用方法。但是,我们不建议在运行生产应用程序时使用本地模式。

The Life Cycle of a Spark Application (Outside Spark) Spark 应用的生命周期(Spark外部)

This chapter has thus far covered the vocabulary necessary for discussing Spark Applications. It’s now time to talk about the overall life cycle of Spark Applications from “outside” the actual Spark code. We will do this with an illustrated example of an application run with spark-submit (introduced in Chapter 3). We assume that a cluster is already running with four nodes, a driver (not a Spark driver but cluster manager driver) and three worker nodes. The actual cluster manager does not matter at this point: this section uses the vocabulary from the previous section to walk through a step-by-step Spark Application life cycle from initialization to program exit.

本章迄今为止涵盖了讨论 Spark 应用程序所需的词汇。现在是时候从实际的 Spark 代码“外部”来讨论 Spark 应用程序的整个生命周期了。我们将通过一个使用 spark-submit 运行的应用程序的示例(在第3章中介绍)来实现这一点。我们假设一个集群已经运行了四个节点、一个驱动程序(不是 Spark 驱动程序,而是集群管理器驱动程序)和三个工作节点。此时,实际的集群管理器并不重要:本节使用上一节中的词汇表逐步遍历从初始化到程序退出的 Spark 应用程序生命周期。


NOTE 注意
This section also makes use of illustrations and follows the same notation that we introduced previously. Additionally, we now introduce lines that represent network communication. Darker arrows represent communication by Spark or Spark related processes, whereas dashed lines represent more general communication (like cluster management communication).

本节还使用了插图,并遵循我们前面介绍的相同的符号。此外,我们现在引入表示网络通信的线。较暗的箭头表示通过 Spark 或 Spark 相关进程进行的通信,而虚线表示更一般的通信(如集群管理通信)。


Client Request 客户端请求

The first step is for you to submit an actual application. This will be a pre-compiled JAR or library. At this point, you are executing code on your local machine and you’re going to make a request to the cluster manager driver node (Figure 15-4). Here, we are explicitly asking for resources for the Spark driver process only. We assume that the cluster manager accepts this offer and places the driver onto a node in the cluster. The client process that submitted the original job exits and the application is off and running on the cluster.

第一步是提交实际的申请。这将是一个预编译的 JAR 或库。此时,您正在本地计算机上执行代码,并将向集群管理员驱动程序节点发出请求(图15-4)。这里,我们明确地要求只为 Spark 驱动程序进程提供资源。我们假设集群管理员接受这个提议,并将驱动程序放在集群中的一个节点上。提交原始作业的客户端进程退出,应用程序在集群上关闭并运行。

1566494008338

To do this, you’ll run something like the following command in your terminal:

为此,您将在终端中运行如下命令:

1
2
3
4
5
6
7
8
./bin/spark-submit \
--class <main-class> \
--master <master-url> \
--deploy-mode cluster \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]

Launch 启动(应用程序)

Now that the driver process has been placed on the cluster, it begins running user code (Figure 15-5). This code must include a SparkSession that initializes a Spark cluster (e.g., driver + executors). The SparkSession will subsequently communicate with the cluster manager (the darker line), asking it to launch Spark executor processes across the cluster (the lighter lines). The number of executors and their relevant configurations are set by the user via the command-line arguments in the original spark-submit call.

现在驱动程序进程已经放置在集群上,它开始运行用户代码(图15-5)。此代码必须包含初始化 Spark 集群的SparkSession(例如,驱动程序+执行器)。SparkSession 随后将与集群管理器(较暗的线)通信,要求它在集群中启动 Spark executor进程(较亮的线)。执行器(executor)的数量及其相关配置由用户通过原始 spark-submit 调用中的命令行参数设置。

1566494720815

The cluster manager responds by launching the executor processes (assuming all goes well) and sends the relevant information about their locations to the driver process. After everything is hooked upcorrectly, we have a “Spark Cluster” as you likely think of it today.

集群管理器通过启动执行器进程(假设一切正常)进行响应,并将有关其位置的相关信息发送到驱动程序进程。在所有的东西都连接正确之后,我们就有了一个“Spark 集群”,就像你今天想象的那样。

Execution 执行

Now that we have a “Spark Cluster,” Spark goes about its merry way executing code, as shown in Figure 15-6. The driver and the workers communicate among themselves, executing code and moving data around. The driver schedules tasks onto each worker, and each worker responds with the status of those tasks and success or failure. (We cover these details shortly.)

既然我们有了一个“Spark 集群”,Spark就会以一种愉快的方式执行代码,如图15-6所示。驱动程序和工作人员(workers )之间进行通信,执行代码并移动数据。驱动程序将任务调度到每个工作人员(workers )身上,每个工作人员对这些任务的状态以及成功或失败作出响应。(我们将很快介绍这些细节。)

1566495102805

Completion 完成

After a Spark Application completes, the driver processs exits with either success or failure (Figure 15-7). The cluster manager then shuts down the executors in that Spark cluster for the driver. At this point, you can see the success or failure of the Spark Application by asking the cluster manager for this information.

Spark应用程序完成后,驱动程序进程以成功或失败退出(图15-7)。然后,集群管理员为驱动程序关闭该 Spark 集群中的执行器。此时,通过向集群管理器询问这些信息,您可以看到 Spark 应用程序的成功或失败。

1566495233939

The Life Cycle of a Spark Application (Inside Spark) Spark应用程序的生命周期(Spark内部)

We just examined the life cycle of a Spark Application outside of user code (basically the infrastructure that supports Spark), but it’s arguably more important to talk about what happens within Spark when you run an application. This is “user-code” (the actual code that you write that defines your Spark Application). Each application is made up of one or more Spark jobs. Spark jobs within an application are executed serially (unless you use threading to launch multiple actions in parallel).

我们刚刚研究了 Spark 应用程序在用户代码之外的生命周期(基本上是支持 Spark 的基础设施),但是讨论运行应用程序时 Spark 内发生的事情可能更重要。这是“用户代码”(定义 Spark 应用程序的实际代码)。每个应用程序由一个或多个 Spark 作业组成。应用程序中的 Spark 作业是串行执行的(除非使用线程并行启动多个操作)。

### The SparkSession

The first step of any Spark Application is creating a SparkSession. In many interactive modes, this is done for you, but in an application, you must do it manually. Some of your legacy code might use the new SparkContext pattern. This should be avoided in favor of the builder method on the SparkSession, which more robustly instantiates the Spark and SQL Contexts and ensures that there is no context conflict, given that there might be multiple libraries trying to create a session in the same Spark Appication:

任何Spark应用程序的第一步都是创建 SparkSession。在许多交互模式中,这是为您完成的,但在应用程序中,您必须手动完成。一些遗留代码可能使用新的 SparkContext 模式。应该避免这样做,因为 SparkSession 上的builder方法更能有力地实例化 Spark 和 SQL 上下文,并确保没有上下文冲突,因为可能有多个库试图在同一Spark应用程序中创建会话:

1
2
3
4
// Creating a SparkSession in Scala
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().appName("Databricks Spark Example").config("spark.sql.warehouse.dir", "/user/hive/warehouse")
.getOrCreate()
1
2
3
4
5
# Creating a SparkSession in Python
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("Word Count")\
.config("spark.some.config.option", "some-value")\
.getOrCreate()

After you have a SparkSession, you should be able to run your Spark code. From the SparkSession, you can access all of low-level and legacy contexts and configurations accordingly, as well. Note that the SparkSession class was only added in Spark 2.X. Older code you might find would instead directly create a SparkContext and a SQLContext for the structured APIs.

在进行 SparkSession 之后,您应该能够运行spark代码。通过 SparkSession,您还可以相应地访问所有低阶和遗留上下文和配置。请注意,SparkSession 类只添加在 Spark 2.x 中。您可能会发现,较旧的代码将直接为结构化API创建 SparkContext 和 sqlContext。

The SparkContext

A SparkContext object within the SparkSession represents the connection to the Spark cluster. This class is how you communicate with some of Spark’s lower-level APIs, such as RDDs. It is commonly stored as the variable sc in older examples and documentation. Through a SparkContext, you can create RDDs, accumulators, and broadcast variables, and you can run code on the cluster. For the most part, you should not need to explicitly initialize a SparkContext; you should just be able to access it through the SparkSession. If you do want to, you should create it in the most general way, through the getOrCreate method:

SparkSession 中的 SparkContext 对象表示与 Spark 群集的连接。这个类是如何与Spark的一些低阶API(如RDD)通信的。在旧的示例和文档中,它通常存储为变量 sc 。通过 SparkContext,您可以创建RDD、累加器(accumulators)和广播(broadcast)变量,并且可以在集群上运行代码。在大多数情况下,您不需要显式初始化 SparkContext;您只需要能够通过 SparkSession 访问它。如果您愿意,您应该通过 getOrCreate 方法以最一般的方式创建它:

1
2
3
// in Scala
import org.apache.spark.SparkContext
val sc = SparkContext.getOrCreate()

THE SPARKSESSION, SQLCONTEXT, AND HIVECONTEXT

In previous versions of Spark, the SQLContext and HiveContext provided the ability to work with DataFrames and Spark SQL and were commonly stored as the variable sqlContext in examples, documentation, and legacy code. As a historical point, Spark 1.X had effectively two contexts. The SparkContext and the SQLContext. These two each performed different things. The former focused on more fine-grained control of Spark’s central abstractions, whereas the latter focused on the higher-level tools like Spark SQL. In Spark 2.X, the communtiy combined the two APIs into the centralized SparkSession that we have today. However, both of these APIs still exist and you can access them via the SparkSession. It is important to note that you should never need to use the SQLContext and rarely need to use the SparkContext.

在Spark的早期版本中,SQLContext 和 HiveContext提供了使用 DataFrame 和 Spark SQL的能力,并且通常作为变量SQLContext存储在示例、文档和旧代码中。作为一个历史点,spark 1.x实际上有两个上下文。SparkContext和SQLContext。这两个人的表现各不相同。前者侧重于对Spark的中心抽象进行更细粒度的控制,而后者则侧重于更高级的工具,如Spark SQL。在spark 2.x中,社区将这两个API合并到了我们今天使用的集中式 SparkSession 中。但是,这两个API仍然存在,您可以通过SparkSession访问它们。需要注意的是,您不应该需要使用 SQLContext,而且很少需要使用 SparkContext。

After you initialize your SparkSession, it’s time to execute some code. As we know from previous chapters, all Spark code compiles down to RDDs. Therefore, in the next section, we will take some logical instructions (a DataFrame job) and walk through, step by step, what happens over time.

初始化 SparkSession 之后,该执行一些代码了。正如我们从前面的章节所知道的,所有 Spark 代码都编译成RDD。因此,在下一节中,我们将接受一些逻辑指令(一个 DataFrame 作业)并逐步了解随着时间的推移会发生什么。

### Logical Instructions 逻辑指令

As you saw in the beginning of the book, Spark code essentially consists of transformations and actions. How you build these is up to you—whether it’s through SQL, low-level RDD manipulation, or machine learning algorithms. Understanding how we take declarative instructions like DataFrames and convert them into physical execution plans is an important step to understanding how Spark runs on a cluster. In this section, be sure to run this in a fresh environment (a new Spark shell) to follow along with the job, stage, and task numbers.

正如您在书的开头所看到的,Spark代码基本上由转换(transformation )和动作(action)组成。无论是通过SQL、低阶的RDD操作还是机器学习算法,如何构建这些都取决于您。了解我们如何使用声明性指令(如DataFrame)并将其转换为物理执行计划是了解Spark如何在集群上运行的重要步骤。在本节中,请确保在新的环境(新的 Spark shell)中运行此程序,以跟踪作业(job)、阶段(stage)和任务(task)编号。

Logical instructions to physical execution 物理执行的逻辑指令

We mentioned this in Part II, but it’s worth reiterating so that you can better understand how Spark takes your code and actually runs the commands on the cluster. We will walk through some more code, line by line, explain what’s happening behind the scenes so that you can walk away with a better understanding of your Spark Applications. In later chapters, when we discuss monitoring, we will perform a more detailed tracking of a Spark job through the Spark UI. In this current example, we’ll take a simpler approach. We are going to do a three-step job: using a simple DataFrame, we’ll repartition it, perform a value-by-value manipulation, and then aggregate some values and collect the final result.

我们在第二部分中提到了这一点,但是值得重申,这样您就可以更好地理解 Spark 是如何使用代码并在集群上实际运行命令的。我们将一行一行地介绍更多的代码,解释幕后发生的事情,以便您能够更好地了解 Spark 应用程序。在后面的章节中,当我们讨论监控时,我们将通过 Spark UI 对 Spark 作业执行更详细的跟踪。在当前的示例中,我们将采用更简单的方法。我们要做一个三步的工作:使用一个简单的数据框架,我们将对它重新分区,执行一个值一个值的操作,然后聚合一些值并收集最终的结果。


NOTE 注意

This code was written and runs with Spark 2.2 in Python (you’ll get the same result in Scala, so we’ve omitted it). The number of jobs is unlikely to change drastically but there might be improvements to Spark’s underlying optimizations that change physical execution strategies.

这段代码是用 Python 中的 Spark 2.2 编写和运行的(您将在 Scala 中得到相同的结果,所以我们省略了它)。工作数量不太可能大幅度改变,但 Spark 的底层优化可能会有所改进,从而改变物理执行策略。


1
2
3
4
5
6
7
8
9
# in Python
df1 = spark.range(2, 10000000, 2)
df2 = spark.range(2, 10000000, 4)
step1 = df1.repartition(5)
step12 = df2.repartition(6)
step2 = step1.selectExpr("id * 5 as id")
step3 = step2.join(step12, ["id"])
step4 = step3.selectExpr("sum(id)")
step4.collect() # 2500000000000

When you run this code, we can see that your action triggers one complete Spark job. Let’s take a look at the explain plan to ground our understanding of the physical execution plan. We can access this information on the SQL tab (after we actually run a query) in the Spark UI, as well:

当您运行此代码时,我们可以看到您的操作触发了一个完整的Spark作业。让我们看一下解释计划,以加深我们对实际执行计划的理解。我们可以在Spark UI中的SQL选项卡(在实际运行查询之后)上访问这些信息,以及:

1
step4.explain()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
== Physical Plan ==
*HashAggregate(keys=[], functions=[sum(id#15L)])
+- Exchange SinglePartition
+- *HashAggregate(keys=[], functions=[partial_sum(id#15L)])
+- *Project [id#15L]
+- *SortMergeJoin [id#15L], [id#10L], Inner
:- *Sort [id#15L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#15L, 200)
: +- *Project [(id#7L * 5) AS id#15L]
: +- Exchange RoundRobinPartitioning(5)
: +- *Range (2, 10000000, step=2, splits=8)
+- *Sort [id#10L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#10L, 200)
+- Exchange RoundRobinPartitioning(6)
+- *Range (2, 10000000, step=4, splits=8)

What you have when you call collect (or any action) is the execution of a Spark job that individually consist of stages and tasks. Go to localhost:4040 if you are running this on your local machine to see the Spark UI. We will follow along on the “jobs” tab eventually jumping to stages and tasks as we proceed to further levels of detail.

当您调用 collect(或任何操作)时,您所拥有的是 Spark作业的执行,它分别由阶段(stage)和任务(task)组成。如果您在本地机器上运行这个程序,请转到 localhost:4040 查看Spark用户界面。我们将继续关注“jobs”选项卡,最终跳到阶段(stage)和任务(task),继续深入到更详细的层次。

### A Spark Job 一个Spark作业

In general, there should be one Spark job for one action. Actions always return results. Each job breaks down into a series of stages, the number of which depends on how many shuffle operations need to take place.

通常,一个动作(action)应该有一个Spark作业。操作始终返回结果。每项工作分为一系列阶段(stage),其数量取决于需要进行多少次数据再分配(shuffle)操作。

This job breaks down into the following stages and tasks:
这项工作分为以下几个阶段(stage)和任务(task):

  • Stage 1 with 8 Tasks
    第1阶段,8个任务
  • Stage 2 with 8 Tasks
    第2阶段,8个任务
  • Stage 3 with 6 Tasks
    第3阶段有6个任务
  • Stage 4 with 5 Tasks
    第4阶段有5个任务
  • Stage 5 with 200 Tasks
    第5阶段,200个任务
  • Stage 6 with 1 Task
    第6阶段,1个任务

I hope you’re at least somewhat confused about how we got to these numbers so that we can take the time to better understand what is going on!

我希望你至少对我们如何得到这些数字感到困惑,以便我们可以花时间更好地了解正在发生的事情!

Stages 阶段

Stages in Spark represent groups of tasks that can be executed together to compute the same operation on multiple machines. In general, Spark will try to pack as much work as possible (i.e., as many transformations as possible inside your job) into the same stage, but the engine starts new stages after operations called shuffles. A shuffle represents a physical repartitioning of the data—for example, sorting a DataFrame, or grouping data that was loaded from a file by key (which requires sending records with the same key to the same node). This type of repartitioning requires coordinating across executors to move data around. Spark starts a new stage after each shuffle, and keeps track of what order the stages must run in to compute the final result.

Spark中的阶段(stage)表示可以一起执行以在多台计算机上计算相同操作的任务(task)组。一般来说,Spark会尝试将尽可能多的工作(即工作中尽可能多的转换)打包到同一个阶段(stage),但引擎会在称为数据再分配(shuffle)的操作后启动新的阶段(stage)。 shuffle 表示数据的物理重新分区——例如,对 DataFrame 进行排序,或者根据键(key)分组从文件加载的数据(这需要将具有相同键的记录发送到同一节点)。这种类型的重新分区需要跨执行器(executor)进行协调以移动数据。 Spark在每次shuffle之后开始一个新阶段(stage),并跟踪阶段(stage)必须运行的顺序以计算最终结果。

In the job we looked at earlier, the first two stages correspond to the range that you perform in order to create your DataFrames. By default when you create a DataFrame with range, it has eight partitions. The next step is the repartitioning. This changes the number of partitions by shuffling the data. These DataFrames are shuffled into six partitions and five partitions, corresponding to the number of tasks in stages 3 and 4.

在我们之前查看的工作中,前两个阶段(stage)对应于您为创建DataFrame而执行的范围(range)。默认情况下,当您使用范围(range)创建DataFrame时,它有八个分区。下一步是重新分区(repartitioning)。这会通过对数据再分配(shuffle)来更改分区数。这些DataFrame被数据再分配到六个分区和五个分区,对应于阶段3和4中的任务数。

Stages 3 and 4 perform on each of those DataFrames and the end of the stage represents the join (a shuffle). Suddenly, we have 200 tasks. This is because of a Spark SQL configuration. The spark.sql.shuffle.partitions default value is 200, which means that when there is a shuffle performed during execution, it outputs 200 shuffle partitions by default. You can change this value, and the number of output partitions will change.

阶段(stage)3和4对每个DataFrame执行,阶段(stage)结束表示连接(join)。突然间,我们有200个任务(task)。这是因为Spark SQL配置。 `spark.sql.shuffle.partitions` 默认值为200,这意味着当执行期间执行了数据再分配(shuffle)时,它默认输出200个数据再分配(shuffle)分区。您可以更改此值,并且输出分区的数量将更改

。 ---

TIP 提示

We cover the number of partitions in a bit more detail in Chapter 19 because it’s such an important parameter. This value should be set according to the number of cores in your cluster to ensure efficient execution. Here’s how to set it:

我们在第19章中更详细地介绍了分区的数量,因为它是一个非常重要的参数。这个值应该根据集群中核心的数量来设置,以确保高效执行。设置方法如下:

1
spark.conf.set("spark.sql.shuffle.partitions", 50)

A good rule of thumb is that the number of partitions should be larger than the number of executors on your cluster, potentially by multiple factors depending on the workload. If you are running code on your local machine, it would behoove you to set this value lower because your local machine is unlikely to be able to execute that number of tasks in parallel. This is more of a default for a cluster in which there might be many more executor cores to use. Regardless of the number of partitions, that entire stage is computed in parallel. The final result aggregates those partitions individually, brings them all to a single partition before finally sending the final result to the driver. We’ll see this configuration several times over the course of this part of the book.

一个好的经验法则是分区数应该大于集群上执行器(executor)的数量,可能由多个因素决定,具体取决于工作负载。如果您在本地计算机上运行代码,那么您可以将此值设置得更低,因为本地计算机不太可能并行执行该数量的任务。对于可能需要使用更多执行程序核心的集群,这更像是一个默认设置。无论分区数量如何,整个阶段都是并行计算的。最终结果单独聚合这些分区,在最后将最终结果发送给驱动程序之前将它们全部带到一个分区。在本书的这一部分过程中,我们会多次看到这种配置。

### Tasks 任务

Stages in Spark consist of tasks. Each task corresponds to a combination of blocks of data and a set of transformations that will run on a single executor. If there is one big partition in our dataset, we will have one task. If there are 1,000 little partitions, we will have 1,000 tasks that can be executed in parallel. A task is just a unit of computation applied to a unit of data (the partition). Partitioning your data into a greater number of partitions means that more can be executed in parallel. This is not a panacea, but it is a simple place to begin with optimization.

Spark中的阶段(stage)由任务(task)组成。每个任务(task)对应于将在单个执行器(executor)上运行的数据块和一组转换的组合。如果我们的数据集中有一个大分区,我们将有一个任务。如果有1000个小分区,我们将有1,000个可以并行执行的任务。任务只是应用于数据单元(分区)的计算单位。将数据划分为更多数量的分区意味着可以并行执行更多数据。这不是灵丹妙药,但它是一个简单的开始优化的入手之处。

## Execution Details 执行细节

Tasks and stages in Spark have some important properties that are worth reviewing before we close out this chapter. First, Spark automatically pipelines stages and tasks that can be done together, such as a map operation followed by another map operation. Second, for all shuffle operations, Spark writes the data to stable storage (e.g., disk), and can reuse it across multiple jobs. We’ll discuss these concepts in turn because they will come up when you start inspecting applications through the Spark UI.

在我们结束本章之前,Spark中的任务和阶段具有一些值得检查的重要属性。首先,Spark自动管理可以一起完成的阶段和任务,例如映射(map)操作,然后是另一个映射(map)操作。其次,对于所有数据再分配(shuffle)操作,Spark将数据写入稳定存储(例如,磁盘),并且可以在多个作业中重复使用它。我们将依次讨论这些概念,因为当您开始通过Spark UI检查应用程序时,它们会出现。

Pipelining 管道化

An important part of what makes Spark an “in-memory computation tool” is that unlike the tools that came before it (e.g., MapReduce), Spark performs as many steps as it can at one point in time before writing data to memory or disk. One of the key optimizations that Spark performs is pipelining, which occurs at and below the RDD level. With pipelining, any sequence of operations that feed data directly into each other, without needing to move it across nodes, is collapsed into a single stage of tasks that do all the operations together. For example, if you write an RDD-based program that does a map, then a filter, then another map, these will result in a single stage of tasks that immediately read each input record, pass it through the first map, pass it through the filter, and pass it through the last map function if needed. This pipelined version of the computation is much faster than writing the intermediate results to memory or disk after each step. The same kind of pipelining happens for a DataFrame or SQL computation that does a select, filter, and select.

使Spark成为“内存计算工具”的一个重要部分是,与之前的工具(例如,MapReduce)不同,Spark在将数据写入内存或磁盘之前的一个时间点执行尽可能多的步骤。 Spark执行的一个关键优化是流水线操作,它发生在RDD级别和低于RDD级别。通过流水线操作,将数据直接相互馈送而无需跨节点移动的任何操作序列都会折叠为一起完成所有操作的任务。例如,如果你编写一个基于RDD的程序来执行一个映射(map),然后是一个过滤器(filter),然后是另一个映射(map),这些将导致一个阶段的任务立即读取每个输入记录,将其传递通过第一个映射,传递给它过滤器,如果需要,将其传递给最后一个映射(map)函数。这个流水线版的计算比在每个步骤之后将中间结果写入内存或磁盘要快得多。对于执行select,filter和select的DataFrame或SQL计算,会发生相同类型的流水线操作。

From a practical point of view, pipelining will be transparent to you as you write an application—the Spark runtime will automatically do it—but you will see it if you ever inspect your application through the Spark UI or through its log files, where you will see that multiple RDD or DataFrame operations were pipelined into a single stage.

从实际的角度来看,在编写应用程序时,流水线操作对您来说是透明的——Spark运行时会自动执行——但如果您通过Spark UI或其日志文件检查应用程序,您将看到它将看到多个RDD或DataFrame操作被流水线化为单个阶段。

### Shuffle Persistence 数据再分配的持久化

The second property you’ll sometimes see is shuffle persistence. When Spark needs to run an operation that has to move data across nodes, such as a reduce-by-key operation (where input data for each key needs to first be brought together from many nodes), the engine can’t perform pipelining anymore, and instead it performs a cross-network shuffle. Spark always executes shuffles by first having the “source” tasks (those sending data) write shuffle files to their local disks during their execution stage. Then, the stage that does the grouping and reduction launches and runs tasks that fetch their corresponding records from each shuffle file and performs that computation (e.g., fetches and processes the data for a specific range of keys). Saving the shuffle files to disk lets Spark run this stage later in time than the source stage (e.g., if there are not enough executors to run both at the same time), and also lets the engine re-launch reduce tasks on failure without rerunning all the input tasks.

你有时会看到的第二个属性是随机持久性。当 Spark 需要运行必须跨节点移动数据的操作时,例如 reduce-by-key 操作(每个键的输入数据需要首先从许多节点聚集在一起),引擎不能再执行流水线操作了,而是它执行交叉网络数据再分配(shuffle)。 在执行阶段,Spark总是首先通过让“源”任务(那些发送数据的任务)将数据再分配(shuffle)文件写入本地磁盘来执行数据再分配(shuffle)操作。然后,执行分组和减少启动项,并运行从每个数据再分配文件获取其相应记录的任务并执行该计算(例如,获取和处理特定范围的键的数据)。将数据再分配(shuffle)文件保存到磁盘允许Spark比源阶段更晚地运行此阶段(例如,如果没有足够的执行器(executor)同时运行两者),并且还允许引擎重新启动以在故障时且不用重新运行所有输入任务的情况下减少任务。

One side effect you’ll see for shuffle persistence is that running a new job over data that’s already been shuffled does not rerun the “source” side of the shuffle. Because the shuffle files were already written to disk earlier, Spark knows that it can use them to run the later stages of the job, and it need not redo the earlier ones. In the Spark UI and logs, you will see the pre-shuffle stages marked as “skipped”. This automatic optimization can save time in a workload that runs multiple jobs over the same data, but of course, for even better performance you can perform your own caching with the DataFrame or RDD cache method, which lets you control exactly which data is saved and where. You’ll quickly grow accustomed to this behavior after you run some Spark actions on aggregated data and inspect them in the UI.

您将看到的随机持久性的一个副作用是,对已经被数据再分配的数据运行新作业不会重新运行“源”端的数据再分配操作。因为数据再分配(shuffle)文件早先已经写入磁盘,所以Spark知道它可以使用它们来运行作业的后期阶段,并且它不需要重做早期的那些(任务)。在Spark UI和日志中,您将看到标记为“已跳过”的预数据再分配阶段。这种自动优化可以节省在同一数据上运行多个作业的工作负载的时间,但当然,为了获得更好的性能,您可以使用DataFrame或RDD缓存方法执行自己的缓存,这样您就可以精确控制保存的数据和哪里。在对聚合数据运行一些Spark操作并在UI中检查它们之后,您将很快习惯于此行为。

Conclusion 总结

In this chapter, we discussed what happens to Spark Applications when we go to execute them on a cluster. This means how the cluster will actually go about running that code as well as what happens within Spark Applications during the process. At this point, you should feel quite comfortable understanding what happens within and outside of a Spark Application. This will give you a starting point for debugging your applications. Chapter 16 will discuss writing Spark Applications and the things you should consider when doing so.

在本章中,我们讨论了当我们在集群上执行它们时Spark应用程序会发生什么。 这意味着集群将如何实际运行该代码以及在此过程中Spark应用程序中发生的事情。 此时,您应该非常自如地了解Spark应用程序内部和外部发生的情况。 这将为您调试应用程序提供一个起点。 第16章将讨论编写Spark应用程序以及执行此操作时应考虑的事项。

查看本网站请使用全局科学上网
欢迎打赏来支持我的免费分享
0%