首页编程java编程java rdd 是什么(spark和java的关系)

java rdd 是什么(spark和java的关系)

编程之家2023-10-14115次浏览

大家好,关于java rdd 是什么很多朋友都还不太明白,今天小编就来为大家分享关于spark和java的关系的知识,希望对各位有所帮助!

java rdd 是什么(spark和java的关系)

spark和java的关系

通常大家只是说Spark是基于内存计算的,速度比MapReduce要快。或者说内存中迭代计算。其实我们要抓住问题的本质。总结有以下几点:

1、Spark vs MapReduce≠内存 vs磁盘

其实Spark和MapReduce的计算都发生在内存中,区别在于:

java rdd 是什么(spark和java的关系)

MapReduce通常需要将计算的中间结果写入磁盘,然后还要读取磁盘,从而导致了频繁的磁盘IO。

Spark则不需要将计算的中间结果写入磁盘,这得益于Spark的RDD(弹性分布式数据集,很强大)和DAG(有向无环图),其中DAG记录了job的stage以及在job执行过程中父RDD和子RDD之间的依赖关系。中间结果能够以RDD的形式存放在内存中,且能够从DAG中恢复,大大减少了磁盘IO。

2、Spark vs MapReduce Shuffle的不同

java rdd 是什么(spark和java的关系)

Spark和MapReduce在计算过程中通常都不可避免的会进行Shuffle,两者至少有一点不同:

MapReduce在Shuffle时需要花费大量时间进行排序,排序在MapReduce的Shuffle中似乎是不可避免的;

Spark在Shuffle时则只有部分场景才需要排序,支持基于Hash的分布式聚合,更加省时;

3、多进程模型 vs多线程模型的区别

MapReduce采用了多进程模型,而Spark采用了多线程模型。多进程模型的好处是便于细粒度控制每个任务占用的资源,但每次任务的启动都会消耗一定的启动时间。就是说MapReduce的Map Task和Reduce Task是进程级别的,而Spark Task则是基于线程模型的,就是说mapreduce中的 map和 reduce都是 jvm进程,每次启动都需要重新申请资源,消耗了不必要的时间(假设容器启动时间大概1s,如果有1200个block,那么单独启动map进程事件就需要20分钟)

Spark则是通过复用线程池中的线程来减少启动、关闭task所需要的开销。(多线程模型也有缺点,由于同节点上所有任务运行在一个进程中,因此,会出现严重的资源争用,难以细粒度控制每个任务占用资源)

总结:关于Spark为什么比MapReduce快,或者Spark速度快于MapReduce的原因,总结至少有这几点不同之处吧。

大数据中的Spark指的是什么

Spark是一种通用的大数据计算框架,和传统的大数据技术MapReduce有本质区别。前者是基于内存并行计算的框架,而mapreduce侧重磁盘计算。Spark是加州大学伯克利分校AMP实验室开发的通用内存并行计算框架,用于构建大型的、低延迟的数据分析应用程序。

Spark同样支持离线计算和实时计算两种模式。Spark离线计算速度要比Mapreduce快10-100倍。而实时计算方面,则依赖于SparkStreaming的批处理能力,吞吐量大。不过相比Storm,SparkStreaming并不能做到真正的实时。

Spark使用强大的函数式语言Scala开发,方便简单。同时,它还提供了对Python、Java和R语言的支持。

作为大数据计算框架MapReduce的继任者,Spark具备以下优势特性。

1,高效性

不同于MapReduce将中间计算结果放入磁盘中,Spark采用内存存储中间计算结果,减少了迭代运算的磁盘IO,并通过并行计算DAG图的优化,减少了不同任务之间的依赖,降低了延迟等待时间。内存计算下,Spark比 MapReduce快100倍。

2,易用性

不同于MapReduce仅支持Map和Reduce两种编程算子,Spark提供了超过80种不同的Transformation和Action算子,如map,reduce,filter,groupByKey,sortByKey,foreach等,并且采用函数式编程风格,实现相同的功能需要的代码量极大缩小。

3,通用性

Spark提供了统一的解决方案。Spark可以用于批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX)。

4,兼容性

Spark能够跟很多开源工程兼容使用。如Spark可以使用Hadoop的YARN和Apache Mesos作为它的资源管理和调度器,并且Spark可以读取多种数据源,如HDFS、HBase、MySQL等。

scala 中rdd类型用什么头文件

