본문 바로가기
PyQt5_

Using the thread pool

by 자동매매 2023. 3. 13.

QApplication 객체에서 .exec() 를 호출하여 시작된 이벤트 루프는 파이썬 코드와 동일한 스레드 내에서 실행됩니다.
이 이벤트 루프를 실행하는 스레드(GUI 스레드)는 호스트 운영 체제와의 모든 창 통신도 처리합니다.

기본적으로 이벤트 루프에 의해 트리거된 모든 실행은 이 스레드 내에서도 동기적으로 실행됩니다.
실제로 이것은 PyQt6 응용 프로그램이 코드에서 무언가를 할 때마다  창 통신 및 GUI 상호 작용이 정지된다는 것을 의미합니다.

수행중인 작업이 간단하고 GUI 루프에 제어를 신속하게 반환하면이 동결은 사용자가 감지 할 수 없습니다. 그러나 큰 파일 열기 / 쓰기, 일부 데이터 다운로드 또는 복잡한 이미지 렌더링과 같이 더 오래 실행되는 작업을 수행해야하는 경우 문제가 발생할 수 있습니다. 사용자에게 응용 프로그램이 응답하지 않는 것처럼 보입니다

해결책은 간단합니다 - GUI 스레드에서 작업을 수행하십시오. PyQt6는 이를 정확히 달성할 수 있는 간단한 인터페이스를 제공합니다.

 

Introduction to Threads & Processes

bad_example_1.py

import sys
import time

from PyQt6.QtCore import QTimer
from PyQt6.QtWidgets import (
    QApplication,
    QLabel,
    QMainWindow,
    QPushButton,
    QVBoxLayout,
    QWidget,
)


class MainWindow(QMainWindow):
    def __init__(self):
        super().__init__()

        self.counter = 0

        layout = QVBoxLayout()

        self.l = QLabel("Start")
        b = QPushButton("DANGER!")
        b.pressed.connect(self.oh_no)

        layout.addWidget(self.l)
        layout.addWidget(b)

        w = QWidget()
        w.setLayout(layout)

        self.setCentralWidget(w)

        self.show()

        self.timer = QTimer()
        self.timer.setInterval(1000)
        self.timer.timeout.connect(self.recurring_timer)
        self.timer.start()

    def oh_no(self):
        time.sleep(5)

    def recurring_timer(self):
        self.counter += 1
        self.l.setText("Counter: %d" % self.counter)


app = QApplication(sys.argv)
window = MainWindow()
app.exec()

frozen interface 발생

 

The wrong approach

Qt는 호스트 OS에 계속 응답 할 수 있으며 응용 프로그램은 응답성을 유지합니다. QApplication 클래스에서 정적 .processEvents() 함수를 사용하여 이 작업을 쉽게 수행할 수 있습니다.

    def oh_no(self):
        for n in range(5):
            QApplication.processEvents()
            time.sleep(1)

 

OS 이벤트에 정상적으로 응답 있습니다. Qt 이제 이벤트를 수락하고 나머지 코드를 실행하기 위해 돌아 오기 전에 이벤트를 처리합니다.

이것은 작동하지만 몇 가지 이유로 끔찍합니다.

Firstly, when you pass control back to Qt, your code is no longer running. This means that whatever long-running thing you’re trying to do will take longer. That is probably not what you want.

Secondly, processing events outside the main event loop causes your application to branch off into handling code (e.g. for triggered slots, or events) while in your loop. If your code depends on/responds to external state this can cause undefined behavior. The code below demonstrates this in action:

 

bad_example_2.py

import sys
import time

from PyQt6.QtCore import QTimer
from PyQt6.QtWidgets import (
    QApplication,
    QLabel,
    QMainWindow,
    QPushButton,
    QVBoxLayout,
    QWidget,
)


