10个必要的火花面试问题 *

寻找 自由职业工作?用顶尖的自由职业者设计你的生活方式。

提交面试问题

描述以下代码以及输出将是什么。

case class User(userId: Long, userName: String)

case class UserActivity(userId: Long, activityTypeId: Int, timestampEpochSec: Long)

val LoginActivityTypeId = 0
val LogoutActivityTypeId = 1

private def readUserData(sparkSession: SparkSession): RDD[User] = {
 sparkSession.sparkContext.parallelize(
   Array(
     User(1, " 迪伊,约翰 "),
     User(2, " 母鹿,简 "),
     User(3, "X, Mr."))
 )
}

private def readUserActivityData(sparkSession: SparkSession): RDD[UserActivity] = {
 sparkSession.sparkContext.parallelize(
   Array(
     UserActivity(1, LoginActivityTypeId, 1514764800L),
     UserActivity(2, LoginActivityTypeId, 1514808000L),
     UserActivity(1, LogoutActivityTypeId, 1514829600L),
     UserActivity(1, LoginActivityTypeId, 1514894400L))
 )
}

def calculate(sparkSession: SparkSession): Unit = {
 val userRdd: RDD[(Long, User)] =
   readUserData(sparkSession).map(e => (e.userId, e))
 val userActivityRdd: RDD[(Long, UserActivity)] =
   readUserActivityData(sparkSession).map(e => (e.userId, e))

 val result = userRdd
   .leftOuterJoin(userActivityRdd)
   .filter(e => e._2._2.isDefined && e._2._2.get.activityTypeId == LoginActivityTypeId)
   .map(e => (e._2._1.userName, e._2._2.get.timestampEpochSec))
   .reduceByKey((a, b) => if (a < b) a else b)

 result
   .foreach(e => println(s"${e._1}: ${e._2}"))

}

The main method, calculate, reads two sets of data. (In the example they are provided from a constant inline data structure that is converted into a distributed dataset using parallelize.) The map applied to each of them transforms them into tuples, each consisting of a userId and the object itself. The userId is used to join the two datasets.

The joined dataset is filtered by all users with all their login activities. It is then transformed into a tuple consisting of 用户名 and the event timestamp.

这最终仅减少到每个用户的第一个登录条目并写入控制台。

结果将是:

 迪伊,约翰 : 1514764800
Doe, Jane: 1514808000

以下代码提供了具有以下结构的两个准备的Dataframe:

DF1: userId, userName
DF2: userId, pageId, timestamp, eventType

Add the code to join the two dataframes and count the number of events per 用户名 . It should output in the format 用户名 ; totalEventCount and only for users that have events.

def calculate(sparkSession: SparkSession): Unit = {
 val UserIdColName = "userId"
 val UserNameColName = " 用户名 "
 val CountColName = "totalEventCount"

 val userRdd: DataFrame = readUserData(sparkSession)
 val userActivityRdd: DataFrame = readUserActivityData(sparkSession)


 val result = userRdd
   .repartition(col(UserIdColName))
   // ???????????????
   .select(col(UserNameColName))
   // ???????????????

 result.show()
}

完整的代码应该如下所示:

def calculate(sparkSession: SparkSession): Unit = {
 val UserIdColName = "userId"
 val UserNameColName = " 用户名 "
 val CountColName = "totalEventCount"

 val userRdd: DataFrame = readUserData(sparkSession)
 val userActivityRdd: DataFrame = readUserActivityData(sparkSession)


 val result = userRdd
   .repartition(col(UserIdColName))
   .join(userActivityRdd, UserIdColName)
   .select(col(UserNameColName))
   .groupBy(UserNameColName)
   .count()
   .withColumnRenamed("count", CountColName)

 result.show()
}

连接表达式可以采用不同类型的参数。以下替代品产生相同的结果:

.join(userActivityRdd,UserIdColName, "inner")
.join(userActivityRdd, Seq(UserIdColName))
.join(userActivityRdd, Seq(UserIdColName), "inner")

Passing “left” as the last parameter would return a wrong result (users without events will be included, showing an event count of 1).

随着最后一个参数传递“右”,将返回正确的结果,但将是语义误导。

   .groupBy(UserNameColName)

This is required. Without it, the total number of rows would be counted and result would be a Long instead of a DataFrame (so the code wouldn’t even compile, since the show() method does not exist for Long.)

   .count()

This is the actual aggregation that adds a new column (count) to the DataFrame.

   .withColumnRenamed("count", CountColName)

This renames the count column to totalEventCount, as requested in the question.

