Panduan untuk Apache Crunch

1. Perkenalan

Dalam tutorial ini, kami akan mendemonstrasikan Apache Crunch dengan contoh aplikasi pemrosesan data. Kami akan menjalankan aplikasi ini menggunakan kerangka kerja MapReduce.

Kami akan mulai dengan membahas secara singkat beberapa konsep Apache Crunch. Kemudian kita akan beralih ke aplikasi sampel. Di aplikasi ini kami akan melakukan pemrosesan teks:

  • Pertama-tama, kita akan membaca baris dari file teks
  • Nanti, kami akan membaginya menjadi kata-kata dan menghapus beberapa kata umum
  • Kemudian, kami akan mengelompokkan kata yang tersisa untuk mendapatkan daftar kata unik dan jumlahnya
  • Terakhir, kami akan menulis daftar ini ke file teks

2. Apa Itu Crunch?

MapReduce adalah kerangka kerja pemrograman paralel terdistribusi untuk memproses data dalam jumlah besar pada sekelompok server. Kerangka kerja perangkat lunak seperti Hadoop dan Spark mengimplementasikan MapReduce.

Crunch menyediakan kerangka kerja untuk menulis, menguji, dan menjalankan pipeline MapReduce di Java. Di sini, kami tidak menulis pekerjaan MapReduce secara langsung. Sebaliknya, kami mendefinisikan pipeline data (yaitu operasi untuk melakukan langkah input, pemrosesan, dan output) menggunakan Crunch API. Crunch Planner memetakannya ke pekerjaan MapReduce dan menjalankannya saat diperlukan.

Oleh karena itu, setiap pipeline data Crunch dikoordinasikan oleh sebuah instance dari antarmuka Pipeline . Antarmuka ini juga menentukan metode untuk membaca data ke dalam pipeline melalui instance Source dan menulis data dari pipeline ke instance Target .

Kami memiliki 3 antarmuka untuk mewakili data:

  1. PCollection - kumpulan elemen yang tidak berubah dan terdistribusi
  2. PTable , V > - multi-peta kunci dan nilai yang tidak dapat diubah, didistribusikan, dan tidak beraturan
  3. PGroupedTable , V > - peta kunci tipe K yang didistribusikan dan diurutkan ke Iterable V yang dapat diiterasi tepat satu kali

DoFn adalah kelas dasar untuk semua fungsi pemrosesan data . Ini sesuai dengan kelas Mapper , Reducer ,dan Combiner di MapReduce. Kami menghabiskan sebagian besar waktu pengembangan untuk menulis dan menguji komputasi logis menggunakannya .

Sekarang kita sudah lebih familiar dengan Crunch, mari kita gunakan untuk membangun aplikasi contoh.

3. Menyiapkan Proyek Crunch

Pertama-tama, mari buat Proyek Crunch dengan Maven. Kita dapat melakukannya dengan dua cara:

  1. Tambahkan dependensi yang diperlukan dalam file pom.xml dari proyek yang ada
  2. Gunakan arketipe untuk menghasilkan proyek awal

Mari kita lihat sekilas kedua pendekatan tersebut.

3.1. Dependensi Maven

Untuk menambahkan Crunch ke proyek yang sudah ada, mari tambahkan dependensi yang diperlukan di file pom.xml .

Pertama, mari tambahkan pustaka crunch-core :

 org.apache.crunch crunch-core 0.15.0 

Selanjutnya, mari tambahkan pustaka klien hadoop untuk berkomunikasi dengan Hadoop. Kami menggunakan versi yang cocok dengan instalasi Hadoop:

 org.apache.hadoop hadoop-client 2.2.0 provided 

Kami dapat memeriksa Maven Central untuk versi terbaru pustaka crunch-core dan hadoop-client.

3.2. Pola Dasar Maven

Pendekatan lain adalah dengan cepat menghasilkan proyek awal menggunakan pola dasar Maven yang disediakan oleh Crunch :

mvn archetype:generate -Dfilter=org.apache.crunch:crunch-archetype 

Saat diminta oleh perintah di atas, kami memberikan versi Crunch dan detail artefak proyek.

4. Pengaturan Crunch Pipeline

Setelah menyiapkan proyek, kita perlu membuat objek Pipeline . Crunch memiliki 3 implementasi Pipeline :

  • MRPipeline - dijalankan dalam Hadoop MapReduce
  • SparkPipeline - dijalankan sebagai rangkaian pipeline Spark
  • MemPipeline - mengeksekusi dalam memori pada klien dan berguna untuk pengujian unit

Biasanya, kami mengembangkan dan menguji menggunakan instance MemPipeline . Nanti kami menggunakan instance MRPipeline atau SparkPipeline untuk eksekusi sebenarnya.

