Apache Spark 3 - Utiliser le shell Spark avec Scala

Il existe déjà beaucoup de littérature de présentation à la fois d'Apache Spark et aussi de Scala.

Juste en quelques mots, les points clés qu'il faut retenir:

  • Apache Spark est un framework conçu pour la manipulation de gros volumes de données, et très versatile, et est généralement distribués sur une plateforme Hadoop, mais pas que.
  • Dans Spark, on manipule un ensemble de données que l'on appelle Datasets, qui se basent sur la notion principale de RDD : Resilient Distributed Datasets. Ce sont des blocks de données distribuées sur le cluster, généralement stockées en mémoire, permettant un traitement très rapide.
  • Scala est un langage de programmation basé sur la JVM et qui présente l’intérêt d'être moins verbeux que Java.
  • Apache Spark est développé en Scala, mais on peut aussi utiliser Spark avec Java, Python, R et SQL.

Dans le reste de l'article nous allons installer Spark 3 et utiliser le shell pour analyser des données. En général, nous n'utilisons pas le shell en production, mais c'est un outil très utile pour analyser un petit ensemble de données avant de passer à plus grand échelle ou bien pour faire des pocs. Le REPL spark (Read Eval Print Loop), comme dans beaucoup d'autres langages, y compris Java, permet de rapidement taper des lignes de codes en ligne de commande sans passer par un éditeur ni la phase de compilation.

Dans un autre article nous mettrons en place un projet Apache Spark dans Intellij et nous ferons des tests sur un cluster spark.

Télécharger la librairie Apache Spark

Allez sur la site d'Apache Spark sur la page de téléchargement et téléchargez une version récente de spark. Au moment d'écrire cet article nous téléchargeons spark 3.0.1

Une fois le ficher tgz télécharger, le dézipper où vous voulez sur votre disque. Puis créer une variable d'environnement SPARK_HOME vers ce répertoire et ajouter le bin dans le path export PATH="$PATH:$SPARK_HOME/bin" pour pouvoir démarrer le shell de n'importe quel répertoire.

Télécharger un jeu de données à analyser

Nous allons utiliser un jeu de données des valeurs foncières du site : https://www.data.gouv.fr/fr/datasets/demandes-de-valeurs-foncieres-geolocalisees/

Voici le lien de téléchargement du jeu de données en question: https://cadastre.data.gouv.fr/data/etalab-dvf/latest/csv/2020/full.csv.gz

Téléchargez le fichier et le dézipper. Il s'agit d'un CSV de 136M dont les colonnes sont:

  • id_mutation : Identifiant de mutation (non stable, sert à grouper les lignes)
  • date_mutation : Date de la mutation au format ISO-8601 (YYYY-MM-DD)
  • numero_disposition : Numéro de disposition
  • valeur_fonciere : Valeur foncière (séparateur décimal = point)
  • adresse_numero : Numéro de l'adresse
  • adresse_suffixe : Suffixe du numéro de l'adresse (B, T, Q)
  • adresse_code_voie : Code FANTOIR de la voie (4 caractères)
  • adresse_nom_voie : Nom de la voie de l'adresse
  • code_postal : Code postal (5 caractères)
  • code_commune : Code commune INSEE (5 caractères)
  • nom_commune : Nom de la commune (accentué)
  • ancien_code_commune : Ancien code commune INSEE (si différent lors de la mutation)
  • ancien_nom_commune : Ancien nom de la commune (si différent lors de la mutation)
  • code_departement : Code département INSEE (2 ou 3 caractères)
  • id_parcelle : Identifiant de parcelle (14 caractères)
  • ancien_id_parcelle : Ancien identifiant de parcelle (si différent lors de la mutation)
  • numero_volume : Numéro de volume
  • lot_1_numero : Numéro du lot 1
  • lot_1_surface_carrez : Surface Carrez du lot 1
  • lot_2_numero : Numéro du lot 2
  • lot_2_surface_carrez : Surface Carrez du lot 2
  • lot_3_numero : Numéro du lot 3
  • lot_3_surface_carrez : Surface Carrez du lot 3
  • lot_4_numero : Numéro du lot 4
  • lot_4_surface_carrez : Surface Carrez du lot 4
  • lot_5_numero : Numéro du lot 5
  • lot_5_surface_carrez : Surface Carrez du lot 5
  • nombre_lots : Nombre de lots
  • code_type_local : Code de type de local
  • type_local : Libellé du type de local
  • surface_reelle_bati : Surface réelle du bâti
  • nombre_pieces_principales : Nombre de pièces principales
  • code_nature_culture : Code de nature de culture
  • nature_culture : Libellé de nature de culture
  • code_nature_culture_speciale : Code de nature de culture spéciale
  • nature_culture_speciale : Libellé de nature de culture spéciale
  • surface_terrain : Surface du terrain
  • longitude : Longitude du centre de la parcelle concernée (WGS-84)
  • latitude : Latitude du centre de la parcelle concernée (WGS-84)

