Apache Spark : Les sources de données externes

Apache Spark : Les sources de données externes

Généralité sur Apache Spark :

Spark est un système de calcul distribué sur des collections de données répliquées et résilientes. Les calculs sont exécutés avec un fort parallélisme. Spark supporte plusieurs languages : Scala, Java, Python et R.

Le schéma classique d'une application Spark est le suivant :

  • Chargement des données
  • Tranformation des données
  • Actions pour générer des résultats

Spark-Workflow

Dans cet article nous allons couvrir les sources de données pour montrer la grande diversité des sources, et l'utilisation simple d'une API standard.

Les sources de données supportées

  • Connecting to SQL Databases using JDBC
  • Cassandra
  • Couchbase
  • ElasticSearch
  • Importing Hive Tables
  • MongoDB
  • Neo4j
  • Oracle
  • Reading Avro Files
  • Reading CSV Files
  • Reading JSON Files
  • Reading LZO Compressed Files
  • Reading Parquet Files
  • Redis
  • Riak Time Series
  • Zip Files
  • Amazon Redshift
  • Amazon S3 with Apache Spark
  • Azure Blob Storage
  • Azure Data Lake Store
  • Azure Cosmos DB
  • Azure SQL Data Warehouse

Il est relativement facile d'en ajouter des spécifiques si elles ne sont pas déjà dans cette liste, et la liste continue...

Une API très simple et versatile

L'API Spark pour charger ou sauver des données est simple, standard (le même pour toutes les source) et versatile (support déjà aujourd'hui beaucoup de sources de données

Dataset<Row> data = DataFrameReader.format(String inputDataSourceFormat)
    .option(...)
    .option(...)
    .load(...)

inputDataSourceFormat : chaine de caractères définissant le format des données à charger (text, json, xml, jdbc, S3, Cassandra, ...)

Load textFile from HDFS

Dataset<Row> dataset = sparkSession
                .read()
                .textFile("hdfs://[filepath_on_hdfs]");
long numberOfLines = dataset.count();

Load CSV

Dataset<Row> dataset = sparkSession
                .option("header", true)
                .option("inferSchema", true)
                .csv("hdfs://[filepath_on_hdfs]");
long numberOfLines = dataset.count();
dataset.printSchema();

Load Json

Dataset<Row> dataset = sparkSession
                .read()
                .json("hdfs://[filepath_on_hdfs]");
long numberOfLines = dataset.count();

ou avec un schéma

StructType userSchema = new StructType()
    .add("name", "string")
    .add("age", "integer", true)
	.add("food", "string", true);


Dataset<People> people = sparkSession
     .read()
     .json("[filepath on S3, HDFS, url, ...)");

Load XML

Example de chargement de données mappé sur un object métier.

Dataset<People> people = sparkSession
     .read()
     .xml("[filepath on S3, HDFS, url, ...)")
     .as(Encoders.bean(People.class));

ou

Dataset<Row> propertiesStats = sparkSession
     .read()
     .option("rootTag", "Properties")
     .option("rowTag", "Property")
     .xml("[filepath on S3, HDFS, url, ...)")
     
     .groupBy("BedroomsCount")
     .agg(   avg("Price").alias("Average Price"),
             count("Price").alias("Prices Count"));

Load data from a database with JDBC

Properties dbProperties = new Properties();
dbProperties.put("driver", "com.mysql.jdbc.Driver");
dbProperties.put("user", "userName");
dbProperties.put("password", "userSecret");

Dataset<Row> persons = spark.read()
    .format("jdbc")
    .load("jdbc:mysql://dbIP:3306/databaseName,
           tableName, dbProperties);

ou write to JSBC

Dataset<Row> persons = spark.write()
	.jdbc(jdbcURL, tableName, dbProperties);

Les streams

On peut charger des données en mode streaming (en fait micro-batch) quasi temps-réel

Ecouter sur un port

Dataset<Row> socketDF = spark
	.readStream()
	.format("socket")
	.option("host", "localhost")
	.option("port", 9999)
	.load();

Ecouter un répertoire (HDFS, etc...)

// Read all the csv files written atomically in a directory
StructType userSchema = new StructType()
	.add("name", "string")
	.add("age", "integer", true)
	.add("food", "string", true);

Dataset<Row> df = spark
	.readStream()
	.schema(userSchema)
	.json(folderPath);

StreamingQuery query = df
	.where("age > 40")
	.groupBy("age")
	.agg(collect_list("name"))
    .writeStream()
	.outputMode("complete")
	.format("console")
	.start();

query.awaitTermination();