diskusi.tech (beta) Community

loading...
Cover image for Berkenalan dengan Apache Beam

Berkenalan dengan Apache Beam

mimindeeptech profile image Mimin Deep Tech ・8 min read

Ditulis oleh Imre Nagi

Pada dasarnya pengolahan data bisa dilakukan dengan menggunakan mesin pengolah angka seperti Microsoft Excel, Numbers (Mac-OS). Atau jika pengguna membutuhkan visualisasi yang lebih fleksibel, pengguna juga bisa menggunakan bahasa pemograman seperti Python dengan tambahan beberapa library seperti Pandas, Numpy, dan lain-lain. Namun pada akhirnya penggunaan teknologi tersebut akan dibatasi oleh kemampuan mesin atau komputer yang digunakan untuk melakukan analisis. Jika pengguna menggunakan pinranti lunak tersebut untuk melakukan pemrosesan data yang jumlahnya sangat besar (dalam satuan GB atau TB), pemrosesan data mungkin akan diselesaikan dalam hitungan jam hingga hari. Jika kita sedang beruntung, aplikasi bisa tiba-tiba crash.

Permasalahan ini kemudian mendasari lahirnya teknologi big data yang kita kenal hari ini seperti Hadoop MapReduce, Apache Spark, Spark Streaming, Apache Storm, dan lain-lain. Masing-masing hadir dengan keunggulan dan kekurangan masing-masing. Sayangnya, Hadoop MapReduce kini mulai jarang digunakan karena Hadoop MapReduce relatif lebih lambat bila dibandingkan dengan Apache Spark yang menggunakan memori utama dari mesin pekerja (worker) dengan sebisa mungkin mengurangi operasi I/O ke disk mesin pekerja.

Alt Text
Beberapa teknologi Big Data. Gambar disadur dari matttruck.com

Selain itu, Hadoop MapReduce dan Apache Spark hanya dapat digunakan untuk melakukan Batch Processing. Sementara Apache Storm dan Spark Streaming pada umumnya digunakan untuk melakukan Stream Procsessing. Konsekuensinya adalah jika sebuah perusahaan ingin menerapkan Arsitektur Lambda yang notabene membutuhkan pemrosesan secara batch dan streaming, maka pengembang piranti lunak perlu mengurus dua teknologi berbeda, kombinasi dari MapReduce atau Apache Spark dengan Apache Storm atau Spark Streaming.

In Big Data, no one-size-fits-all, until Apache Beam comes...

Pengenalan

Apache Beam merupakan model pemograman yang digunakan untuk mendefinisikan dan mengeksekusi data processing pipeline termasuk ETL (Extract, Transform, dan Load) pada batch dan stream processing. Apache Beam sendiri merupakan implementasi dari sebuah paper tentang Dataflow model [1] yang dirancang di Google untuk menyelesaikan permasalahan komputasi pada sistem terdistribusi Google.

Dengan menggunakan SDK yang tersedia, kita kemudian dapat mendefiniskan Beam Pipeline dan kemudian melakukan eksekusi pada runner (processing backend)yang didukung oleh Apache Beam seperti Apache Apex, Apache Flink, Apache Samza, Apache Spark, and Google Cloud Dataflow.

Abstraksi

Untuk menggunakan Apache Beam, ada beberapa abstraksi dasar yang perlu dipahami.

Pipeline
Pipeline mengenkapsulasi seluruh data pemrosesan dari awal hingga akhir. Di dalam sebuah pipeline terdapat satu atau lebih proses yang membaca data masukan, satu atau lebih transformer yang akan melakukan transformasi atau komputasi pada data, dan satu atau lebih proses yang menulis data keluaran pada medium tertentu seperti berkas teks, blob, dan lain-lain. Setiap program Beam harus memiliki satu pipeline dan memiliki informasi tentang runner apa yang akan digunakan.

Alt Text
Ilustrasi Apache Beam Pipeline untuk aplikasi penghitung kata. Sumber: Dokumentasi Beam

Inisialisasi pipeline dapat dilakukan dengan cara berikut:

// Define the pipeline option 
PipelineOptions options = PipelineOptionsFactory.create();

// Create the pipeline
Pipeline p = Pipeline.create(options);
Enter fullscreen mode Exit fullscreen mode

Inisialisasi Apache Beam pipeline.

PCollection
PCollection merepresentasikan kumpulan data terdistribusi yang akan diproses oleh Apache Beam. Jika Anda adalah pengembang aplikasi Spark, maka PCollection ekivalen dengan RDD yang digunakan di Spark.