以下代码注册了用户定义的函数(UDF)并在查询中使用它。 (一般业务逻辑与问题无关紧要。)关于代码的问题是什么问题,使得它可能会拆除整个群集,以及如何解决?

(Hint: It has to do with the usage of the categoryNodesWithChildren Map variable.)

def calculate(sparkSession: SparkSession): Unit = {
 val UserIdColumnName = "userId"
 val CategoryIdColumnName = "categoryId"
 val NumActionsColumnName = "numActions"
 val OtherCategoryIdColumnName = "otherCategoryId"
 val OtherNumActionsColumnName = "otherNumActions"


 val categoryNodesWithChildren: Map[Int, Set[Int]] =
   Map(0 -> Set(1, 2, 3),
     1 -> Set(4, 5),
     2 -> Set(6, 7),
     3 -> Set(8),
     7 -> Set(9, 10)
   )

 sparkSession.udf.register("isChildOf", (nodeId: Int, parentNodeId: Int) =>  
 nodeId != parentNodeId && categoryNodesWithChildren.getOrElse(nodeId, Set[Int]()).contains(parentNodeId))


 val userCategoryActions = readUserCategoryActions(sparkSession)

 val otherUserCategoryActions = userCategoryActions
   .select(
     col(UserIdColumnName),
     col(CategoryIdColumnName).alias(OtherCategoryIdColumnName),
     col(NumActionsColumnName).alias(OtherNumActionsColumnName)
   )

 val joinedUserActions = userCategoryActions
   .join(otherUserCategoryActions, UserIdColumnName)
   .where("!(isChildOf(categoryId,otherCategoryId) or isChildOf(otherCategoryId,categoryId))")
   .groupBy(UserIdColumnName, CategoryIdColumnName, OtherCategoryIdColumnName)
   .sum(OtherNumActionsColumnName)
   .withColumnRenamed(s"sum($OtherNumActionsColumnName)", OtherNumActionsColumnName)

 joinedUserActions.show()

}

注册UDF的线应替换为此代码片段:

def calculate(sparkSession: SparkSession): Unit = {
...


 val categoryNodesWithChildrenBC = sparkSession.sparkContext.broadcast(categoryNodesWithChildren)

 sparkSession.udf.register("isChildOf", (nodeId: Int, parentNodeId: Int) =>
   nodeId != parentNodeId && categoryNodesWithChildrenBC.value.getOrElse(nodeId, Set[Int]()).contains(parentNodeId))

...

}

第一种方法的问题是它使用不可用的驱动程序应用程序的变量 本身 on the worker nodes. Spark will fetch the variable (meaning, the whole Map) from the master node each time the UDF is called. This can result in a very high load on the master and the whole cluster might become unresponsive. The adaptation of the code sends (broadcasts) a copy of the variable to each of the worker nodes where it is accessible as an org.apache.spark.broadcast object that holds the actual Map.

申请加入Toptal'S开发网络

并享受可靠,稳定,远程自由职业的火花开发商工作。

申请自由职业者

概述Apache Spark的概述。框架是如何结构化的?什么是主要模块?

火花群的基本结构:

火花群的基本结构:驱动程序程序的SparkContext与群集管理器接口,两者都有哪些与彼此访问的工作节点的接口。每个工人节点都有一个具有任务的executor和缓存。

群集管理器不是Spark Framework本身的一部分 - 即使Spark Ships拥有自己,也不应在生产中使用。支持的集群经理是Mesos,Yarn和Kybernetes。

驱动程序程序是Java,Scala或Python应用程序,该应用程序在Spark Master上执行。

作为该程序的一部分,将调用某些Spark框架方法,在工作节点上执行本身。

每个工作台都可能运行多个执行器(如已配置的:通常每个可用的CPU核心)。每个执行者都将从要执行的调度程序接收任务。

Apache Spark的模块直接运行其核心顶部:

Apache Spark具有上面的一层,由模块Spark SQL,Spark Streaming,MLLIB(机器学习)和Graphx(图)组成。

Apache Spark(核心)

火花 由管理数据内部表示的核心框架,包括:

  • 序列化
  • 内存分配
  • 缓存
  • 通过在磁盘上存储中间快照来增加弹性
  • 自动重试
  • 工作节点之间的数据交换(Shuffling)
  • 等等。

It also provides a bunch of methods to transform data (like map and reduce). All of these methods work on resilient distributed datasets (RDD).

火花 自动识别单个步骤之间的依赖关系,从而知道哪一个可以并行执行。

