Scala pour Apache Spark - Cheatsheet

Nous présentons ici une aide-mémoire de codes en scala pour gérer des traitements de données distribués à l'aide d'Apache Spark.

Scala pour Apache Spark - Cheatsheet

Vous trouverez dans cet article un ensemble de bouts de code, trucs et astuces et retours d'expérience sur l'utilisation d'Apache Spark en Scala.

Créer un objet exécutable comme une app

object MyApp {
  def main(args: Array[String]) {
    // Le code ici sera exécuté
  }
}

ou

object MyApp extends App {
 // le code ici sera exécuté
}

Créer un contexte spark


  def getSC(appName: String , master: String = "local"): SparkContext = {
    val conf = new SparkConf()
      .setAppName(appName)
      .setMaster(master)
      .set("spark.hadoop.validateOutputSpecs", "false")
    new SparkContext(conf)
  }

Créer une session Spark

  def getSS(appName: String , master: String = "local"): SparkSession = {
    SparkSession
      .builder
      .appName(appName)
      .master(master)
      .config("spark.sql.warehouse.dir", "file:///tmp") // A utiliser pour contourner un bug de Spark 2.0.0 sous Windows
      .getOrCreate()
  }

Créer un streaming context

 def getStreamingContext(appName: String, duration: Duration, master: String = "local"): StreamingContext = {

    val conf = new SparkConf()
      .setAppName(appName)
      .setMaster(master)
    new StreamingContext(conf, duration)
 }

Note : Spark Streaming ne fait pas du vrai streaming, mais plutôt regroupe des événements dans de micro-batch. Duration est l'intervalle entre deux batches

Créer un RDD à partir d'un objet scala (pour tester mon algo)

object MyApp extends App {
  val sc = SparkUtils.getSC("CountByValue")
  
  val list = List("value1", "value2", "value3", "value4", "value4")
  
  val rdd: RDD[String] = sc.parallelize(list)
}

Le parallelize crée un RDD qui sera distribué dans Spark (si vous lancez le job sur un cluster). Bien sûr en général on travaille sur de très grosses quantité de données que l'on charge ou reçoit d'une source de données externes à l'application.

Parrallelize est très utile pendant le développement pour tester notre algo dans une session "local".

Charger des données à partir d'un fichier

object WordCountReduceByKey extends App {
  val sc = SparkUtils.getSC(this.getClass.getName)
  val lines: RDD[String] = sc.textFile("hdfs://spark/my-data/ddh.txt")
  // It could have been "file://spark/my-data/ddh.txt"
  
  println(lines.count)
}

Note 1 : le fichier texte est en général placé sur un stockage distribué, type Hadoop HDFS

Note 2 : c'est un super outil pour le développement pour charger des données d'un fichier local aussi

countByValue

object MyApp extends App {
  val sc = SparkUtils.getSC("CountByValue")
  
  val list = List("value1", "value2", "value3", "value4", "value4")
  val rdd = sc.parallelize(list)
  
  val countedByValue = rdd.countByValue
  countedByValue.print()
  
}

Note 1 : take et collect sont de très bons outils pour le développement, et sont moins souvent utilisés en production (ça dépend des cas) car ils ramènent tous les éléments du RDD dans le process courant. Si la collection est trop grosse Spark ne pourra pas allouer assez de mémoire.

Note 2 : On préfère en général utiliser reduceByValue

PairRDDs

Un PairRDD est un RDD  de paires. Certaines fonctions sont proposées pour la manipulation des paires qui va vous simplifier la vie et améliorer les performances des traitements.

object MyApp extends App {
  val sc.SparkUtils.getSC(this.getClass.getName)
  
  val lines = sc.textFile("/path/to/my/file.txt")
  val words: RDD[String] = lines.flatMap( l => l.split(" "))
  
  val pairRDD: RDD[(String, Int)] = words.map((word: String) => (word, 1))
  
  pairRDD.collect.foreach(println)
}

groupByKey

object MyApp extends App {
  val sc.SparkUtils.getSC(this.getClass.getName)
  
  val lines = sc.textFile("/path/to/my/file.txt")
  val words: RDD[String] = lines.flatMap( l => l.split(" "))
  val pairs: RDD[(String, Int)] = words.map((word: String) => (word, 1))
  
  val groupedByKey = pairs.groupByKey()
  
  groupedByKey.collect.foreach(println)
}

