introduction
Nous verrons comment configurer un index OpenSearch à partir d’une table DynamoDb. Nous supposerons que vous avez une certaine connaissance de DynamoDB et Lambdas et que vous êtes également familiarisé avec l’utilisation de CDK pour déployer l’infrastructure dans AWS.
DynamoDB
Tout d’abord, réfléchissons à notre table DynamoDB et comment la configurer de manière à ce qu’elle soit prête à être indexée. C’est en fait très simple et utilise un flux DynamoDB.
"A DynamoDB stream is an ordered flow of information about changes to items in a DynamoDB table." - AWS
En utilisant CDK, votre table pourrait ressembler à :
const userTable = new dynamodb.Table(this, "UserTable", {
tableName: "user-table",
billingMode: BillingMode.PAY_PER_REQUEST,
partitionKey: {name: "partitionKey", type: AttributeType.STRING},
sortKey: {name: "sortKey", type: AttributeType.STRING},
pointInTimeRecovery: true,
stream: StreamViewType.NEW_IMAGE // This is the important line!
});
Si vous utilisez la console au lieu de CDK, consultez ceci.
Il existe différents types de flux :
KEYS_ONLY - Only the key attributes of the modified item are written to the stream.
NEW_IMAGE - The entire item, as it appears after it was modified, is written to the stream.
OLD_IMAGE - The entire item, as it appeared before it was modified, is written to the stream.
NEW_AND_OLD_IMAGES - Both new and old item images of the item are written to the stream.
Ici, nous avons choisi NEW_IMAGE
car nous avons seulement besoin de connaître le nouvel élément à indexer.
Cela créera une table avec un flux DynamoDB, ce qui signifie que tout événement d’élément nouveau, mis à jour ou supprimé sera diffusé dans un endroit de votre choix ; Nous choisissons un lambda.
Lambda
Donc, ensuite, nous devons penser à l’indexation lambda. Il y a actuellement aucun moyen direct d’indexer vos données d’un flux vers le domaine OpenSearch, nous devons donc ajouter un intermédiaire pour faire le travail. Le code de ce Lambda et de quelques autres Lambda utiles peut être trouvé ici sur Github. Cette Lambda vit dans le index-stream
annuaire.
Voici un extrait de code de ce gestionnaire Lambda :
export const handler = async (event: DynamoDBStreamEvent): Promise<void> => {
console.log("Received event from the user table");
// In later setup we set the batchSize of this event source to be 1
// so in reality this for loop will only run once each time
// but if you wanted to increase the batch size this Lambda can handle that
for (const record of event.Records) {
if (!record.eventName || !record.dynamodb || !record.dynamodb.Keys) continue;
const partitionKey = record.dynamodb.Keys.partitionKey.S;
const sortKey = record.dynamodb.Keys.sortKey.S;
// Note here that we are using a pk and sk
// but maybe you are using only an id, this would look like:
// const id = record.dynamodb.Keys.id.S;
try {
if (record.eventName === "REMOVE") {
// removeDocumentFromOpenSearch will perform a DELETE request to your index
return await removeDocumentFromOpenSearch(partitionKey, sortKey);
} else {
// There are 2 types of events left to handle, INSERT and MODIFY,
// which will both contain a NewImage
if (!record.dynamodb.NewImage) continue;
const userDocument = DynamoDB.Converter.unmarshall(record.dynamodb.NewImage) as User;
// indexDocumentInOpenSearch will perform a PUT request to your index
return await indexDocumentInOpenSearch(userDocument, partitionKey, sortKey);
}
} catch (error) {
console.error("Error occurred updating OpenSearch domain", error);
throw error;
}
}
};
Dans CDK, vous pouvez créer votre lambda comme suit :
const userTableIndexingFunction = new Function(this, "UserTableIndexingFunction", {
functionName: "UserTableIndexingFunction",
code: Code.fromAsset("user-table-indexing-lambda-dist-folder"),
runtime: Runtime.NODEJS_14_X,
handler: "index.handler"
});
Ensuite, nous pouvons ajouter le flux DynamoDB en tant qu’événement source à ce Lambda.
userTableIndexingFunction.addEventSource(new DynamoEventSource(userTable, {
startingPosition: StartingPosition.TRIM_HORIZON,
batchSize: 1,
retryAttempts: 3
}));
Il existe 2 types de positions de départ :
TRIM_HORIZON - Start reading at the last untrimmed record in the shard in the system,
which is the oldest data record in the shard.
In other words, the stream will look at all the item events and
deal with them in chronological order (oldest event to most recent event)
LATEST - Start reading just after the most recent record in the shard,
so that you always read the most recent data in the shard.
In other words, the stream will look at all the item events and
deal with the most recent first and work down until the oldest event.
Pour cet exemple, on utilise donc TRIM_HORIZON
afin que l’index reflète les données dans leur état actuel.
Ouvrir la recherche
Examinons maintenant la configuration réelle du domaine OpenSearch. Maintenant, AWS suggère une puissance substantielle (et donc de l’argent) pour un domaine prêt pour la production. Vous pouvez trouver les meilleures pratiques ici. Pour cet exemple, nous utiliserons une très petite configuration sans redondance, cependant, n’hésitez pas à la faire évoluer en fonction de vos besoins :
const openSearchDomain = new Domain(this, "OpenSearchDomain", {
version: EngineVersion.OPENSEARCH_1_0,
capacity: {dataNodeInstanceType: "t3.small.search",
dataNodes: 1,
masterNodes: 0
},
ebs: {enabled: true,
volumeSize: 50,
volumeType: EbsDeviceVolumeType.GENERAL_PURPOSE_SSD
},
logging: {slowSearchLogEnabled: true,
appLogEnabled: true,
slowIndexLogEnabled: true,
},
});
Cela déploiera votre domaine OpenSearch, cela peut prendre un certain temps, alors soyez patient.
Une dernière chose à laquelle vous devez penser est d’accorder à votre Lambda les droits de lecture et d’écriture sur votre domaine :
openSearchDomain.grantIndexReadWrite("user-index", userTableIndexingFunction);
Cela permettra à votre Lambda de faire son travail.
C’est tout, vous êtes prêt à indexer toutes les nouvelles données dans votre index OpenSearch. Pour indexer les données existantes, vous pouvez trouver un Lambda utile sous le index-data
répertoire ici sur Github.