Deskripsi skenario
Misalkan ada tabel mysql yang diiris secara horizontal, tersebar di beberapa host, masing-masing dengan n tabel pecahan. Apa yang harus saya lakukan jika saya perlu mengakses tabel ini secara bersamaan dan mendapatkan hasil kueri dengan cepat? Berikut adalah solusi untuk mengimplementasikan persyaratan ini menggunakan pustaka asyncio asyncio dan pustaka asinkron aiomysql python3.
Kode demo
Impor Pencatatan impor acak Impor Asyncio Dari AIOMYSQL Impor create_pool
# Misalkan tabel mysql tersebar di 8 host, masing-masing dengan 16 sub-tabel TBLES = { "192.168.1.01": "table_000-015", #000-015 menunjukkan bahwa indikasi di bawah IP ini bersifat berkelanjutan dari table_000 ke table_015 "192.168.1.02": "table_016-031", "192.168.1.03": "table_032-047", "192.168.1.04": "table_048-063", "192.168.1.05": "table_064-079", "192.168.1.06": "table_080-095", "192.168.1.07": "table_096-0111", "192.168.1.08": "table_112-0127",
} PENGGUNA = "xxx" PASSWD = "xxxx"
# Fungsi pembungkus untuk menangkap pengecualian def query_wrapper(func): pembungkus def asinkron(*args, **kwargs): Coba: tunggu func(*args, **kwargs) kecuali Pengecualian sebagai e: cetak (e) pembungkus kembali
# Fungsi penanganan akses SQL yang sebenarnya mengimplementasikan permintaan non-pemblokiran asinkron melalui AIOMYSQL @query_wrapper query_do_something def asinkron (ip, db, tabel): asinkron dengan create_pool(host=ip, db=db, user=USER, password=PASSWD) sebagai kumpulan: asinkron dengan pool.get() sebagai conn: asinkron dengan conn.cursor() sebagai cur: sql = ("pilih xxx dari {} di mana xxxx") await cur.execute(sql.format(table)) res = tunggu cur.fetchall() # lalu lakukan sesuatu...
# Hasilkan antrean akses SQL, setiap elemen antrean berisi fungsi dan parameter untuk mengakses tabel definisi gen_tasks(): tugas = [] untuk ip, tbls di TBLES.items(): cols = re.split('_|-', tbls) tblpre = "_".join(cols[:-2]) min_num = int(cols[-2]) max_num = int(cols[-1]) untuk num dalam rentang (min_num, max_num+1): tugas.melampirkan( (query_do_something, ip, 'your_dbname', '{}_{}'.format(tblpre, num)) )
acak.acak(tugas) Tugas Pengembalian
# Jalankan antrean permintaan akses SQL dalam batch def run_tasks(tugas, batch_len): Coba: Untuk IDX dalam Range(0, Len(Tasks), batch_len): batch_tasks = tugas[idx:idx+batch_len] logging.info("batch saat ini, start_idx:%s len:%s" % (idx, len(batch_tasks))) untuk i dalam rentang (0, len(batch_tasks)): l = batch_tasks batch_tasks= asyncio.ensure_future( l[0](*l[1:]) ) loop.run_until_complete(asyncio.gather(*batch_tasks)) kecuali Pengecualian sebagai e: pencatatan.memperingatkan(e)
# Metode utama, yang mengimplementasikan panggilan fungsi asinkron melalui asyncio definisi utama (): perulangan = asyncio.get_event_loop()
tugas = gen_tasks() batch_len = len(TBLES.keys()) * 5 # semuanya terserah Anda run_tasks(tugas, batch_len)
loop.close()
|