본문 바로가기
IT/Internals

[번역] Speed Up Your Python Program With Concurrency Part 2

by 물통꿀꿀이 2019. 8. 11.

지난 번에 이어서 좀 더 자세히 알아보려고 한다.

*의역이 있을 수 있고 이해한 바를 바탕으로 정리했기 때문에 원문과 의미가 조금 달라 질 수 있다. (원문은 아래에 첨부하였다.)

https://realpython.com/python-concurrency/#how-to-speed-up-an-io-bound-program


I/O Bound 프로그램의 속도를 높이는 방법

I/O Bound 관련 프로그램으로 네트워크 상에서 컨텐츠를 내려 받는 문제에 대해 알아보자. 


동기(Synchronous) 버전

먼저 해당 작업을 비동시성 작업으로 시작해보자. 프로그램은 requests 모듈이 필요하다. 때문에 프로그램을 실행하기전에 pip install requests를 실행해야 한다. 

import requests
import time


def download_site(url, session):
with session.get(url) as response:
print(f"Read {len(response.content)} from {url}")


def download_all_sites(sites):
with requests.Session() as session:
for url in sites:
download_site(url, session)


if __name__ == "__main__":
sites = [
"https://www.jython.org",
"http://olympus.realpython.org/dice",
] * 80
start_time = time.time()
download_all_sites(sites)
duration = time.time() - start_time
print(f"Downloaded {len(sites)} in {duration} seconds")


위 코드를 확인해보면, download_site()는 URL에서 컨텐츠를 다운로드하고 크기를 출력해준다. 주목해야한 부분은 requests에서 Session 객체를 사용하는 부분이다.

requests에서 직접 get()을 단순히 사용할 수 있지만 Session 객체를 사용하는 것은 requests에서 몇몇 네트워크 트릭을 통해 실제로 속도를 높일 수 있기 때문이다.


이 프로그램의 처리 과정에 대한 다이어그램은 이 전 포스팅에서 알아본 I/O-bound 그림과 매우 비슷하다. (아래 그림 참조)



* 참고로 네트워크 트래픽은 매 초마다 네트워크 많은 내부 요소에 의존적이다. 


동기 버전이 인기가 있는 이유

단순하게 말해서 일단 쉽다. 즉, 작성하기도 쉽고 디버깅하기도 쉽다. 더군다나 생각하기도 쉽다. 순차적인 과정이므로 다음 과정이 무엇이고 어떻게 행동하는지 쉽게 예측할 수 있다.


동기 버전의 문제점

가장 큰 문제는 앞으로 보여줄 다른 해법과 비교해서 상대적으로 느리다는 것이다. 다음은 해당 코드를 실행했을 때 얻을 수 있는 결과이다.


$ ./io_non_concurrent.py
   [most output skipped]
Downloaded 160 in 14.289619207382202 seconds

그렇지만 느리다는 것은 항상 큰 문제는 아니다. 만일 프로그램이 동기 버전으로 2초 밖에 걸리지 않고 자주 실행하는 것이 아니라면, 아마 동시성을 추가할 필요는 없을 것이다. 

그런데 이 프로그램을 자주 실행한다면 어떨까? 실행하는데 한 시간 걸리면 어떨까? Threading을 사용하여 프로그램을 재작성하면서 동시성으로 넘어가 보자.


Threading 버전

Threading이 적용된 프로그램을 작성하는 것은 더 많은 노력을 필요로 한다. 다음은 스레딩과 같은 프로그램의 모습이다.


import concurrent.futures
import requests
import threading
import time


thread_local = threading.local()


def get_session():
if not hasattr(thread_local, "session"):
thread_local.session = requests.Session()
return thread_local.session


def download_site(url):
session = get_session()
with session.get(url) as response:
print(f"Read {len(response.content)} from {url}")


def download_all_sites(sites):
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
executor.map(download_site, sites)