这是通过构建定向的非循环图(DAG)来实现的,这也意味着转换不会立即执行,但是当调用动作函数时。

所以基本上,该方法可分为两种类型:RDD转换和动作。

这些是RDD转换:

  • map(func)
  • flatMap()
  • filter(func)
  • mapPartitions(func)
  • mapPartitionWithIndex()
  • union(dataset)
  • intersection(dataset)
  • distinct()
  • groupByKey()
  • reduceByKey(func, [numTasks])
  • sortByKey()
  • join()
  • coalesce()

这些是RDD行动:

  • count()
  • collect()
  • take(n)
  • top()
  • countByValue()
  • reduce()
  • fold()
  • aggregate()
  • foreach()

可以在Spark UI Web界面中查看运行作业的DAG。它还显示了待处理的作业,任务列表和当前资源使用和配置。

如果配置历史记录服务器,也可以在完成(或失败)作业中审查大多数信息。

火花 SQL.

这是Spark的核心API的抽象。虽然核心API适用于RDD,但所有转换都由开发人员明确定义,Spark SQL表示RDD为所谓的DataFrame。 DataFrame API更像是一个看起来像SQL的DSL。

开发人员甚至可以通过将DataFrame注册为名为内存表中的DataFrame来更备注。然后可以查询此表将使用SQL查询关系数据库中的表。

火花流

这可以调用Apache Kafka或Amazon Kinesis(以及一些其他消息传递系统,如ActiveMQ)等分布式日志来处理微批处理中的消息。 (几乎)Spark Ba​​tch作业可用的所有功能也可以应用于Spark Streaming提供的RDD。

mllib.

Mllib提供了通常用于常规数据分析(如聚类和回归)和机器学习中的高级算法。它提供了定义管道,列车模型和持续物体的功能,并读取训练型模型以将它们应用于实时数据。

graphx.

This lets you represent RDD as a graph (nodes are connected via edges) and perform some basic graph operations on it. Currently (only) three more advanced algorithms are provided: PageRank, ConnectedComponents, and TriangleCounting.

完成丢失的SQL查询以返回结果,如示例数据所示:

case class User(userId: Long, userName: String)

case class UserActivity(userId: Long, activityTypeId: Int, timestampEpochMs: Long)

val LoginActivityTypeId = 0
val LogoutActivityTypeId = 1

private def readUserData(sparkSession: SparkSession): DataFrame = {
 sparkSession.createDataFrame(
   sparkSession.sparkContext.parallelize(
     Array(
       User(1, " 迪伊,约翰 "),
       User(2, " 母鹿,简 "),
       User(3, "X, Mr."))
   )
 )
}

private def readUserActivityData(sparkSession: SparkSession): DataFrame = {
 sparkSession.createDataFrame(
   sparkSession.sparkContext.parallelize(
     Array(
       UserActivity(1, LoginActivityTypeId, 1514764800000L),
       UserActivity(2, LoginActivityTypeId, 1514808000000L),
       UserActivity(1, LogoutActivityTypeId, 1514829600000L),
       UserActivity(1, LoginActivityTypeId, 1514894400000L))
   )
 )
}

def calculate(sparkSession: SparkSession): Unit = {
 val UserTableName = "user"
 val UserActivityTableName = "userActivity"

 val userDf: DataFrame = readUserData(sparkSession)
 val userActivityDf: DataFrame = readUserActivityData(sparkSession)

 userDf.createOrReplaceTempView(UserTableName)
 userActivityDf.createOrReplaceTempView(UserActivityTableName)

 val result = sparkSession
   .sql(s"SELECT ...")

 result.show()
}

输出应该是这样的:

用户名 Firstlogin.
迪伊,约翰 1514764800000
母鹿,简 1514808000000

缺少的SQL应该看起来像这样:

 val result = sparkSession
   .sql(s"SELECT u.userName, MIN(ua.timestampEpochMs) AS firstLogin " +
     s"FROM $UserTableName u " +
     s"JOIN $UserActivityTableName ua ON u.userId=ua.userId " +
     s"WHERE ua.activityTypeId=$LoginActivityTypeId " +
     s"GROUP BY u.userName")

(The $ notation is a Scala feature to replace expressions within a string and instead the table names might be there “hard-coded” as well. Note that you would never do this with user-supplied variables because it would open your code to injection vulnerabilities.)

请突出显示在主设备上执行以下哪一部分以下代码,并且将在每个工作节点上运行。

val formatter: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyy/MM")

