Scala pour Apache Spark - Cheatsheet
Nous présentons ici une aide-mémoire de codes en scala pour gérer des traitements de données distribués à l'aide d'Apache Spark.
Vous trouverez dans cet article un ensemble de bouts de code, trucs et astuces et retours d'expérience sur l'utilisation d'Apache Spark en Scala.
Créer un objet exécutable comme une app
object MyApp {
def main(args: Array[String]) {
// Le code ici sera exécuté
}
}
ou
object MyApp extends App {
// le code ici sera exécuté
}
Créer un contexte spark
def getSC(appName: String , master: String = "local"): SparkContext = {
val conf = new SparkConf()
.setAppName(appName)
.setMaster(master)
.set("spark.hadoop.validateOutputSpecs", "false")
new SparkContext(conf)
}
Créer une session Spark
def getSS(appName: String , master: String = "local"): SparkSession = {
SparkSession
.builder
.appName(appName)
.master(master)
.config("spark.sql.warehouse.dir", "file:///tmp") // A utiliser pour contourner un bug de Spark 2.0.0 sous Windows
.getOrCreate()
}
Créer un streaming context
def getStreamingContext(appName: String, duration: Duration, master: String = "local"): StreamingContext = {
val conf = new SparkConf()
.setAppName(appName)
.setMaster(master)
new StreamingContext(conf, duration)
}
Note : Spark Streaming ne fait pas du vrai streaming, mais plutôt regroupe des événements dans de micro-batch. Duration est l'intervalle entre deux batches
Créer un RDD à partir d'un objet scala (pour tester mon algo)
object MyApp extends App {
val sc = SparkUtils.getSC("CountByValue")
val list = List("value1", "value2", "value3", "value4", "value4")
val rdd: RDD[String] = sc.parallelize(list)
}
Le parallelize
crée un RDD qui sera distribué dans Spark (si vous lancez le job sur un cluster). Bien sûr en général on travaille sur de très grosses quantité de données que l'on charge ou reçoit d'une source de données externes à l'application.
Parrallelize est très utile pendant le développement pour tester notre algo dans une session "local".
Charger des données à partir d'un fichier
object WordCountReduceByKey extends App {
val sc = SparkUtils.getSC(this.getClass.getName)
val lines: RDD[String] = sc.textFile("hdfs://spark/my-data/ddh.txt")
// It could have been "file://spark/my-data/ddh.txt"
println(lines.count)
}
Note 1 : le fichier texte est en général placé sur un stockage distribué, type Hadoop HDFS
Note 2 : c'est un super outil pour le développement pour charger des données d'un fichier local aussi
countByValue
object MyApp extends App {
val sc = SparkUtils.getSC("CountByValue")
val list = List("value1", "value2", "value3", "value4", "value4")
val rdd = sc.parallelize(list)
val countedByValue = rdd.countByValue
countedByValue.print()
}
Note 1 : take
et collect
sont de très bons outils pour le développement, et sont moins souvent utilisés en production (ça dépend des cas) car ils ramènent tous les éléments du RDD dans le process courant. Si la collection est trop grosse Spark ne pourra pas allouer assez de mémoire.
Note 2 : On préfère en général utiliser reduceByValue
PairRDDs
Un PairRDD est un RDD de paires. Certaines fonctions sont proposées pour la manipulation des paires qui va vous simplifier la vie et améliorer les performances des traitements.
object MyApp extends App {
val sc.SparkUtils.getSC(this.getClass.getName)
val lines = sc.textFile("/path/to/my/file.txt")
val words: RDD[String] = lines.flatMap( l => l.split(" "))
val pairRDD: RDD[(String, Int)] = words.map((word: String) => (word, 1))
pairRDD.collect.foreach(println)
}
groupByKey
object MyApp extends App {
val sc.SparkUtils.getSC(this.getClass.getName)
val lines = sc.textFile("/path/to/my/file.txt")
val words: RDD[String] = lines.flatMap( l => l.split(" "))
val pairs: RDD[(String, Int)] = words.map((word: String) => (word, 1))
val groupedByKey = pairs.groupByKey()
groupedByKey.collect.foreach(println)
}
// --- output ---
// [...]
// (privé,,CompactBuffer(1))
// (inquiété,CompactBuffer(1))
// (ou,CompactBuffer(1, 1, 1, 1, 1))
// (consentir,CompactBuffer(1))
// [...]
Note : On préfère en général reduceByKey
qui réduit le volume de données échangées pendant le shuffle.
reduceByKey
object MyApp extends App {
val sc = SparkUtils.getSC(this.getClass.getName)
val words = List("value1", "value2", "value3", "value4", "value4")
val wordsPairs = words.map(word => (word.toLowerCase, 1))
val wordCounts = wordsPairs.reduceByKey((c1: Int, c2: Int) => c1 + c2)
.sortByKey()
wordCounts.collect.foreach(println)
}
// --- output ---
// [...]
// (ordres,1)
// (expressément.,1)
// (privé,,1)
// (inquiété,1)
// [...]
map/filter sur RDD
Si vous êtes familier avec map et filter dans n'importe quel langage, par exemple avec les collections Java ou Scala, c'est essentiellement la même chose d'un point de vu programmatique. Prenons un exemple simple, qu'on ne devrait pas trouver un production, mais qui donne une idée:
- Nous chargeons un fichier csv de liste d'aéroports et on split les colonnes sur ","
- Nous allons filtrer les aéroports qui ont une altitude supérieure à 1500m et ne retourner que le nom et altitude des aéroports en question.
Vous noterez que comme on ne charge pas le csv très proprement en considérant les chaines de caractères incluant le séparateur, etc... On filtrera la première ligne en éliminant la chaîne qui commence par Airport (pas propre, mais suffisant pour notre exemple).
D'autre part, le nom de l'aéroport est la première colonne de notre fichier csv et l'altitude est à la colonne 4
object AirportsByAltitude extends App {
val sc = SparkUtils.getSC(this.getClass.getName)
val maxAltitude = 1500
val airports = sc.textFile(raw"C:\MyData\spark\data\airports.dat.txt")
.filter(l => !l.startsWith("Airport")) // pas propre
val data1 = airports
.map(l => {
val s = l.split(",")
val airportName = s(1)
val altitude = s(4)
(airportName, altitude.toDouble)
})
val data2 = data1.filter(t => t._2 > maxAltitude)
data2.take(2).foreach(println)
println("airpots above 1500m: " + data2.count())
}
Note : faire les filtres sur les données le plus tôt possible pour réduire la taille des données lors des opérations lourdes, surtout lorsqu'un shuffle va être nécessaire. Les données filtrées ne sont jamais ajoutées dans le RDD.
Sauvegarder un RDD
On peut sauvegarder un RDD dans un fichier text (qui peut-être distribué en étant sauvegarder sur HDFS par exemple) ou dans d'autres format éventuellement plus efficace comme les séquences files
[...]
// On parse un fichier de log apache par exemple
val logParserRegex = raw"([^\:]+)[^\/]+([^\s\?]+).*".r
def matchLog(logLine: String): (String, String) = logLine match {
case logParserRegex(date, url) => (date, url)
case _ => ("", "")
}
val tuples = logs.map(matchLog).distinct
tuples.saveAsTextFile("hdfs://mon/chemin/tuples.txt")
[...]
Il pourra être rechargé plus tard avec :
[...]
val sc = SparkUtils.getSC(this.getClass.getName)
val logs = sc.textFile("hdfs://mon/chemin/tuples.txt")
[...]
Note : selon le besoin il est utile d'utiliser un format spécifique pour sauvegarder le RDD, par exemple un format de fichier colonnaire permet de pouvoir accéder de façon très efficace à un colonne entière sans avoir avec lire la totalité du fichier comme c'est le cas avec un fichier plat CSV.
Regardez SequenceFile, RC (Row Colomnar), ORC (Optimized Row Columnar) Avro, Parquet.
Mettre un RDD en cache et le réutiliser
Quand un RDD est créé vous pouvez vous en servir pour faire d'autres opérations dessus. Si vous avez plusieurs opérations à faire il est important de considérer mettre en cache le RDD, c'est à dire qu'il sera conservé en mémoire. Sinon la chaîne complète sera réexécutée lors des opérations suivantes.
Par exemple :
object AnalyseLogs extends App {
val sc = SparkUtils.getSC(this.getClass.getName)
val logs = sc.textFile("hdfs://mon/chemin/tuples.txt").cache // on met en cache
val countSite1 = logs.filter(t => t._2.startsWith("https://monsite1.com/")).count
val countSite2 = logs.filter(t => t._2.startsWith("https://monsite2.com/")).count
println("count site1 " + countSite1)
println("count site2 " + countSite2)
}
Note : sans l'instruction .cache
de la troisième ligne, le fichier text serait parsé 2 fois.
map et flatMap
map
transforme chaque élément de la collection en un autre élément avec éventuellement un type différent
map[U: ClassTag](f: T => U): RDD[U]
flatMap transforme chaque élémenet de la collection en un collection (éventuellement vide) qui est ensuite applitie pour donner une collection entière
flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
# exemple : tous les mots d'un texte
val text = sc.textFile("/path/to/mon_text.txt")
val listOfWords = text.flatMap( line => line.split(" ")) // on retourne bien une collection de mots pour chaque ligne du fichier. Le resultat du flatMap est une collection de mots de tout le fichier
val wordCount = listOfWords.count
GroupByKey vs ReduceByKey
On compte le nomber de mots d'un fichier
object WordCountGroupByKey extends App {
val sc = SparkUtils.getSC(this.getClass.getName)
val lines = sc.textFile("/path/to/my_big_text.txt")
val words = lines.flatMap( l => l.split(" "))
.map(w => (w, 1) // on crée un tuple2 dans la clé est le mot
.cache
# groupBy
val groupByCount = words.groupByKey()
groupByCount.take(10).foreach(println)
# reduceByKey
val reduceByKeyCount = pairs.reduceByKey((count1, count2) => count1 + count2 )
// pour chaque clé, on fait un reduce de la valeur
reduceByKeyCount.collect.foreach(println)
}
On obtient le même résultat dans les deux cas, mais le reduceByKey est préféré car le réduce est d'abord fait par partition (sans shuffle / sans échange réseau) alors que le reduce qui est fait par le groupBy et groupByKey est fait dans un reduce job, donc toutes les données de la collection sont échangées via un shuffle
Fusionner deux RDDs
val rdd1: RDD[Int] = sc.parallelize(List(1,2,3))
val rdd2: RDD[Int] = sc.parallelize(List(4,5,6))
val merged = rdd1.union(rdd2)
merged.collect.foreach(println)
Réduction des logs Apache Spark
Utiliser la configuration de votre logger ou bien en développement faites :
Logger.getLogger("org").setLevel(Level.ERROR)
Ne pas faire ça!
Pourquoi cela ne fonctionne-t-il pas en Prod ?
let monRDD = ... // on charge un RDD
monRdd.foreach(println)
En fait le code compile, et j'arrive même à l'exécuter sur mon cluster, mais les println sont exécutés sur les workers dans spark, dans un sous job. Je ne vais pas les voir apparaître dans le driver (mon application spark).
Ce qu'il faut faire si on veut récupérer un résultat dans notre driver :
let monRDD = ... // on charge un RDD
monRdd.collect.foreach(println)
Le collect
est une action finale qui ramène tous les éléments du RDD dans une collection scala dans la JVM courante
Bien pour le développement et prod
- rdd.take(num: Int): Array[T]
Pour récupérer des éléments dans une collection du driver - rdd.first()
Pour récupérer le premier élément de la collection - rdd.top(num: Int): Array[T]
par défaut, comme take mais après classement de la collection avec un "ordering" implcit
On peut surcharger le triage comme suit :
val customOrdering = new Ordering[Int] {
override def compare(a: Int, b: Int) = {
b - a
}
}
rdd.top(10)(customOrdering).foreach(println)
- rdd.sample(
withReplacement: Boolean, // accepte-t-on plusieurs fois le même élément
fraction: Double, // fraction de la collection
seed: Long = Utils.random.nextLong // initialization du générateur aléatoire
): RDD[T] = { // note on reçoit un RDD - un peu avec le même principe, mais pour pour récupérer en local dans le driver
rdd.takeSample(
withReplacement: Boolean,
num: Int,
seed: Long = Utils.random.nextLong): Array[T] // on reçoit bien un Array, qui est une instance de collection dans la JVM du driver - rdd.randomSplit(
weights: Array[Double],
seed: Long = Utils.random.nextLong
) : Array[RDD[T]]
Vocabulaire
- RDD : Resilient Distributed Dataset
- RC : Row Colomnar
- ORC : Optimized Row Columnar