Utiliser spark-shell

Dans cet article nous allons lancer spark sans cluster, juste en local. Pour cela créer une variable d'environnement SPARK_LOCAL comme suit.

export SPARK_LOCAL_IP="127.0.0.1"

Lancer spark-shell

Soit depuis le répertoire de spark : ./bin/spark-shell soit spark-shell si vous avez ajouté les variables d'environnement SPARK_HOME dans votre path.

Charger un fichier text

scala> val donnees_brutes = spark.read.textFile("full.csv")

donnees_brutes: org.apache.spark.sql.Dataset[String] = [value: string]

Nous avons instancié un Dataset dont le contenu sera le contenu du fichier full.csv. Chaque ligne du fichier peut-être traitée.

A ce point, rien n'a été chargé. Seule une action "finale" lancera un traitement. Par exemple, compter le nombre de ligne du Dataset lance le chargement du fichier et effectuer l'action de comptage:

scala> donnees_brutes.count
res0: Long = 827106

Voyons maintenant un échantillon du fichier pour voir de quoi il est fait.

scala> donnees_brutes.show(3, false)

+------------+
|value       |
+------------+
|id_mutation,date_mutation,numero_disposition,nature_mutation,valeur_fonciere,adresse_numero,adresse_suffixe,adresse_nom_voie,adresse_code_voie,code_postal,code_commune,nom_commune,code_departement,ancien_code_commune,ancien_nom_commune,id_parcelle,ancien_id_parcelle,numero_volume,lot1_numero,lot1_surface_carrez,lot2_numero,lot2_surface_carrez,lot3_numero,lot3_surface_carrez,lot4_numero,lot4_surface_carrez,lot5_numero,lot5_surface_carrez,nombre_lots,code_type_local,type_local,surface_reelle_bati,nombre_pieces_principales,code_nature_culture,nature_culture,code_nature_culture_speciale,nature_culture_speciale,surface_terrain,longitude,latitude|
|2020-1,2020-01-07,000001,Vente,8000,,,FORTUNAT,B063,01250,01072,Ceyzériat,01,,,01072000AK0216,,,,,,,,,,,,,0,,,,,T,terres,,,1061,5.323522,46.171899                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
|2020-2,2020-01-07,000001,Vente,75000,,,RUE DE LA CHARTREUSE,0064,01960,01289,Péronnas,01,,,01289000AI0210,,,,,,,,,,,,,0,,,,,AB,terrains a bâtir,,,610,5.226197,46.184538                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
+----------+
only showing top 3 rows

Nous voyons de longues lignes dont la première est le header du fichier csv puis deux autres lignes.

Nous pouvons filtrer par exemple la colonne 32 qui est 'nombre_pieces_principales'

scala> val col32 = donnees_brutes.map(li => li.split(",")(32))
col32: org.apache.spark.sql.Dataset[String] = [value: string]

Nous utilisons ici le dataset initial et ajoutons un map. A ce stade aucun traitement ne se fait car il ne s'agit pas d'une action finale.

Le map prend une fonction lambda où li est une ligne et l'action sera de spliter la ligne avec le séparateur ',' et extraire la colonne 32.

scala> col32.show(10)

col32.show(10)
+--------------------+
|               value|
+--------------------+
|nombre_pieces_pri...|
|                    |
|                    |
|                    |
|                    |
|                   5|
|                    |
|                    |
|                    |
|                    |
+--------------------+
only showing top 10 rows

