Python 병렬 프로그래밍 - (1) thread, ThreadPoolExecutor
목차
thread란?
thread란 무엇일까? 
 thread는 프로그램의 실행 흐름을 나누어 병렬로 작업을 수행할 수 있도록 하는 기능이다.
 thread를 사용하면 하나의 프로세스 내에서 여러 작업을 동시에 실행할 수 있다.
thread 장,단점
1) 장점
- 동일한 프로세스에서 thread간 통신 속도, 데이터 위치, 정보 공유가 빠름
 - 스레드 생성은 프로세스 생성에 비해 생성 비용이 적음 
 - 프로세서의 캐시 메모리를 통해 메모리 접근을 최적화
 
2) 단점
- CPU bound thread를 사용할 때 GIL로 인해 multi threading의 성능이 제한될 수 있음 
 - GIL(Global Interpreter Lock)은 한 번에 하나의 thread만이 python 바이트코드를 실행할 수 있도록 하는 잠금
 
thread 종류
1) 커널 수준 Thread(Kernel-Level Thread)
커널 수준 Thread는 운영 체제에 의해 관리된다.
 운영 체제의 스케줄러에 의해 스케줄링되며, 각 Thread는 독립적으로 스케줄링되고 관리된다.
 장점
         - 하나의 커널 수준 Thread는 하나의 프로세스를 참조하기 때문에
           특정 커널 수준 Thread가 Block 되더라도 다른 커널 수준 Thread들은 정상적으로 실행될 수 있다.
         - 여러 CPU에서 병렬로 실행될 수 있다.
단점
         - 생성과 동기화 루틴, 컨텍스트 스위칭 비용이 비싸다.
         - thread 구현이 플랫폼(OS)에 종속된다. 
2) 사용자 수준 Thread(User-Level Thtread)
사용자 수준 Thread는 라이브러리에 의해 관리된다.
 운영 체제는 사용자 수준 Thread를 인식하지 못하며, 하나의 커널 수준 Thread에 여러 사용자 수준 Thread가 매핑될 수 있다.
 장점
         - 생성과 동기화 루틴, 컨텍스트 비용이 저렴하다.
         - thread가 플랫폼(OS)에 종속되지 않는다.
단점
         - 하나의 사용자 수준 Thred가 Block될 경우 전체 프로세스가 Block될 수 있다.
         - 하나의 CPU에서만 실행될 수 있다.
3) Hybrid Thread
커널 수준 Thread와 사용자 수준 Thread의 조합으로 사용자 수준 Thread가 커널 수준 Thread로 매핑된다.
 커널 수준 Thread의 장점과 사용자 수준 Thread의 장점을 결합한 것 이다.
thread 상태 정의
thread의 생명주기는 5단계로 나타낼 수 있다.
 1) 생성
         - 메인 프로세스가 thread를 생성한다.
 2) 실행
         - 이 단계에서 thread는 CPU를 사용한다.
 3) 준비
         - 이 단계에서 thread는 실행될 의무가 있다.
 4) 블록
         - 이 단계에서 thread는 I/O 연산을 기다리기 위해 Block 된다.
 5) 종료
         - 이 단계에서 thread는 실행에 사용했던 자원을 해제한 후, 수명이 끝난다
threading 모듈 사용하는 방법
threading 모듈은 _thread 모듈의 추상화 계층을 제공해서 thread 기반의 병렬 시스템을 개발할 수 있는 함수들을 제공한다. 
 threading 모듈은 thread를 개발자가 직접 관리할 수 있게 해준다.
 그래서 thread의 생명주기, 동기화 문제, 예외 처리 등을 개발자가 직접 관리해야 한다.
 장점은 thread를 세밀하게 제어할 수 있어 복잡한 thread 관리가 필요한 경우에 유리하다.
 단점은 thread 수가 많아지면 관리가 복잡해질 수 있고, thread pool 과 같은 추상화가 없기 때문에 더 많은 코딩을 해야 한다.
 threading 모듈에 대한 자세한 내용은 threading 에서 확인해볼 수 있다. 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from queue import Queue
import threading
import requests
import re
shared_queue = Queue()
input_list = ['https://www.samsung.com', 'https://www.samsung.com/sec/event/samsung-monitor/', 'https://www.samsung.com/sec/event/2024-tv-launching/',
              'https://www.samsung.com/sec/event/best_items/', 'https://www.samsung.com/sec/event/bespoke-refrigerator/', 'https://www.samsung.com/sec/event/kimchi-refrigerator/',
              'https://www.samsung.com/sec/event/bespoke-grande-ai/', 'https://www.samsung.com/sec/event/bespoke-jet-air-cleaner/', 'https://www.samsung.com/sec/event/air-conditioners-inhome/',
              'https://www.samsung.com/sec/event/kitchen-appliance/']