Kumpulan data yang bisa diproses oleh Apache Beam dapat dikelompokan menjadi dua bagian: Bounded dataset dan Unbounded dataset. Bounded data set merupakan data yang berasal dari sumber yang tetap seperti file atau blob. Sementara unbounded data bisa bersumber dari data yang secara kontinu dikirimkan oleh sebuah sumber seperti data subscription dari message bus atau message broker seperti Kafka, Cloud Pub/Sub dan lain-lain.

// Create the PCollection 'lines' by applying a 'Read' transform.
PCollection<String> lines = p.apply(TextIO.read().from("/path/to/some/inputData.txt"));

PCollection<String> linesGCS = p.apply(TextIO.read().from("gs://deeptech/*"));
Enter fullscreen mode Exit fullscreen mode

Bounded Text Collection dengan TextI/O

Pada umumnya, sebuah Beam Pipeline menginisialisasi PCollection dengan membaca data dari sumber eksternal, seperti berkas teks, blob objek, Kafka, Cloud Pub/Sub dan lain-lain. Namun, pengembang pipeline juga bisa menggunakan data yang dimuat di memori program atau dengan menggunakan generator sekuens.

static final List<String> LINES = Arrays.asList(
      "This is the first line",
      "You will say this one is the second", 
      "But it's not. ");
// Generating PCollection from in memory data
PCollection<String> lines = p.apply(Create.of(LINES)).setCoder(StringUtf8Coder.of())

// Generate bounded pcollection
PCollection<Long> bounded = p.apply(GenerateSequence.from(0).to(1000));  

// Generate unbounded pcollection
PCollection<Long> unbounded = p.apply(GenerateSequence.from(0));
Enter fullscreen mode Exit fullscreen mode

PTransform
PTransform merupakan abstraksi transformasi data pada Apache Beam. PTransform bisa memiliki satu atau lebih step yang digunakan untuk melakukan transformasi tertentu. Setiap PTransform menerima satu atau lebih masukan PCollection dari sebuah objek, melakukan pemrosesan yang telah didefinisikan dan menghasilkan keluaran dalam bentuk PCollection objek baru.

public class Greeting extends PTransform<PCollection<String>, PCollection<String>> {
    @Override
    public PCollection<String> expand(PCollection<String> input) {
        return input
                .apply("Hello", ParDo.of(new HelloDoFn()))
                .apply("Ask", ParDo.of(new HowAreYouDoFn()));
    }
}

public class HelloDoFn extends DoFn<String, String> {
    @ProcessElement
    public void processElement(ProcessContext context) {
        String name = context.element();
        context.output("Hello, " + name + " ! ");
    }
}

public class HowAreYouDoFn extends DoFn<String, String> {
    @ProcessElement
    public void processElement(ProcessContext context) {
        String hello = context.element();
        context.output(hello + "How are you?");
    }
}
Enter fullscreen mode Exit fullscreen mode

Pada contoh diatas, kita memiliki Greeting transform yang menerima masukan PCollection<String> dan akan menghasilkan keluaran PCollection<String> baru. Di dalam PTransform Greeting kita memiliki dua DoFn yaitu HelloDoFn dan HowAreYouDoFn yang masing-masingnya akan menambahkan string berbeda pada setiap step. Selain itu, Beam PTransform juga mendukung nested PTransform. yang berarti PTransform dapat menggunakan PTransform lain selama tipe data yang digunakan atau dihasilkan kompatibel dengan PCollection sebelum atau sesudahnya.

public class Greeting extends PTransform<PCollection<String>, PCollection<String>> {
        @Override
        public PCollection<String> expand(PCollection<String> input) {
            return input
                    .apply("Hello", ParDo.of(new HelloDoFn()))
                    .....
                    .apply(new AnotherPTransform()); //Nested Transformation
        }
}

public class AnotherPTransform extends PTransform<PCollection<String>, PCollection<String>> {
        ...
}
Enter fullscreen mode Exit fullscreen mode

I/O Transforms
I/O Transform merupakan kumpulan PTransform yang digunakan untuk membaca atau menulis data ke berbagai macam media penyimpanan eksternal. Berikut beberapa I/O Transform yang sering digunakan:

  1. TextIO: digunakan untuk membaca dan menulis data dari dan ke sebuah berkas teks yang disimpan pada file sistem di lokal ataupun di cloud seperti GCS, S3, dan lain-lain.

  2. KafkaIO, PubsubIO: digunakan menerima dan mengirimkan data ke sebuah message broker seperti Kafka atau Cloud Pub/Sub.