Jika kita membutuhkan pipeline dalam memori, kita bisa menggunakan metode statis getInstance untuk mendapatkan instance MemPipeline :

Pipeline pipeline = MemPipeline.getInstance();

Tapi untuk saat ini, mari buat instance MRPipeline untuk menjalankan aplikasi dengan Hadoop :

Pipeline pipeline = new MRPipeline(WordCount.class, getConf());

5. Membaca Data Masukan

Setelah membuat objek pipeline, kami ingin membaca data masukan. The Pipeline antarmuka menyediakan metode kemudahan untuk membaca input dari file teks , readTextFile (pathname).

Mari panggil metode ini untuk membaca file teks masukan:

PCollection lines = pipeline.readTextFile(inputPath);

Kode di atas membaca file teks sebagai kumpulan String .

Sebagai langkah selanjutnya, mari kita tulis kasus uji untuk membaca input:

@Test public void givenPipeLine_whenTextFileRead_thenExpectedNumberOfRecordsRead() { Pipeline pipeline = MemPipeline.getInstance(); PCollection lines = pipeline.readTextFile(INPUT_FILE_PATH); assertEquals(21, lines.asCollection() .getValue() .size()); }

Dalam tes ini, kami memverifikasi bahwa kami mendapatkan jumlah baris yang diharapkan saat membaca file teks.

6. Langkah Pengolahan Data

Setelah membaca data masukan, kita perlu memprosesnya. Crunch API berisi sejumlah subclass DoFn untuk menangani skenario pemrosesan data umum :

  • FilterFn - memfilter anggota koleksi berdasarkan kondisi boolean
  • MapFn - memetakan setiap record input ke tepat satu record output
  • CombineFn - menggabungkan sejumlah nilai menjadi satu nilai
  • JoinFn - melakukan join seperti inner join, left outer join, right outer join dan full outer join

Let's implement the following data processing logic by using these classes:

  1. Split each line in the input file into words
  2. Remove the stop words
  3. Count the unique words

6.1. Split a Line of Text Into Words

First of all, let's create the Tokenizer class to split a line into words.

We'll extend the DoFn class. This class has an abstract method called process. This method processes the input records from a PCollection and sends the output to an Emitter.

We need to implement the splitting logic in this method:

public class Tokenizer extends DoFn { private static final Splitter SPLITTER = Splitter .onPattern("\\s+") .omitEmptyStrings(); @Override public void process(String line, Emitter emitter) { for (String word : SPLITTER.split(line)) { emitter.emit(word); } } } 

In the above implementation, we've used the Splitter class from Guava library to extract words from a line.

Next, let's write a unit test for the Tokenizer class:

@RunWith(MockitoJUnitRunner.class) public class TokenizerUnitTest { @Mock private Emitter emitter; @Test public void givenTokenizer_whenLineProcessed_thenOnlyExpectedWordsEmitted() { Tokenizer splitter = new Tokenizer(); splitter.process(" hello world ", emitter); verify(emitter).emit("hello"); verify(emitter).emit("world"); verifyNoMoreInteractions(emitter); } }

The above test verifies that the correct words are returned.

Finally, let's split the lines read from the input text file using this class.

The parallelDo method of PCollection interface applies the given DoFn to all the elements and returns a new PCollection.

Let's call this method on the lines collection and pass an instance of Tokenizer:

PCollection words = lines.parallelDo(new Tokenizer(), Writables.strings()); 

As a result, we get the list of words in the input text file. We'll remove the stop words in the next step.

6.2. Remove Stop Words

Similarly to the previous step, let's create a StopWordFilter class to filter out stop words.

However, we'll extend FilterFn instead of DoFn. FilterFn has an abstract method called accept. We need to implement the filtering logic in this method:

public class StopWordFilter extends FilterFn { // English stop words, borrowed from Lucene. private static final Set STOP_WORDS = ImmutableSet .copyOf(new String[] { "a", "and", "are", "as", "at", "be", "but", "by", "for", "if", "in", "into", "is", "it", "no", "not", "of", "on", "or", "s", "such", "t", "that", "the", "their", "then", "there", "these", "they", "this", "to", "was", "will", "with" }); @Override public boolean accept(String word) { return !STOP_WORDS.contains(word); } }

Next, let's write the unit test for StopWordFilter class:

public class StopWordFilterUnitTest { @Test public void givenFilter_whenStopWordPassed_thenFalseReturned() { FilterFn filter = new StopWordFilter(); assertFalse(filter.accept("the")); assertFalse(filter.accept("a")); } @Test public void givenFilter_whenNonStopWordPassed_thenTrueReturned() { FilterFn filter = new StopWordFilter(); assertTrue(filter.accept("Hello")); assertTrue(filter.accept("World")); } @Test public void givenWordCollection_whenFiltered_thenStopWordsRemoved() { PCollection words = MemPipeline .collectionOf("This", "is", "a", "test", "sentence"); PCollection noStopWords = words.filter(new StopWordFilter()); assertEquals(ImmutableList.of("This", "test", "sentence"), Lists.newArrayList(noStopWords.materialize())); } }

This test verifies that the filtering logic is performed correctly.

Finally, let's use StopWordFilter to filter the list of words generated in the previous step. The filter method of PCollection interface applies the given FilterFn to all the elements and returns a new PCollection.

Let's call this method on the words collection and pass an instance of StopWordFilter:

PCollection noStopWords = words.filter(new StopWordFilter());

As a result, we get the filtered collection of words.

6.3. Count Unique Words

After getting the filtered collection of words, we want to count how often each word occurs. PCollection interface has a number of methods to perform common aggregations:

  • min – returns the minimum element of the collection
  • max – returns the maximum element of the collection
  • length – returns the number of elements in the collection
  • count – returns a PTable that contains the count of each unique element of the collection

Let's use the count method to get the unique words along with their counts:

// The count method applies a series of Crunch primitives and returns // a map of the unique words in the input PCollection to their counts. PTable counts = noStopWords.count();

7. Specify Output

As a result of the previous steps, we have a table of words and their counts. We want to write this result to a text file. The Pipeline interface provides convenience methods to write output:

void write(PCollection collection, Target target); void write(PCollection collection, Target target, Target.WriteMode writeMode); void writeTextFile(PCollection collection, String pathName);

Therefore, let's call the writeTextFile method:

pipeline.writeTextFile(counts, outputPath); 

8. Manage Pipeline Execution

All the steps so far have just defined the data pipeline. No input has been read or processed. This is because Crunch uses lazy execution model.

It doesn't run the MapReduce jobs until a method that controls job planning and execution is invoked on the Pipeline interface:

  • run – prepares an execution plan to create the required outputs and then executes it synchronously
  • done – runs any remaining jobs required to generate outputs and then cleans up any intermediate data files created
  • runAsync – similar to run method, but executes in a non-blocking fashion

Therefore, let's call the done method to execute the pipeline as MapReduce jobs:

PipelineResult result = pipeline.done(); 

The above statement runs the MapReduce jobs to read input, process them and write the result to the output directory.

9. Putting the Pipeline Together

So far we have developed and unit tested the logic to read input data, process it and write to the output file.

Next, let's put them together to build the entire data pipeline:

public int run(String[] args) throws Exception { String inputPath = args[0]; String outputPath = args[1]; // Create an object to coordinate pipeline creation and execution. Pipeline pipeline = new MRPipeline(WordCount.class, getConf()); // Reference a given text file as a collection of Strings. PCollection lines = pipeline.readTextFile(inputPath); // Define a function that splits each line in a PCollection of Strings into // a PCollection made up of the individual words in the file. // The second argument sets the serialization format. PCollection words = lines.parallelDo(new Tokenizer(), Writables.strings()); // Take the collection of words and remove known stop words. PCollection noStopWords = words.filter(new StopWordFilter()); // The count method applies a series of Crunch primitives and returns // a map of the unique words in the input PCollection to their counts. PTable counts = noStopWords.count(); // Instruct the pipeline to write the resulting counts to a text file. pipeline.writeTextFile(counts, outputPath); // Execute the pipeline as a MapReduce. PipelineResult result = pipeline.done(); return result.succeeded() ? 0 : 1; }

10. Hadoop Launch Configuration

The data pipeline is thus ready.

However, we need the code to launch it. Therefore, let's write the main method to launch the application:

public class WordCount extends Configured implements Tool { public static void main(String[] args) throws Exception { ToolRunner.run(new Configuration(), new WordCount(), args); }

ToolRunner.run parses the Hadoop configuration from the command line and executes the MapReduce job.

11. Run Application

The complete application is now ready. Let's run the following command to build it:

mvn package 

As a result of the above command, we get the packaged application and a special job jar in the target directory.

Let's use this job jar to execute the application on Hadoop:

hadoop jar target/crunch-1.0-SNAPSHOT-job.jar 

The application reads the input file and writes the result to the output file. The output file contains unique words along with their counts similar to the following:

[Add,1] [Added,1] [Admiration,1] [Admitting,1] [Allowance,1]

In addition to Hadoop, we can run the application within IDE, as a stand-alone application or as unit tests.

12. Conclusion

In this tutorial, we created a data processing application running on MapReduce. Apache Crunch makes it easy to write, test and execute MapReduce pipelines in Java.

As usual, the full source code can be found over on Github.