input_list = input_list * 100
results = {}
queue_condition = threading.Condition()
button_id_regex = re.compile(r'<button\s(?:.*?\s)*?id=[\'"](.*?)[\'"].*?>', re.IGNORECASE)
link_regex = re.compile(r'<a\s(?:.*?\s)*?href=[\'"](.*?)[\'"].*?>')
def crawl_website(condition):
    with condition:
        while shared_queue.empty():
            condition.wait()
        else:
            url = shared_queue.get()
            request_data = requests.get(url)
            button = button_id_regex.findall(request_data.text)
            
            link = link_regex.findall(request_data.text)
            results[url] = {'button': button, 'link': link}
            shared_queue.task_done()
def setup_queue(condition):
    with condition:
        for item in input_list:
            shared_queue.put(item)
        condition.notify_all()
if __name__  == '__main__':
    threads_1 = [threading.Thread(name=f'crawl_task_{i}',daemon=True, target=crawl_website, args=(queue_condition,)) for i in range(len(input_list))]
    [thread.start() for thread in threads_1]
    threads_2 = threading.Thread(name='setup_queue', daemon=True, target=setup_queue, args=(queue_condition,))
    threads_2.start()
    [thread.join() for thread in threads_1]
ThreadPoolExecutor 사용하는 방법
concurrent.futures 모듈에서 제공하는 클래스 중 하나로, thread 기반의 병렬 처리를 손쉽게 구현할 수 있게 해주는 함수를 제공한다.
 thread pool을 제공하여 thread 관리를 더 간편하게 해준다.
 thread pool은 일정 수의 thread를 미리 생성하고, 작업을 queue에 넣어 관리를 한다.
 그렇기 때문에 thread 수를 개발자가 직접 제어할 필요가 없고, thread 관리의 복잡함을 줄일 수 있다.
 장점은 thread pool을 이용해 thread 수를 조절하고 관리하는 것이 간편하다.
 단점은 thread를 직접 제어하거나 thread 관리가 필요한 경우에는 유연성이 부족하다.
 또한 ThreadPoolExecutor에 대한 자세한 내용은 ThreadPoolExecutor 에서 확인해볼 수 있다. 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
from concurrent.futures import ThreadPoolExecutor, as_completed
from queue import Queue
import requests
import re
import os
shared_queue = Queue()
input_list = ['https://www.samsung.com', 'https://www.samsung.com/sec/event/samsung-monitor/', 'https://www.samsung.com/sec/event/2024-tv-launching/',
              'https://www.samsung.com/sec/event/best_items/', 'https://www.samsung.com/sec/event/bespoke-refrigerator/', 'https://www.samsung.com/sec/event/kimchi-refrigerator/',
              'https://www.samsung.com/sec/event/bespoke-grande-ai/', 'https://www.samsung.com/sec/event/bespoke-jet-air-cleaner/', 'https://www.samsung.com/sec/event/air-conditioners-inhome/',
              'https://www.samsung.com/sec/event/kitchen-appliance/']
input_list = input_list * 100
results = {}
button_id_regex = re.compile(r'<button\s(?:.*?\s)*?id=[\'"](.*?)[\'"].*?>', re.IGNORECASE)
link_regex = re.compile(r'<a\s(?:.*?\s)*?href=[\'"](.*?)[\'"].*?>')
def crawl_website(url):
    request_data = requests.get(url)
    button = button_id_regex.findall(request_data.text)
            
    link = link_regex.findall(request_data.text)
    return {url: {'button': button, 'link': link}}
        
def setup_queue(url):
    shared_queue.put(url)
if __name__  == '__main__':
    max_workers = os.cpu_count()
    with ThreadPoolExecutor(max_workers=max_workers) as setup_queue_threads:
        future_tasks = [setup_queue_threads.submit(setup_queue, item) for item in input_list]
        for future in as_completed(future_tasks):
            pass
        
    with ThreadPoolExecutor(max_workers=max_workers) as crawl_website_threads:
        future_tasks = []
        while not shared_queue.empty():
            url = shared_queue.get()
            future_tasks.append(crawl_website_threads.submit(crawl_website, url))
            
        for future in as_completed(future_tasks):
            results.update(future.result())