On constate que toutes les entrées ne renseignent pas forcément le nombre de pièce. Essayons de montrer plus de lignes :

scala> col32.show(1000)

Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 32 out of bounds for length 29

Nous constatons que la fonction qui split les lignes n'obtient pas 32 valeurs pour certaines lignes.

En effet nous ne parsons pas le fichier très proprement dans le cas où le fichier est mal formaté. Chaque ligne devrait avoir 40 colonnes. Comptons le nombre de colonnes par ligne et montrons le nombre de ligne par taille

scala> val colsCount = donnees_brutes.map(l => l.split(",").length)
colsCount: org.apache.spark.sql.Dataset[Int] = [value: int]

scala> colsCount.groupByKey(identity).count().orderBy("key").show
+---+--------+
|key|count(1)|
+---+--------+
| 31|       3|
| 40|  780120|
| 41|      12|
| 35|       4|
| 38|   39465|
| 29|    4948|
| 33|    2554|
+---+--------+

groupByKey et count nous permettent de calculer le nombre d’occurrences de colsCount.
groupByKey traite chaque entrée, qui dans notre cas, est simplement un Integer. Nous utilisons la fonction 'identité' pour utiliser cette valeur comme clé. La fonction count() compte le nombre d'occurence.

Les Dataset nous permettent de faire des choses plus puissantes que cet exemple. Voyons plus loin.

Autres manipulations du fichier de données csv

Vous pouvez spécifier plus de paramètres au chargement d'un fichier csv que dans l'exemple ci-dessus.

Nous demandons à scala de passer en "paste mode" pour pouvoir coller la section suivante sur plusieurs lignes.

scala> :paste
// Entering paste mode (ctrl-D to finish)
val donnees_brutes = spark.read.format("csv")
  .option("locale", "France")
  .option("sep", ",")
  .option("inferSchema", "true")
  .option("header", "true")
  .load("full.csv")

Une fois collé tapez Ctrl-D.

Spark a parsé le fichier csv avec les options fournies et à identifié les colonnes du fichier. Ce qui est puissant dans ce cas est que Spark peut identifier automatiquement le schema.

scala> donnees_brutes.printSchema
root
 |-- id_mutation: string (nullable = true)
 |-- date_mutation: string (nullable = true)
 |-- numero_disposition: integer (nullable = true)
 |-- nature_mutation: string (nullable = true)
 |-- valeur_fonciere: double (nullable = true)
 |-- adresse_numero: integer (nullable = true)
 |-- adresse_suffixe: string (nullable = true)
 |-- adresse_nom_voie: string (nullable = true)
 |-- adresse_code_voie: string (nullable = true)
 |-- code_postal: integer (nullable = true)
 |-- code_commune: string (nullable = true)
 |-- nom_commune: string (nullable = true)
 |-- code_departement: string (nullable = true)
 |-- ancien_code_commune: string (nullable = true)
 |-- ancien_nom_commune: string (nullable = true)
 |-- id_parcelle: string (nullable = true)
 |-- ancien_id_parcelle: string (nullable = true)
 |-- numero_volume: string (nullable = true)
 |-- lot1_numero: string (nullable = true)
 |-- lot1_surface_carrez: double (nullable = true)
 |-- lot2_numero: string (nullable = true)
 |-- lot2_surface_carrez: double (nullable = true)
 |-- lot3_numero: string (nullable = true)
 |-- lot3_surface_carrez: double (nullable = true)
 |-- lot4_numero: integer (nullable = true)
 |-- lot4_surface_carrez: double (nullable = true)
 |-- lot5_numero: integer (nullable = true)
 |-- lot5_surface_carrez: double (nullable = true)
 |-- nombre_lots: integer (nullable = true)
 |-- code_type_local: integer (nullable = true)
 |-- type_local: string (nullable = true)
 |-- surface_reelle_bati: integer (nullable = true)
 |-- nombre_pieces_principales: integer (nullable = true)
 |-- code_nature_culture: string (nullable = true)
 |-- nature_culture: string (nullable = true)
 |-- code_nature_culture_speciale: string (nullable = true)
 |-- nature_culture_speciale: string (nullable = true)
 |-- surface_terrain: integer (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)

