Démarrer CockroachDB
Démarrer CRDB avec --max-sql-memory
approprié pour la RAM de votre machine. 128 Mo par défaut ne suffisent pas pour cette charge de travail.
cockroach start --insecure --store=node1 --listen-addr=localhost:26257 --http-addr=localhost:8080 --join=localhost:26257,localhost:26258,localhost:26259 --background --max-sql-memory=.25
Installer Ray
pip3 install ray
J’ai écrit le code suivant pour exécuter un IMPORT
dans CRDB, la mise en œuvre de Ray pour ajouter le multitraitement consistait à suivre leur fichier README.
#! /usr/bin/python3 import subprocess as sb import sys, ray, os, uuid init = """ cockroach sql --insecure --host=localhost:26257 --execute="CREATE DATABASE IF NOT EXISTS parallelload;" """ def normalize_statement(): id = str(uuid.uuid1()).replace("-", "", 4) return """cockroach sql --insecure --host=localhost:26257 --database="parallelload" --execute="IMPORT TABLE orders{} ( o_orderkey INTEGER NOT NULL PRIMARY KEY, o_custkey INTEGER NOT NULL, o_orderstatus CHAR(1) NOT NULL, o_totalprice FLOAT NOT NULL, o_orderdate DATE NOT NULL, o_orderpriority CHAR(15) NOT NULL, o_clerk CHAR(15) NOT NULL, o_shippriority INTEGER NOT NULL, o_comment VARCHAR(79) NOT NULL, INDEX o_ck (o_custkey ASC), INDEX o_od (o_orderdate ASC) ) CSV DATA ('https://storage.googleapis.com/cockroach-fixtures/tpch-csv/sf-1/orders.tbl.1') WITH delimiter="|";"""".format(id) def initialization(): print("Initial Step: Creating Database") sb.run(init, shell=True, check=True) @ray.remote def execute_import(): statement = normalize_statement() print("Running process: {}".format(os.getpid())) try: sb.run(statement, shell=True, check=True) except sb.CalledProcessError as e: sys.stderr.write( "common::run_command() : [ERROR]: output = %s, error code = %sn" % (e.output, e.returncode)) if __name__ == "__main__": initialization() # run ray uncapped without arguments [ray.init()] ray.init(memory=52428800, object_store_memory=78643200) futures = [execute_import.remote() for i in range(1000)] print(ray.get(futures))
Pièces spécifiques aux rayons :
import ray ray.init() @ray.remote def f(x): return x * x futures = [f.remote(i) for i in range(4)] print(ray.get(futures))
Pièces spécifiques aux cafards :
IMPORT TABLE orders{} ( o_orderkey INTEGER NOT NULL PRIMARY KEY, o_custkey INTEGER NOT NULL, o_orderstatus CHAR(1) NOT NULL, o_totalprice FLOAT NOT NULL, o_orderdate DATE NOT NULL, o_orderpriority CHAR(15) NOT NULL, o_clerk CHAR(15) NOT NULL, o_shippriority INTEGER NOT NULL, o_comment VARCHAR(79) NOT NULL, INDEX o_ck (o_custkey ASC), INDEX o_od (o_orderdate ASC) ) CSV DATA ('https://storage.googleapis.com/cockroach-fixtures/tpch-csv/sf-1/orders.tbl.1') WITH delimiter="|";
Exécutez ensuite le code et regardez-le aller :
python3 import_with_ray.py
(pid=85485) job_id status fraction_completed rows index_entries system_records bytes (pid=85485) 509795360310591489 succeeded 1 187500 375000 0 27768880 (pid=85485) Running process: 85485 (pid=85495) job_id status fraction_completed rows index_entries system_records bytes (pid=85495) 509795420722757633 succeeded 1 187500 375000 0 27768880 (pid=85495) Running process: 85495
Enfin, allez à cockroach sql
shell et vérifiez que vos tables sont chargées. Notez que la sortie est réduite par souci de concision :
root@localhost:26257/parallelload> show tables; ... ordersfef573fc183c11eaa7b2acde48001122 ordersff03fb32183e11ea9700acde48001122 ordersff479678183c11ea9700acde48001122 ordersff58aa96183f11eaa1e9acde48001122 ordersff8e417c183c11ea99bbacde48001122 ordersff931a5a183f11ea8265acde48001122 (1004 rows) Time: 15.318ms