// Subscribe Pub/Sub data with PubsubIO
PCollection<String> subs = p.
  apply("YoutubeSubscriptionEvent", PubsubIO.readStrings()
    .fromSubscription("projects/deeptech/subscriptions/youtubesubscription));
Enter fullscreen mode Exit fullscreen mode
  1. BigqueryIO, BigtableIO, JdbcIO: digunakan untuk membaca dan menulis data ke basis data seperti Bigquery, Cloud Bigtable atau SQL database.
  2. Dan masih banyak IO Transform lainnya disini

Hal Menarik Dari Apache Beam

Arsitektur Lambda mengajarkan paradigma bahwa streaming analitik adalah adalah sistem yang sangat tidak akurat. Streaming analitik hanya mampu mengestimasi sebuah kejadian dalam rentang waktu tertentu. Oleh karena itu, pada arsitektur lambda, akurasi ini diperbaiki dengan adanya batch processing yang dijalankan secara periodik (setiap jam, harian, atau bahkan sekali sebulan).

Namun, yang menarik adalah real streaming pipeline pada Apache Beam sendiri memungkinkan kita untuk melakukan realtime analytics dengan akurasi yang menyerupai batch processing. Bagaimana? Pada Apache Beam terdapat dua fitur lanjutan berikut:

Windowing
Windowing membagi PCollection berdasarkan timestamp dari setiap elemennya. Transformasi yang mengagregasi beberapa elemenet seperti GroupByKey dan Combine , bekerja secara implisit pada setiap basis window. Selain itu, windowing juga bisa digunakan pada bounded maupun unbounded PCollection.

Alt Text
Contoh penggunaan Windowing pada Unbounded PCollection. Sumber: Dokumentasi Apache Beam

Ada beberapa jenis windowing yang dapat digunakan:

  • Fixed Window
  • Sliding Window
  • Session Window

Alt Text
Ilustrasi Fixed Window. Sumber: Dokumentasi Apache Beam

Triggering
Apache Beam menggunakan trigger untuk menentukan kapan hasil dari sebuah aggregasi akan dikeluarkan untuk dikonsumsi oleh transformasi berikutnya. By default, apache beam trigger akan mengeluarkan hasil aggregasi pada saat beam mengestimasi bahwa semua data telah tiba untuk periode window tertentu. Namun, hal ini tentu dapat dikonfigurasi agar lebih fleksible dan sesuai dengan kebutuhan pengembang.

Alt Text
Ilustrasi pada event fixed window. Sumber: Dokumentasi Apache Beam

// Ilustrasi trigger pada Apache Beam (Accumulating Mode)
First trigger firing:  [5, 8, 3]
Second trigger firing: [5, 8, 3, 15, 19, 23]
Third trigger firing:  [5, 8, 3, 15, 19, 23, 9, 13, 10]
Enter fullscreen mode Exit fullscreen mode

Managing Late Data
Sebuah event bisa jadi datang tepat waktu pada window yang sama atau bahkan bisa datang setelah window tersebut berakhir. Hal ini sangat wajar realtime processing. Untuk mengatasi ini, Apache Beam memiliki fitur untuk memperbolehkan data yang terlambat untuk masuk dalam analisis dengan menggunakan API.withAllowedLateness pada strategi windowing yang digunakan.

Pada contoh kode berikut, beam menggunakan strategi fixed window dengan interval 1 menit dan memberikan tolerasi terhadap data yang terlambat hingga dua hari kedepan.

PCollection<String> items = ...;
    PCollection<String> fixedWindowedItems = items.apply(
        Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
              .withAllowedLateness(Duration.standardDays(2)));
Enter fullscreen mode Exit fullscreen mode

Jika ditemukan adanya data yang terlambat, maka pada akhir window tersebut seluruh data (termasuk late data) akan diagregasi kembali sehingga kita bisa mendapatkan hasil yang lebih akurat untuk window tersebut.

Pola Umum Penggunaan Apache Beam

Pipeline linear
Mode linear merupakan mode paling sederhana dari sebuah beam pipeline dimana masukan PCollection ditransformasikan oleh satu atau lebih PTransform secara berurutan.

Alt Text

Satu transform dengan multi keluaran
Pada pola satu transform dengan multi keluaran, sebuah PTransform bisa menghasilkan keluaran beberapa PCollection berbeda. Pola penggunaan ini misalnya dapat digunakan ketika pengembang aplikasi memiliki keluaran sampingan seperti error atau ketika PTransform mengklasifikasikan masukan ke beberapa PCollection keluaran yang berbeda.

Alt Text

final TupleTag<String> dataTag = new TupleTag<String>() {};
final TupleTag<String> errorTag = new TupleTag<String>() {};

PCollectionTuple mixedCollection =
  names.apply(ParDo
    .of(new DoFn<String, String>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        if (c.element().startsWith("Foo")) {
          c.output(c.element());
        } else if (c.element().startsWith("Bar")) {
          c.output(errorTag, c.element());
        }
      }
    }).withOutputTags(dataTag, TupleTagList.of(errorTag)));

