spark中的mapper和reducer个数是否可以配置?

理由
举报 取消

如图,这是网上一张讲解shuffle的经典图,不过我感觉这个图有点问题。spark中的task分为两类,一类是shuffleMapTask,另一类是resultTask。shuffle的过程应该是在shuffleMapTask之间,或者shuffleMapTask和resultTask之间。也就是说,图中的map task应该对应的是以shuffle write为结尾的某个task过程,reduce task对应的是以shuffle read为开头的task过程。(不知道理解是否正确) 针对spark,我有一下几个问题,还请大牛帮答疑: 1)我记得hadoop mapreduce中的map和reduce个数是可以设定的。在spark中,一个RDD中的分片partition对应一个task,那么我是否可以认为配置文件中的Parallesim参数(设定了partition)就设定了task的个数,也就是上图中reduce和map task的个数?(查资料我得到,当RDD从textfile中读取后,初始的partiton个数由block个数决定)2)针对上面一个问题,sort based shuffle情况下,bypassMergeThreshold这个阈值和配置参数parallesim是什么关系?3)一个job的stage列表中,最后一个stage称为finalStage?它和普通的stage有什么区别?4)实验环境:3台内网服务器,上面部署了hadoop和spark,其中n1作为namenode和master。要进行pagerank测试,实验数据存放在hdfs上(按理应该是分布在各两个datanode n2和n3上)。这是当运行pagerank时,spark worker需要从hdfs上读取数据,那么问题来了——-n2和n3都会优先读取自己hdfs上的数据吗(物理优先)?

2018年2月9日 4 条回复 1229 次浏览

发起人:suliey 初入职场

回复 ( 4 )

  1. 狗叔
    理由
    举报 取消

    蟹妖 @Ryan Fan@虚静

    今天一天都在攻关问题,趁晚上有空简答一下 :)

    1.Spark中的mapper和reducer个数是否可以配置?

    reduce task的个数是由stage的第一个RDD(ShuffledRDD)的partition数量决定的,而ShuffledRDD的partition数又取决于Partitioner(分区器)中的partition个数,那Partitioner以及其分区数是怎样被指定的呢?我们可以看一下代码:

      def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
        val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.length).reverse
        for (r <- bySize if r.partitioner.isDefined && r.partitioner.get.numPartitions > 0) {
          return r.partitioner.get
        }
        if (rdd.context.conf.contains("spark.default.parallelism")) {
          new HashPartitioner(rdd.context.defaultParallelism)
        } else {
          new HashPartitioner(bySize.head.partitions.length)
        }
      }
    
  2. 东围居士
    理由
    举报 取消

    谢邀

    我不懂这个,已帮你邀请华为商用Spark开发攻城狮 @狗叔

  3. momisabuilder
    理由
    举报 取消

    map数与分区数成正比,reduce同样也跟着map分区数成正比

  4. 匿名用户
    理由
    举报 取消

    谢邀~

    1. partition数目等于task数目是正确的。

    3. finalStage的主要作用是由它开始,反向搜索逻辑执行图,从而找出各个RDD之间的依赖关系。

    抱歉2和4没有了解过。

我来回答

Captcha 点击图片更换验证码