def getEventCountOnWeekdaysPerMonth(data: RDD[(LocalDateTime, Long)]): Array[(String, Long)] = {

 val result = data
   .filter(e => e._1.getDayOfWeek.getValue < DayOfWeek.SATURDAY.getValue)
   .map(mapDateTime2Date)
   .reduceByKey(_ + _)
   .collect()

 result
   .map(e => (e._1.format(formatter), e._2))
}

private def mapDateTime2Date(v: (LocalDateTime, Long)): (LocalDate, Long) = {
 (v._1.toLocalDate.withDayOfMonth(1), v._2)
}

The call of this function is performed by the driver application. The assignment to the result value is the definition of the DAG, including its execution, triggered by the collect() call. All parts of this (including the logic of the function mapDateTime2Date) are executed on the worker nodes.

The resulting value that is stored in result is an array that is collected on the master, so the map performed on this value is run on the master.

将火花流与Kafka Streams和Flink进行比较。突出各种技术的差异和优点,以及每个流处理框架的使用情况最佳。

下表概述了每个框架的某些特征。何时使用哪个框架并不总是清楚的答案。特别是因为它经常在它们不同的不重要细节中。

但是理解:

  • Kafka Streams.只是一个库(没有其他基础架构组件,但它有责任部署和缩放流应用程序)。
  • 当目前,当涉及低延迟流处理时,传递是最优越/功能丰富的框架(当流在实时服务之间的服务之间的核心通信很重要)时,这是一个很重要的框架。
  • 这一火花的主要好处是整个现有的生态系统,包括MLLIB / Graphx抽象,并且可以重复使用批处理和流处理功能的代码部分。
fl Kafka Streams. 火花流
部署 也是在群集中负责部署的框架 可以包含在任何Java程序中的库。部署取决于如何部署Java应用程序。 也是在群集中负责部署的框架
生命周期 流处理逻辑作为Flink集群中的作业运行 流处理逻辑作为“标准”Java应用程序的一部分运行 流处理逻辑作为Spark群集的作业运行
责任 专用基础设施团队 应用程序开发人员 专用基础设施团队
协调 fl Master(jobManager),流程的一部分 利用KAFKA集群进行协调,负载平衡和容错 火花大师
连续数据来源 Kafka,文件系统,其他消息队列 kafka只有 像Kafka,Flume,Kinesis等常用流平台
下沉结果 任何使用Flink Sink API的实现的存储 Kafka或使用Kafka Connect API实现Kafka接收器的任何其他存储 文件和kafka作为预定义的接收器,任何其他目的地使用foreach-sink(手动实现)
有界和无限的数据流 无限和有界 无界限 无限制(使用Spark Ba​​tch作业有限)
语义保障 恰好用于内部传导状态;结束于结束的一旦使用所选来源和沉没(例如,Kafka传递给HDFS);当Kafka用作水槽时至少一次 一旦曾经,结束与Kafka 恰好一件事
流处理方法 单一记录 单一记录 微批次
国家管理层 键值存储,对开发人员透明 不,必须手动实施 不,无论自然无国籍
功能集 丰富的功能集,包括活动时间(反对处理时间),滑动窗口和水印 简单的功能设置为在翻滚窗口中聚合 广泛的功能集,但缺少Flink优惠的一些更高级功能
低延迟 是的 是的
何时选择作为流处理器的示例 设置需要高级流处理功能的新事件驱动的架构,并具有低延迟要求 JVM应用程序应消耗现有的Kafka事件流 添加流处理当火花已用于批处理时,低延迟不是强制性的

Graphx库与RDD创建的元素有哪些元素?完成以下代码以计算页面排名。

def calculate(sparkSession: SparkSession): Unit = {

 val pageRdd: RDD[(???, Page)] =
   readPageData(sparkSession)
     .map(e => (e.pageId, e))
     .cache()
 val pageReferenceRdd: RDD[???[PageReference]] = readPageReferenceData(sparkSession)

 val graph = Graph(pageRdd, pageReferenceRdd)
 val PageRankTolerance = 0.005
 val ranks = graph.???

 ranks.take(1000)
   .foreach(println)
}

结果将是一个如下所示的元组列表:

(1,1.4537951595091907)
(2,0.7731024202454048)
(3,0.7731024202454048)
def calculate(sparkSession: SparkSession): Unit = {

 val pageRdd: RDD[(VertexId, Page)] =
   readPageData(sparkSession)
     .map(e => (e.pageId, e))
     .cache()
 val pageReferenceRdd: RDD[Edge[PageReference]] = readPageReferenceData(sparkSession)

 val graph = Graph(pageRdd, pageReferenceRdd)
 val PageRankTollerance = 0.005
 val ranks = graph.pageRank(PageRankTollerance).vertices

 ranks.take(1000)
   .foreach(println)
}