mixedCollection.get(dataTag).apply(...);
mixedCollection.get(errorTag).apply(...);
Enter fullscreen mode Exit fullscreen mode

Pada contoh ini, keluaran transformasi yang dihasilkan adalah sebuah PCollectionTuple yang menyimpan seluruh PCollection yang dihasilkan oleh PTransform.

Multi transform untuk sebuah PCollection
Alt Text
Keluaran tambahan untuk memisahkan data dengan kriteria tertentu. Sumber: Dokumentasi Beam

Pola penggunaan ini pada dasarnya dapat menghasilkan keluaran yang sama dengan pola penggunaan sebelumnya (satu transform dengan multi keluaran). Namun, pada pola ini alih-alih digunakan satu PTransform, digunakan dua PTransform yang berbeda pada satu PCollection yang sama.

PCollection<String> names = ...;

PCollection<String> fooCollection = names.apply("FooTrans", ParDo.of(new DoFn<String, String>(){
  @ProcessElement
  public void processElement(ProcessContext c) {
    if(c.element().startsWith("Foo")){
      c.output(c.element());
    }
  }}));

PCollection<String> errCollection = names.apply("BarTrans", ParDo.of(new DoFn<String, String>(){
  @ProcessElement
  public void processElement(ProcessContext c) {
    if(c.element().startsWith("Bar")){
      c.output(c.element());
    }
  }
}));
Enter fullscreen mode Exit fullscreen mode

Menggabungkan beberapa PCollection
Pada pola ini, pengguna dapat menggabungkan dua atau lebih PCollection dengan tipe data yang sama menjadi sebuah PCollection.

Alt Text

//merge the two PCollections 
PCollectionList<String> collectionList = PCollectionList.of(aCollection).and(bCollection);

//flatten
PCollection<String> mergedCollectionWithFlatten = collectionList.apply(Flatten.<String>pCollections());
Enter fullscreen mode Exit fullscreen mode

Join Beberapa Sumber Data
Pola penggunaan ini menyerupai operasi Join yang dimiliki oleh SQL. Pada ilustrasi dibawah, sumber data 1 (basis data) menghasilkan keluaran PCollection dari pasangan key-value nama dan alamat. Sementara sumber data 2 (berkas teks) menghasilkan keluaran PCollection dari pasangan key-value nama dan pesanan barang tertentu. Operasi transformasi Join pada Apache Beam akan menghasilkan PCollection dari pasangan key-value nama, alamat dan nama pesanan.

Alt Text

Minimal WordCount

public class MinimalWordCount {

  public static void main(String[] args) {
    PipelineOptions options = PipelineOptionsFactory.create();
    Pipeline p = Pipeline.create(options);

    p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))
        .apply(
            FlatMapElements.into(TypeDescriptors.strings())
                .via((String word) -> Arrays.asList(word.split("[^\\p{L}]+"))))
        .apply(Filter.by((String word) -> !word.isEmpty()))
        .apply(Count.perElement())
        .apply(
            MapElements.into(TypeDescriptors.strings())
                .via(
                    (KV<String, Long> wordCount) ->
                        wordCount.getKey() + ": " + wordCount.getValue()))
        .apply(TextIO.write().to("wordcounts"));

    p.run();
  }
}
Enter fullscreen mode Exit fullscreen mode

Kesimpulan

  • Apache beam dapat digunakan untuk batch dan stream processing.
  • Dengan menggunakan fitur Windowing dan Triggering, apache beam dapat melakukan realtime analitik dengan hasil yang lebih akurat.

Referensi

  1. http://www.vldb.org/pvldb/vol8/p1792-Akidau.pdf
  2. Streaming 101 https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101
  3. Streaming 102 https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102

Discussion

pic
Editor guide