본문 바로가기
점프 투 파이썬 정리

병렬처리

by 자동매매 2022. 4. 29.

 

 

10. 동시성 프로그래밍 (Concurrency Programming)

이번 장에서는 파이썬으로 Concurrency(동시성) 프로그래밍을 다룹니다.

wikidocs.net

 

 

파이썬(python) Multiprocessing 사용법

멀티 프로세싱을 활용하면 여러 작업을 별도의 프로세스를 생성 후 병렬처리해서 더 빠르게 결과를 얻을 수 있다. 멀티 프로세싱을 잘 활용하면 멀티코어의 CPU 장점을 잘 살릴 수 있지만, 병렬

light-tree.tistory.com

 

[Python/파이썬] PyQt5 - 사용자 정의 시그널(Custom Signal)과 Emit 사용법

1편 : 2021.03.06 - [코딩/Python] - [Python/파이썬] PyQT5 및 QT Designer 소개, .ui 파일 .py로 변환 방법 2편 : 2021.03.28 - [코딩/Python] - [Python/파이썬] PyQt5를 통한 GUI 구성 및 사용법 이해하기 3편..

ybworld.tistory.com

Process.docx
0.05MB

프로그램이 메모리에 올라가서 실행 중인 것을 프로세스(process)라고 부릅니다.

프로세스의 실행 단위를 스레드라고 합니다.

프로세스는 최소 하나 이상의 스레드를 갖으며 경우에 따라 여러 스레드를 가질 수도 있습니다.

1. 파이썬 스레드

1.1 threading 모듈 사용하기

import threading
import time


class Worker(threading.Thread):
    def __init__(self, name):
        super().__init__()
        self.name = name            # thread 이름 지정

    def run(self):
        print("sub thread start ", threading.currentThread().getName())
        time.sleep(3)
        print("sub thread end ", threading.currentThread().getName())


print("main thread start")
for i in range(5):
    name = "thread {}".format(i)
    t = Worker(name)                # sub thread 생성
    t.start()                       # sub thread의 run 메서드를 호출


print("main thread end")

 

main thread start
sub thread start  thread 0
sub thread start  thread 1
sub thread start  thread 2
sub thread start  thread 3
sub thread start  thread 4
main thread end
sub thread end  thread 1
sub thread end  thread 4
sub thread end  thread 0
sub thread end  thread 3
sub thread end  thread 2

 

Fork와 Join

import threading
import time


class Worker(threading.Thread):
    def __init__(self, name):
        super().__init__()
        self.name = name            # thread 이름 지정

    def run(self):
        print("sub thread start ", threading.currentThread().getName())
        time.sleep(5)
        print("sub thread end ", threading.currentThread().getName())


print("main thread start")

t1 = Worker("1")        # sub thread 생성
t1.start()              # sub thread의 run 메서드를 호출

t2 = Worker("2")        # sub thread 생성
t2.start()              # sub thread의 run 메서드를 호출

t1.join()
t2.join()

print("main thread post job")
print("main thread end")

 

main thread start
sub thread start  1
sub thread start  2
sub thread end  1
sub thread end  2
main thread post job
main thread end

 

스레드 List 가능

import threading
import time


class Worker(threading.Thread):
    def __init__(self, name):
        super().__init__()
        self.name = name            # thread 이름 지정

    def run(self):
        print("sub thread start ", threading.currentThread().getName())
        time.sleep(5)
        print("sub thread end ", threading.currentThread().getName())


print("main thread start")

threads = []
for i in range(3):
    thread = Worker(i)
    thread.start()              # sub thread의 run 메서드를 호출
    threads.append(thread)


for thread in threads:
    thread.join()

print("main thread post job")
print("main thread end")

 

main thread start
sub thread start  0
sub thread start  1
sub thread start  2
sub thread end  2
sub thread end  0
sub thread end  1
main thread post job
main thread end

 

# 데몬 스레드 만들기 : Daemon속성

import threading
import time


class Worker(threading.Thread):
    def __init__(self, name):
        super().__init__()
        self.name = name            # thread 이름 지정

    def run(self):
        print("sub thread start ", threading.currentThread().getName())
        time.sleep(3)
        print("sub thread end ", threading.currentThread().getName())


print("main thread start")
for i in range(5):
    name = "thread {}".format(i)
    t = Worker(name)                # sub thread 생성
    t.daemon =True                   # 메인 스레드 종료 시 서브 스레드 종료
    t.start()                       # sub thread의 run 메서드를 호출


print("main thread end")

 

main thread start
sub thread start  thread 0
sub thread start  thread 1
sub thread start  thread 2
sub thread start  thread 3
sub thread start  thread 4
main thread end

 

2. 병렬성 프로그래밍

프로세스 스포닝(spawning)

