DéveloppeurWeb.Com
    DéveloppeurWeb.Com
    • Agile Zone
    • AI Zone
    • Cloud Zone
    • Database Zone
    • DevOps Zone
    • Integration Zone
    • Web Dev Zone
    DéveloppeurWeb.Com
    Home»Uncategorized»Utiliser Golang pour le traitement des données avec Amazon Kinesis et AWS Lambda
    Uncategorized

    Utiliser Golang pour le traitement des données avec Amazon Kinesis et AWS Lambda

    mars 14, 2023
    Utiliser Golang pour le traitement des données avec Amazon Kinesis et AWS Lambda
    Share
    Facebook Twitter Pinterest Reddit WhatsApp Email

    Ce billet de blog est destiné aux personnes souhaitant apprendre à utiliser Golang et AWS Lambda pour créer une solution sans serveur. Vous utiliserez la bibliothèque aws-lambda-go avec AWS Go SDK v2 pour une application qui traitera les enregistrements d’un flux de données Amazon Kinesis et les stockera dans une table DynamoDB. Mais ce n’est pas tout! Vous utiliserez également les liaisons Go pour AWS CDK afin d’implémenter « l’infrastructure en tant que code » pour l’ensemble de la solution et de la déployer avec l’interface de ligne de commande AWS CDK.

    Introduction

    Amazon Kinesis est une plateforme de traitement, d’ingestion et d’analyse de données en temps réel. Kinesis Data Streams est un sans serveur données en continu service (qui fait partie de la plateforme de données de streaming Kinesis, avec Kinesis Data Firehose, Kinesis Video Streams et Kinesis Data Analytics) qui permet aux développeurs de collecter, traiter et analyser de grandes quantités de données en temps réel à partir de diverses sources telles que les médias sociaux , appareils IoT, journaux, etc. AWS Lambda, en revanche, est un sans serveur calculer service qui permet aux développeurs d’exécuter leur code sans avoir à gérer l’infrastructure sous-jacente.

    L’intégration d’Amazon Kinesis à AWS Lambda offre un moyen efficace de traiter et d’analyser de grands flux de données en temps réel. Un flux de données Kinesis est un ensemble de partitions et chaque partition contient une séquence d’enregistrements de données. Une fonction Lambda peut agir comme une application consommateur et traiter les données d’un flux de données Kinesis. Vous pouvez mapper une fonction Lambda à un consommateur à débit partagé (itérateur standard) ou à un consommateur à débit dédié avec diffusion améliorée. Pour les itérateurs standard, Lambda interroge chaque partition de votre flux Kinesis pour les enregistrements utilisant le protocole HTTP. Le mappage de source d’événement partage le débit de lecture avec d’autres consommateurs de la partition.

    Amazon Kinesis et AWS Lambda peuvent être utilisés ensemble pour créer de nombreuses solutions, notamment l’analyse en temps réel (permettant aux entreprises de prendre des décisions éclairées), le traitement des journaux (utilisez les journaux pour identifier et résoudre de manière proactive les problèmes des serveurs/applications, etc. avant qu’ils ne deviennent critiques) , le traitement des données IoT (analysez les données de l’appareil en temps réel et déclenchez des actions en fonction des résultats), l’analyse du flux de clics (fournissez des informations sur le comportement des utilisateurs), la détection des fraudes (détectez et empêchez les transactions par carte frauduleuses) et plus encore.

    Comme toujours, le code est disponible sur GitHub.

    Flux Kinesis-Lambda-DynamoDB

    Conditions préalables

    Avant de continuer, assurez-vous que vous disposez du langage de programmation Go (v1.18 ou supérieur) et AWS CDK installé.

    Clonez le dépôt GitHub et accédez au bon répertoire :

    git clone https://github.com/abhirockzz/kinesis-lambda-events-golang
    
    cd kinesis-lambda-events-golang
    

    Utiliser AWS CDK pour déployer la solution

    Pour démarrer le déploiement, invoquez simplement cdk deploy et attendez un peu. Vous verrez une liste de ressources qui seront créées et vous devrez fournir votre confirmation pour continuer.

    cd cdk
    
    cdk deploy
    
    # output
    
    Bundling asset KinesisLambdaGolangStack/kinesis-function/Code/Stage...
    
    ✨  Synthesis time: 5.94s
    
    This deployment will make potentially sensitive changes according to your current security approval level (--require-approval broadening).
    Please confirm you intend to make the following modifications:
    
    //.... omitted
    
    Do you wish to deploy these changes (y/n)? y
    

    Cela commencera à créer les ressources AWS requises pour notre application.

    Si vous souhaitez voir le modèle AWS CloudFormation qui sera utilisé en arrière-plan, exécutez cdk synth et vérifier le cdk.out dossier.

    Vous pouvez suivre la progression dans le terminal ou accéder à la console AWS : CloudFormation > Stacks > KinesisLambdaGolangStack.

    Une fois toutes les ressources créées, vous pouvez tester l’application. Tu aurais dû:

    • Une fonction Lambda
    • Un flux Kinesis
    • Une table DynamoDB
    • Avec quelques autres composants (comme les rôles IAM, etc.)

    Vérifier la solution

    Vous pouvez vérifier la table et les informations de flux Kinesis dans la sortie de la pile (dans le terminal ou le Les sorties onglet dans la console AWS CloudFormation pour votre pile) :

    KinesisLambdaGolangStack

    Publiez quelques messages sur le flux Kinesis. Pour les besoins de cette démonstration, vous pouvez utiliser l’AWS CLI :

    export KINESIS_STREAM=<enter the Kinesis stream name from cloudformation output>
    
    aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user1@foo.com --data $(echo -n '{"name":"user1", "city":"seattle"}' | base64)
    aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user2@foo.com --data $(echo -n '{"name":"user2", "city":"new delhi"}' | base64)
    aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user3@foo.com --data $(echo -n '{"name":"user3", "city":"new york"}' | base64)
    

    Vérifiez la table DynamoDB pour confirmer que les métadonnées du fichier ont été stockées. Vous pouvez utiliser la console AWS ou l’AWS CLI aws dynamodb scan --table-name <enter the table name from cloudformation output>.

    Vérifiez la table DynamoDB : confirmez que les métadonnées du fichier ont été stockées

    N’oubliez pas de nettoyer

    Une fois que vous avez terminé, pour supprimer tous les services, utilisez simplement :

    cdk destroy
    
    #output prompt (choose 'y' to continue)
    
    Are you sure you want to delete: KinesisLambdaGolangStack (y/n)?
    

    Vous avez pu configurer et essayer la solution complète. Avant de conclure, parcourons rapidement certaines des parties importantes du code pour mieux comprendre ce qui se passe dans les coulisses.

    Procédure pas à pas

    Une partie du code (gestion des erreurs, journalisation, etc.) a été omise par souci de brièveté car nous ne voulons nous concentrer que sur les parties importantes.

    CDK AWS

    Vous pouvez vous référer au code CDK ici.

    On commence par créer le DynamoDB tableau:

        table := awsdynamodb.NewTable(stack, jsii.String("dynamodb-table"),
            &awsdynamodb.TableProps{
                PartitionKey: &awsdynamodb.Attribute{
                    Name: jsii.String("email"),
                    Type: awsdynamodb.AttributeType_STRING},
            })
    
        table.ApplyRemovalPolicy(awscdk.RemovalPolicy_DESTROY)
    

    Nous créons la fonction Lambda (CDK se chargera de créer et de déployer la fonction) et nous nous assurons de lui fournir les autorisations appropriées pour écrire dans le DynamoDB tableau.

        function := awscdklambdagoalpha.NewGoFunction(stack, jsii.String("kinesis-function"),
            &awscdklambdagoalpha.GoFunctionProps{
                Runtime:     awslambda.Runtime_GO_1_X(),
                Environment: &map[string]*string{"TABLE_NAME": table.TableName()},
                Entry:       jsii.String(functionDir),
            })
    
        table.GrantWriteData(function)
    

    Ensuite, nous créons le flux Kinesis et l’ajoutons en tant que source d’événement à la fonction Lambda.

        kinesisStream := awskinesis.NewStream(stack, jsii.String("lambda-test-stream"), nil)
    
        function.AddEventSource(awslambdaeventsources.NewKinesisEventSource(kinesisStream, &awslambdaeventsources.KinesisEventSourceProps{
            StartingPosition: awslambda.StartingPosition_LATEST,
        }))
    

    Enfin, nous exportons le flux Kinesis et le nom de la table DynamoDB en tant que sorties CloudFormation.

        awscdk.NewCfnOutput(stack, jsii.String("kinesis-stream-name"),
            &awscdk.CfnOutputProps{
                ExportName: jsii.String("kinesis-stream-name"),
                Value:      kinesisStream.StreamName()})
    
        awscdk.NewCfnOutput(stack, jsii.String("dynamodb-table-name"),
            &awscdk.CfnOutputProps{
                ExportName: jsii.String("dynamodb-table-name"),
                Value:      table.TableName()})
    

    Fonction lambda

    Vous pouvez vous référer au code de la fonction Lambda ici.

    Le gestionnaire de fonctions Lambda effectue une itération sur chaque enregistrement du flux Kinesis, et pour chacun d’entre eux :

    • Désorganise le JSON charge utile dans le flux Kinesis dans un Go struct
    • Stocke la clé de partition de données de flux en tant qu’attribut de clé primaire (email) de la DynamoDB tableau
    • Le reste des informations est extrait des données de flux et également stocké dans la table.
    func handler(ctx context.Context, kinesisEvent events.KinesisEvent) error {
    
        for _, record := range kinesisEvent.Records {
    
            data := record.Kinesis.Data
    
            var user CreateUserInfo
            err := json.Unmarshal(data, &user)
    
            item, err := attributevalue.MarshalMap(user)
            if err != nil {
                return err
            }
    
            item["email"] = &types.AttributeValueMemberS{Value: record.Kinesis.PartitionKey}
    
            _, err = client.PutItem(context.Background(), &dynamodb.PutItemInput{
                TableName: aws.String(table),
                Item:      item,
            })
        }
    
        return nil
    }
    
    type CreateUserInfo struct {
        Name string `json:"name"`
        City string `json:"city"`
    }
    

    Conclure

    Dans ce blog, vous avez vu un exemple d’utilisation de Lambda pour traiter des messages dans un flux Kinesis et les stocker dans DynamoDB, grâce à l’intégration Kinesis et Lamdba. L’ensemble du cycle de vie de l’infrastructure a été automatisé à l’aide d’AWS CDK.

    Tout cela a été fait à l’aide du langage de programmation Go, qui est bien pris en charge dans DynamoDB, AWS Lambda et AWS CDK.

    Bonne construction !

    Share. Facebook Twitter Pinterest LinkedIn WhatsApp Reddit Email
    Add A Comment

    Leave A Reply Cancel Reply

    Catégories

    • Politique de cookies
    • Politique de confidentialité
    • CONTACT
    • Politique du DMCA
    • CONDITIONS D’UTILISATION
    • Avertissement
    © 2023 DéveloppeurWeb.Com.

    Type above and press Enter to search. Press Esc to cancel.