if __name__ == "__main__":
sites = [
"https://www.jython.org",
"http://olympus.realpython.org/dice",
] * 80
start_time = time.time()
download_all_sites(sites)
duration = time.time() - start_time
print(f"Downloaded {len(sites)} in {duration} seconds"


Threading을 추가함에 있어서 전체적인 구조는 같고 약간의 변경이 필요했다. download_all_sites()는 사이트 당 함수 호출을 좀더 복잡한 구조로 변경했다.


이 버전에는 ThreadPoolExecutor를 사용한다. ThreadPoolExecutor를 살펴보면 ThreadPoolExecutor = Thread + Pool + Executor로 분리해서 볼 수 있다.

먼저 Thread 부분에 대해서는 알고 있다. Pool은 흥미로운 부분으로 ThreadPoolExecutor 객체는 Thread의 Pool을 만들고, 각각의 Thread를 동시에 실행한다. 마지막으로 Executor는 Pool에 있는 각각의 Thread가 언제, 어떻게 실행하는지 제어하는 부분이다. 즉, Executor는 Pool에서 요청을 실행한다.


유용하게도 표준 라이브러리가 Context Manager 로서 ThreadPoolExecutor를 구현한다. 때문에 with 문법을 사용하여 Threads Pool을 만들 수도, 해제할 수도 있다.

또한 ThreadPoolExecutor를 사용하면 편리한 .map() 함수를 사용할 수 있다. 해당 함수는 각 사이트에 인자로 받은 함수를 실행시킨다. 놀라운 부분은 관리하는 Threads 의 Pool을 사용하여 자동적으로 동시에 실행시킨다는 것이다.


다른 언어나 Python 2.x를 사용했던 사람들이라면 Threading을 다룰 때 Thread.start(), Thread.join() 그리고 Queue과 같은 익숙했던 부분을 다루는 객체와 함수가 어디에 있는지 궁금할 것이다. 이 객체와 함수들은 여전히 존재하고 Thread를 실행하는 방법을 세분화하여 제어 할 수 있다. 그러나 파이썬 3.2부터는 표준 라이브러리에서 Excecutors 로 불리는 한층 더 추상화된 객체를 추가했다. (Executors는 세부 제어를 원하지 않으면 대신에 세부 제어를 한다.)


이 예제에서 흥미로운 부분은 각 Thread에서 requests.Session() 객체를 사용한다는 것이다. requests 문서를 살펴볼 때 특히,

https://github.com/not-kennethreitz/requests/issues/2766 해당 이슈를 확인하면 각 Thread마다 Session이 필요한 이유가 분명해진다.


보통 OS는 현재 동작하는 Task를 중단시키고, 다른 Task를 실행할 때 Thread 사이에 공유된 어떤 데이터든지 보호할 수 있도록 관리한다. (다시 말해서 thread-safe) 그러나 requests.Session은 thread-safe가 아니다. 물론 데이터가 무엇이고 어떻게 사용되는지에 따라 데이터를 thread-safe하게 만들 수 있는 몇몇 방법이 있다. 그 중에 하나가 Python에서 지원하는 queue 모듈과 같은 비슷한 자료 구조를 thread-safe로 사용하는 것이다. 이들 객체는 threading.Lock과 같은 low-level primitives을 사용하는데 동시간에 오직 하나의 Thread만이 코드 블록 및 메모리에 접근하는 것을 보장해준다. 그렇기 때문에 Threading은 흥미롭지만 어려운 부분이기도하다. 

그러나 위와 같은 부분들을 ThreadPoolExecutor 객체를 통해 간접적으로 사용할 수 있다.


또 다른 방법은 스레드 로컬 저장소로 일컫는 것이다. Threading.local()은 전역 변수 같은 객체를 생성하지만 실질적으로는 각 Thread 마다 고유하다. 예를 들어 위의 예제에서 threadLocal와 get_session()을 사용해보았다.


threadLocal = threading.local()


def get_session():
if not hasattr(threadLocal, "session"):
threadLocal.session = requests.Session()
return threadLocal.session


get_session()이 호출되었을 때, Session은 현재 동작하는 Thread에 고유하다. 그래서 각 Thread는 get_session()을 첫 번째 호출할 때 단일 Session을 만든다. 그리고나서 Thread 작업 내내 만들어진 Session을 사용한다.


마지막으로 Thread의 갯수에 대해 주목해보자. 예제 코드에서는 5개의 Thread를 사용한다. 다운로드당 하나의 스레드를 가지는 것은 매우 빠를 것이라고 예상할 수 있겠지만 해당 시스템에서는 그렇지 않다. 가장 빠른 결과는 5~10개의 Thread 언저리에 있는 것을 알 수 있다. 만일 그 이상으로 가면 Thread를 만들고 삭제하는 추가 오버헤드가 time saving을 없앨 수 있다. 때문에 올바른 Thread의 개수는 Task 단위의 상수가 아니라는 것이다. 즉, 실험이 필요하다.


Threading 버전이 인기가 있는 이유

빠르다. 테스트 중에 가장 빠르다. non-concurrent 버전은 14초 이상 걸렸던 것을 기억해라


$ ./io_threading.py
   [most output skipped]
Downloaded 160 in 3.7238826751708984 seconds

실행 시간 다이어그램을 살펴보자.


여기서는 동시간에 웹 사이트에 여러 개의 Open 요청을 보내기 위해 여러 개의 Thread를 사용했다. 프로그램은 대기 시간을 Overlap 하고 결과를 빠르게 얻을 수 있다. 


Threading 버전을 사용할 때의 문제점

예시에서 본 것처럼 추가 코드가 필요하다. 그리고 Thread 사이에 어떤 데이터를 공유해야 할지 생각 좀 해봐야한다. 특히 Thread간에 Interaction은 알아내기 쉽지 않다. 더군다나임의로 Race Condition을 야기시키기도 하고 찾기 매우 어려운 간헐적인 버그를 발생시키기도 한다.

* Race Condition 관련해서는 이후 추가로 번역하겠다.


Asyncio 버전

Asyncio 예제 코드로 들어가기전에 잠시 Asyncio에 대해 알아보려고 한다.


1) Asyncio 기초

