Apache Spark 3 - Utiliser le shell Spark avec Scala
Pour démarrer avec Apache Spark en Scala : voici vos premiers pas afin de pouvoir mettre vos mains dans la manipulation de gros jeux de données avec le shell ou REPL spark. Vous trouverez quelques exemples de prototypage d'analyses de données.
Il existe déjà beaucoup de littérature de présentation à la fois d'Apache Spark et aussi de Scala.
Juste en quelques mots, les points clés qu'il faut retenir:
- Apache Spark est un framework conçu pour la manipulation de gros volumes de données, et très versatile, et est généralement distribués sur une plateforme Hadoop, mais pas que.
- Dans Spark, on manipule un ensemble de données que l'on appelle Datasets, qui se basent sur la notion principale de RDD : Resilient Distributed Datasets. Ce sont des blocks de données distribuées sur le cluster, généralement stockées en mémoire, permettant un traitement très rapide.
- Scala est un langage de programmation basé sur la JVM et qui présente l’intérêt d'être moins verbeux que Java.
- Apache Spark est développé en Scala, mais on peut aussi utiliser Spark avec Java, Python, R et SQL.
Dans le reste de l'article nous allons installer Spark 3 et utiliser le shell pour analyser des données. En général, nous n'utilisons pas le shell en production, mais c'est un outil très utile pour analyser un petit ensemble de données avant de passer à plus grand échelle ou bien pour faire des pocs. Le REPL spark (Read Eval Print Loop), comme dans beaucoup d'autres langages, y compris Java, permet de rapidement taper des lignes de codes en ligne de commande sans passer par un éditeur ni la phase de compilation.
Dans un autre article nous mettrons en place un projet Apache Spark dans Intellij et nous ferons des tests sur un cluster spark.
Télécharger la librairie Apache Spark
Allez sur la site d'Apache Spark sur la page de téléchargement et téléchargez une version récente de spark. Au moment d'écrire cet article nous téléchargeons spark 3.0.1
Une fois le ficher tgz télécharger, le dézipper où vous voulez sur votre disque. Puis créer une variable d'environnement SPARK_HOME vers ce répertoire et ajouter le bin dans le path export PATH="$PATH:$SPARK_HOME/bin"
pour pouvoir démarrer le shell de n'importe quel répertoire.
Télécharger un jeu de données à analyser
Nous allons utiliser un jeu de données des valeurs foncières du site : https://www.data.gouv.fr/fr/datasets/demandes-de-valeurs-foncieres-geolocalisees/
Voici le lien de téléchargement du jeu de données en question: https://cadastre.data.gouv.fr/data/etalab-dvf/latest/csv/2020/full.csv.gz
Téléchargez le fichier et le dézipper. Il s'agit d'un CSV de 136M dont les colonnes sont:
id_mutation
: Identifiant de mutation (non stable, sert à grouper les lignes)date_mutation
: Date de la mutation au format ISO-8601 (YYYY-MM-DD)numero_disposition
: Numéro de dispositionvaleur_fonciere
: Valeur foncière (séparateur décimal = point)adresse_numero
: Numéro de l'adresseadresse_suffixe
: Suffixe du numéro de l'adresse (B, T, Q)adresse_code_voie
: Code FANTOIR de la voie (4 caractères)adresse_nom_voie
: Nom de la voie de l'adressecode_postal
: Code postal (5 caractères)code_commune
: Code commune INSEE (5 caractères)nom_commune
: Nom de la commune (accentué)ancien_code_commune
: Ancien code commune INSEE (si différent lors de la mutation)ancien_nom_commune
: Ancien nom de la commune (si différent lors de la mutation)code_departement
: Code département INSEE (2 ou 3 caractères)id_parcelle
: Identifiant de parcelle (14 caractères)ancien_id_parcelle
: Ancien identifiant de parcelle (si différent lors de la mutation)numero_volume
: Numéro de volumelot_1_numero
: Numéro du lot 1lot_1_surface_carrez
: Surface Carrez du lot 1lot_2_numero
: Numéro du lot 2lot_2_surface_carrez
: Surface Carrez du lot 2lot_3_numero
: Numéro du lot 3lot_3_surface_carrez
: Surface Carrez du lot 3lot_4_numero
: Numéro du lot 4lot_4_surface_carrez
: Surface Carrez du lot 4lot_5_numero
: Numéro du lot 5lot_5_surface_carrez
: Surface Carrez du lot 5nombre_lots
: Nombre de lotscode_type_local
: Code de type de localtype_local
: Libellé du type de localsurface_reelle_bati
: Surface réelle du bâtinombre_pieces_principales
: Nombre de pièces principalescode_nature_culture
: Code de nature de culturenature_culture
: Libellé de nature de culturecode_nature_culture_speciale
: Code de nature de culture spécialenature_culture_speciale
: Libellé de nature de culture spécialesurface_terrain
: Surface du terrainlongitude
: Longitude du centre de la parcelle concernée (WGS-84)latitude
: Latitude du centre de la parcelle concernée (WGS-84)
Utiliser spark-shell
Dans cet article nous allons lancer spark sans cluster, juste en local. Pour cela créer une variable d'environnement SPARK_LOCAL comme suit.
export SPARK_LOCAL_IP="127.0.0.1"
Lancer spark-shell
Soit depuis le répertoire de spark : ./bin/spark-shell
soit spark-shell
si vous avez ajouté les variables d'environnement SPARK_HOME dans votre path.
Charger un fichier text
scala> val donnees_brutes = spark.read.textFile("full.csv")
donnees_brutes: org.apache.spark.sql.Dataset[String] = [value: string]
Nous avons instancié un Dataset dont le contenu sera le contenu du fichier full.csv. Chaque ligne du fichier peut-être traitée.
A ce point, rien n'a été chargé. Seule une action "finale" lancera un traitement. Par exemple, compter le nombre de ligne du Dataset lance le chargement du fichier et effectuer l'action de comptage:
scala> donnees_brutes.count
res0: Long = 827106
Voyons maintenant un échantillon du fichier pour voir de quoi il est fait.
scala> donnees_brutes.show(3, false)
+------------+
|value |
+------------+
|id_mutation,date_mutation,numero_disposition,nature_mutation,valeur_fonciere,adresse_numero,adresse_suffixe,adresse_nom_voie,adresse_code_voie,code_postal,code_commune,nom_commune,code_departement,ancien_code_commune,ancien_nom_commune,id_parcelle,ancien_id_parcelle,numero_volume,lot1_numero,lot1_surface_carrez,lot2_numero,lot2_surface_carrez,lot3_numero,lot3_surface_carrez,lot4_numero,lot4_surface_carrez,lot5_numero,lot5_surface_carrez,nombre_lots,code_type_local,type_local,surface_reelle_bati,nombre_pieces_principales,code_nature_culture,nature_culture,code_nature_culture_speciale,nature_culture_speciale,surface_terrain,longitude,latitude|
|2020-1,2020-01-07,000001,Vente,8000,,,FORTUNAT,B063,01250,01072,Ceyzériat,01,,,01072000AK0216,,,,,,,,,,,,,0,,,,,T,terres,,,1061,5.323522,46.171899 |
|2020-2,2020-01-07,000001,Vente,75000,,,RUE DE LA CHARTREUSE,0064,01960,01289,Péronnas,01,,,01289000AI0210,,,,,,,,,,,,,0,,,,,AB,terrains a bâtir,,,610,5.226197,46.184538 |
+----------+
only showing top 3 rows
Nous voyons de longues lignes dont la première est le header du fichier csv puis deux autres lignes.
Nous pouvons filtrer par exemple la colonne 32 qui est 'nombre_pieces_principales'
scala> val col32 = donnees_brutes.map(li => li.split(",")(32))
col32: org.apache.spark.sql.Dataset[String] = [value: string]
Nous utilisons ici le dataset initial et ajoutons un map. A ce stade aucun traitement ne se fait car il ne s'agit pas d'une action finale.
Le map prend une fonction lambda où li est une ligne et l'action sera de spliter la ligne avec le séparateur ',' et extraire la colonne 32.
scala> col32.show(10)
col32.show(10)
+--------------------+
| value|
+--------------------+
|nombre_pieces_pri...|
| |
| |
| |
| |
| 5|
| |
| |
| |
| |
+--------------------+
only showing top 10 rows
On constate que toutes les entrées ne renseignent pas forcément le nombre de pièce. Essayons de montrer plus de lignes :
scala> col32.show(1000)
Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 32 out of bounds for length 29
Nous constatons que la fonction qui split les lignes n'obtient pas 32 valeurs pour certaines lignes.
En effet nous ne parsons pas le fichier très proprement dans le cas où le fichier est mal formaté. Chaque ligne devrait avoir 40 colonnes. Comptons le nombre de colonnes par ligne et montrons le nombre de ligne par taille
scala> val colsCount = donnees_brutes.map(l => l.split(",").length)
colsCount: org.apache.spark.sql.Dataset[Int] = [value: int]
scala> colsCount.groupByKey(identity).count().orderBy("key").show
+---+--------+
|key|count(1)|
+---+--------+
| 31| 3|
| 40| 780120|
| 41| 12|
| 35| 4|
| 38| 39465|
| 29| 4948|
| 33| 2554|
+---+--------+
groupByKey
et count
nous permettent de calculer le nombre d’occurrences de colsCount.groupByKey
traite chaque entrée, qui dans notre cas, est simplement un Integer. Nous utilisons la fonction 'identité' pour utiliser cette valeur comme clé. La fonction count() compte le nombre d'occurence.
Les Dataset nous permettent de faire des choses plus puissantes que cet exemple. Voyons plus loin.
Autres manipulations du fichier de données csv
Vous pouvez spécifier plus de paramètres au chargement d'un fichier csv que dans l'exemple ci-dessus.
Nous demandons à scala de passer en "paste mode" pour pouvoir coller la section suivante sur plusieurs lignes.
scala> :paste
// Entering paste mode (ctrl-D to finish)
val donnees_brutes = spark.read.format("csv")
.option("locale", "France")
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.load("full.csv")
Une fois collé tapez Ctrl-D.
Spark a parsé le fichier csv avec les options fournies et à identifié les colonnes du fichier. Ce qui est puissant dans ce cas est que Spark peut identifier automatiquement le schema.
scala> donnees_brutes.printSchema
root
|-- id_mutation: string (nullable = true)
|-- date_mutation: string (nullable = true)
|-- numero_disposition: integer (nullable = true)
|-- nature_mutation: string (nullable = true)
|-- valeur_fonciere: double (nullable = true)
|-- adresse_numero: integer (nullable = true)
|-- adresse_suffixe: string (nullable = true)
|-- adresse_nom_voie: string (nullable = true)
|-- adresse_code_voie: string (nullable = true)
|-- code_postal: integer (nullable = true)
|-- code_commune: string (nullable = true)
|-- nom_commune: string (nullable = true)
|-- code_departement: string (nullable = true)
|-- ancien_code_commune: string (nullable = true)
|-- ancien_nom_commune: string (nullable = true)
|-- id_parcelle: string (nullable = true)
|-- ancien_id_parcelle: string (nullable = true)
|-- numero_volume: string (nullable = true)
|-- lot1_numero: string (nullable = true)
|-- lot1_surface_carrez: double (nullable = true)
|-- lot2_numero: string (nullable = true)
|-- lot2_surface_carrez: double (nullable = true)
|-- lot3_numero: string (nullable = true)
|-- lot3_surface_carrez: double (nullable = true)
|-- lot4_numero: integer (nullable = true)
|-- lot4_surface_carrez: double (nullable = true)
|-- lot5_numero: integer (nullable = true)
|-- lot5_surface_carrez: double (nullable = true)
|-- nombre_lots: integer (nullable = true)
|-- code_type_local: integer (nullable = true)
|-- type_local: string (nullable = true)
|-- surface_reelle_bati: integer (nullable = true)
|-- nombre_pieces_principales: integer (nullable = true)
|-- code_nature_culture: string (nullable = true)
|-- nature_culture: string (nullable = true)
|-- code_nature_culture_speciale: string (nullable = true)
|-- nature_culture_speciale: string (nullable = true)
|-- surface_terrain: integer (nullable = true)
|-- longitude: double (nullable = true)
|-- latitude: double (nullable = true)
Notez qu'il est aussi possible de spécifier le schema à spark si on connait des particularités.
Remarquez également que 'nombre_pieces_principales' est bien un integer.
Si on regarde les données ça ressemble fortement à une requête SQL
scala> donnees_brutes.show(3)
+---------+
|id_mutation|date_mutation|numero_disposition|nature_mutation|valeur_fonciere|adresse_numero|adresse_suffixe| adresse_nom_voie|adresse_code_voie|code_postal|code_commune|nom_commune|code_departement|ancien_code_commune|ancien_nom_commune| id_parcelle|ancien_id_parcelle|numero_volume|lot1_numero|lot1_surface_carrez|lot2_numero|lot2_surface_carrez|lot3_numero|lot3_surface_carrez|lot4_numero|lot4_surface_carrez|lot5_numero|lot5_surface_carrez|nombre_lots|code_type_local|type_local|surface_reelle_bati|nombre_pieces_principales|code_nature_culture| nature_culture|code_nature_culture_speciale|nature_culture_speciale|surface_terrain|longitude| latitude|
+---------+
| 2020-1| 2020-01-07| 1| Vente| 8000.0| null| null| FORTUNAT| B063| 1250| 01072| Ceyzériat| 01| null| null|01072000AK0216| null| null| null| null| null| null| null| null| null| null| null| null| 0| null| null| null| null| T| terres| null| null| 1061| 5.323522|46.171899|
| 2020-2| 2020-01-07| 1| Vente| 75000.0| null| null|RUE DE LA CHARTREUSE| 0064| 1960| 01289| Péronnas| 01| null| null|01289000AI0210| null| null| null| null| null| null| null| null| null| null| null| null| 0| null| null| null| null| AB|terrains a bâtir| null| null| 610| 5.226197|46.184538|
| 2020-3| 2020-01-14| 1| Vente| 89000.0| null| null| VACAGNOLE| B112| 1340| 01024| Attignat| 01| null| null|01024000AL0120| null| null| null| null| null| null| null| null| null| null| null| null| 0| null| null| null| null| AB|terrains a bâtir| null| null| 600| null| null|
+---------+
only showing top 3 rows
On peut même sélectionner une colonne qui nous intéresse
scala> donnees_brutes.select("nombre_pieces_principales").show
+-------------------------+
|nombre_pieces_principales|
+-------------------------+
| null|
| null|
| null|
| null|
| 5|
| null|
| null|
| null|
| null|
| 4|
| null|
| 4|
| 0|
| null|
| null|
| null|
| null|
| null|
| 7|
| null|
+-------------------------+
only showing top 20 rows
Nombre de propriétés vendues par nombre de pièces :
scala> donnees_brutes.groupBy(col("nombre_pieces_principales")).count().sort(col("nombre_pieces_principales")).show(100)
+-------------------------+------+
|nombre_pieces_principales| count|
+-------------------------+------+
| null|364129|
| 0|137414|
| 1| 33767|
| 2| 58112|
| 3| 77281|
| 4| 78304|
| 5| 47759|
| 6| 18971|
| 7| 6853|
| 8| 2635|
| 9| 960|
| 10| 460|
| 11| 227|
| 12| 96|
| 13| 42|
| 14| 32|
| 15| 15|
| 16| 13|
| 17| 7|
| 18| 3|
| 19| 1|
| 20| 8|
| 21| 2|
| 22| 3|
| 23| 3|
| 25| 2|
| 28| 2|
| 30| 1|
| 41| 1|
| 55| 1|
| 70| 1|
+-------------------------+------+
Nombre de propriétés vendues par département :
scala> donnees_brutes.groupBy("code_departement").count.orderBy("code_departement").show(100)
+----------------+-----+
|code_departement|count|
+----------------+-----+
| 01| 6314|
| 02| 9217|
| 03| 1832|
| 04| 2550|
| 05| 677|
| 06|18859|
| 07| 6165|
| 08| 5256|
| 09| 563|
| 10| 4662|
| 11| 6473|
| 12| 149|
| 13| 9435|
| 14| 5474|
| 15| 81|
| 16| 3848|
| 17| 3857|
| 18| 6185|
| 19| 6346|
| 21| 23|
| 22|12335|
| 23| 5686|
| 24|11069|
| 25| 2419|
| 26| 4990|
| 27| 4136|
| 28| 9412|
| 29| 5251|
| 2A| 1983|
| 2B| 43|
| 30|18654|
| 31| 4439|
| 32| 1942|
| 33|36966|
| 34|13716|
| 35|22680|
| 36| 1096|
| 37| 6864|
| 38| 8015|
| 39| 2817|
| 40| 6351|
| 41|10449|
| 42|11995|
| 43| 4703|
| 44|34684|
| 45|13716|
| 46| 401|
| 47|10392|
| 48| 2941|
| 49|20213|
| 50|13129|
| 51|11161|
| 52| 989|
| 53| 6139|
| 54| 9418|
| 55| 119|
| 56|15274|
| 58| 4529|
| 59|29203|
| 60|13304|
| 61| 940|
| 62|14243|
| 63| 4073|
| 64| 2536|
| 65| 1289|
| 66| 1681|
| 69|16265|
| 70| 4078|
| 71| 3743|
| 72|12665|
| 73| 1118|
| 74| 6771|
| 75|23498|
| 76|12723|
| 77|24965|
| 78|23371|
| 79| 9289|
| 80| 7674|
| 81| 1137|
| 82| 5257|
| 83|11189|
| 84| 4262|
| 85|18288|
| 86| 6425|
| 87|10147|
| 88|10698|
| 89| 6259|
| 90| 383|
| 91|17785|
| 92|14624|
| 93|15855|
| 94|19177|
| 95|19635|
| 971| 31|
| 972| 1241|
| 973| 1236|
| 974| 965|
+----------------+-----+
Notez que l'utilisation de col() dans les cas simples n'est pas obligatoire.
On peut aussi ajouter des colonnes :
scala> donnees_brutes.groupBy("code_departement").agg(count("code_departement"), sum("valeur_fonciere")).orderBy("code_departement").show(20)
+----------------+-----------------------+--------------------+
|code_departement|count(code_departement)|sum(valeur_fonciere)|
+----------------+-----------------------+--------------------+
| 01| 6314| 1.742594188399994E9|
| 02| 9217|1.3345704534799988E9|
| 03| 1832|2.7055697240999997E8|
| 04| 2550|3.8006166279999995E8|
| 05| 677| 1.128379445E8|
| 06| 18859|1.205845898618000...|
| 07| 6165| 8.0660834657E8|
| 08| 5256| 5.004314756300006E8|
| 09| 563|1.3252729695000008E8|
| 10| 4662| 8.730233068700001E8|
| 11| 6473|1.1661764984500015E9|
| 12| 149| 5.617143578E7|
| 13| 9435| 3.72011788628E9|
| 14| 5474|1.5801534871200008E9|
| 15| 81| 4574032.25|
| 16| 3848| 5.387546528899996E8|
| 17| 3857| 7.421604203099996E8|
| 18| 6185| 9.852137856099991E8|
| 19| 6346| 5.676203941399999E8|
| 21| 23| 2601610.0|
+----------------+-----------------------+--------------------+
only showing top 20 rows
Autres requêtes sur les données
- filtrer les données pour un code postal particulier :
scala> val data1 = donnees_brutes.filter("code_postal == 79000")
data1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id_mutation: string, date_mutation: string ... 38 more fields]
- comme il peut y avoir plusieurs communes pour un même code postal, nous filtrons en plus par nom de commune, qui est typée donc on met entre simple quote. Et nous mettons en cache ce qui permet de ne pas relancer les traitements à chaque opération :
scala> data1.groupBy("nom_commune").count.show
+-----------+-----+
|nom_commune|count|
+-----------+-----+
| Sciecq| 15|
| Niort| 1199|
| Bessines| 36|
+-----------+-----+
scala> val data2 = data1.filter("nom_commune == 'Niort'").cache
data2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id_mutation: string, date_mutation: string ... 38 more fields]
- nous regroupons ensuite par adresse et nous affichons les valeurs foncières. Le booleen dans la fonction show permet de ne pas tronquer les noms de rues :
scala> val data3 = data2.groupBy("valeur_fonciere").agg(collect_set("adresse_numero"),collect_set("adresse_nom_voie"))
data3: org.apache.spark.sql.DataFrame = [valeur_fonciere: double, collect_set(adresse_numero): array<int> ... 1 more field]
scala> data3.show(3, false)
+---------------+---------------------------+-----------------------------------------+
|valeur_fonciere|collect_set(adresse_numero)|collect_set(adresse_nom_voie) |
+---------------+---------------------------+-----------------------------------------+
|181400.0 |[219] |[RUE JEAN JAURES] |
|300000.0 |[65] |[LA JEUNE NOEMIE, RUE LOUISE MICHEL] |
|330000.0 |[133, 47] |[AV DE LA VENISE VERTE, RUE DE FLEURELLE]|
+---------------+---------------------------+-----------------------------------------+
only showing top 3 rows
Utiliser SparkSQL
scala> donnees_brute.createTempView("valeurs")
scala> spark.sql("SELECT code_departement, count(code_departement), format_number(sum(valeur_fonciere),2) as val from valeurs group by code_departement").show
+----------------+-----------------------+----------------+
|code_departement|count(code_departement)| val|
+----------------+-----------------------+----------------+
| 07| 6165| 806,608,346.57|
| 51| 11161|2,602,674,825.81|
| 15| 81| 4,574,032.25|
| 54| 9418|3,584,392,751.82|
| 11| 6473|1,166,176,498.45|
| 29| 5251| 924,555,519.99|
| 69| 16265|6,919,943,214.33|
| 42| 11995|2,150,156,062.88|
| 73| 1118| 322,142,860.18|
| 87| 10147|1,252,443,498.36|
| 974| 965| 915,366,405.44|
| 64| 2536| 469,110,158.79|
| 30| 18654|5,146,175,384.62|
| 34| 13716|2,900,615,804.92|
| 59| 29203|9,800,705,395.83|
| 01| 6314|1,742,594,188.40|
| 22| 12335|1,780,289,300.53|
| 28| 9412|2,595,710,151.61|
| 85| 18288|2,669,234,383.54|
| 16| 3848| 538,754,652.89|
+----------------+-----------------------+----------------+
only showing top 20 rows
Beaucoup de sources de données disponibles
Apache spark sait lire des tas de sources de données en natif et avec des modules open source. En voici une liste à titre d'exemple, mais en général n'importe quelle source est possible
- CSV
- JSON
- Parquet
- ORC
- JDBC/ODBC connections
- Plain-text files
- Cassandra
- HBase
- MongoDB
- AWS Redshift
- XML
- Kafka
- etc...
Exemple : Lire un fichier JSON
Spark ne va pas lire directement un fichier JSON, mais attend un object JSON par ligne, par example
{"nom":"Garcia","prenom":"Denis"}
{"nom":"Colin","prenom":"Gérald"}
Vous pouvez préparer votre fichier JSON avec un simple commande jq
cat data.json | jq -c '.' > data-ready.json
Maintenant, il nous suffit de faire comme suit avec spark :
scala> val fichier = "data-ready.json"
scala> val personnes = spark.read.json(fichier)
scala> personnes.printSchema()
root
|-- firstname: string (nullable = true)
|-- name: string (nullable = true)
scala> personnes.show()
+------+------+
|prenom| nom |
+------+------+
| Denis|Garcia|
|Gérald| Colin|
+------+------+
Voilà pour cette mise en bouche rapide pour l'utilisation d'Apache Spark et le traitement rapide et éphémère de données.