Scenarijaus aprašymas
Tarkime, kad yra mysql lentelė, kuri yra supjaustyta horizontaliai, paskirstyta keliuose pagrindiniuose kompiuteriuose, kiekviename iš jų yra n skeveldrų lentelių. Ką daryti, jei reikia vienu metu pasiekti šias lenteles ir greitai gauti užklausos rezultatus? Čia yra sprendimas, kaip įgyvendinti šį reikalavimą naudojant asyncio asyncio biblioteką ir aiomysql asinchroninę python3 biblioteką.
Kodo demonstracija
Importo registravimas importuoti atsitiktinai Importuoti asyncio iš aiomysql importo create_pool
# Tarkime, kad mysql lentelė yra paskirstyta per 8 šeimininkus, kiekvienas su 16 sub-lentelių TBLES = { "192.168.1.01": "table_000-015", #000-015 rodo, kad indikacija pagal šį IP yra nepertraukiama nuo table_000 iki table_015 "192.168.1.02": "table_016-031", "192.168.1.03": "table_032-047", "192.168.1.04": "table_048-063", "192.168.1.05": "table_064-079", "192.168.1.06": "table_080-095", "192.168.1.07": "table_096-0111", "192.168.1.08": "table_112-0127",
} USER = "xxx" PASSWD = "xxxx"
# įvyniojimo funkcija, skirta išimtims sugauti def query_wrapper(func): asinchroninis def wrapper(*args, **kwargs): Pabandykite: laukti func(*args, **kwargs) išskyrus išimtį kaip e: spausdinti (e) grąžinimo įvyniojimas
# Faktinė SQL prieigos tvarkymo funkcija įgyvendina asinchronines neblokuojančias užklausas per AIOMYSQL @query_wrapper Asinchroninis def query_do_something(IP, DB, lentelė): asinchronizuoti su create_pool(host=ip, db=db, user=USER, password=PASSWD) kaip telkinį: asinchronizuoti su pool.get() kaip conn: asinchroninis su conn.cursor() kaip cur: sql = ("pasirinkite xxx iš {}, kur xxxx") laukti cur.execute(sql.format(table)) res = laukti cur.fetchall() # tada daryk ką nors...
# Sukurkite SQL prieigos eilę, kiekviename eilės elemente yra funkcijos ir parametrai, skirti pasiekti lentelę def gen_tasks(): užduotys = [] ip, tbls TBLES.items(): cols = re.split('_|-', tbls) tblpre = "_".join(cols[:-2]) min_num = int(stulpeliai[-2]) max_num = int(stulpeliai[-1]) NUM diapazone(min_num, max_num+1): tasks.append( (query_do_something, ip, 'your_dbname', '{}_{}'.format(tblpre, num)) )
random.shuffle(užduotys) Grąžinimo užduotys
# Paleiskite SQL prieigos užklausų eilę paketais def run_tasks(užduotys, batch_len): Pabandykite: IDX diapazone(0, len(užduotys), batch_len): batch_tasks = užduotys[idx:idx+batch_len] logging.info("dabartinis paketas, start_idx:%s len:%s" % (idx, len(batch_tasks))) i diapazone(0, len(batch_tasks)): l = batch_tasks batch_tasks= asyncio.ensure_future( l[0](*l[1:]) ) loop.run_until_complete(asyncio.gather(*batch_tasks)) išskyrus išimtį kaip e: loggging.warn(e)
# Pagrindinis metodas, kuris įgyvendina asinchroninį funkcijų iškvietimą per Asyncio def pagrindinis(): kilpa = asyncio.get_event_loop()
užduotys = gen_tasks() batch_len = len(TBLES.keys()) * 5 # viskas priklauso nuo jūsų run_tasks(užduotys, batch_len)
loop.close()
|