Apache Spark: Perbedaan antara Dataframe, Datasets, dan RDD

1. Ikhtisar

Apache Spark adalah sistem pemrosesan data yang cepat dan terdistribusi. Itu melakukan pemrosesan data dalam memori dan menggunakan cache dalam memori dan eksekusi yang dioptimalkan menghasilkan kinerja yang cepat. Ini menyediakan API tingkat tinggi untuk bahasa pemrograman populer seperti Scala, Python, Java, dan R.

Dalam tutorial singkat ini, kita akan membahas tiga konsep dasar Spark: kerangka data, kumpulan data, dan RDD.

2. DataFrame

Spark SQL memperkenalkan abstraksi data tabular yang disebut DataFrame sejak Spark 1.3. Sejak itu, ini telah menjadi salah satu fitur terpenting di Spark. API ini berguna ketika kita ingin menangani data terstruktur dan semi-terstruktur yang terdistribusi.

Di bagian 3, kita akan membahas Set Data Terdistribusi Tangguh (RDD). DataFrames menyimpan data dengan cara yang lebih efisien daripada RDD, ini karena mereka menggunakan kemampuan RDD yang tidak dapat diubah, dalam memori, tangguh, terdistribusi, dan paralel, tetapi mereka juga menerapkan skema ke data. DataFrames juga menerjemahkan kode SQL ke dalam operasi RDD tingkat rendah yang dioptimalkan.

Kita dapat membuat DataFrames dengan tiga cara:

  • Mengonversi RDD yang ada
  • Menjalankan kueri SQL
  • Memuat data eksternal

Tim Spark memperkenalkan SparkSession dalam versi 2.0, ini menyatukan semua konteks yang berbeda memastikan pengembang tidak perlu khawatir tentang membuat konteks yang berbeda:

SparkSession session = SparkSession.builder() .appName("TouristDataFrameExample") .master("local[*]") .getOrCreate(); DataFrameReader dataFrameReader = session.read();

Kami akan menganalisis file Tourist.csv :

Dataset data = dataFrameReader.option("header", "true") .csv("data/Tourist.csv");

Karena Spark 2.0 DataFrame menjadi Kumpulan Data jenis Baris , jadi kita bisa menggunakan DataFrame sebagai alias untuk Set Data .

Kita dapat memilih kolom tertentu yang kita minati. Kita juga dapat memfilter dan mengelompokkan berdasarkan kolom tertentu:

data.select(col("country"), col("year"), col("value")) .show(); data.filter(col("country").equalTo("Mexico")) .show(); data.groupBy(col("country")) .count() .show();

3. Kumpulan Data

Kumpulan data adalah kumpulan data terstruktur dengan tipe kuat . Mereka menyediakan gaya pemrograman berorientasi objek yang sudah dikenal ditambah manfaat keamanan tipe karena kumpulan data dapat memeriksa sintaks dan menangkap kesalahan pada waktu kompilasi.

Dataset adalah ekstensi dari DataFrame, sehingga kita dapat menganggap DataFrame sebagai tampilan set data yang tidak diketik.

Tim Spark merilis API Set Data di Spark 1.6 dan seperti yang mereka sebutkan: "tujuan Set Data Spark adalah untuk menyediakan API yang memungkinkan pengguna untuk dengan mudah mengekspresikan transformasi pada domain objek, sekaligus memberikan keunggulan kinerja dan ketahanan dari eksekusi SQL Spark mesin".

Pertama, kita perlu membuat kelas bertipe TouristData :