부모 프로세스(Parent Proecess)가 운영체제에 요청하여 자식 프로세스(Child Process)를 새로 만들어내는 과정을 스포닝이라고 부릅니다. 파이썬의 multiprocessing 모듈을 이용하면 이러한 프로세스 스포닝을 쉽게 수행할 수 있습니다. 보통 부모 프로세스가 처리할 작업이 많은 경우 자식 프로세스를 새로 만들어 일부 작업을 자식 프로세스에게 위임하여 처리합니다.

 

process = multiprocessing.Process( name= , target= , args=( 인자1 , 인자2, .... , ) )

import multiprocessing as mp
import time

def worker():
    proc = mp.current_process()  #current_process 객체 생성
    print(proc.name)
    print(proc.pid)
    time.sleep(5)
    print("SubProcess End")


if __name__ == "__main__":
    # main process
    proc = mp.current_process()
    print(proc.name)
    print(proc.pid)

    # process spawning
    p = mp.Process(name="SubProcess", target=worker)
    p.start()

    print("MainProcess End")

 

MainProcess
13164
MainProcess End
SubProcess
34164
SubProcess End

 

import multiprocessing
import os
import time
from multiprocessing import Process,Pool,Queue

num = 42

def f(name):
	global num
	num += 1
	print('-'*20)
	print('PID of parent:', os.getppid())                  #Parent  process ID 반환
	print('PID of %s : %d' %(name, os.getpid()))   #Current  process ID 반환
	print('%d' %num)
 
if __name__ == '__main__':
	print('PID of %s : %d' %(multiprocessing.current_process().name, os.getpid()))   #Current  process ID 반환
	print('%d' %num)

 	# Process 생성
	p1 = Process(target=f, args=("proc_1",))
	p2 = Process(target=f, args=("proc_2",))

	# 프로세서 실행(start), 프로세서 종료(join)
	p1.start(); p1.join()
	p2.start(); p2.join()

 

PID of MainProcess : 2972
42
--------------------
PID of parent: 2972
PID of proc_1 : 32324
43
--------------------
PID of parent: 2972
PID of proc_2 : 20224
43

Pool 사용하기

pool = multiprocessing.Pool( name= , target= , args=( 인자1 , 인자2, .... , ) )

pool = Pool(processes =)     # Pool객체 생성
pool.map(
함수,인수)            # map함수 이용

from multiprocessing import Pool


def work(x):
    print(x)


if __name__ == "__main__":
    pool = Pool(4)
    data = range(1, 100)
    pool.map(work, data)

1. 인자 전달하기

1) 함수 경우

import multiprocessing as mp


def work(value):
    pname = mp.current_process().name
    print(pname)
    print(value)


if __name__ == "__main__":
    p = mp.Process(name="Sub Process", target=work, args=("hello",))
    p.start()
    p.join()
    print("Main Process")

 

Sub Process 
hello
Main Process

2) Class 경우

import multiprocessing as mp


class Worker:
    def __init__(self):
        pass 

    def run(self, value):
        pname = mp.current_process().name
        print(pname)
        print(value)


if __name__ == "__main__":
    w = Worker()
    p = mp.Process(name="Sub Process", target=w.run, args=("hello",))
    p.start()
    p.join()
    print("Main Process")

 

Sub Process 
hello
Main Process

2. join과 데몬

import multiprocessing as mp 
import time


def work():
    print("Sub Process start")
    time.sleep(5) 
    print("Sub Process end")


if __name__ == "__main__":
    print("Main Process start")
    proc = mp.Process(name="Sub Process", target=work)
    proc.start()
    print("Main Process end")

 

Main Process start
Main Process end
Sub Process start
Sub Process end

1) Join

import multiprocessing as mp 
import time


def work():
    print("Sub Process start")
    time.sleep(5) 
    print("Sub Process end")


if __name__ == "__main__":
    print("Main Process start")
    proc = mp.Process(name="Sub Process", target=work)
    proc.start()
    proc.join()
    print("Main Process end")

 

Main Process start
Sub Process start
Sub Process end
Main Process end

2) Daemon속성

process = multiprocessing.Process(name=, target=, daemon=True)

# Daemon Process
# 메인 프로세스가 종료시 서브 프로세스도 종료됨
import multiprocessing as mp 
import time


def work():
    print("Sub Process start")
    time.sleep(5) 
    print("Sub Process end")


if __name__ == "__main__":
    print("Main Process start")
    proc = mp.Process(name="Sub Process", target=work, daemon=True)
    proc.start()
    print("Main Process end")

 

Main Process start
Main Process end

3. 서브 프로세스 상태 확인 및 종료

is alive : Process객체.is_alive()

kill      : Process객체.kill() 

import multiprocessing as mp 
import time

def work():
    while True:
        print("sub process is running") 
        time.sleep(1)