A graph consists of Vertex objects and Edge objects that are passed to the Graph object as RDDs of type RDD[VertexId, VT] and RDD[Edge[ET]] (where VT and ET are any user-defined types that should be associated with a given Vertex or Edge). The constructor of the Edge type is Edge[ET](srcId: VertexId, dstId: VertexId, attr: ET). The type VertexId is basically an alias for Long.

您有一个包含10个节点的群集,每个节点都有24个CPU核心。

以下代码有效,但可能会在大数据集上崩溃,或者至少不会利用群集的完整处理能力。哪个是有问题的部分以及如何调整它?

def calculate(sparkSession: SparkSession): Unit = {
 val NumNodes = 10
 val userActivityRdd: RDD[UserActivity] =
   readUserActivityData(sparkSession)
     .repartition(NumNodes)

 val result = userActivityRdd
   .map(e => (e.userId, 1L))
   .reduceByKey(_ + _)

 result
   .take(1000)
}

The repartition statement generates 10 partitions (no matter if it were more or less when they were loaded from wherever). These might become quite large on huge datasets and probably won’t fit into the allocated memory for one executor.

此外,每个执行程序只能分配一个分区。这意味着使用240个执行器中的10个(10个节点,其中24个核心,每个节点运行一个执行者)。

如果数字被选中太高,则调度程序管理分区的开销会增加并降低性能。在某些情况下,对于非常小的分区,它甚至可能超过执行时间本身。

推荐的分区数量是 执行者数量的两到三倍之间。在我们的情况下,600 = 10 x 24 x 2.5将是一个适当数量的分区。

描述模型创建如何与MLLIB合作以及如何应用模型。

火花 MLlib has two basic components: Transformers and Estimators.

A Transformer reads a DataFrame and returns a new DataFrame with a specific transformation applied (e.g. new columns added). An Estimator is some machine learning algorithm that takes a DataFrame to train a model and returns the model as a Transformer.

火花 Mllib让您将多个转换组合成管道 应用复杂数据转换:

以下图像显示此类管道用于培训模型:

由令牌用品,哈希特,最终回归的管道(估算器)。在它下面是pipeline.fit()函数,将原始文本更改为单词,然后将特征向量,然后最后一个逻辑回归模型。

然后可以将生产的模型应用于实时数据:

由令牌化器,HashingTF组成的管道透模(变压器),最后是一个逻辑回归模型。在它下面是pipelinemodel.transform()函数,将原始文本更改为单词,然后将特征向量,然后终于进入预测。

有更多的采访,而不是棘手的技术问题,因此这些是仅作为指导。不是每个值得招聘的“A”候选人将能够回答他们所有人,也不回答他们所有保证“A”候选人。在一天结束时, 招聘仍然是艺术,科学 - 以及很多工作.

提交面试问题

提交的问题和答案可能需要审查和编辑,可能会或可能不会被选中以在Toptal,LLC的唯一自行决定酌情选择。

* 各个领域都需要

Luigi Crispo.

自由职业者Spark开发商

荷兰 自2019年11月12日以来的Toptal会员

Luigi是一个经验丰富的云和领导专家,拥有超过两十年的各种环境的专业经验。他对技术和价值驱动的项目充满热情,他很适应。他一直是直接推动数字时代的领导者的重要产业转型浪潮的一部分。

展示更多

史蒂夫福克斯

自由职业者Spark开发商

美国自2019年7月25日以来Toptal会员

Steve是一个认证的AWS解决方案架构师专业,具有大数据和机器学习专业认证。他拥有不同的背景,并在AWS中经验架构,建筑和经营大数据机学习应用。史蒂夫曾在CTO和首席执行官举办了技术贡献者的角色。

展示更多

Khushali Patel.

自由职业者Spark开发商

印度 自2019年9月14日以来的Toptal会员

Khushali是一名细节的数据工程师,具有替代,准时和高质量的产品交付态度。他在可扩展,机器人和可重复使用的大数据产品和框架方面拥有三年多的经验,为许多创业公司和知名的金融公司。 Khushali Excels在编程(Scala,Java,Python),大数据(Hadoop,Spark,Hive,Impala,德鲁伊)和流媒体技术(Kafka,KSQL)中。

展示更多

寻找Spark开发人员?

寻找 火花开发人员?查看Toptal的Spark开发人员。

toptal连接 最佳  3% 世界各地的自由人才。

加入Toptal社区。

学到更多