DéveloppeurWeb.Com
    DéveloppeurWeb.Com
    • Agile Zone
    • AI Zone
    • Cloud Zone
    • Database Zone
    • DevOps Zone
    • Integration Zone
    • Web Dev Zone
    DéveloppeurWeb.Com
    Home»Uncategorized»Charger des données dans Redshift à l’aide de PySpark
    Uncategorized

    Charger des données dans Redshift à l’aide de PySpark

    mars 17, 2023
    Charger des données dans Redshift à l'aide de PySpark
    Share
    Facebook Twitter Pinterest Reddit WhatsApp Email

    Les données sont le moteur de nombreuses entreprises d’aujourd’hui. Avec les quantités toujours croissantes de données disponibles, les entreprises doivent créer des pipelines de données optimisés capables de gérer de gros volumes de données de manière fiable et efficace. Dans cet article, nous expliquerons comment créer un pipeline de données optimisé à l’aide de PySpark et charger les données dans une table de base de données Redshift. Nous couvrirons également le nettoyage, la transformation, le partitionnement et la validation de la qualité des données.

    Avant de plonger dans le code, jetons un coup d’œil aux outils que nous allons utiliser :

    • PySpark : PySpark est une API Python pour Apache Spark, un système informatique distribué open source. PySpark fournit une interface pour programmer Spark avec Python.
    • Décalage vers le rouge : Amazon Redshift est un entrepôt de données rapide, entièrement géré et à l’échelle du pétaoctet qui permet d’analyser simplement et à moindre coût toutes vos données à l’aide de SQL standard et de vos outils de Business Intelligence (BI) existants.

    Avec ces outils à l’esprit, commençons par définir le problème que nous voulons résoudre.

    Définition du problème

    Supposons que nous disposions d’un grand ensemble de données contenant des informations sur les clients et leurs commandes. Nous souhaitons charger cet ensemble de données dans une table Redshift et effectuer les tâches suivantes :

    1. Nettoyage des données: Supprimez tous les enregistrements contenant des données manquantes ou non valides.
    2. Transformation de données : Transformez les données dans un format adapté à Redshift.
    3. Partitionnement : Partitionnez les données en sous-ensembles plus petits pour améliorer les performances des requêtes.
    4. Validation de la qualité des données : Validez la qualité des données de l’ensemble de données avant de le charger dans Redshift.
    5. Chargement des données dans Redshift : Chargez l’ensemble de données nettoyé et transformé dans une table Redshift.

    Maintenant que nous avons défini le problème, commençons à construire notre pipeline de données optimisé. Voici le code complet pour créer un pipeline de données optimisé avec nettoyage, transformation, partitionnement et validation de la qualité des données à l’aide de PySpark et chargement dans la table de base de données Redshift :

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, when 
    
    # create a spark session
    spark = SparkSession.builder.appName("DataPipeline").getOrCreate() 
    
    # load the data
    data = spark.read.csv("customer_orders.csv", header=True, inferSchema=True) 
    
    # drop records with missing or invalid data
    data = data.dropna() 
    
    # rename columns
    data = data.withColumnRenamed("Customer ID", "customer_id") \
        .withColumnRenamed("Order ID", "order_id") \
        .withColumnRenamed("Order Date", "order_date") \
        .withColumnRenamed("Order Amount", "order_amount") 
    
    # convert data types
    data = data.withColumn("customer_id", col("customer_id").cast("int")) \
        .withColumn("order_id", col("order_id").cast("int")) \
        .withColumn("order_amount", col("order_amount").cast("double")) 
    
    # partition data
    data = data.repartition("customer_id") 
    
    # data quality validation
    if data.count() == 0:    
        print("Error: No data to process.")    
        exit() 
    
    # load data into Redshift
    data.write \
        .format("jdbc") \
        .option("url", "jdbc:redshift://redshift-cluster-1.cvboublcdews.us-west-2.redshift.amazonaws.com:5439/dev") \
        .option("dbtable", "customer_orders") \
        .option("user", "username") \
        .option("password", "password") \
        .option("aws_iam_role", "arn:aws:iam::0123456789012:role/myRedshiftRole") \
        .option("tempdir", "s3a://my-s3-bucket/temp/") \
        .mode("overwrite") \
        .save() 
    
    # stop the spark session
    spark.stop()

    Maintenant, expliquons chaque étape du code.

    Étape 1 : Configurer PySpark et Redshift

    Nous commençons par importer les bibliothèques nécessaires et configurons PySpark.

    Nous importons également le col et when fonctions de pyspark.sql.functions bibliothèque. Ces fonctions seront utilisées plus tard dans l’étape de transformation des données.

    Étape 2 : Charger les données

    L’étape suivante consiste à charger les données dans PySpark.

    Nous chargeons les données à partir d’un fichier CSV en utilisant le read.csv() méthode. Nous spécifions également que le fichier a une ligne d’en-tête et déduisons le schéma à partir des données.

    Étape 3 : nettoyage des données

    L’étape suivante consiste à nettoyer les données en supprimant tous les enregistrements contenant des données manquantes ou non valides.

    Nous utilisons le dropna() méthode pour supprimer tous les enregistrements avec des données manquantes ou invalides.

    Étape 4 : transformation des données et validation de la qualité des données

    L’étape suivante consiste à transformer les données dans un format adapté à Redshift. Nous allons renommer les colonnes pour se conformer aux conventions de dénomination Redshift et convertir les types de données pour qu’ils correspondent au schéma de table Redshift. Les données sont validées à l’aide du <dataframe>.count() pour vous assurer que la trame de données n’est pas vide avant de lancer l’écriture dans la base de données Redshift.

    Étape 5 : Écrire dans la base de données Redshift

    Enfin, nous utilisons data.write pour écrire les données du PySpark DataFrame dans Redshift. Nous spécifions les propriétés de connexion Redshift telles que l’URL, l’utilisateur, le mot de passe, le rôle IAM et le répertoire S3 temporaire où les données sont stockées avant d’être chargées dans Redshift. Nous spécifions également le nom de la table et définissons les options pour tronquer la table avant d’écrire les données (c’est-à-dire supprimer toutes les données existantes) et le mode d’écrasement (c’est-à-dire remplacer la table si elle existe déjà). Enfin, nous utilisons spark.stop() pour arrêter la SparkSession.

    Share. Facebook Twitter Pinterest LinkedIn WhatsApp Reddit Email
    Add A Comment

    Leave A Reply Cancel Reply

    Catégories

    • Politique de cookies
    • Politique de confidentialité
    • CONTACT
    • Politique du DMCA
    • CONDITIONS D’UTILISATION
    • Avertissement
    © 2023 DéveloppeurWeb.Com.

    Type above and press Enter to search. Press Esc to cancel.