シナリオの説明
例えば、複数のホストに横方向にスライスされたmysqlテーブルがあり、それぞれにn個のシャードテーブルがあるとします。 これらのテーブルに同時にアクセスし、クエリ結果を素早く取得する必要がある場合、どうすればよいでしょうか? ここにasyncio asyncioライブラリとpython3のaiomysql非同期ライブラリを用いてこの要件を実装する解決策があります。
コードデモ
インポートログ インポートランダム import asyncio aiomySQL インポートcreate_poolより
# mysqlテーブルが8台のホストに分散し、それぞれに16のサブテーブルがあるとします TBLES = { 「192.168.1.01」:「table_000-015」、#000-015は、このIPの下で表示がtable_000からtable_015まで連続していることを示しています 「192.168.1.02」:「table_016-031」 「192.168.1.03」:「table_032-047」 「192.168.1.04」:「table_048-063」、 「192.168.1.05」:「table_064-079」 「192.168.1.06」:「table_080-095」 「192.168.1.07」:「table_096-0111」 「192.168.1.08」:「table_112-0127」
} ユーザー = "xxx" PASSWD = "xxxx"
# 例外をキャッチするラッパー関数 def query_wrapper(func): async def ラッパー(*args, **kwargs): 試してみて: await func(*args, **kwargs) 例外はe: 印刷(e) リターンラッパー
# 実際のSQLアクセス処理関数は、AIOMYSQLを通じて非同期非ブロッキング要求を実装しています @query_wrapper 非同期 def query_do_something(IP、DB、テーブル): create_pool(host=ip, db=db, user=USER, password=PASSWD)をプールとしてasync: async with pool.get() as conn: CON.cursor() と ACR を async で行います: SQL = (「{} からXXXを選択し、ここでxxxx」) await cur.execute(sql.format(table)) res = wait. cur.fetchall() # じゃあ何かして...
# SQLアクセスキューを生成する。キューの各要素にはテーブルにアクセスする関数とパラメータが含まれています def gen_tasks(): タスク = [] IPについてはTBLES.items(): cols = re.split('_|-', TBLS) tblpre = "_".join(cols[:-2]) min_num = int(cols[-2]) max_num = int(cols[-1]) num が範囲(min_num, max_num+1)の場合: tasks.append( (query_do_something、ip、'your_dbname'、'{}_{}'.format(tblpre, num)) )
random.shuffle(tasks) 返却タスク
# SQLアクセスリクエストキューをバッチで実行してください def run_tasks(tasks, batch_len): 試してみて: idxの場合、range(0, len(tasks), batch_len: batch_tasks = tasks[idx:idx+batch_len] logging.info("current batch, start_idx:%s len:%s" % (idx, len(batch_tasks))) レンジ(0, len(batch_tasks))のIの場合: l = batch_tasks batch_tasks= asyncio.ensure_future( l[0](*l[1:]) ) loop.run_until_complete(asyncio.gather(*batch_tasks)) 例外はe: logging.warn(e)
# メインメソッドは、asyncioを通じて関数の非同期呼び出しを実装します def main(): ループ = asyncio.get_event_loop()
タスク = gen_tasks() batch_len = len(TBLES.keys()) * 5 # すべてあなた次第です run_tasks(タスク、batch_len)
loop.close()
|