Notez qu'il est aussi possible de spécifier le schema à spark si on connait des particularités.

Remarquez également que 'nombre_pieces_principales' est bien un integer.

Si on regarde les données ça ressemble fortement à une requête SQL

scala> donnees_brutes.show(3)
+---------+
|id_mutation|date_mutation|numero_disposition|nature_mutation|valeur_fonciere|adresse_numero|adresse_suffixe|    adresse_nom_voie|adresse_code_voie|code_postal|code_commune|nom_commune|code_departement|ancien_code_commune|ancien_nom_commune|   id_parcelle|ancien_id_parcelle|numero_volume|lot1_numero|lot1_surface_carrez|lot2_numero|lot2_surface_carrez|lot3_numero|lot3_surface_carrez|lot4_numero|lot4_surface_carrez|lot5_numero|lot5_surface_carrez|nombre_lots|code_type_local|type_local|surface_reelle_bati|nombre_pieces_principales|code_nature_culture|  nature_culture|code_nature_culture_speciale|nature_culture_speciale|surface_terrain|longitude| latitude|
+---------+
|     2020-1|   2020-01-07|                 1|          Vente|         8000.0|          null|           null|            FORTUNAT|             B063|       1250|       01072|  Ceyzériat|              01|               null|              null|01072000AK0216|              null|         null|       null|               null|       null|               null|       null|               null|       null|               null|       null|               null|          0|           null|      null|               null|                     null|                  T|          terres|                        null|                   null|           1061| 5.323522|46.171899|
|     2020-2|   2020-01-07|                 1|          Vente|        75000.0|          null|           null|RUE DE LA CHARTREUSE|             0064|       1960|       01289|   Péronnas|              01|               null|              null|01289000AI0210|              null|         null|       null|               null|       null|               null|       null|               null|       null|               null|       null|               null|          0|           null|      null|               null|                     null|                 AB|terrains a bâtir|                        null|                   null|            610| 5.226197|46.184538|
|     2020-3|   2020-01-14|                 1|          Vente|        89000.0|          null|           null|           VACAGNOLE|             B112|       1340|       01024|   Attignat|              01|               null|              null|01024000AL0120|              null|         null|       null|               null|       null|               null|       null|               null|       null|               null|       null|               null|          0|           null|      null|               null|                     null|                 AB|terrains a bâtir|                        null|                   null|            600|     null|     null|
+---------+
only showing top 3 rows

On peut même sélectionner une colonne qui nous intéresse

scala> donnees_brutes.select("nombre_pieces_principales").show
+-------------------------+
|nombre_pieces_principales|
+-------------------------+
|                     null|
|                     null|
|                     null|
|                     null|
|                        5|
|                     null|
|                     null|
|                     null|
|                     null|
|                        4|
|                     null|
|                        4|
|                        0|
|                     null|
|                     null|
|                     null|
|                     null|
|                     null|
|                        7|
|                     null|
+-------------------------+
only showing top 20 rows

Nombre de propriétés vendues par nombre de pièces :

scala> donnees_brutes.groupBy(col("nombre_pieces_principales")).count().sort(col("nombre_pieces_principales")).show(100)
+-------------------------+------+
|nombre_pieces_principales| count|
+-------------------------+------+
|                     null|364129|
|                        0|137414|
|                        1| 33767|
|                        2| 58112|
|                        3| 77281|
|                        4| 78304|
|                        5| 47759|
|                        6| 18971|
|                        7|  6853|
|                        8|  2635|
|                        9|   960|
|                       10|   460|
|                       11|   227|
|                       12|    96|
|                       13|    42|
|                       14|    32|
|                       15|    15|
|                       16|    13|
|                       17|     7|
|                       18|     3|
|                       19|     1|
|                       20|     8|
|                       21|     2|
|                       22|     3|
|                       23|     3|
|                       25|     2|
|                       28|     2|
|                       30|     1|
|                       41|     1|
|                       55|     1|
|                       70|     1|
+-------------------------+------+

Nombre de propriétés vendues par département :

