Apache Spark : Les sources de données externes
Généralité sur Apache Spark :
Spark est un système de calcul distribué sur des collections de données répliquées et résilientes. Les calculs sont exécutés avec un fort parallélisme. Spark supporte plusieurs languages : Scala, Java, Python et R.
Le schéma classique d'une application Spark est le suivant :
- Chargement des données
- Tranformation des données
- Actions pour générer des résultats
Dans cet article nous allons couvrir les sources de données pour montrer la grande diversité des sources, et l'utilisation simple d'une API standard.
Les sources de données supportées
- Connecting to SQL Databases using JDBC
- Cassandra
- Couchbase
- ElasticSearch
- Importing Hive Tables
- MongoDB
- Neo4j
- Oracle
- Reading Avro Files
- Reading CSV Files
- Reading JSON Files
- Reading LZO Compressed Files
- Reading Parquet Files
- Redis
- Riak Time Series
- Zip Files
- Amazon Redshift
- Amazon S3 with Apache Spark
- Azure Blob Storage
- Azure Data Lake Store
- Azure Cosmos DB
- Azure SQL Data Warehouse
Il est relativement facile d'en ajouter des spécifiques si elles ne sont pas déjà dans cette liste, et la liste continue...
Une API très simple et versatile
L'API Spark pour charger ou sauver des données est simple, standard (le même pour toutes les source) et versatile (support déjà aujourd'hui beaucoup de sources de données
Dataset<Row> data = DataFrameReader.format(String inputDataSourceFormat)
.option(...)
.option(...)
.load(...)
inputDataSourceFormat : chaine de caractères définissant le format des données à charger (text, json, xml, jdbc, S3, Cassandra, ...)
Load textFile from HDFS
Dataset<Row> dataset = sparkSession
.read()
.textFile("hdfs://[filepath_on_hdfs]");
long numberOfLines = dataset.count();
Load CSV
Dataset<Row> dataset = sparkSession
.option("header", true)
.option("inferSchema", true)
.csv("hdfs://[filepath_on_hdfs]");
long numberOfLines = dataset.count();
dataset.printSchema();
Load Json
Dataset<Row> dataset = sparkSession
.read()
.json("hdfs://[filepath_on_hdfs]");
long numberOfLines = dataset.count();
ou avec un schéma
StructType userSchema = new StructType()
.add("name", "string")
.add("age", "integer", true)
.add("food", "string", true);
Dataset<People> people = sparkSession
.read()
.json("[filepath on S3, HDFS, url, ...)");
Load XML
Example de chargement de données mappé sur un object métier.
Dataset<People> people = sparkSession
.read()
.xml("[filepath on S3, HDFS, url, ...)")
.as(Encoders.bean(People.class));
ou
Dataset<Row> propertiesStats = sparkSession
.read()
.option("rootTag", "Properties")
.option("rowTag", "Property")
.xml("[filepath on S3, HDFS, url, ...)")
.groupBy("BedroomsCount")
.agg( avg("Price").alias("Average Price"),
count("Price").alias("Prices Count"));
Load data from a database with JDBC
Properties dbProperties = new Properties();
dbProperties.put("driver", "com.mysql.jdbc.Driver");
dbProperties.put("user", "userName");
dbProperties.put("password", "userSecret");
Dataset<Row> persons = spark.read()
.format("jdbc")
.load("jdbc:mysql://dbIP:3306/databaseName,
tableName, dbProperties);
ou write to JSBC
Dataset<Row> persons = spark.write()
.jdbc(jdbcURL, tableName, dbProperties);
Les streams
On peut charger des données en mode streaming (en fait micro-batch) quasi temps-réel
Ecouter sur un port
Dataset<Row> socketDF = spark
.readStream()
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load();
Ecouter un répertoire (HDFS, etc...)
// Read all the csv files written atomically in a directory
StructType userSchema = new StructType()
.add("name", "string")
.add("age", "integer", true)
.add("food", "string", true);
Dataset<Row> df = spark
.readStream()
.schema(userSchema)
.json(folderPath);
StreamingQuery query = df
.where("age > 40")
.groupBy("age")
.agg(collect_list("name"))
.writeStream()
.outputMode("complete")
.format("console")
.start();
query.awaitTermination();