Scenariobeskrivning
Anta att det finns en mysql-tabell som är uppdelad horisontellt, spridd över flera värdar, var och en med n skärvade tabeller. Vad ska jag göra om jag behöver komma åt dessa tabeller samtidigt och snabbt få ut sökresultaten? Här är en lösning för att implementera detta krav med hjälp av asyncio asyncio-biblioteket och aiomysql-asynkronbiblioteket i python3.
Koddemo
Importloggning importera slumpmässigt importera asyncio Från AIOMYSQL Import create_pool
# Antag att mysql-tabellen är utspridd över 8 värdar, var och en med 16 deltabeller TBLES = { "192.168.1.01": "table_000-015", #000-015 indikerar att indikationen under denna IP är kontinuerlig från table_000 till table_015 "192.168.1.02": "table_016-031", "192.168.1.03": "table_032-047", "192.168.1.04": "table_048-063", "192.168.1.05": "table_064-079", "192.168.1.06": "table_080-095", "192.168.1.07": "table_096-0111", "192.168.1.08": "table_112-0127",
} ANVÄNDARE = "xxx" PASSWD = "xxxx"
# wrapper-funktion för att fånga undantag def query_wrapper(func): Asynkron Def-omslag(*args, **kwargs): Försök: await func(*args, **kwargs) förutom undantag som e: print(e) Returneringsförpackning
# Den faktiska SQL-åtkomsthanteringsfunktionen implementerar asynkrona, icke-blockerande förfrågningar via AIOMYSQL @query_wrapper Async Def query_do_something(IP, DB, Table): asynk med create_pool(host=ip, db=db, user=user, password=PASSWD) som pool: Asynk med pool.get() som conn: Asynk med conn.cursor() som cur: SQL = ("Välj xxx från {} där xxxx") await cur.execute(sql.format(table)) res = vänta på cur.fetchall() # Gör då något...
# Generera en SQL-åtkomstkö, varje element i kön innehåller funktioner och parametrar för att komma åt en tabell def gen_tasks(): Tasks = [] för ip, tbls i TBLES.items(): cols = re.split('_|-', tbls) tblpre = "_".join(cols[:-2]) min_num = int(kols[-2]) max_num = int(kol[-1]) för num i intervall(min_num, max_num+1): tasks.append( (query_do_something, ip, 'your_dbname', '{}_{}'.format(tblpre, num)) )
random.shuffle(tasks) Återvändningsuppgifter
# Kör SQL-åtkomstförfrågan i batcher def run_tasks(uppgifter, batch_len): Försök: för idx i intervall(0, len(tasks), batch_len): batch_tasks = tasks[idx:idx+batch_len] logging.info("nuvarande batch, start_idx:%s len:%s" % (idx, len(batch_tasks))) för i i i intervall(0, len(batch_tasks)): l = batch_tasks batch_tasks= asyncio.ensure_future( l[0](*l[1:]) ) loop.run_until_complete(asyncio.gather(*batch_tasks)) förutom undantag som e: logging.warn(e)
# huvudmetod, som implementerar asynkron anrop av funktioner via asyncio def main(): loop = asyncio.get_event_loop()
Tasks = gen_tasks() batch_len = len(TBLES.keys()) * 5 # helt upp till dig run_tasks(uppgifter, batch_len)
loop.close()
|