여기서는 asyncio의 단순화한 부분으로 더 많은 세부 사항이 있겠지만 여기서는 어떻게 동작하는지에 대한 부분에 대해서만 알아보겠다.

Asyncio의 일반적인 개념은 Event Loop로 일컫는 단일 Python Object가 각각의 작업을 언제, 어떻게 실행시킬지 관리하는 것이다. Event Loop는 각 작업을 인식하고 어떤 상태인지 알고 있다. 현실적으로 작업들이 많은 상태들이 있지만 여기서는 2개의 상태만을 가질 수 있다고 가정하자.


준비 상태는 작업이 수행해야 할 작업이 있고 실행할 준비가 되어있다는 것을 의미한다고 생각해보자. 또한 대기 상태는 작업은 네트워크 작업과 같이 외부 작업이 끝나는 것을 기다리고 있다는 것을 의미한다고 생각해보자. 이렇게 Event loop는 각 작업에 대해 2 가지 상태를 유지하고 준비된 작업 중에 하나를 선택하고 실행시킨다. 해당 작업은 Event Loop로 반환될 때까지 완전히 제어된다.


실행중인 작업이 Event Loop로 제어를 반환할 때, Event Loop는 해당 작업을 준비 또는 대기 목록에 위치시키고 대기 목록에 있는 Task가 준비되었는지 확인한다.

이렇듯 모든 Task가 올바른 목록으로 다시 정렬되면, Event loop는 실행시켜야 할 다음(Next) Task를 선택하다. 그리고 이제까지의 과정을 반복한다. 단순화된 Event loop는 가장 오랫동안 기다린 Task를 선택하고 실행시킨다. 이 과정은 Event loop가 끝날 때가지 반복한다.


Asyncio의 중요한 점은 Task가 의도적으로 제어권을 반환하지 않는 이상 절대 제어권을 놓지 않는 다는 것이다. 이러한 이유는 Task는 절대 실행 중간에 중단되지 않기 때문이다. 그러므로 Asyncio가 Threading 보다 조금 더 쉽게 자원을 공유할 수 있게 한다. thread-safe 코드에 대해서는 걱정할 필요가 없다.


Asyncio에서 일어나는 문제들은 high-level 단에서 일어나는 것이다. 보다 자세히 알고 싶다면 아래 페이지를 참고하라.

https://realpython.com/python-concurrency/#what-is-parallelism


2) async 와 await

Python에 추가된 2가지 키워드 async, await에 대해 알아보자. 위에서 알아본 것을 바탕으로 await는 Task가 Event loop에 제어권을 반환시킬 수있는 일종의 magic 키워드로 볼 수 있다. 코드에서 Function call을 await하게 되면, 일종의 신호로서 해당 Function call을 잠시동안 포기한다는 것과 같다. 즉, await를 사용하는 것은 Python의 일종의 async Flag로 여기는 것이 이해하기 쉽다. asynchronous generators와 같이 엄격하지 않는 몇몇 경우가 있지만 대부분은 단순 모델을 사용한다.