if __name__ == "__main__":
    p = mp.Process(target=work, name="SubProcess")
    print("Status: ", p.is_alive())

    p.start()
    print("Status: ", p.is_alive())

    time.sleep(5)   # 메인 프로세스 3초 대기
    p.kill()        # 서브 프로세스 종료
    print("Status: ", p.is_alive())

    p.join()        # 메인 프로세스는 서브 프로세스가 종료될 때까지 블록됨
    print("Status: ", p.is_alive())

 

Status:  False
Status:  True
sub process is running
sub process is running
sub process is running
sub process is running
sub process is running
Status:  True
Status:  False

4. 프로세스 간 객체 교환

1) 큐(Queue)

프로세스내의 스레드는 한 프로세스에 할당된 자원을 공유하지만 프로세스와 프로세스는 자원을 공유하지 않습니다. 따라서 프로세스와 프로세스 사이에서 자원을 전달하려면 특별한 통신이 필요합니다. 보통 프로세스 사이의 자원 공유를 위해 큐(Queue)라는 자료구조를 사용합니다.

import multiprocessing as mp

q = mp.Queue() 
q.put(1) 
q.put(2) 
q.put(3)

data1 = q.get() 
data2 = q.get() 
data3 = q.get()

print(data1) 
print(data2) 
print(data3)

 

1
2
3

 

 

import os
import time
from multiprocessing import Process,Pool,Queue

def f(q, l=list):
	q.put(l)
 
if __name__ == '__main__':
	# Queue생성 
	q = Queue()
	p1 = Process(target=f, args=(q,[42, None, 'process 1']))
	p2 = Process(target=f, args=(q,[31, None, 'process 2']))


	#  Queue공유
	p1.start()
	print(q.get())    # prints "[42, None, 'process 1']"
	p1.join()

	p2.start()
	print(q.get())    # prints "[31, None, 'process 2']"
	p2.join()

2) Pipe()

import os
import time
from multiprocessing import Process,Pool,Queue,Pipe


def f(conn):
	print(conn.recv()) # prints "[31, None, 'send from parent_conn']"
	conn.send([42, None, 'send from child_conn'])
	conn.close()
 
if __name__ == '__main__':

	# Pipe() 생성
	parent_conn, child_conn = Pipe()
	parent_conn.send([31, None, 'send from parent_conn'])
	p = Process(target=f, args=(child_conn,))
	p.start()
	print(parent_conn.recv())   # prints "[42, None, 'send from child_conn']"
	p.join()

3) Value , Array

import os
import time
from multiprocessing import Process,Pool,Queue,Pipe,Value,Array

def f(n, a, num):
	n.value = num
	for i in range(len(a)):
		a[i] = -a[i]

 
if __name__ == '__main__':
	num = Value('d', 0.0)
	arr = Array('i', range(10))

	p1 = Process(target=f, args=(num, arr, 1))
	p1.start()
	p1.join()

	p2 = Process(target=f, args=(num, arr, 2))
	p2.start()
	p2.join()

	print(num.value)   # 2.0
	print(arr[:])             # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

5. 멀티프로세싱과 PyQt

GUI를 구성하는 경우 특정 함수나 메소드에서 오랫동안 작업을 수행하게되면 GUI가 멈추는 현상이 발생합니다. 이런 경우에는 작업 시간이 오래 걸리는 메소드를 처리하기 위해 별도의 스레드를 생성하는 것이 좋습니다. 일반적으로 I/O 작업 위주의 경우 스레드를 사용해도 성능에 문제가 없습니다. 왜냐하면 I/O 위주의 작업의 경우 실제로 CPU가 처리하는 시간보다 인터넷이나 하드 드라이브에서 데이터를 읽는데 많은 시간을 사용하게 때문입니다. 이 경우 파이썬은 다른 스레드를 실행함으로써 동시에 여러 스레드가 효율적으로 동작하게 됩니다.

그런데 시간이 오래 걸리는 작업이 I/O 작업 위주가 아니라 실제로 CPU를 사용하는 복잡한 계산에 의한 것이라면 스레드보다는 프로세스를 사용하는 것이 좋습니다. 특히 파이썬은 GIL (Global Interpeter Lock)의 제한이 있어서 스레드를 여러개 사용하는 멀티 스레드의 성능이 더 좋지 않습니다. 물론 I/O 작업의 경우 GIL이 있더라도 성능 저하기 크지 않도록 설계되어 있습니다. 간단히 정리해보면 복잡한 작업이 I/O 위주의 작업이라면 멀티 스레드도 좋은 방법이지만 계산 위주의 작업이라면 멀티 프로세스 사용을 권장합니다.