class MainWindow(QMainWindow):
    def __init__(self):
        super().__init__()

        self.counter = 0

        layout = QVBoxLayout()

        self.l = QLabel("Start")
        b = QPushButton("DANGER!")
        b.pressed.connect(self.oh_no)

        c = QPushButton("?")
        c.pressed.connect(self.change_message)

        layout.addWidget(self.l)
        layout.addWidget(b)

        layout.addWidget(c)

        w = QWidget()
        w.setLayout(layout)

        self.setCentralWidget(w)

        self.show()

    def change_message(self):
        self.message = "OH NO"

    def oh_no(self):
        self.message = "Pressed"

        for _ in range(100):
            time.sleep(0.1)
            self.l.setText(self.message)
            QApplication.processEvents()


app = QApplication(sys.argv)
window = MainWindow()
app.exec()

 

Threads and Processes

QRunnable 수행하려는 작업의 컨테이너이고 QThreadPool은 작업 스레드의 관리자입니다.

 

Using the thread pool

 

# class 전달

concurrent/qrunnable_1.py

3. qrunnable_1.py
0.00MB

 

 

import sys
import time

from PyQt6.QtCore import QRunnable, QThreadPool, QTimer, pyqtSlot
from PyQt6.QtWidgets import (
    QApplication,
    QLabel,
    QMainWindow,
    QPushButton,
    QVBoxLayout,
    QWidget,
)

class Worker(QRunnable):
    """
    Worker thread
    """

    @pyqtSlot()
    def run(self):
        """
        Your code goes in this method
        """
        print("Thread start")
        time.sleep(5)
        print("Thread complete")

class MainWindow(QMainWindow):
    def __init__(self):
        super().__init__()

        self.threadpool = QThreadPool()
        print(
            "Multithreading with maximum %d threads"
            % self.threadpool.maxThreadCount()
        )
        # end::init[]
        self.counter = 0

        layout = QVBoxLayout()

        self.l = QLabel("Start")
        b = QPushButton("DANGER!")
        b.pressed.connect(self.oh_no)

        layout.addWidget(self.l)
        layout.addWidget(b)

        w = QWidget()
        w.setLayout(layout)

        self.setCentralWidget(w)

        self.show()

        self.timer = QTimer()
        self.timer.setInterval(1000)
        self.timer.timeout.connect(self.recurring_timer)
        self.timer.start()

    # tag::oh_no[]
    def oh_no(self):
        worker = Worker()
        self.threadpool.start(worker)

    # end::oh_no[]

    def recurring_timer(self):
        self.counter += 1
        self.l.setText("Counter: %d" % self.counter)


app = QApplication(sys.argv)
window = MainWindow()
app.exec()

 

threat수 설정

        self.threadpool.setMaxThreadCount(16)

 

# method 전달

concurrent/qthreadpool_start_1.py

4. qthreadpool_start_1.py
0.00MB

 

 

    def oh_no(self):
        self.threadpool.start(self.do_some_work)

    @pyqtSlot()
    def do_some_work(self):
        print("Thread start")
        time.sleep(5)
        print("Thread complete")
 
이 방법은 많은 간단한 작업에 적합합니다. 실행된 함수 내에서 신호에 액세스할 수 있으며 이를 사용하여 데이터를 내보낼 수 있습니다. 신호를 수신 할 수는 없지만 self 변수를 통하여 상호 작용 가능
코드를 업데이트하여 다음 custom_signal 추가하고 work 메서드를 수정하여 이 신호를 내보내고 self.counter 변수를 업데이트합니다.

# custom signal

concurrent/qthreadpool_start_2.py

5. qthreadpool_start_2.py
0.00MB

 

 

class MainWindow(QMainWindow):

    custom_signal = pyqtSignal()

    def __init__(self):
        super().__init__()

        # Connect our custom signal to a handler.
        self.custom_signal.connect(self.signal_handler)
        
        
    def oh_no(self):
        self.threadpool.start(self.do_some_work)

    @pyqtSlot()
    def do_some_work(self):
        print("Thread start")
        # Emit our custom signal.
        self.custom_signal.emit()
        for n in range(5):
            time.sleep(1)
        self.counter = self.counter - 10
        print("Thread complete")

    def signal_handler(self):
        print("Signal received!")

    def recurring_timer(self):
        self.counter += 1
        self.l.setText("Counter: %d" % self.counter)

 

Extending QRunnable

If you want to pass custom data into the execution function you can set up your runner to take arguments or keywords and then store that data on the QRunnable self object. The data will then be accessible from within the run method.

 