1.RDD介绍:

RDD,弹性分布式数据集,即分布式的元素集合。在spark中,对所有数据的操作不外乎是创建RDD、转化已有的RDD以及调用RDD操作进行求值。在这一切的背后,Spark会自动将RDD中的数据分发到集群中,并将操作并行化。

Spark中的RDD就是一个不可变的分布式对象集合。每个RDD都被分为多个分区,这些分区运行在集群中的不同节点上。RDD可以包含Python,Java,Scala中任意类型的对象,甚至可以包含用户自定义的对象。

用户可以使用两种方法创建RDD:读取一个外部数据集,或在驱动器程序中分发驱动器程序中的对象集合,比如list或者set。

RDD的转化操作都是惰性求值的,这意味着我们对RDD调用转化操作,操作不会立即执行。相反,Spark会在内部记录下所要求执行的操作的相关信息。我们不应该把RDD看做存放着特定数据的数据集,而最好把每个RDD当做我们通过转化操作构建出来的、记录如何计算数据的指令列表。数据读取到RDD中的操作也是惰性的,数据只会在必要时读取。转化操作和读取操作都有可能多次执行。

2.创建RDD数据集

(1)读取一个外部数据集

val input=sc.textFile(inputFileDir)

(2)分发对象集合,这里以list为例

val lines=sc.parallelize(List("hello world","this is a test"));

3.RDD操作

(1)转化操作

实现过滤器转化操作:

val lines=sc.parallelize(List("error:a","error:b","error:c","test"));

val errors=lines.filter(line=> line.contains("error"));

errors.collect().foreach(println);

输出:

error:a

error:b

error:c

可见,列表list中包含词语error的表项都被正确的过滤出来了。

(2)合并操作

将两个RDD数据集合并为一个RDD数据集

接上述程序示例:

val lines=sc.parallelize(List("error:a","error:b","error:c","test","warnings:a"));

val errors=lines.filter(line=> line.contains("error"));

val warnings=lines.filter(line=> line.contains("warnings"));

val unionLines=errors.union(warnings);

unionLines.collect().foreach(println);

输出:

error:a

error:b

error:c

warning:a

可见,将原始列表项中的所有error项和warning项都过滤出来了。

(3)获取RDD数据集中的部分或者全部元素

①获取RDD数据集中的部分元素.take(int num)返回值List<T>

获取RDD数据集中的前num项。

/**

* Take the first num elements of the RDD. This currently scans the partitions*one by one*, so

* it will be slow if a lot of partitions are required. In that case, use collect() to get the

* whole RDD instead.

*/

def take(num: Int): JList[T]

程序示例:接上

unionLines.take(2).foreach(println);

输出:

error:a

error:b

可见,输出了RDD数据集unionLines的前2项

②获取RDD数据集中的全部元素.collect()返回值 List<T>

程序示例:

val all=unionLines.collect();

all.foreach(println);

遍历输出RDD数据集unionLines的每一项

4.向spark传递函数

在scala中,我们可以把定义的内联函数、方法的引用或静态方法传递给Spark,就像Scala的其他函数式API一样。我们还要考虑其他一些细节,必须所传递的函数及其引用的数据需要是可序列化的(实现了Java的Serializable接口)。除此之外,与Python类似,传递一个对象的方法或者字段时,会包含对整个对象的引用。我们可以把需要的字段放在一个局部变量中,来避免包含该字段的整个对象。

class searchFunctions(val query:String){

def isMatch(s: String): Boolean={

s.contains(query)

}

def getMatchFunctionReference(rdd: RDD[String]):RDD[String]={

//问题: isMach表示 this.isMatch,因此我们需要传递整个this

rdd.filter(isMatch)

}

def getMatchesFunctionReference(rdd: RDD[String]):RDD[String]={

//问题: query表示 this.query,因此我们需要传递整个this

rdd.flatMap(line=> line.split(query))

}

def getMatchesNoReference(rdd:RDD[String]):RDD[String]={

//安全,只把我们需要的字段拿出来放入局部变量之中

val query1=this.query;

rdd.flatMap(x=>x.split(query1)

)

}

}

5.针对每个元素的转化操作:

转化操作map()接收一个函数,把这个函数用于RDD中的每个元素,将函数的返回结果作为结果RDD中对应的元素。关键词:转化

转化操作filter()接受一个函数,并将RDD中满足该函数的元素放入新的RDD中返回。关键词:过滤

