Scenariebeskrivelse
Antag, at der findes en mysql-tabel, der er delt vandret, fordelt over flere værter, hver med n shardede tabeller. Hvad skal jeg gøre, hvis jeg har brug for at tilgå disse tabeller samtidig og hurtigt få forespørgselsresultaterne? Her er en løsning til at implementere dette krav ved hjælp af asyncio asyncio-biblioteket og aiomysql asynkrone bibliotek i python3.
Kodedemo
importlogning importer tilfældigt Import Asyncio Fra AIOMYSQL Import create_pool
# Antag, at mysql-tabellen er fordelt over 8 værter, hver med 16 undertabeller TBLES = { "192.168.1.01": "table_000-015", #000-015 angiver, at angivelsen under denne IP er kontinuerlig fra table_000 til table_015 "192.168.1.02": "table_016-031", "192.168.1.03": "table_032-047", "192.168.1.04": "table_048-063", "192.168.1.05": "table_064-079", "192.168.1.06": "table_080-095", "192.168.1.07": "table_096-0111", "192.168.1.08": "table_112-0127",
} BRUGER = "xxx" PASSWD = "xxxx"
# wrapper-funktion til at fange undtagelser def query_wrapper(func): Async Def Wrapper(*args, **kwargs): Prøv: await func(*args, **kwargs) undtagen undtagelse som e: print(e) returindpakning
# Den faktiske SQL-adgangshåndteringsfunktion implementerer asynkrone, ikke-blokerende forespørgsler via AIOMYSQL @query_wrapper Async Def query_do_something(IP, DB, Table): asynkron med create_pool(vært=ip, db=db, bruger=BRUGER, adgangskode=PASSWD) som pool: asynkron med pool.get() som conn: asynkron med conn.cursor() som cur: SQL = ("Vælg xxx fra {} hvor xxxx") await cur.execute(sql.format(table)) res = afvent cur.fetchall() # så gør noget...
# Generer en SQL-adgangskø, hvert element i køen indeholder funktioner og parametre til at tilgå en tabel def gen_tasks(): opgaver = [] for ip, tbls i TBLES.items(): cols = re.split('_|-', tbls) tblpre = "_".join(kols[:-2]) min_num = int(kols[-2]) max_num = int(kols[-1]) for num i interval(min_num, max_num+1): tasks.append( (query_do_something, ip, 'your_dbname', '{}_{}'.format(tblpre, num)) )
random.shuffle(tasks) Returneringsopgaver
# Kør SQL-adgangsanmodningskøen i batches def run_tasks(opgaver, batch_len): Prøv: for idx i interval(0, len(tasks), batch_len): batch_tasks = tasks[idx:idx+batch_len] logging.info("Current batch, start_idx:%s len:%s" % (idx, len(batch_tasks))) for i i i interval(0, len(batch_tasks)): l = batch_tasks batch_tasks= asyncio.ensure_future( l[0](*l[1:]) ) loop.run_until_complete(asyncio.gather(*batch_tasks)) undtagen undtagelse som e: logging.warn(e)
# hovedmetode, som implementerer asynkron kald af funktioner via asynkio def main(): loop = asyncio.get_event_loop()
opgaver = gen_tasks() batch_len = len(TBLES.keys()) * 5 # det er op til dig run_tasks(opgaver, batch_len)
loop.close()
|