qrunnable_2.py

6. qrunnable_2.py
0.00MB

 

 

class Worker(QRunnable):
    """
    Worker thread

    :param args: Arguments to make available to the run code
    :param kwargs: Keywords arguments to make available to the run
    :code
    :
    """

    def __init__(self, *args, **kwargs):
        super().__init__()
        self.args = args
        self.kwargs = kwargs

    @pyqtSlot()
    def run(self):
        """
                Initialize the runner function with passed self.args,
        self.kwargs.
        """
        print(self.args, self.kwargs)
        

class MainWindow(QMainWindow): 

    # tag::oh_no[]
    def oh_no(self):
        worker = Worker("some", "arguments", keywords=2)
        self.threadpool.start(worker)

 

Thread IO

Custom signal는 QObject에서 파생된 객체에서만 정의할 수 있습니다.
QRunnable
 QObject에서 파생되지 않기 때문에 신호를 직접 정의 할 수 없습니다.

 

concurrent/qrunnable_3.py

import random
import sys
import time

from PyQt6.QtCore import (
    QObject,
    QRunnable,
    QThreadPool,
    QTimer,
    pyqtSignal,
    pyqtSlot,
)
from PyQt6.QtWidgets import (
    QApplication,
    QLabel,
    QMainWindow,
    QPushButton,
    QVBoxLayout,
    QWidget,
)


class WorkerSignals(QObject):
    """
    Defines the signals available from a running worker thread.

    Supported signals are:

    finished
        No data

    error
        `str` Exception string

    result
        `dict` data returned from processing

    """

    finished = pyqtSignal()
    error = pyqtSignal(str)
    result = pyqtSignal(dict)



class Worker(QRunnable):
    """
    Worker thread

    :param args: Arguments to make available to the run code
    :param kwargs: Keywords arguments to make available to the run
    :code
    :
    """

    def __init__(self, iterations=5):
        super().__init__()
        self.signals = (
            WorkerSignals()
        )  # Create an instance of our signals class.
        self.iterations = iterations

    @pyqtSlot()
    def run(self):
        """
                Initialize the runner function with passed self.args,
        self.kwargs.
        """
        try:
            for n in range(self.iterations):
                time.sleep(0.01)
                v = 5 / (40 - n)

        except Exception as e:
            self.signals.error.emit(str(e))

        else:
            self.signals.finished.emit()
            self.signals.result.emit({"n": n, "value": v})


class MainWindow(QMainWindow):
    def __init__(self):
        super().__init__()

        self.threadpool = QThreadPool()
        print(
            "Multithreading with maximum %d threads"
            % self.threadpool.maxThreadCount()
        )

        self.counter = 0

        layout = QVBoxLayout()

        self.l = QLabel("Start")
        b = QPushButton("DANGER!")
        b.pressed.connect(self.oh_no)

        layout.addWidget(self.l)
        layout.addWidget(b)

        w = QWidget()
        w.setLayout(layout)

        self.setCentralWidget(w)

        self.show()

        self.timer = QTimer()
        self.timer.setInterval(1000)
        self.timer.timeout.connect(self.recurring_timer)
        self.timer.start()

    def oh_no(self):
        worker = Worker(iterations=random.randint(10, 50))
        worker.signals.result.connect(self.worker_output)
        worker.signals.finished.connect(self.worker_complete)
        worker.signals.error.connect(self.worker_error)
        self.threadpool.start(worker)

    def worker_output(self, s):
        print("RESULT", s)

    def worker_complete(self):
        print("THREAD COMPLETE!")

    def worker_error(self, t):
        print("ERROR: %s" % t)

    def recurring_timer(self):
        self.counter += 1
        self.l.setText("Counter: %d" % self.counter)


app = QApplication(sys.argv)
window = MainWindow()
app.exec()

 

'PyQt5_' 카테고리의 다른 글

Long-running threads  (0) 2023.03.13
QRunnable examples  (0) 2023.03.13
Using Custom Widgets in Qt Designer  (0) 2023.03.13
Creating Custom Widgets  (0) 2023.03.13
Bitmap Graphics in Qt  (0) 2023.03.13

댓글