시나리오 설명
가로로 슬라이스되어 여러 호스트에 분산된 mysql 테이블이 있다고 가정해 봅시다. 각 호스트는 n개의 샤드 테이블을 가지고 있습니다. 이 테이블들을 동시에 접근하고 쿼리 결과를 빠르게 받아야 한다면 어떻게 해야 하나요? asyncio asyncio 라이브러리와 python3의 aiomysql 비동기 라이브러리를 사용하여 이 요구사항을 구현하는 해결책이 있습니다.
코드 데모
수입 기록 기록 랜덤 가져오기 import asyncio AiomySQL 가져오기 create_pool에서
# mysql 테이블이 8개의 호스트에 걸쳐 있고, 각 호스트마다 16개의 서브 테이블이 있다고 가정해 봅시다 TBLES = { "192.168.1.01": "table_000-015", #000-015는 이 IP에 따른 표시가 table_000부터 table_015까지 연속적임을 나타냅니다 "192.168.1.02": "table_016-031", "192.168.1.03": "table_032-047", "192.168.1.04": "table_048-063", "192.168.1.05": "table_064-079", "192.168.1.06": "table_080-095", "192.168.1.07": "table_096-0111", "192.168.1.08": "table_112-0127",
} 사용자 = "xxx" PASSWD = "xxxx"
# 예외를 포착하는 래퍼 함수 def query_wrapper(func): async def wrapper(*args, **kwargs): 시도해보세요: await func(*args, **kwargs) 예외는 e: 인쇄(e) 리턴 래퍼
# 실제 SQL 접근 처리 함수는 AIOMYSQL을 통해 비동기 비차단 요청을 구현합니다 @query_wrapper 비동기 query_do_something(IP, DB, 테이블): create_pool(host=ip, db=db, user=USER, password=PASSWD)를 풀로 사용하여 비동기: conn으로서 pool.get() 비동기: CORN.cursor() 와 비동기 상태: SQL = ("{}에서 xxxx를 선택하세요") await cur.execute(sql.format(table)) res = wait. cur.fetchall() # 그럼 뭔가 해봐...
# SQL 접근 큐를 생성하세요. 큐의 각 요소는 테이블에 접근하는 함수와 매개변수를 포함합니다 def gen_tasks(): 작업 = [] IP는 TBLES.items(): cols = re.split('_|-', TBLS) tblpre = "_".join(cols[:-2]) min_num = int(콜스[-2]) max_num = int(콜s[-1]) 범위(min_num, max_num+1) 내 num에 대해: tasks.append( (query_do_something, ip, 'your_dbname', '{}_{}'.format(tblpre, num)) )
random.shuffle(tasks) 임무 반환
# SQL 접근 요청 큐를 배치로 실행하세요 def run_tasks(tasks, batch_len): 시도해보세요: idx 범위(0, len(tasks), batch_len 기준: batch_tasks = tasks[idx:idx+batch_len] logging.info("current batch, start_idx:%s len:%s" % (idx, len(batch_tasks))) range(0, len(batch_tasks)에 있는 i에 대해: l = batch_tasks batch_tasks= asyncio.ensure_future( l[0](*l[1:]) ) loop.run_until_complete(asyncio.gather(*batch_tasks)) 예외는 e: logging.warn(e)
# 주요 메서드는 asyncio를 통해 비동기 함수 호출을 구현합니다 def 메인(): 루프 = asyncio.get_event_loop()
작업 = gen_tasks() batch_len = len(TBLES.keys()) * 5 # 모두 당신에게 달려 있습니다 run_tasks(작업, batch_len)
loop.close()
|