생산자의 반대 역할이 바로 소비자(Consumer)입니다. 소비자는 큐만 쳐다보고 있다가 큐에 데이터가 입력되면 데이터를 큐에서 빼옵니다. 그리고 큐에서 빼온 데이터를 PyQt와 같은 GUI 담당 스레드에게 전달해줍니다. 소비자 역시 언제 생산자가 데이터를 큐에 넣어줄 지 모르기 때문에 계속해서 큐를 쳐다보고 있어야 합니다. 그래서 GUI와 달리 별도의 스레드로 구성합니다.

사용된 프로세스는 Main Process와 생산자를 위해 생성한 Sub Process 두 개입니다. Main Process는 다시 GUI를 위한 스레드와 Consumer를 위한 스레드가 있습니다.

import sys
from PyQt5.QtWidgets import *
from PyQt5.QtCore import *
from multiprocessing import Process, Queue
import multiprocessing as mp
import datetime
import time


def producer(q):
    proc = mp.current_process()
    print(proc.name)

    while True:
        now = datetime.datetime.now()
        data = str(now)
        q.put(data)
        time.sleep(1)


class Consumer(QThread):
    poped = pyqtSignal(str)

    def __init__(self, q):
        super().__init__()
        self.q = q

    def run(self):
        while True:
            if not self.q.empty():
                data = q.get()
                self.poped.emit(data)


class MyWindow(QMainWindow):
    def __init__(self, q):
        super().__init__()
        self.setGeometry(200, 200, 300, 200)

        # thread for data consumer
        self.consumer = Consumer(q)
        self.consumer.poped.connect(self.print_data)
        self.consumer.start()


    @pyqtSlot(str)
    def print_data(self, data):
        self.statusBar().showMessage(data)


if __name__ == "__main__":
    q = Queue()

    # producer process
    p = Process(name="producer", target=producer, args=(q, ), daemon=True)
    p.start()

    # Main process
    app = QApplication(sys.argv)
    mywindow = MyWindow(q)
    mywindow.show()
    app.exec_()

6. 멀티프로세싱과 클래스

프로세스와 클래스

Process의 target 파라미터로 Woker 클래스 객체의 run 함수를 전달합니다. 이 경우 Worker 클래스 자체는 MainProcess에서 생성된 것이며 생성자에서 프로세스 이름을 출력하면 MainProces로 출력됩니다. 이와달리 run 함수는 SubProcess로 출력됩니다. 한가지 주의할 점은 클래스 내부의 변수를 읽는 동작입니다. 이런 동작은 다른 프로세스에 있는 값을 읽는 것인데 한 프로세스에서 다른 프로세스의 값을 직접 읽을 수 없음에 유의해야합니다.

import multiprocessing as mp

class Worker:
    def __init__(self):
        print("__init__", mp.current_process().name)
        self.name = None

    def run(self, name):
        self.name = name
        print("run", self.name)
        print("run", mp.current_process().name)


if __name__ == "__main__":
    w = Worker()
    p = mp.Process(target=w.run, name="SubProcess", args=("bob",))
    p.start()       # target function is called
    p.join()
    # 서브 프로세스는 종료된 상태
    # 생성자는 메인 프로세스이므로 self.name은 None
    print(w.name)

 

__init__ MainProcess
run bob
run SubProcess
None

 

import multiprocessing as mp

class Worker:
    def __init__(self):
        print("__init__", mp.current_process().name)
        self.name = None

    def run(self, name):
        self.name = name
        print("run", self.name)
        print("run", mp.current_process().name)


if __name__ == "__main__":
    w = Worker()
    p = mp.Process(target=w.run, name="SubProcess", args=("bob",))
    p.start()       # target function is called

    print("before join")
    print('SubProcess: ', p.is_alive())
    print(w.name)

    p.join()

    print("after join")
    print('SubProcess: ', p.is_alive())
    print(w.name)

 

__init__ MainProcess
before join
SubProcess:  True
None
run bob
run SubProcess
after join
SubProcess:  False
None

타켓으로 클래스 전달

이번에는 target 파라미터로 클래스 이름을 전달해봅시다. 이 경우 start 함수가 호출될 때 해당 클래스의 생성자가 호출됩니다. 생성자에서 프로세스 이름을 출력하면 이번에는 SubProcess가 됩니다.

import multiprocessing as mp

class Worker:
    def __init__(self, name):
        print("__init__", mp.current_process().name)
        self.name = name
        self.run()

    def run(self):
        print("run", self.name)
        print("run", mp.current_process().name)


if __name__ == "__main__":
    p = mp.Process(target=Worker, name="SubProcess", args=("bob",))
    p.start()       # __init__ is called
    p.join()

 

__init__ SubProcess
run bob
run SubProcess

 

'점프 투 파이썬 정리' 카테고리의 다른 글

library  (0) 2022.10.23
내장함수  (0) 2022.10.23
11.5. 로깅  (0) 2022.04.27

댓글