Spark

Apache Spark steht für 'Lightning-Fast Cluster Computing'.

Im Vergleich zu Hadoop läuft Map-Reduce auf Spark 100x schneller. Das zentrale Datenobjekt von Spark 2.x ist das Dataset (DataFrame in Spark 1.x). Das Dataset ist das Spark-Analogon zur Tabelle einer relationalen Datenbank. Das Dataset wird im Speicher des Spark-Clusters gespeichert, alle Operationen darauf sind also In-Memory.

Unterschied zu analytischen Datenbanken

Analytischen Datenbanken wie Netezza oder Exasol persistieren die Daten auf die Festplatte und laden sie für den Anwender transparent in den Speicher. Spark ist rein Speicher-basiert. Sollen Teilergebnisse, wie zum Beispiel geladene oder gecleante Daten persistiert werden, so muss dies bei Hadoop / Spark explizit getan werden. Im direkten Vergleich der Performance bei vergleichbaren Servern mögen die analytischen Datenbanken schneller sein, dieser Punkt tritt aber in den Hintergrund, da dem Hadoop / Spark - Cluster normalerweise aufgrund der fehlenden Lizenzkosten deutlich mehr Hardware zur Verfügung steht.

Programmiersprachen

Die folgenden Programmiersprachen bieten sich an, um mit dem Spark-Framework zu arbeiten:

  • Scala
  • Python
  • Java
  • R

Erstellen eines DataSets in Scala:

// Register the DataFrame as a global temporary view
df.createGlobalTempView("people")

// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

Erstellen eines Datasets in Java:

 

import java.util.Arrays;
import java.util.Collections;
import java.io.Serializable;

import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;

public static class Person implements Serializable {
  private String name;
  private int age;

  public String getName() {
    return name;
  }

  public void setName(String name) {
    this.name = name;
  }

  public int getAge() {
    return age;
  }

  public void setAge(int age) {
    this.age = age;
  }
}

// Create an instance of a Bean class
Person person = new Person();
person.setName("Andy");
person.setAge(32);

// Encoders are created for Java beans
Encoder<Person> personEncoder = Encoders.bean(Person.class);
Dataset<Person> javaBeanDS = spark.createDataset(
  Collections.singletonList(person),
  personEncoder
);
javaBeanDS.show();
// +---+----+
// |age|name|
// +---+----+
// | 32|Andy|
// +---+----+

// Encoders for most common types are provided in class Encoders
Encoder<Integer> integerEncoder = Encoders.INT();
Dataset<Integer> primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder);
Dataset<Integer> transformedDS = primitiveDS.map(
    (MapFunction<Integer, Integer>) value -> value + 1,
    integerEncoder);
transformedDS.collect(); // Returns [2, 3, 4]

// DataFrames can be converted to a Dataset by providing a class. Mapping based on name
String path = "examples/src/main/resources/people.json";
Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder);
peopleDS.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+