scala> donnees_brutes.groupBy("code_departement").count.orderBy("code_departement").show(100)
+----------------+-----+
|code_departement|count|
+----------------+-----+
|              01| 6314|
|              02| 9217|
|              03| 1832|
|              04| 2550|
|              05|  677|
|              06|18859|
|              07| 6165|
|              08| 5256|
|              09|  563|
|              10| 4662|
|              11| 6473|
|              12|  149|
|              13| 9435|
|              14| 5474|
|              15|   81|
|              16| 3848|
|              17| 3857|
|              18| 6185|
|              19| 6346|
|              21|   23|
|              22|12335|
|              23| 5686|
|              24|11069|
|              25| 2419|
|              26| 4990|
|              27| 4136|
|              28| 9412|
|              29| 5251|
|              2A| 1983|
|              2B|   43|
|              30|18654|
|              31| 4439|
|              32| 1942|
|              33|36966|
|              34|13716|
|              35|22680|
|              36| 1096|
|              37| 6864|
|              38| 8015|
|              39| 2817|
|              40| 6351|
|              41|10449|
|              42|11995|
|              43| 4703|
|              44|34684|
|              45|13716|
|              46|  401|
|              47|10392|
|              48| 2941|
|              49|20213|
|              50|13129|
|              51|11161|
|              52|  989|
|              53| 6139|
|              54| 9418|
|              55|  119|
|              56|15274|
|              58| 4529|
|              59|29203|
|              60|13304|
|              61|  940|
|              62|14243|
|              63| 4073|
|              64| 2536|
|              65| 1289|
|              66| 1681|
|              69|16265|
|              70| 4078|
|              71| 3743|
|              72|12665|
|              73| 1118|
|              74| 6771|
|              75|23498|
|              76|12723|
|              77|24965|
|              78|23371|
|              79| 9289|
|              80| 7674|
|              81| 1137|
|              82| 5257|
|              83|11189|
|              84| 4262|
|              85|18288|
|              86| 6425|
|              87|10147|
|              88|10698|
|              89| 6259|
|              90|  383|
|              91|17785|
|              92|14624|
|              93|15855|
|              94|19177|
|              95|19635|
|             971|   31|
|             972| 1241|
|             973| 1236|
|             974|  965|
+----------------+-----+

Notez que l'utilisation de col() dans les cas simples n'est pas obligatoire.

On peut aussi ajouter des colonnes :

scala> donnees_brutes.groupBy("code_departement").agg(count("code_departement"), sum("valeur_fonciere")).orderBy("code_departement").show(20)
+----------------+-----------------------+--------------------+
|code_departement|count(code_departement)|sum(valeur_fonciere)|
+----------------+-----------------------+--------------------+
|              01|                   6314| 1.742594188399994E9|
|              02|                   9217|1.3345704534799988E9|
|              03|                   1832|2.7055697240999997E8|
|              04|                   2550|3.8006166279999995E8|
|              05|                    677|       1.128379445E8|
|              06|                  18859|1.205845898618000...|
|              07|                   6165|      8.0660834657E8|
|              08|                   5256| 5.004314756300006E8|
|              09|                    563|1.3252729695000008E8|
|              10|                   4662| 8.730233068700001E8|
|              11|                   6473|1.1661764984500015E9|
|              12|                    149|       5.617143578E7|
|              13|                   9435|     3.72011788628E9|
|              14|                   5474|1.5801534871200008E9|
|              15|                     81|          4574032.25|
|              16|                   3848| 5.387546528899996E8|
|              17|                   3857| 7.421604203099996E8|
|              18|                   6185| 9.852137856099991E8|
|              19|                   6346| 5.676203941399999E8|
|              21|                     23|           2601610.0|
+----------------+-----------------------+--------------------+
only showing top 20 rows

Autres requêtes sur les données

  • filtrer les données pour un code postal particulier :
scala> val data1 = donnees_brutes.filter("code_postal == 79000")
data1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id_mutation: string, date_mutation: string ... 38 more fields]
  • comme il peut y avoir plusieurs communes pour un même code postal, nous filtrons en plus par nom de commune, qui est typée donc on met entre simple quote. Et nous mettons en cache ce qui permet de ne pas relancer les traitements à chaque opération :
