Stsenaariumi kirjeldus
Oletame, et on olemas mysql tabel, mis on horisontaalselt lõigatud, hajutatud mitme peremehe vahel, igaühel n killustatud tabelit. Mida peaksin tegema, kui pean neid tabeleid samaaegselt kasutama ja päringutulemused kiiresti kätte saama? Siin on lahendus selle nõude rakendamiseks, kasutades asyncio asyncio teeki ja python3 aiomysql asünkroonset teeki.
Koodidemo
Impordilogimine Import juhuslik import asyncio Aiomysql import create_pool
# Oletame, et mysql tabel on jaotatud 8 hosti vahel, igaühel 16 alamtabelit TBLES = { "192.168.1.01": "table_000-015", #000-015 näitab, et selle IP all olev näitaja on pidev table_000 kuni table_015 "192.168.1.02": "table_016-031", "192.168.1.03": "table_032-047", "192.168.1.04": "table_048-063", "192.168.1.05": "table_064-079", "192.168.1.06": "table_080-095", "192.168.1.07": "table_096-0111", "192.168.1.08": "table_112-0127",
} KASUTAJA = "xxx" PASSWD = "xxxx"
# Wrapperi funktsioon erandite püüdmiseks def query_wrapper(funkt): async def wrapper(*args, **kwargs): Proovi: Await func(*args, **kwargs) välja arvatud erand kui e: print(e) Tagastuspakk
# Tegelik SQL juurdepääsu haldamise funktsioon rakendab asünkroonseid mitteblokeerivaid päringuid AIOMYSQL kaudu @query_wrapper Asünkroonne definitsioon query_do_something(IP, DB, tabel): asünkroonne create_pool(host=ip, db=db, user=USER, password=PASSWD) kui bassein: Asünkroonne pool.get() kui Conn: asünkroonne conn.cursor() kui cur: sql = ("vali xxx {} hulgast, kus xxxx") await cur.execute(sql.format(table)) res = oota cur.fetchall() # siis tee midagi...
# Genereeri SQL-i ligipääsujärjekord, iga järjekorra element sisaldab funktsioone ja parameetreid tabeli ligipääsuks def gen_tasks(): ülesanded = [] IP jaoks TBLES.items(): cols = re.split('_|-', tbls) tblpre = "_".join(cols[:-2]) min_num = int(cols[-2]) max_num = int(cols[-1]) num vahemikus (min_num, max_num+1): tasks.append( (query_do_something, ip, 'your_dbname', '{}_{}'.format(tblpre, num)) )
random.shuffle(ülesanded) Tagastamisülesanded
# Käivita SQL-i ligipääsupäringu järjekord partiidena def run_tasks(ülesanded, batch_len): Proovi: idx puhul vahemikus (0, len(ülesanded), batch_len): batch_tasks = ülesanded[idx:idx+batch_len] logging.info("praegune partii, start_idx:%s len:%s" % (idx, len(batch_tasks)))) i korral vahemikus (0, len(batch_tasks)): l = batch_tasks batch_tasks= asyncio.ensure_future( l[0](*l[1:]) ) loop.run_until_complete(asyncio.gather(*batch_tasks)) välja arvatud erand kui e: logimine.warn(e)
# peamine meetod, mis rakendab asünkroonset funktsioonide kutsumist asünkroonse funktsioonide kaudu def main(): loop = asyncio.get_event_loop()
ülesanded = gen_tasks() batch_len = len(TBLES.keys()) * 5 # kõik sinu otsustada run_tasks(ülesanded, batch_len)
loop.close()
|