threading

# python 3.9
import subprocess
from threading import Thread

def main():
  command = [
    'mycommand',
    '-opt1',
  ]
  
  proc = subprocess.Popen(
    command,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
  )

  def read_stdout(stdout):
    for line in stdout:
      line_text = line.decode('utf-8').strip()
      print(f'STDOUT: {line_text}', flush=True)
    print('stdout closed') # closed when process exited

  def read_stderr(stderr):
    for line in stderr:
      line_text = line.decode('utf-8').strip()
      print(f'STDERR: {line_text}', flush=True)
    print('stderr closed') # closed when process exited

  Thread(target=read_stdout, args=(proc.stdout,)).start()
  Thread(target=read_stderr, args=(proc.stderr,)).start()
  
  while True:
    returncode = proc.poll()
    if returncode is not None:
      break
    time.sleep(0.01)

  print(f'exited {returncode}')

asyncio

import time
import asyncio
from concurrent.futures import ThreadPoolExecutor

async def main():
  loop = asyncio.get_event_loop()
  executor = ThreadPoolExecutor(2)

  def operation():
  time.sleep(1)
  print('aaa', flush=True)

  # 2個ずつ実行される(asyncio.runの実行終了まで2秒かかる)
  loop.run_in_executor(executor, operation)
  loop.run_in_executor(executor, operation)
  loop.run_in_executor(executor, operation)
  loop.run_in_executor(executor, operation)

  # main関数の実行はブロックされない
  print('before aaa')

asyncio.run(main())
  import time
  import asyncio
  from asyncio.subprocess import create_subprocess_exec, PIPE
  from concurrent.futures import ThreadPoolExecutor
  
  async def main():
    command = [
      'mycommand',
      '-opt1',
    ]
  
    proc = await create_subprocess_exec(
      command[0],
      *command[1:],
      stdout=PIPE,
      stderr=PIPE,
    )
  
    loop = asyncio.get_event_loop()
    executor = ThreadPoolExecutor()
  
    def read_stdout(stdout):
      while True:
        line = asyncio.run_coroutine_threadsafe(stdout.readline(), loop).result()
        if not line:
          break
        line_text = line.decode('utf-8').strip()
        print(f'STDOUT: {line_text}', flush=True)
      print('stdout closed') # closed when process exited
  
    def read_stderr(stderr):
      while True:
        line = asyncio.run_coroutine_threadsafe(stderr.readline(), loop).result()
        if not line:
          break
        line_text = line.decode('utf-8').strip()
        print(f'STDERR: {line_text}', flush=True)
      print('stderr closed') # closed when process exited
  
    loop.run_in_executor(executor, read_stdout, proc.stdout)
    loop.run_in_executor(executor, read_stderr, proc.stderr)
  
    await proc.wait()
  
    print(f'exited {proc.returncode}')

終了時にEvent loop is closedというエラーが出ることがある?

# https://stackoverflow.com/questions/45600579/asyncio-event-loop-is-closed-when-getting-loop
if platform.system() == 'Windows':
  asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())