// --- output ---
// [...]
// (privé,,CompactBuffer(1))
// (inquiété,CompactBuffer(1))
// (ou,CompactBuffer(1, 1, 1, 1, 1))
// (consentir,CompactBuffer(1))
// [...]

Note : On préfère en général reduceByKey qui réduit le volume de données échangées pendant le shuffle.

reduceByKey

object MyApp extends App {
  val sc = SparkUtils.getSC(this.getClass.getName)
  val words = List("value1", "value2", "value3", "value4", "value4")
  val wordsPairs = words.map(word => (word.toLowerCase, 1))
  
  val wordCounts = wordsPairs.reduceByKey((c1: Int, c2: Int) => c1 + c2)
    .sortByKey()
    
  wordCounts.collect.foreach(println)
}

// --- output ---
// [...]
// (ordres,1)
// (expressément.,1)
// (privé,,1)
// (inquiété,1)
// [...]

map/filter sur RDD

Si vous êtes familier avec map et filter dans n'importe quel langage, par exemple avec les collections Java ou Scala, c'est essentiellement la même chose d'un point de vu programmatique. Prenons un exemple simple, qu'on ne devrait pas trouver un production, mais qui donne une idée:

  • Nous chargeons un fichier csv de liste d'aéroports et on split les colonnes sur ","
  • Nous allons filtrer les aéroports qui ont une altitude supérieure à 1500m et ne retourner que le nom et altitude des aéroports en question.

Vous noterez que comme on ne charge pas le csv très proprement en considérant les chaines de caractères incluant le séparateur, etc... On filtrera la première ligne en éliminant la chaîne qui commence par Airport (pas propre, mais suffisant pour notre exemple).

D'autre part, le nom de l'aéroport est la première colonne de notre fichier csv et l'altitude est à la colonne 4

object AirportsByAltitude extends App {

  val sc = SparkUtils.getSC(this.getClass.getName)

  val maxAltitude = 1500
  val airports = sc.textFile(raw"C:\MyData\spark\data\airports.dat.txt")
    .filter(l => !l.startsWith("Airport")) // pas propre

    val data1 = airports
      .map(l => {
        val s = l.split(",")
        val airportName = s(1)
        val altitude = s(4)
        (airportName, altitude.toDouble)
      })

  val data2 = data1.filter(t => t._2 > maxAltitude)

  data2.take(2).foreach(println)

  println("airpots above 1500m: " + data2.count())
}

Note : faire les filtres sur les données le plus tôt possible pour réduire la taille des données lors des opérations lourdes, surtout lorsqu'un shuffle va être nécessaire. Les données filtrées ne sont jamais ajoutées dans le RDD.

Sauvegarder un RDD

On peut sauvegarder un RDD dans un fichier text (qui peut-être distribué en étant sauvegarder sur HDFS par exemple) ou dans d'autres format éventuellement plus efficace comme les séquences files

[...]

  // On parse un fichier de log apache par exemple
  val logParserRegex = raw"([^\:]+)[^\/]+([^\s\?]+).*".r
  def matchLog(logLine: String): (String, String) = logLine match {
    case logParserRegex(date, url) => (date, url)
    case _ => ("", "")
  }
  val tuples = logs.map(matchLog).distinct

  tuples.saveAsTextFile("hdfs://mon/chemin/tuples.txt")
  
[...]

Il pourra être rechargé plus tard avec :

[...]
  val sc = SparkUtils.getSC(this.getClass.getName)
  val logs = sc.textFile("hdfs://mon/chemin/tuples.txt")

[...]

Note : selon le besoin il est utile d'utiliser un format spécifique pour sauvegarder le RDD, par exemple un format de fichier colonnaire permet de pouvoir accéder de façon très efficace à un colonne entière sans avoir avec lire la totalité du fichier comme c'est le cas avec un fichier plat CSV.
Regardez SequenceFile, RC (Row Colomnar), ORC (Optimized Row Columnar) Avro, Parquet.

Mettre un RDD en cache et le réutiliser

Quand un RDD est créé vous pouvez vous en servir pour faire d'autres opérations dessus. Si vous avez plusieurs opérations à faire il est important de considérer mettre en cache le RDD, c'est à dire qu'il sera conservé en mémoire. Sinon la chaîne complète sera réexécutée lors des opérations suivantes.

Par exemple :

object AnalyseLogs extends App {
  val sc = SparkUtils.getSC(this.getClass.getName)
  val logs = sc.textFile("hdfs://mon/chemin/tuples.txt").cache // on met en cache
  
  val countSite1 = logs.filter(t => t._2.startsWith("https://monsite1.com/")).count

