Les clients Nebula fournissent aux utilisateurs des API dans plusieurs langages de programmation pour interagir avec Nebula Graph et reconditionner la structure de données renvoyée par le serveur pour une meilleure utilisation.
Actuellement, les clients Nebula prennent en charge C++, Java, Python, Golang et Rust.
Cadre de communication des services
Les clients de la nébuleuse utilisent fbthrift
https://github.com/facebook/fbthrift en tant que cadre RPC pour la communication de service entre les serveurs et les clients afin de mettre en œuvre une interaction multilingue.
À un haut niveau, fbthrift
est:
- Un générateur de code :
fbthrift
possède un générateur de code qui génère des structures de données pouvant être sérialisées à l’aide de Thrift dans différentes langues. - Un framework de sérialisation :
fbthrift
dispose d’un ensemble de protocoles pour sérialiser les structures générées créées à partir du générateur de code. - Un cadre RPC :
fbthrift
a un cadre pour envoyer des messages entre les clients et les serveurs et pour appeler des fonctions définies par l’application lors de la réception de messages dans différentes langues.
Exemples
Prenons l’exemple du client Golang pour montrer l’application de fbthrift
dans le graphique de la nébuleuse.
1. La définition du Vertex
structure en serveurs :
struct Vertex {
Value vid;
std::vector<Tag> tags;
Vertex() = default;
};
2. Définir des structures de données dans src/interface/common.thrift
:
struct Tag {
1: binary name,
// List of <prop_name, prop_value>
2: map<binary, Value> (cpp.template = "std::unordered_map") props,
} (cpp.type = "nebula::Tag")
struct Vertex {
1: Value vid,
2: list<Tag> tags,
} (cpp.type = "nebula::Vertex")
Dans l’exemple ci-dessus, nous définissons une structure Vertex. (cpp.type = "nebula::Vertex")
indique que cette structure correspond à nebula::Vertex
le serveur.
3. fbthrift
générera automatiquement la structure de données dans Golang :
// Attributes:
// - Vid
// - Tags
type Vertex struct {
Vid *Value `thrift:"vid,1" db:"vid" json:"vid"`
Tags []*Tag `thrift:"tags,2" db:"tags" json:"tags"`
}
func NewVertex() *Vertex {
return &Vertex{}
}
func (p *Vertex) Read(iprot thrift.Protocol) error { // 反序列化
...
}
func (p *Vertex) Write(oprot thrift.Protocol) error { // 序列化
...
}
4. Dans MATCH (v:Person) WHERE id(v) == "ABC" RETURN v
, le client demande un sommet (nebula::Vertex
) du serveur. Le serveur va sérialiser après l’avoir trouvé. Une fois que le serveur a trouvé ce sommet, il sera sérialisé et envoyé au client via le transport
du cadre de communication RPC. Lorsque le client recevra ces données, il sera désérialisé pour générer la structure de données correspondante (type Vertex struct
) défini dans le client.
Clients
Dans cette section, nous prendrons comme exemple nebula-go pour présenter différents modules du client et leurs interfaces principales.
1. Configurations fournit toutes les options de configuration.
type PoolConfig struct {
// Set the timeout threshold. The default value 0 means it does not time out. Unit: ms
TimeOut time.Duration
// The maximum idle time of each connection. When the idle time exceeds this threshold, the connection will be disconnected and deleted. The default value 0 means permanently idle and the connection will not be disconnected
IdleTime time.Duration
// max_connection_pool_size: Set the maximum number of connections in the connection pool. The default value is 10
MaxConnPoolSize int
// The minimum number of idle connections. The default value is 0
MinConnPoolSize int
}
2. Séance fournit une interface permettant aux utilisateurs d’appeler directement.
// Manage the specific information of Session
type Session struct {
// Use for identity verification or message retry when executing commands
sessionID int64
// Currently held connections
connection *connection
// Currently used connection pools
connPool *ConnectionPool
// Log tools
log Logger
// Use to save the time zone used by the current session
timezoneInfo
}
- La définition des interfaces est la suivante :
// Execute nGQL. The return data type is ResultSet. This interface is non-thread-safe
func (session *Session) Execute(stmt string) (*ResultSet, error) {...}
// Re-acquire a connection from the connection pool for the current Session
func (session *Session) reConnect() error {...}
// Signout, release the Session ID, and return the connection to the pool
func (session *Session) Release() {
3. Piscine de connexion gère toutes les connexions. Les principales interfaces sont les suivantes :
// Create a new connection pool and complete the initialization with the entered service address
func NewConnectionPool(addresses []HostAddress, conf PoolConfig, log Logger) (*ConnectionPool, error) {...}
// Validate and get the Session example
func (pool *ConnectionPool) GetSession(username, password string) (*Session, error) {...}
4. Connexion emballe le réseau de épargne et fournit les interfaces suivantes :
// Establish a connection with the specified ip and port
func (cn *connection) open(hostAddress HostAddress, timeout time.Duration) error {...}
// Authenticate the username and password
func (cn *connection) authenticate(username, password string) (*graph.AuthResponse, error) {
// Execute query
func (cn *connection) execute(sessionID int64, stmt string) (*graph.ExecutionResponse, error) {...}
// Generate a temp sessionID 0 and send the query "YIELD 1" to test if the connection is usable.
func (cn *connection) ping() bool {...}
// Release sessionId to the graphd process.
func (cn *connection) signOut(sessionID int64) error {...}
// Disconnect.
func (cn *connection) close() {...}
5. Équilibre de charge est utilisé dans le pool de connexions.
Interaction des modules
Pool de connexions
Initialiser
Lors de son utilisation, l’utilisateur doit créer et initialiser un pool de connexion. Lors de l’initialisation, le pool de connexions établira un lien à l’adresse du service Nebula indiquée par l’utilisateur. Si plusieurs services Graph sont déployés dans une méthode de déploiement de cluster, le pool de connexions utilisera un politique de sondage pour équilibrer la charge et établir un nombre presque égal de connexions pour chaque adresse.
Gérer les connexions
Deux files d’attente sont maintenues dans le pool de connexions, file d’attente de connexion inactive et file d’attente de connexion active. Le pool de connexions détectera périodiquement les connexions inactives expirées et les fermera. Ces deux files d’attente utiliseront verrouillage en lecture-écriture pour garantir l’exactitude de l’exécution multi-thread lors de l’ajout ou de la suppression d’éléments.
Lorsque Session demande une connexion au pool de connexions, il vérifie s’il existe des connexions utilisables dans la file d’attente des connexions inactives. S’il existe des connexions utilisables, elles seront directement renvoyées à la session pour que les utilisateurs puissent les utiliser. S’il n’y a pas de connexions utilisables et que le nombre total actuel de connexions ne dépasse pas le nombre maximum de connexions défini dans la configuration, une nouvelle connexion est créée pour la session. S’il atteint le nombre maximum de connexions, une erreur est renvoyée.
En règle générale, le pool de connexions doit être fermé uniquement lorsque vous fermez le programme. Toutes les connexions du pool seront déconnectées à la fermeture du programme.
Session
Session est généré via le pool de connexions. L’utilisateur doit fournir le mot de passe pour l’authentification. Une fois l’authentification réussie, l’utilisateur obtiendra un exemple de session et communiquera avec le serveur via la connexion dans la session. L’interface la plus couramment utilisée est execute()
. Si une erreur se produit lors de l’exécution, le client vérifiera le type d’erreur. S’il s’agit d’une erreur réseau, il se reconnecter automatiquement et essayez à nouveau d’exécuter l’instruction.
Notez qu’une session ne prend pas en charge l’utilisation par plusieurs threads en même temps. La méthode correcte est que plusieurs sessions sont appliquées par plusieurs threads et qu’une session est utilisée par chaque thread.
Lorsque la session est libérée, la connexion détenue par elle sera remise dans le file d’attente de connexion inactive du pool de connexions afin qu’il puisse être réutilisé ultérieurement par d’autres sessions.
Lien
Chaque exemple de connexion est équivalent et peut être détenu par n’importe quelle session. Le but de cette conception est de permettre à ces connexions d’être réutilisées par différentes sessions, réduisant l’activation et la désactivation répétées du transport.
La connexion enverra la requête du client au serveur et renverra le résultat à la session.
Exemple
// Initialize connection pool
pool, err := nebula.NewConnectionPool(hostList, testPoolConfig, log)
if err != nil {
log.Fatal(fmt.Sprintf("Fail to initialize the connection pool, host: %s, port: %d, %s", address, port, err.Error()))
}
// Close all connections in the pool when program exits
defer pool.Close()
// Create session
session, err := pool.GetSession(username, password)
if err != nil {
log.Fatal(fmt.Sprintf("Fail to create a new session from connection pool, username: %s, password: %s, %s",
username, password, err.Error()))
}
// Release session and return connection back to connection pool when program exits
defer session.Release()
// Excute a query
resultSet, err := session.Execute(query)
if err != nil {
fmt.Print(err.Error())
}
Structure des données renvoyées
Le client regroupe les résultats de requête renvoyés par une partie des serveurs complexes et ajoute une interface pour une utilisation pratique.

nebula::Value
sera emballé comme ValueWrapper
dans le client et convertis en d’autres structures via des interfaces. (c’est-à-dire node = ValueWrapper.asNode()
)
Analyse de la structure des données
Pour MATCH p= (v:player{name:"Tim Duncan"})-[]->(v2) RETURN p
, le résultat renvoyé est :
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| p |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| <("Tim Duncan" :bachelor{name: "Tim Duncan", speciality: "psychology"} :player{age: 42, name: "Tim Duncan"})<-[:teammate@0 {end_year: 2016, start_year: 2002}]-("Manu Ginobili" :player{age: 41, name: "Manu Ginobili"})> |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
Got 1 rows (time spent 11550/12009 us)
Nous pouvons voir que le résultat renvoyé contient une ligne et que son type est un chemin. À ce stade, vous pouvez exécuter comme suit pour obtenir les propriétés du sommet de destination du chemin (v2).
// Execute a query
resultSet, _ := session.Execute("MATCH p= (v:player{name:""Tim Duncan""})-[]->(v2) RETURN p")
// Get the first row of the result. The index of the first row is 0
record, err :=...