다음 코드에서 볼 수 있는 부분은 async with 구문이다. (해당 구문은 await 중인 객체로부터 Context Manager를 만드는 것이다.) 의미는 조금 다를 수 있으나 주요 개념은 동일하다. (Context manager를 Swap out 할 수 있도록 플래그 지정)


생각한 것처럼 Event loop와 Task 사이의 상호 통신을 관리하는 것은 복잡하다. 그러나 개발자가 asyncio를 사용할 때 세부 사항을 아는 것이 중요한 것은 아니지만 await를 호출한 어떤 Function이든 async로 표기되야한다. 그렇지 않으면 syntax error가 발생할 것이다.


코드로 돌아와서 지금까지 Asyncio가 무엇인지에 대해 알아보았다. 그럼 Asyncio 예제 코드로 돌아가서 어떻게 동작하는지 확인해보자 해당 버전에서는 aiohttp를 추가했다. 해당 코드를 실행하기 전에 pip install aiohttp 실행시켜야 한다.


import asyncio
import time
import aiohttp


async def download_site(session, url):
async with session.get(url) as response:
print("Read {0} from {1}".format(response.content_length, url))


async def download_all_sites(sites):
async with aiohttp.ClientSession() as session:
tasks = []
for url in sites:
task = asyncio.ensure_future(download_site(session, url))
tasks.append(task)
await asyncio.gather(*tasks, return_exceptions=True)


