Resilient Distributed Datasets (RDDs)
Resilient Distributed Datasets (RDD) là một cấu trúc dữ liệu cơ bản của Spark. Nó là một tập hợp bất biến phân tán của một đối tượng có thể hoạt động song song.
Có hai cách để tạo RDDs:
- Tạo từ một tập hợp dữ liệu có sẵn trong ngôn ngữ sử dụng như Java, Python, Scala.
- Lấy từ dataset hệ thống lưu trữ bên ngoài như HDFS, Hbase hoặc các cơ sở dữ liệu quan hệ.
Ví dụ tạo RDD từ 1 List với Java:
1
2
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);
Ví dụ tạo RDD từ 1 file text:
1
JavaRDD<String> distFile = sc.textFile("data.txt");
RDD Operations
RDD hỗ trợ 2 loại operations:
- transformations: tạo tập dữ liệu mới từ tập dữ liệu hiện có
- actions: trả về giá trị cho chương trình điều khiển sau khi thực hiện tính toán trên tập dữ liệu
Tất cả transformations
trong Spark đều lazy, đây cũng là một tính chất quan trọng quyết định tới hiệu năng của Spark và đi phỏng vấn hay thi câu này khá được chú ý tới. Các transformations
không tính toán kết quả ngay lập tức, thay vào đó nó nhớ phép biến đổi được áp dụng cho tập dữ liệu. Transformations chỉ thực hiện tính toán khi actions
yêu cầu một kết quả trả về.
Transformations
Transformation | Ý nghĩa |
---|---|
map(func) | Trả về một tập dữ liệu mới bằng cách chuyển đổi mỗi phần tử cũ sang từng phần tử mới thông qua func |
filter(func) | Trả về một tập dữ liệu mới nếu các phần tử trong tập dữ liệu được chọn có func trả về giá trị true |
flatMap(func) | Giống mới map, tuy nhiên map là chuyển đổi 1-1 còn với flatMap thì có thể là chuyển đổi 1-N, 1-1, 1-0 |
union(otherDataset) | Trả về tập dữ liệu mới là hợp của 2 tập dữ liệu |
intersection(otherDataset) | Trả về tập dữ liệu mới là hợp của 2 tập dữ liệu |
distinct([numPartitions])) | Trả về tập dữ liệu mới với các phần tử riêng biệt |
groupByKey([numPartitions]) | Khi tập dữ liệu ban đầu có dạng cặp (K,V), trả về tập dữ liệu khác có dạng là 1 cặp (K, Iterable |
reduceByKey(func, [numPartitions]) | Tập dữ liệu ban đầu có dạng là 1 cặp (K,V) và kết quả cho ra là 1 cặp (K,V) khác, trong đó giá trị V được tổng hợp từ hàm func. Chúng ta hiểu đơn giản, reduceByKey giống so mới groupByKey, tuy nhiên groupByKey chỉ giúp nhóm các Value lại trong khi đó reduceByKey có thêm 1 hàm func để thực hiện phép toán tổng hợp các Value đó và trả về kết quả Value duy nhất |
sortByKey([ascending], [numPartitions]) | Tập dữ liệu ban đầu có dạng 1 cặp (K,V), trả về 1 cặp (K,V) khác với các K đã được sắp xếp tăng dần hoặc giảm dần |
Còn 1 số các Transformations khác mà do chúng không phổ biến và không được sử dụng thường xuyên nên mình không đề cập ở đây. Nếu bạn muốn tìm hiểu thêm thì đọc tại Spark docs transformations
Actions
Actions | Ý nghĩa |
---|---|
reduce(func) | Tổng hợp các phần tử sử dụng hàm func (nhận vào 2 tham số và trả về một) |
collect() | Trả về tất cả các dữ liệu dưới dạng 1 mảng |
count() | Trả về sống lượng phần tử của tập dữ liệu |
first() | Trả về phần tử đầu tiên |
take(n) | Trả về 1 mảng với n phần tử đầu tiên trong mảng đó |
saveAsTextFile(path) | Ghi dữ liệu vào file text |
foreach(func) | Duyệt từng phần tử của tập hợp |
Còn 1 số các Actions khác mà do chúng không phổ biến và không được sử dụng thường xuyên nên mình không đề cập ở đây. Nếu bạn muốn tìm hiểu thêm thì đọc tại Spark docs Actions
Ví dụ một số Operations
Tạo một maven project bằng Java và thêm vào các phụ thuộc sau:
1
2
3
4
5
6
7
8
9
10
11
12
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.1.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.0</version>
</dependency>
map
Ta có một tập dữ liệu có giá trị các phần tử là 10,20,30. Bây giờ chúng ta sẽ dùng map
để gấp đôi từng phần tử trong tập dữ liệu ban đầu.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package demo;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
public class Main {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("Demo")
.setMaster("local[2]");
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
List<Integer> list = Arrays.asList(10,20,30);
JavaRDD<Integer> data = sc.parallelize(list);
data = data.map(new Function<Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1) throws Exception {
return v1 * 2;
}
});
data.collect().forEach(v -> System.out.println(v));
}
}
}
Kết quả cho ra lần lượt là:
1
2
3
10
20
30
filter
Ta có tập dữ liệu ban đầu là 10, 11, 12, 13, 14, 15. Sau đó sử dụng flatMap
để lọc ra các phần tử chia hết cho 5.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package filter;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
public class Main {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("Demo")
.setMaster("local[2]");
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
List<Integer> list = Arrays.asList(10,11,12,13,14,15);
JavaRDD<Integer> data = sc.parallelize(list);
data = data.filter(new Function<Integer, Boolean>() {
private static final long serialVersionUID = 1L;
@Override
public Boolean call(Integer v1) throws Exception {
if(v1 % 5 == 0) return true;
return false;
}
});
data.collect().forEach(v -> System.out.println(v));
}
}
}
Kết quả cho ra lần lượt là:
1
2
10
15
groupByKey
Ta có 1 tập dữ liệu ban đầu dạng (K,V). groupByKey
sẽ giúp nhóm khác phần tử cùng key lại với nhau.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package groupbykey;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
public class Main {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("Demo")
.setMaster("local[2]");
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
List<Tuple2<String, Integer>> list = Arrays.asList(
new Tuple2<String, Integer>("C", 3),
new Tuple2<String, Integer>("A", 1),
new Tuple2<String, Integer>("B", 4),
new Tuple2<String, Integer>("A", 2),
new Tuple2<String, Integer>("B", 5));
JavaPairRDD<String, Integer> data = sc.parallelizePairs(list);
data.groupByKey().collect().forEach(s -> System.out.println(s));
}
}
}
Kết quả sẽ là:
1
2
3
(B,[4, 5])
(A,[1, 2])
(C,[3])
reduceByKey
reduceByKey
ngoài việc nhóm các phần tử cùng key lại với nhau nó còn giúp thực hiện tính toán trên các Value cùng Key đó.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package reducebykey;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import scala.Tuple2;
public class Main {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("Demo")
.setMaster("local[2]");
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
List<Tuple2<String, Integer>> list = Arrays.asList(
new Tuple2<String, Integer>("C", 3),
new Tuple2<String, Integer>("A", 1),
new Tuple2<String, Integer>("B", 4),
new Tuple2<String, Integer>("A", 2),
new Tuple2<String, Integer>("B", 5));
JavaPairRDD<String, Integer> data = sc.parallelizePairs(list);
data = data.reduceByKey(new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
data.collect().forEach(v -> System.out.println(v));
}
}
}
Kết quả là:
1
2
3
(B,9)
(A,3)
(C,3)
Xem toàn bộ mã nguồn của ví dụ tại: https://github.com/demanejar/spark-rdd
Tham khảo: https://laptrinh.vn/, https://spark.apache.org/