Python, asyncioを使った 同期・非同期処理の実装例
- Python 3.9.13 (pyenv)
- Ubuntu 20.04 (WSL2)
- https://github.com/aoirint/python_asyncio_examples
なんかこう書くとうまく動くということしかわからん、という実装例です。
基本実装
同期 関数から非同期関数を同期的に呼び出す
- asyncio.run
import asyncioimport timedef main():async def func():await asyncio.sleep(3)print('func exited') # 1asyncio.run(func())time.sleep(1)print('main exited') # 2main()print('exited') # 3
同期関数から非同期関数を非同期的に呼び出す
- threading.Thread + asyncio.run
import asynciofrom concurrent.futures import ThreadPoolExecutorimport threadingimport timedef main():async def func():await asyncio.sleep(3)print('func exited') # 3thread = threading.Thread(target=lambda: asyncio.run(func()))thread.start()time.sleep(1)print('main exited') # 1main()print('exited') # 2
非同期関数から同期関数を同期的に呼び出す
- ふつうに呼び出す
import asyncioimport timeasync def main():def func():time.sleep(3)print('func exited') # 1func()await asyncio.sleep(1)print('main exited') # 2asyncio.run(main())print('exited') # 3
非同期関数から同期関数を非同期的に呼び出す
- asyncio.new_event_loop + ThreadPoolExecutor + EventLoop.run_in_executor
import asynciofrom concurrent.futures import ThreadPoolExecutorimport timeasync def main():def func():time.sleep(3)print('func exited') # 3loop = asyncio.new_event_loop()executor = ThreadPoolExecutor()loop.run_in_executor(executor, func)await asyncio.sleep(1)print('main exited') # 1asyncio.run(main())print('exited') # 2
非同期関数から非同期関数を同期的に呼び出す
- awaitキーワード
import asyncioasync def main():async def func():await asyncio.sleep(3)print('func exited') # 1await func()await asyncio.sleep(1)print('main exited') # 2asyncio.run(main())print('exited') # 3
非同期関数から非同期関数を非同期的に呼び出す
- threading.Thread + asyncio.run
import asyncioimport threadingasync def main():async def func():await asyncio.sleep(3)print('func exited') # 3thread = threading.Thread(target=lambda: asyncio.run(func()))thread.start()await asyncio.sleep(1)print('main exited') # 1asyncio.run(main())print('exited') # 2
非同期関数から同期間数を非同期的に3つずつ呼び出す
- asyncio.new_event_loop + ThreadPoolExecutor + EventLoop.run_in_executor
import asynciofrom concurrent.futures import ThreadPoolExecutorimport timeasync def main():def func():time.sleep(3)print('func exited') # 3loop = asyncio.new_event_loop()executor = ThreadPoolExecutor(max_workers=3)loop.run_in_executor(executor, func)loop.run_in_executor(executor, func)loop.run_in_executor(executor, func)loop.run_in_executor(executor, func)loop.run_in_executor(executor, func)loop.run_in_executor(executor, func)loop.run_in_executor(executor, func)loop.run_in_executor(executor, func)loop.run_in_executor(executor, func)await asyncio.sleep(1)print('main exited') # 1asyncio.run(main())print('exited') # 2
よく見るエラー
RuntimeError: Event loop is closed
TBW
RuntimeError: This event loop is already running
TBW
アプリケーション
FastAPI + schedule
- asyncio.new_event_loop + ThreadPoolExecutor + EventLoop.run_in_executor
- threading.Event
- Dependencies
- fastapi==0.78.0
- schedule==1.1.0
- uvicorn==0.18.2
実行コマンド例
uvicorn fastapi_schedule:app
コード例
import timeimport threadingimport asynciofrom concurrent.futures import ThreadPoolExecutorimport schedulefrom fastapi import FastAPIapp = FastAPI()schedule_event = threading.Event()@app.on_event('startup')async def startup_schedule():loop = asyncio.new_event_loop()executor = ThreadPoolExecutor()def loop_schedule(event):while True:if event.is_set():breakschedule.run_pending()time.sleep(1)print('run all existing scheduled jobs')schedule.run_all()print('exit schedule')loop.run_in_executor(executor, loop_schedule, schedule_event)schedule.every(1).second.do(lambda: print('tick'))@app.on_event('shutdown')async def shutdown_schedule():schedule_event.set()
FFmpegを子プロセスとして同期的に実行しつつログを非同期にキャプチャする
- FFmpeg 4.2.7-0ubuntu0.1
- libx264: --enable-libx264
- aac: native
実行コマンド例
python3 ffmpeg_async.py a.mp4 b.mp4
コード例
from asyncio import create_subprocess_execimport asynciofrom concurrent.futures import ThreadPoolExecutorfrom pathlib import Pathimport tempfileimport timeasync def main(input_video_path: Path,output_video_path: Path,):output_video_path.parent.mkdir(exist_ok=True, parents=True)vcodec: str = 'libx264'acodec: str = 'aac'report_tempfile = tempfile.NamedTemporaryFile(mode='w+', encoding='utf-8')report_loglevel = 32 # 32: info, 48: debugreport = f'file={report_tempfile.name}:level={report_loglevel}'command = ['ffmpeg','-nostdin','-i',str(input_video_path),'-vcodec',vcodec,'-acodec',acodec,'-map','0','-report',str(output_video_path),]proc = await create_subprocess_exec(command[0],*command[1:],env={'FFREPORT': report,},)loop = asyncio.new_event_loop()executor = ThreadPoolExecutor()report_lines = []def read_report(report_file):report_file.seek(0)while True:line = report_file.readline()if len(line) == 0: # EOFif proc.returncode is not None: # process closed and EOFbreaktime.sleep(0.1)continue # for next line writtenif line.endswith('\n'):line = line[:-1] # strip linebreakreport_lines.append(line)print(f'REPORT: {line}', flush=True)print('report closed') # closed when process exitedloop.run_in_executor(executor, read_report, report_tempfile)returncode = await proc.wait()# stdout, stderr may be not closedprint(f'exited {returncode}')# here, report_lines: ffmpeg logif __name__ == '__main__':import argparseparser = argparse.ArgumentParser()parser.add_argument('input', type=str)parser.add_argument('output', type=str)args = parser.parse_args()input_video_path = Path(args.input)output_video_path = Path(args.output)asyncio.run(main(input_video_path=input_video_path,output_video_path=output_video_path,))