示例图如下所示:

①map()

计算RDD中各值的平方

val rdd=sc.parallelize(List(1,2,3,4));

val result=rdd.map(value=> value*value);

println(result.collect().mkString(","));

输出:

1,4,9,16

filter()

②去除RDD集合中值为1的元素:

val rdd=sc.parallelize(List(1,2,3,4));

val result=rdd.filter(value=> value!=1);

println(result.collect().mkString(","));

结果:

2,3,4

我们也可以采取传递函数的方式,就像这样:

函数:

def filterFunction(value:Int):Boolean={

value!=1

}

使用:

val rdd=sc.parallelize(List(1,2,3,4));

val result=rdd.filter(filterFunction);

println(result.collect().mkString(","));

③有时候,我们希望对每个输入元素生成多个输出元素。实现该功能的操作叫做flatMap()。和map()类似,我们提供给flatMap()的函数被分别应用到了输入的RDD的每个元素上。不过返回的不是一个元素,而是一个返回值序列的迭代器。输出的RDD倒不是由迭代器组成的。我们得到的是一个包含各个迭代器可以访问的所有元素的RDD。flatMap()的一个简单用途是将输入的字符串切分成单词,如下所示:

val rdd=sc.parallelize(List("Hello world","hello you","world i love you"));

val result=rdd.flatMap(line=> line.split(""));

println(result.collect().mkString("\n"));

输出:

hello

world

hello

you

world

i

love

you

6.集合操作

RDD中的集合操作

函数

用途

RDD1.distinct()

生成一个只包含不同元素的新RDD。需要数据混洗。

RDD1.union(RDD2)

返回一个包含两个RDD中所有元素的RDD

RDD1.intersection(RDD2)

只返回两个RDD中都有的元素

RDD1.substr(RDD2)

返回一个只存在于第一个RDD而不存在于第二个RDD中的所有元素组成的RDD。需要数据混洗。

集合操作对笛卡尔集的处理:

RDD1.cartesian(RDD2)

返回两个RDD数据集的笛卡尔集

程序示例:生成RDD集合{1,2}和{1,2}的笛卡尔集

val rdd1=sc.parallelize(List(1,2));

val rdd2=sc.parallelize(List(1,2));

val rdd=rdd1.cartesian(rdd2);

println(rdd.collect().mkString("\n"));

输出:

(1,1)

(1,2)

(2,1)

(2,2)

7.行动操作

(1)reduce操作

reduce()接收一个函数作为参数,这个函数要操作两个RDD的元素类型的数据并返回一个同样类型的新元素。一个简单的例子就是函数+,可以用它来对我们的RDD进行累加。使用reduce(),可以很方便地计算出RDD中所有元素的总和,元素的个数,以及其他类型的聚合操作。

以下是求RDD数据集所有元素和的程序示例:

val rdd=sc.parallelize(List(1,2,3,4,5,6,7,8,9,10));

val results=rdd.reduce((x,y)=>x+y);

println(results);

输出:55

(2)fold()操作

接收一个与reduce()接收的函数签名相同的函数,再加上一个初始值来作为每个分区第一次调用时的结果。你所提供的初始值应当是你提供的操作的单位元素,也就是说,使用你的函数对这个初始值进行多次计算不会改变结果(例如+对应的0,*对应的1,或者拼接操作对应的空列表)。

程序实例:

①计算RDD数据集中所有元素的和:

zeroValue=0;//求和时,初始值为0。

val rdd=sc.parallelize(List(1,2,3,4,5,6,7,8,9,10));

val results=rdd.fold(0)((x,y)=>x+y);

println(results);

②计算RDD数据集中所有元素的积:

zeroValue=1;//求积时,初始值为1。

val rdd=sc.parallelize(List(1,2,3,4,5,6,7,8,9,10));

val results=rdd.fold(1)((x,y)=>x*y);

println(results);

(3)aggregate()操作

aggregate()函数返回值类型不必与所操作的RDD类型相同。

与fold()类似,使用aggregate()时,需要提供我们期待返回的类型的初始值。然后通过一个函数把RDD中的元素合并起来放入累加器。考虑到每个节点是在本地进行累加的,最终,还需要提供第二个函数来将累加器两两合并。

以下是程序实例:

val rdd=sc.parallelize(List(1,2,3,4,5,6,7,8,9,10));