scala> data1.groupBy("nom_commune").count.show
+-----------+-----+                                                             
|nom_commune|count|
+-----------+-----+
|     Sciecq|   15|
|      Niort| 1199|
|   Bessines|   36|
+-----------+-----+

scala> val data2 = data1.filter("nom_commune == 'Niort'").cache
data2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id_mutation: string, date_mutation: string ... 38 more fields]
  • nous regroupons ensuite par adresse et nous affichons les valeurs foncières. Le booleen dans la fonction show permet de ne pas tronquer les noms de rues :
scala> val data3 = data2.groupBy("valeur_fonciere").agg(collect_set("adresse_numero"),collect_set("adresse_nom_voie"))
data3: org.apache.spark.sql.DataFrame = [valeur_fonciere: double, collect_set(adresse_numero): array<int> ... 1 more field]

scala> data3.show(3, false)
+---------------+---------------------------+-----------------------------------------+
|valeur_fonciere|collect_set(adresse_numero)|collect_set(adresse_nom_voie)            |
+---------------+---------------------------+-----------------------------------------+
|181400.0       |[219]                      |[RUE JEAN JAURES]                        |
|300000.0       |[65]                       |[LA JEUNE NOEMIE, RUE LOUISE MICHEL]     |
|330000.0       |[133, 47]                  |[AV DE LA VENISE VERTE, RUE DE FLEURELLE]|
+---------------+---------------------------+-----------------------------------------+
only showing top 3 rows

Utiliser SparkSQL

scala> donnees_brute.createTempView("valeurs")
scala> spark.sql("SELECT code_departement, count(code_departement), format_number(sum(valeur_fonciere),2) as val from valeurs group by code_departement").show
+----------------+-----------------------+----------------+                     
|code_departement|count(code_departement)|             val|
+----------------+-----------------------+----------------+
|              07|                   6165|  806,608,346.57|
|              51|                  11161|2,602,674,825.81|
|              15|                     81|    4,574,032.25|
|              54|                   9418|3,584,392,751.82|
|              11|                   6473|1,166,176,498.45|
|              29|                   5251|  924,555,519.99|
|              69|                  16265|6,919,943,214.33|
|              42|                  11995|2,150,156,062.88|
|              73|                   1118|  322,142,860.18|
|              87|                  10147|1,252,443,498.36|
|             974|                    965|  915,366,405.44|
|              64|                   2536|  469,110,158.79|
|              30|                  18654|5,146,175,384.62|
|              34|                  13716|2,900,615,804.92|
|              59|                  29203|9,800,705,395.83|
|              01|                   6314|1,742,594,188.40|
|              22|                  12335|1,780,289,300.53|
|              28|                   9412|2,595,710,151.61|
|              85|                  18288|2,669,234,383.54|
|              16|                   3848|  538,754,652.89|
+----------------+-----------------------+----------------+
only showing top 20 rows

Beaucoup de sources de données disponibles

Apache spark sait lire des tas de sources de données en natif et avec des modules open source. En voici une liste à titre d'exemple, mais en général n'importe quelle source est possible

  • CSV
  • JSON
  • Parquet
  • ORC
  • JDBC/ODBC connections
  • Plain-text files
  • Cassandra
  • HBase
  • MongoDB
  • AWS Redshift
  • XML
  • Kafka
  • etc...

Exemple : Lire un fichier JSON

Spark ne va pas lire directement un fichier JSON, mais attend un object JSON par ligne, par example

{"nom":"Garcia","prenom":"Denis"}
{"nom":"Colin","prenom":"Gérald"}

Vous pouvez préparer votre fichier JSON avec un simple commande jq

cat data.json | jq -c '.' > data-ready.json

Maintenant, il nous suffit de faire comme suit avec spark :

scala> val fichier = "data-ready.json"
scala> val personnes = spark.read.json(fichier)

scala> personnes.printSchema()
root
 |-- firstname: string (nullable = true)
 |-- name: string (nullable = true)
 
scala> personnes.show()
+------+------+
|prenom|  nom |
+------+------+
| Denis|Garcia|
|Gérald| Colin|
+------+------+

Voilà pour cette mise en bouche rapide pour l'utilisation d'Apache Spark et le traitement rapide et éphémère de données.