public class TouristData { private String region; private String country; private String year; private String series; private Double value; private String footnotes; private String source; // ... getters and setters }

Untuk memetakan setiap record kita ke tipe yang ditentukan kita perlu menggunakan Encoder. Pembuat enkode menerjemahkan antara objek Java dan format biner internal Spark :

// SparkSession initialization and data load Dataset responseWithSelectedColumns = data.select(col("region"), col("country"), col("year"), col("series"), col("value").cast("double"), col("footnotes"), col("source")); Dataset typedDataset = responseWithSelectedColumns .as(Encoders.bean(TouristData.class));

Seperti DataFrame, kita dapat memfilter dan mengelompokkan berdasarkan kolom tertentu:

typedDataset.filter((FilterFunction) record -> record.getCountry() .equals("Norway")) .show(); typedDataset.groupBy(typedDataset.col("country")) .count() .show();

Kita juga dapat melakukan operasi seperti memfilter dengan mencocokkan kolom dengan rentang tertentu atau menghitung jumlah kolom tertentu, untuk mendapatkan nilai totalnya:

typedDataset.filter((FilterFunction) record -> record.getYear() != null && (Long.valueOf(record.getYear()) > 2010 && Long.valueOf(record.getYear())  record.getValue() != null && record.getSeries() .contains("expenditure")) .groupBy("country") .agg(sum("value")) .show();

4. RDD

Set Data Terdistribusi Tangguh atau RDD adalah abstraksi pemrograman utama Spark. Ini mewakili kumpulan elemen yang: tidak berubah, tangguh, dan didistribusikan .

RDD merangkum kumpulan data besar, Spark akan secara otomatis mendistribusikan data yang terdapat dalam RDD di seluruh cluster kami dan memparalelkan operasi yang kami lakukan padanya .

Kami dapat membuat RDD hanya melalui operasi data dalam penyimpanan stabil atau operasi pada RDD lain.

Toleransi kesalahan sangat penting ketika kita menangani kumpulan data yang besar dan data didistribusikan pada mesin cluster. RDD tangguh karena mekanisme pemulihan kesalahan bawaan Spark. Spark mengandalkan fakta bahwa RDD mengingat bagaimana mereka dibuat sehingga kita dapat dengan mudah melacak kembali garis keturunan untuk memulihkan partisi .

Ada dua jenis operasi yang dapat kita lakukan di RDD: Transformasi dan Tindakan .

4.1. Transformasi

Kita dapat menerapkan Transformasi ke RDD untuk memanipulasi datanya. Setelah manipulasi ini dilakukan, kita akan mendapatkan RDD baru, karena RDD adalah objek yang tidak dapat diubah .

Kami akan memeriksa cara menerapkan Peta dan Filter, dua transformasi paling umum.

Pertama, kita perlu membuat JavaSparkContext dan memuat data sebagai RDD dari file Tourist.csv :

SparkConf conf = new SparkConf().setAppName("uppercaseCountries") .setMaster("local[*]"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD tourists = sc.textFile("data/Tourist.csv");

Selanjutnya, mari terapkan fungsi peta untuk mendapatkan nama negara dari setiap record dan ubah namanya menjadi huruf besar. Kami dapat menyimpan kumpulan data yang baru dibuat ini sebagai file teks di disk:

JavaRDD upperCaseCountries = tourists.map(line -> { String[] columns = line.split(COMMA_DELIMITER); return columns[1].toUpperCase(); }).distinct(); upperCaseCountries.saveAsTextFile("data/output/uppercase.txt");

If we want to select only a specific country, we can apply the filter function on our original tourists RDD:

JavaRDD touristsInMexico = tourists .filter(line -> line.split(COMMA_DELIMITER)[1].equals("Mexico")); touristsInMexico.saveAsTextFile("data/output/touristInMexico.txt");

4.2. Actions

Actions will return a final value or save the results to disc, after doing some computation on the data.

Two of the recurrently used actions in Spark are Count and Reduce.

Let's count the total countries on our CSV file:

// Spark Context initialization and data load JavaRDD countries = tourists.map(line -> { String[] columns = line.split(COMMA_DELIMITER); return columns[1]; }).distinct(); Long numberOfCountries = countries.count();

Now, we'll calculate the total expenditure by country. We'll need to filter the records containing expenditure in their description.

Instead of using a JavaRDD, we'll use a JavaPairRDD. A pair of RDD is a type of RDD that can store key-value pairs. Let's check it next:

JavaRDD touristsExpenditure = tourists .filter(line -> line.split(COMMA_DELIMITER)[3].contains("expenditure")); JavaPairRDD expenditurePairRdd = touristsExpenditure .mapToPair(line -> { String[] columns = line.split(COMMA_DELIMITER); return new Tuple2(columns[1], Double.valueOf(columns[6])); }); List
    
      totalByCountry = expenditurePairRdd .reduceByKey((x, y) -> x + y) .collect();
    

5. Conclusion

Singkatnya, kita harus menggunakan DataFrames atau Datasets ketika kita membutuhkan API khusus domain, kita membutuhkan ekspresi tingkat tinggi seperti agregasi, jumlah, atau kueri SQL. Atau ketika kita menginginkan keamanan tipe pada waktu kompilasi.

Di sisi lain, kita harus menggunakan RDD ketika data tidak terstruktur dan kita tidak perlu mengimplementasikan skema tertentu atau ketika kita membutuhkan transformasi dan tindakan tingkat rendah.

Seperti biasa, semua contoh kode tersedia di GitHub.