val result=rdd.aggregate((0,0))(

(acc,value)=>(acc._1+value,acc._2+1),

(acc1,acc2)=>(acc1._1+acc2._1, acc1._2+acc2._2)

)

val average=result._1/result._2;

println(average)

输出:5

最终返回的是一个Tuple2<int,int>对象,他被初始化为(0,0),当遇到一个int值时,将该int数的值加到Tuple2对象的_1中,并将_2值加1,如果遇到一个Tuple2对象时,将这个Tuple2的_1和_2的值归并到最终返回的Tuple2值中去。

表格:对一个数据为{1,2,3,3}的RDD进行基本的RDD行动操作

函数名目的示例结果

collect()返回RDD的所有元素 rdd.collect(){1,2,3,3}

count() RDD的元素个数 rdd.count() 4

countByValue()各元素在RDD中出现的次数 rdd.countByValue(){(1,1),

(2,1),

(3,2)

}

take(num)从RDD中返回num个元素 rdd.take(2){1,2}

top(num)从RDD中返回最前面的num个元素 rdd.takeOrdered(2)(myOrdering){3,3}

takeOrdered(num)

(ordering)从RDD中按照提供的顺序返回最前面的num个元素

rdd.takeSample(false,1)非确定的

takeSample(withReplacement,num,[seed])从RDD中返回任意一些元素 rdd.takeSample(false,1)非确定的

reduce(func)并行整合RDD中所有数据 rdd.reduce((x,y)=> x+y)

9

fold(zero)(func)和reduce()一样,但是需要提供初始值 rdd.fold(0)((x,y)=> x+y)

9

aggregate(zeroValue)(seqOp,combOp)和reduce()相似,但是通常返回不同类型的函数 rdd.aggregate((0,0))

((x,y)=>

(x._1+y,x._2+1),

(x,y)=>

(x._1+y._1,x._2+y._2)

)(9,4)

foreach(func)对RDD中的每个元素使用给定的函数 rdd.foreach(func)无

8.持久化缓存

因为Spark RDD是惰性求值的,而有时我们希望能多次使用同一个RDD。如果简单地对RDD调用行动操作,Spark每次都会重算RDD以及它的所有依赖。这在迭代算法中消耗格外大,因为迭代算法常常会多次使用同一组数据。

为了避免多次计算同一个RDD,可以让Spark对数据进行持久化。当我们让Spark持久化存储一个RDD时,计算出RDD的节点会分别保存它们所求出的分区数据。

出于不同的目的,我们可以为RDD选择不同的持久化级别。默认情况下persist()会把数据以序列化的形式缓存在JVM的堆空间中

不同关键字对应的存储级别表

级别

使用的空间

cpu时间

是否在内存

是否在磁盘

备注

MEMORY_ONLY

直接储存在内存

MEMORY_ONLY_SER

序列化后储存在内存里

MEMORY_AND_DISK

中等

部分

部分

如果数据在内存中放不下,溢写在磁盘上

MEMORY_AND_DISK_SER

部分

部分

数据在内存中放不下,溢写在磁盘中。内存中存放序列化的数据。

DISK_ONLY

直接储存在硬盘里面

程序示例:将RDD数据集持久化在内存中。

val rdd=sc.parallelize(List(1,2,3,4,5,6,7,8,9,10)).persist(StorageLevel.MEMORY_ONLY);

println(rdd.count())

println(rdd.collect().mkString(","));

RDD还有unpersist()方法,调用该方法可以手动把持久化的RDD从缓存中移除。

9.不同的RDD类型

在scala中,将RDD转为由特定函数的RDD(比如在RDD[Double]上进行数值操作),是由隐式转换来自动处理的。这些隐式转换可以隐式地将一个RDD转为各种封装类,比如DoubleRDDFunctions(数值数据的RDD)和PairRDDFunctions(键值对RDD),这样我们就有了诸如mean()和variance()之类的额外的函数。

示例程序:

val rdd=sc.parallelize(List(1.0,2.0,3.0,4.0,5.0));

println(rdd.mean());

其实RDD[T]中并没有mean()函数,只是隐式转换自动将其转换为DoubleRDDFunctions。

好了,关于java rdd 是什么和spark和java的关系的问题到这里结束啦,希望可以解决您的问题哈!

java jpanel 是什么 JPanel的定义及作用是什么,清楚举例子解释一下更好java项目上线什么意思(项目上线是什么意思)