  val countSite2 = logs.filter(t => t._2.startsWith("https://monsite2.com/")).count

  println("count site1 " + countSite1)
  println("count site2 " + countSite2)
  
}

Note : sans l'instruction .cache de la troisième ligne, le fichier text serait parsé 2 fois.

map et flatMap

map  transforme chaque élément de la collection en un autre élément avec éventuellement un type différent

map[U: ClassTag](f: T => U): RDD[U]

flatMap transforme chaque élémenet de la collection en un collection (éventuellement vide) qui est ensuite applitie pour donner une collection entière

flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]

# exemple : tous les mots d'un texte

val text = sc.textFile("/path/to/mon_text.txt")
val listOfWords = text.flatMap( line => line.split(" ")) // on retourne bien une collection de mots pour chaque ligne du fichier. Le resultat du flatMap est une collection de mots de tout le fichier

val wordCount = listOfWords.count

GroupByKey vs ReduceByKey

On compte le nomber de mots d'un fichier

object WordCountGroupByKey extends App {

  val sc = SparkUtils.getSC(this.getClass.getName)
  val lines = sc.textFile("/path/to/my_big_text.txt")
  val words = lines.flatMap( l => l.split(" "))
    .map(w => (w, 1) // on crée un tuple2 dans la clé est le mot
    .cache

  # groupBy
  val groupByCount = words.groupByKey()
  groupByCount.take(10).foreach(println)
  
  # reduceByKey
  val reduceByKeyCount = pairs.reduceByKey((count1, count2) => count1 + count2 )
  // pour chaque clé, on fait un reduce de la valeur
  reduceByKeyCount.collect.foreach(println)
}

On obtient le même résultat dans les deux cas, mais le reduceByKey est préféré car le réduce est d'abord fait par partition (sans shuffle / sans échange réseau) alors que le reduce qui est fait par le groupBy et groupByKey est fait dans un reduce job, donc toutes les données de la collection sont échangées via un shuffle

Fusionner deux RDDs

  val rdd1: RDD[Int] = sc.parallelize(List(1,2,3))
  val rdd2: RDD[Int] = sc.parallelize(List(4,5,6))

  val merged = rdd1.union(rdd2)
  merged.collect.foreach(println)

Réduction des logs Apache Spark

Utiliser la configuration de votre logger ou bien en développement faites :

Logger.getLogger("org").setLevel(Level.ERROR)

Ne pas faire ça!

Pourquoi cela ne fonctionne-t-il pas en Prod ?

let monRDD = ... // on charge un RDD
monRdd.foreach(println)

En fait le code compile, et j'arrive même à l'exécuter sur mon cluster, mais les println sont exécutés sur les workers dans spark, dans un sous job. Je ne vais pas les voir apparaître dans le driver (mon application spark).

Ce qu'il faut faire si on veut récupérer un résultat dans notre driver :

let monRDD = ... // on charge un RDD
monRdd.collect.foreach(println)

Le collect est une action finale qui ramène tous les éléments du RDD dans une collection scala dans la JVM courante

Bien pour le développement et prod

  • rdd.take(num: Int): Array[T]
    Pour récupérer des éléments dans une collection du driver
  • rdd.first()
    Pour récupérer le premier élément de la collection
  • rdd.top(num: Int): Array[T]
    par défaut, comme take mais après classement de la collection avec un "ordering" implcit

    On peut surcharger le triage comme suit :
  val customOrdering = new Ordering[Int] {
    override def compare(a: Int, b: Int) = {
      b - a
    }
  }
  rdd.top(10)(customOrdering).foreach(println)
  • rdd.sample(
       withReplacement: Boolean, // accepte-t-on plusieurs fois le même élément
       fraction: Double, // fraction de la collection
       seed: Long = Utils.random.nextLong // initialization du générateur aléatoire
    ): RDD[T] = { // note on reçoit un RDD
  • un peu avec le même principe, mais pour pour récupérer en local dans le driver
    rdd.takeSample(
       withReplacement: Boolean,
       num: Int,
       seed: Long = Utils.random.nextLong): Array[T] // on reçoit bien un Array, qui est une instance de collection dans la JVM du driver
  • rdd.randomSplit(
       weights: Array[Double],
       seed: Long = Utils.random.nextLong
    ) : Array[RDD[T]]

Vocabulaire

  • RDD : Resilient Distributed Dataset
  • RC : Row Colomnar
  • ORC : Optimized Row Columnar