if __name__ == "__main__":
sites = [
"https://www.jython.org",
"http://olympus.realpython.org/dice",
] * 80
start_time = time.time()
asyncio.get_event_loop().run_until_complete(download_all_sites(sites))
duration = time.time() - start_time
print(f"Downloaded {len(sites)} sites in {duration} seconds"


이전 것보다 더 복잡하다. ThreadPoolExecutor 보다 약간의 설정이 더 필요하지만 구조는 비슷하다. 그럼 예제 코드를 위에서부터 살펴보도록 하자.


download_site에서는 threading 버전과 거의 동일하지만 async 및 async with 키워드가 추가되었다. download_all_sites은 threading 버전과 비교했을 때 가장 많은 변화가 있는 부분이다. 


Session은 Context manager로 생성되는데 모든 Task에서 Session을 공유할 수 있다. 그 이유는 같은 Thread에서 동작하기 때문이다. 대신에 하나의 Task가 다른 Task의 Session이 비록 나쁜 상태를 가지더라도 인터럽트 할 수 없다. Context manager 내부를 보면, asyncio.ensure_future()를 사용하여 Task의 목록을 만든다. 모든 Task가 만들어지면 Function을 asyncio.gather()을 사용하여 모든 Task가 완료될 때까지 Session context를 유지하게 한다.


Threading 코드는 비슷한 부분이 있지만 Threading은 세부 사항을 ThreadPoolExecutor에서 편리하게 처리하는 반면 Asyncio는 AsyncioPoolExecutor class가 없다.

그러나 좀 더 자세히 살펴보면 Threading과 비교하여 중요한 점은 Threading은 Thread의 최적의 개수에 대해 명확하지 않는 것이다. 이 부분이 Threading과 비교했을 때 Asyncio의 큰 장점으로 Threading 보다 확장하기 더 좋다는 것이다. 특히 각각의 Task는 Thread를 만드는 것보다 더 적은 리소스와 시간이 든다. 


마지막으로 asyncio의 본질은 event loop를 시작하고 어떤 Task가 동작하고 있는지 알려주는 것이다. __main__은 get_event_loop(), run_until_complete() 코드를 포함한다. 

그런데 Python 3.7에서는 해당 syntax가 간소화 되었다. 때문에 asyncio.get_event_lopp().run_until_complete를 사용하는 대신에 asyncio.run()을 사용하면 된다.


Asyncio 버전이 인기가 있는 이유

일단 빠르다. 


$ ./io_asyncio.py
   [most output skipped]
Downloaded 160 in 2.5727896690368652 seconds

실행 시간 다이어그램은 Threading 예제와 거의 비슷하다. I/O 요청은 모두 동일한 Thread에 의해 수행된다는 점이다.



ThreadPoolExecutor와 같은 Wrapper가 없다는 것은 해당 코드를 threading 예제보다 좀 더 복잡해지게 할 수 있다. (물론 더 좋은 성능을 위해서는 추가 작업이 필요하다.)

또한 async와 await를 적절한 위치에 추가하는 것 또한 비슷한 복잡성을 가진다는 의견도 있다. 그러나 이 부분은 언제 Task가 Swap Out 되는지 생각할 수 있기 때문에 더 좋고 더 빠르게 만드는데 도움을 준다.


Scale 관련하여 수 백개의 Task를 돌려도 Asyncio는 전혀 느려지지 않는다.


Asyncio 버전을 사용할 때의 문제

Asyncio 는 몇몇 문제가 있다. asyncio의 모든 장점을 얻기 위해서는 async 라이브러리 버전이 필요하다. 만일 사이트를 다운로드 하는데 requests를 사용했다면 많이 느릴 것이다. (requests는 Event loop에 Block되었다고 알릴 수 있게 설계되지 않았기 때문에) 물론 해당 이슈는 점점 라이브러리가 asyncio를 차용하면서 점점 낮아지고 있다.


또 다른 문제는 Task 중의 어느 하나가 협력(Cooperate)하지 않으면 Multitasking의 모든 장점이 사라진다. 코드에서 약간의 실수로 Task가 실행되어 오랫동안 프로세서를 사용하고 있기 때문에 실행해야할 다른 Task가 Starving 될 수 있다. Event loop는 만일 Task가 제어권을 반환하지 않을 때 Task를 중단시킬 수 있는 방법이 없다.


이를 염두해두고, Concurrency, Multiprocessing에 대해 근본적으로 다른 접근에 대해 살펴보겠다.


Multiprocessing 버전

이전에 알아본 버전과 다르게 Multiprocessing 버전은 다수의 CPU의 모든 장점을 사용한다. 그럼 코드를 살펴보자.


import requests
import multiprocessing
import time

session = None


def set_global_session():
global session
if not session:
session = requests.Session()


def download_site(url):
with session.get(url) as response:
name = multiprocessing.current_process().name
print(f"{name}:Read {len(response.content)} from {url}")


def download_all_sites(sites):
with multiprocessing.Pool(initializer=set_global_session) as pool:
pool.map(download_site, sites)


if __name__ == "__main__":
sites = [
"https://www.jython.org",
"http://olympus.realpython.org/dice",
] * 80
start_time = time.time()
download_all_sites(sites)
duration = time.time() - start_time
print(f"Downloaded {len(sites)} in {duration} seconds"


Asyncio 예제보다 코드가 짧고 Threading 예제와 매우 비슷하다. 그럼 Multiprocessing이 무엇을 하는지 코드를 살펴보도록 하자.


*지금까지 살펴본 모든 Concurrency 예제는 단일 CPU 또는 Core에서 동작한다. 이런 이유는 CPython의 현재 구조와 GIL(Global Interpreter Lock) 때문이다.

하지만 여기서는 GIL에 대해 깊이있게 다루지는 않을 것이다. 단일 CPU에서 Synchronous, Threading, Asyncio 버전의 예제에 대해 아는 것만으로도 충분하기 때문이다.


표준 라이브러리 상에서 Multiprocessing은 다수의 CPU 상에서 코드를 실행시키도록 설계되어있다. 그런데 각각의 Python interpreter의 인스턴스를 생성하는 것(Multiprocessing)은 현재 Python interpreter에서 Thread를 생성하는 것(Threading)처럼 빠르지 않다. 


Multiprocessing 코드

해당 코드는 synchronous 버전과 비교했을 때 약간의 차이가 있다. 첫 번째는 download_all_sites()이다. 단순히 반복적으로 download_site()를 호출하는 대신에 multiprocessing.Pool 객체를 만들고 download_site를 iterable 타입인 sites에 매핑했다. threading 예제와 비슷하다.


여기서는 Pool이 여러 개의 Python interpreter 프로세스를 생성하고 각각 특정 Function(iterable 속성을 가진 값으로 site 목록)에서 동작한다. 메인 프로세스와 다른 프로세스 간의 상호 통신은 multiprocessing 모듈에 의해 다뤄진다.


Pool을 생성하는 코드에 주목해보자. 첫 번째 얼마나 많은 프로세스가 Pool에서 만들어지는지 구체화하지 않았다. (물론 옵션 파라미터로 설정할 수는 있다.) 디폴트로 multiprocessing.Pool은 사용하는 컴퓨터의 CPU의 개수로 결정한다. 이것이 보통 최고의 방안으로 해당 예제에서 사용하는 방법이다.


이것의 문제는 프로세스 개수를 늘려도 빨라지지 않는다는 점이다. 도리어 프로세스를 구성하고 내리는 비용이 I/O 요청을 병렬적으로 수행해서 얻는 이익보다 더 크기 때문에 실제로 느리다.


다음으로 initializer=set_global_session 부분이다. Pool의 각 프로세스는 자신만의 메모리 공간을 가진다. 이것이 의미하는 바는 Session 객체와 같은 것들을 공유하지 않는다는 것이다. Function이 호출될 때 마다 Session 만들어지는 것을 원하지 않으므로 각 프로세스 마다 하나씩 만든다.


이 경우에 initializer Function의 매개변수가 만들어진다. 반환 값을 initializer에서 download_site()에 의해 호출된 Function으로 되돌릴 방법은 없다. 그렇지만 각 프로세스의 단일 Session을 유지하기 위해서 global session 변수를 초기화 할 수 있다. 각 프로세스는 자신만의 메모리 공간을 가지고 있고 각 프로세스의 global이 다르기 때문이다.


Multiprocessing 버전이 인기가 있는 이유

해당 예제에서 multiprocessing 버전은 약간의 코드만 추가하면 되기 때문에 상대적으로 쉽다. 게다가 사용자 컴퓨터의 CPU 파워를 전부 사용할 수 있다. 그럼 실행 시간 다이어그램을 살펴보도록 하자.



Multiprocessing 버전의 문제

해당 예제 버전은 추가 설정이 필요하고 global session 객체가 낯설다. 그래서 어떤 변수가 각 프로세스에 접근 할 것이지에 대해 많은 생각을 해야한다.

마지막으로 해당 예제는 asyncio와 threading 버전보다 분명하게 느리다.


$ ./io_mp.py [most output skipped]  Downloaded 160 in 5.718175172805786 seconds

놀라운 것은 아니다. I/O-bound 문제는 Multiprocessing이 존재하는 이유가 아니다. 다음 섹션에서 CPU-bound 문제에 대해 알아보자.


CPU-Bound 프로그램의 속도를 높이는 방법

이제까지 I/O-bound 프로그램에 대해서 다루었다. 그럼 CPU-bound 프로그램에 대해 알아보자. I/O-bound 프로그램은 대부분의 시간을 네트워크 호출과 같은 외부 작업이 완료되기를 기다리는데 보낸다. 반면에 CPU-bound 프로그램은 거의 I/O 작업을 하지 않고 대부분의 시간을 요구된 데이터를 빠르게 처리하는지에 대한 부분이다.


이 예제의 목적은 CPU에서 실행하는데 오랜 시간이 걸리는 함수를 만든다. 해당 Function은 사각형의 합을 계산한다.


 def cpu_bound(number):

    return sum(i * i for i in range(number))


CPU-Bound Synchronous 버전

non-concurrent 버전의 예시를 보자


import time


def cpu_bound(number):
return sum(i * i for i in range(number))


def find_sums(numbers):
for number in numbers:
cpu_bound(number)


if __name__ == "__main__":
numbers = [5_000_000 + x for x in range(20)]

start_time = time.time()
find_sums(numbers)
duration = time.time() - start_time
print(f"Duration {duration} seconds"


해당 코드는 CPU-bound()를 20번 호출한다. 단일 CPU의 단일 프로세스, 단일 스레드에서 작업한다. 실행 시간 다이어그램은 아래와 같다.



I/O-Bound 예제와 달리 CPU-Bound 예제는 일반적으로 실행시간이 일정하다. 이 예제는 7.8초 걸렸다.


$ ./cpu_non_concurrent.py
Duration 7.834432125091553 seconds

분명히 이것보다 더 나아질 수 있다. 해당 코드는 단일 CPU에서 동작하는 것이다. 더 나은 방법을 살펴보자.


Threading & Asyncio 버전

Threading이나 Asyncio를 사용하는 것이 속도를 얼마나 빠르게 할 것이라고 생각하는가?


I/O Bound 예제에서 전체 시간의 대부분을 느린 작업이 끝나길 기다리는데 보냈다. Threading과 Asyncio는 순차적으로 작업을 수행하는 대신 대기 중인 시간을 overlap하여 작업을 가속화 할 수 있다. 그러나 CPU bound 문제는 대기가 없다. CPU는 문제를 최대한 빨리 끝내기 위해 작업한다. Python에서는 thread와 task는 같은 프로세스에서 같은 CPU에서 동작한다. 즉, 하나의 CPU는 non-concurrent 코드 작업을 수행하고 thread 또는 task를 구성하는데 추가 작업을 한다. 때문에 10초 정도 더 많은 시간이 걸린다.


$ ./cpu_threading.py
Duration 10.407078266143799 seconds

CPU-Bound Multiprocessing 버전

다른 concurrency 라이브러리와 달리 multiprocessing은 암묵적으로 다수의 CPU에서 CPU workload를 공유하도록 설계되어있다. 그럼 실행 시간 다이어그램을 보도록 하자.



import multiprocessing
import time


def cpu_bound(number):
return sum(i * i for i in range(number))


def find_sums(numbers):
with multiprocessing.Pool() as pool:
pool.map(cpu_bound, numbers)


if __name__ == "__main__":
numbers = [5_000_000 + x for x in range(20)]

start_time = time.time()
find_sums(numbers)
duration = time.time() - start_time
print(f"Duration {duration} seconds"


몇몇 코드가 non-concurrent 버전에서 변경되었다. import multiprocessing를 하고 multiprocessing.Pool 객체를 생성하고 map() 메소드를 사용하여 각 번호를 워커 프로세스에 전달한다.


이건 단순히 I/O-Bound multiprocessing 코드에서 했던 것이고 Session 객체에 대해 걱정할 필요가 없다. 위에서 언급한 것처럼 multiprocessing.Pool의 생성자에 processes 옵션 매개변수에 주의를 기울여야 한다. 얼마나 많은 프로세스 객체가 Pool에서 생성되고 관리되는지 구체화 할 수있다. 기본적으로 당신의 머신에 얼마나 많은 CPU가 있는지 찾고 각각 프로세스를 만든다. 


Multiprocessing 버전이 인기가 많은 이유

multiprocessing은 약간의 코드 추가로 상대적으로 설정하기 쉽다. 그리고 CPU power의 장점을 사용할 수 있다.


$ ./cpu_mp.py
Duration 2.5175397396087646 seconds

Multiprocessing 버전의 문제

multiprocessing의 몇몇 단점이 있다. 해당 예제에서 드러나진 않지만 프로그램을 독립적으로 각 프로세서로 분리하는 것은 어렵다. 또한 프로세스 사이에 상호 통신을 요구하고, 비 동시성 프로그램에 필요없는 약간의 복잡도가 추가될 수 있다.


언제 동시성을 사용해야 하는가


많은 부분을 다루었으므로 몇 가지 주요 아이디어를 검토한 다음 프로젝트에서 사용할 동시성 모듈을 결정하는데 도움이 되는 몇가지 Decision Point을 논의해보자.


프로세스의 첫번째는 당신은 동시성 모듈을 사용해야 하는지에 대한 부분이다. 해당 예제에서는 각 라이브러리가 간단하게 보였지만 동시성은 항상 추가적인 복잡성이 있고 찾기 어려운 버그를 발생시킨다. 알려진 성능 문제가 발생하고 필요한 동시성 유형을 결정할 때까지 기존것을 사용해야 한다. Donald knuth가 말한 것처럼 너무 이른 최적화는 프로그래밍에서 악의 뿌리(the root of all evil) (적어도 대부분)이다. 


프로그램을 최적화해야 한다고 결정을 했다면 프로그램이 CPU-bound 인지 I/O bound인지 찾는 것이 다음 단계이다. I/O-bound 프로그램은 대부분의 시간을 무엇가를 기다리는데 보내는 것이고 CPU-bound 그램은 대부분의 시간은 가능한 빠르게 데이터 프로세싱하는데 보내는 것이다.


CPU-bound 프로그램은 multiprocessing 을 사용하는 편이 낫고 threading과 asyncio는 프로그램에 도움이 되지 않을 것이다.

I/O-bound 프로그램에 대해서는 파이썬 커뮤니티에 일반적인 규칙이 있다. "Use asyncio when you can, threading when you must"

댓글