본문 바로가기
PyQt5_

QRunnable examples

by 자동매매 2023. 3. 13.

QThreadPool  QRunnable 다른 스레드에서 작업을 실행하는 매우 유연한 방법입니다. 신호와 파라미터를 조정하여 상상할  있는 모든 작업을 수행할  있습니다.  장에서는 특정 시나리오에 대한 러너를 구성하는 방법에 대한  가지 예를 살펴보겠습니다.

 

All the examples follow the same general pattern  a custom QRunnable class with custom WorkerSignals. The difference is in what we pass to the runner, what it does with those parameters, and how we hook up the signals.

 

[basic]

 

concurrent/qrunnable_base.py

import sys
import time
import traceback

from PyQt6.QtCore import (
    QObject,
    QRunnable,
    QThreadPool,
    pyqtSignal,
    pyqtSlot,
)
from PyQt6.QtWidgets import QApplication, QMainWindow


class WorkerSignals(QObject):
    pass


class Worker(QRunnable):
    def __init__(self, *args, **kwargs):
        super().__init__()
        # Store constructor arguments (re-used for processing)
        self.args = args
        self.kwargs = kwargs
        self.signals = WorkerSignals()

    @pyqtSlot()
    def run(self):
        pass


class MainWindow(QMainWindow):
    def __init__(self):
        super().__init__()
        self.show()


app = QApplication(sys.argv)
window = MainWindow()
app.exec()

 

The progress watcher

스레드를 사용하여 장기 실행 작업을 수행하는 경우 사용자에게 작업 진행 상황을 알려야 합니다. 이를 수행하는 일반적인 방법은 왼쪽에서 오른쪽으로 막대가 채워져 작업이 얼마나 완료되었는지를 나타내는 진행률 표시줄을 사용자에게 표시하는 것입니다. 작업에 대한 진행률 표시줄을 표시하려면 작업자로부터 현재 진행률 상태를 내보내야 합니다.

 

To do this we can define another signal called progress on the WorkerSignals object. This signal emits on each loop a number from 0..100 as the "task" progresses. The output of this progress signal is connected to a standard QProgressBar shown on the statusbar of our main window.

 

concurrent/qrunnable_progress.py

qrunnable_progress.py
0.00MB

 

 

import sys
import time

from PyQt6.QtCore import (
    QObject,
    QRunnable,
    QThreadPool,
    QTimer,
    pyqtSignal,
    pyqtSlot,
)
from PyQt6.QtWidgets import (
    QApplication,
    QLabel,
    QMainWindow,
    QProgressBar,
    QPushButton,
    QVBoxLayout,
    QWidget,
)


class WorkerSignals(QObject):
    """
    Defines the signals available from a running worker thread.

    progress
        int progress complete,from 0-100
    """

    progress = pyqtSignal(int)


class Worker(QRunnable):
    """
    Worker thread

    Inherits from QRunnable to handle worker thread setup, signals
    and wrap-up.
    """

    def __init__(self):
        super().__init__()

        self.signals = WorkerSignals()

    @pyqtSlot()
    def run(self):
        total_n = 1000
        for n in range(total_n):
            progress_pc = int(
                100 * float(n + 1) / total_n
            )  # Progress 0-100% as int
            self.signals.progress.emit(progress_pc)
            time.sleep(0.01)


class MainWindow(QMainWindow):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        layout = QVBoxLayout()

        self.progress = QProgressBar()

        button = QPushButton("START IT UP")
        button.pressed.connect(self.execute)

        layout.addWidget(self.progress)
        layout.addWidget(button)

        w = QWidget()
        w.setLayout(layout)

        self.setCentralWidget(w)

        self.show()

        self.threadpool = QThreadPool()
        print(
            "Multithreading with maximum %d threads"
            % self.threadpool.maxThreadCount()
        )

    def execute(self):
        worker = Worker()
        worker.signals.progress.connect(self.update_progress)

        # Execute
        self.threadpool.start(worker)

    def update_progress(self, progress):
        self.progress.setValue(progress)


app = QApplication(sys.argv)
window = MainWindow()
app.exec()

concurrent/qrunnable_progress_many.py

import random
import sys
import time
import uuid

from PyQt6.QtCore import (
    QObject,
    QRunnable,
    QThreadPool,
    QTimer,
    pyqtSignal,
    pyqtSlot,
)
from PyQt6.QtWidgets import (
    QApplication,
    QLabel,
    QMainWindow,
    QProgressBar,
    QPushButton,
    QVBoxLayout,
    QWidget,
)


class WorkerSignals(QObject):
    """
    Defines the signals available from a running worker thread.

    progress
        int progress complete,from 0-100
    """

    progress = pyqtSignal(str, int)
    finished = pyqtSignal(str)


class Worker(QRunnable):
    """
    Worker thread

    Inherits from QRunnable to handle worker thread setup, signals
    and wrap-up.
    """

    def __init__(self):
        super().__init__()
        self.job_id = uuid.uuid4().hex  # <1>
        self.signals = WorkerSignals()

    @pyqtSlot()
    def run(self):
        total_n = 1000
        delay = random.random() / 100  # Random delay value.
        for n in range(total_n):
            progress_pc = int(100 * float(n + 1) / total_n)  # <2>
            self.signals.progress.emit(self.job_id, progress_pc)
            time.sleep(delay)

        self.signals.finished.emit(self.job_id)


class MainWindow(QMainWindow):
    def __init__(self):
        super().__init__()

        layout = QVBoxLayout()

        self.progress = QProgressBar()

        button = QPushButton("START IT UP")
        button.pressed.connect(self.execute)

        self.status = QLabel("0 workers")

        layout.addWidget(self.progress)
        layout.addWidget(button)
        layout.addWidget(self.status)

        w = QWidget()
        w.setLayout(layout)

        # Dictionary holds the progress of current workers.
        self.worker_progress = {}

        self.setCentralWidget(w)

        self.show()

        self.threadpool = QThreadPool()
        print(
            "Multithreading with maximum %d threads"
            % self.threadpool.maxThreadCount()
        )

        self.timer = QTimer()
        self.timer.setInterval(100)
        self.timer.timeout.connect(self.refresh_progress)
        self.timer.start()

    def execute(self):
        worker = Worker()
        worker.signals.progress.connect(self.update_progress)
        worker.signals.finished.connect(self.cleanup)  # <3>

        # Execute
        self.threadpool.start(worker)

    def cleanup(self, job_id):
        if job_id in self.worker_progress:
            del self.worker_progress[job_id]  # <4>

            # Update the progress bar if we've removed a value.
            self.refresh_progress()

    def update_progress(self, job_id, progress):
        self.worker_progress[job_id] = progress

    def calculate_progress(self):
        if not self.worker_progress:
            return 0

        return int(sum(v for v in self.worker_progress.values()) / len(
            self.worker_progress
        ))

    def refresh_progress(self):
        # Calculate total progress.
        progress = self.calculate_progress()
        
        if progress:
            print(self.worker_progress)
        self.progress.setValue(progress)
        self.status.setText("%d workers" % len(self.worker_progress))


app = QApplication(sys.argv)
window = MainWindow()
app.exec()

 

Use a unique UUID4 identifier for this runner. 

Progress 0-100% as an integer. 

When the job finishes, we need to cleanup (delete) the workers progress. 

Delete the progress for the finished worker.

 

 

 

모든 진행률 표시 줄이 100에 도달 할 때만 항목을 제거합니다

concurrent/qrunnable_progress_many_2.py

import random
import sys
import time
import uuid

from PyQt6.QtCore import (
    QObject,
    QRunnable,
    QThreadPool,
    QTimer,
    pyqtSignal,
    pyqtSlot,
)
from PyQt6.QtWidgets import (
    QApplication,
    QLabel,
    QMainWindow,
    QProgressBar,
    QPushButton,
    QVBoxLayout,
    QWidget,
)


class WorkerSignals(QObject):
    """
    Defines the signals available from a running worker thread.

    progress
        int progress complete,from 0-100
    """

    progress = pyqtSignal(str, int)
    finished = pyqtSignal(str)


class Worker(QRunnable):
    """
    Worker thread

    Inherits from QRunnable to handle worker thread setup, signals
    and wrap-up.
    """

    def __init__(self):
        super().__init__()
        self.job_id = uuid.uuid4().hex  # <1>
        self.signals = WorkerSignals()

    @pyqtSlot()
    def run(self):
        total_n = 1000
        delay = random.random() / 100  # Random delay value.
        for n in range(total_n):
            progress_pc = int(100 * float(n + 1) / total_n)  # <2>
            self.signals.progress.emit(self.job_id, progress_pc)
            time.sleep(delay)

        self.signals.finished.emit(self.job_id)


class MainWindow(QMainWindow):
    def __init__(self):
        super().__init__()

        layout = QVBoxLayout()

        self.progress = QProgressBar()

        button = QPushButton("START IT UP")
        button.pressed.connect(self.execute)

        self.status = QLabel("0 workers")

        layout.addWidget(self.progress)
        layout.addWidget(button)
        layout.addWidget(self.status)

        w = QWidget()
        w.setLayout(layout)

        # Dictionary holds the progress of current workers.
        self.worker_progress = {}

        self.setCentralWidget(w)

        self.show()

        self.threadpool = QThreadPool()
        print(
            "Multithreading with maximum %d threads"
            % self.threadpool.maxThreadCount()
        )

        self.timer = QTimer()
        self.timer.setInterval(100)
        self.timer.timeout.connect(self.refresh_progress)
        self.timer.start()

    def execute(self):
        worker = Worker()
        worker.signals.progress.connect(self.update_progress)
        worker.signals.finished.connect(self.cleanup)  # <3>

        # Execute
        self.threadpool.start(worker)

    def cleanup(self, job_id):
        if all(v == 100 for v in self.worker_progress.values()):
            self.worker_progress.clear()  # Empty the dict.

            # Update the progress bar if we've removed a value.
            self.refresh_progress()

    def update_progress(self, job_id, progress):
        self.worker_progress[job_id] = progress

    def calculate_progress(self):
        if not self.worker_progress:
            return 0

        return sum(v for v in self.worker_progress.values()) / len(
            self.worker_progress
        )

    def refresh_progress(self):
        # Calculate total progress.
        progress = self.calculate_progress()
        print(self.worker_progress)
        self.progress.setValue(int(progress))
        self.status.setText("%d workers" % len(self.worker_progress))


app = QApplication(sys.argv)
window = MainWindow()
app.exec()

 

The calculator

concurrent/qrunnable_calculator.py

import random
import sys
import time
import uuid

from PyQt6.QtCore import (
    QObject,
    QRunnable,
    QThreadPool,
    QTimer,
    pyqtSignal,
    pyqtSlot,
)
from PyQt6.QtWidgets import (
    QApplication,
    QMainWindow,
    QPushButton,
    QVBoxLayout,
    QWidget,
)
import pyqtgraph as pg


class WorkerSignals(QObject):
    """
    Defines the signals available from a running worker thread.

    data
        tuple data point (worker_id, x, y)
    """

    data = pyqtSignal(tuple)  # <1>


class Worker(QRunnable):
    """
    Worker thread

    Inherits from QRunnable to handle worker thread setup, signals
    and wrap-up.
    """

    def __init__(self):
        super().__init__()
        self.worker_id = uuid.uuid4().hex  # Unique ID for this worker.
        self.signals = WorkerSignals()

    @pyqtSlot()
    def run(self):

        total_n = 1000
        y2 = random.randint(0, 10)
        delay = random.random() / 100  # Random delay value.
        value = 0

        for n in range(total_n):
            # Dummy calculation, each worker will produce different values,
            # because of the random y & y2 values.
            y = random.randint(0, 10)
            value += n * y2 - n * y

            self.signals.data.emit((self.worker_id, n, value))  # <2>
            time.sleep(delay)


class MainWindow(QMainWindow):
    def __init__(self):
        super().__init__()

        self.threadpool = QThreadPool()

        self.x = {}  # Keep timepoints.
        self.y = {}  # Keep data.
        self.lines = {}  # Keep references to plotted lines, to update.

        layout = QVBoxLayout()
        self.graphWidget = pg.PlotWidget()
        self.graphWidget.setBackground("w")
        layout.addWidget(self.graphWidget)

        button = QPushButton("Create New Worker")
        button.pressed.connect(self.execute)

        # layout.addWidget(self.progress)
        layout.addWidget(button)

        w = QWidget()
        w.setLayout(layout)

        self.setCentralWidget(w)

        self.show()

    def execute(self):
        worker = Worker()
        worker.signals.data.connect(self.receive_data)

        # Execute
        self.threadpool.start(worker)

    def receive_data(self, data):
        worker_id, x, y = data  # <3>

        if worker_id not in self.lines:
            self.x[worker_id] = [x]
            self.y[worker_id] = [y]

            # Generate a random color.
            pen = pg.mkPen(
                width=2,
                color=(
                    random.randint(100, 255),
                    random.randint(100, 255),
                    random.randint(100, 255),
                ),
            )
            self.lines[worker_id] = self.graphWidget.plot(
                self.x[worker_id], self.y[worker_id], pen=pen
            )
            return

        # Update existing plot/data
        self.x[worker_id].append(x)
        self.y[worker_id].append(y)

        self.lines[worker_id].setData(
            self.x[worker_id], self.y[worker_id]
        )


app = QApplication(sys.argv)
window = MainWindow()
app.exec()

 

Setup a custom signal to pass out the data. Using tuple allows you to send out any number of values wrapped in a tuple.

Here we’re emitting a worker_id, x and y value. 

Receiver slot unpacks the data.

 

작업자로부터 데이터를 받으면 테이블 또는 모델 뷰에 추가할 있습니다. 여기서 우리는 x y 값을 worker_id 의해 키가 지정된 dict 객체에 저장합니다. 이렇게 하면 작업자에 대한 데이터가 별도로 유지되고 개별적으로 플롯할 있습니다.

 

 

Stopping a running QRunnable

플래그를 사용하여 러너에게 중지해야 함을 나타내는 방법을 살펴보겠습니다

컴퓨팅에서 플래그는 현재 또는 상태 변경을 알리는 사용되는 변수입니다. 선박이 깃발을 사용하여 서로 통신하는 방법을 생각해보십시오.

 

concurrent/qrunnable_stop.py

import sys
import time

from PyQt6.QtCore import (
    QObject,
    QRunnable,
    Qt,
    QThreadPool,
    pyqtSignal,
    pyqtSlot,
)
from PyQt6.QtWidgets import (
    QApplication,
    QHBoxLayout,
    QMainWindow,
    QProgressBar,
    QPushButton,
    QWidget,
)


class WorkerKilledException(Exception):
    pass


class WorkerSignals(QObject):
    progress = pyqtSignal(int)


class JobRunner(QRunnable):

    signals = WorkerSignals()

    def __init__(self):
        super().__init__()

        self.is_killed = False  # <1>

    @pyqtSlot()
    def run(self):
        try:
            for n in range(100):
                self.signals.progress.emit(n + 1)
                time.sleep(0.1)

                if self.is_killed:  # <2>
                    raise WorkerKilledException

        except WorkerKilledException:
            pass  # <3>

    def kill(self):  # <4>
        self.is_killed = True


class MainWindow(QMainWindow):
    def __init__(self):
        super().__init__()

        # Some buttons
        w = QWidget()
        l = QHBoxLayout()
        w.setLayout(l)

        btn_stop = QPushButton("Stop")

        l.addWidget(btn_stop)

        self.setCentralWidget(w)

        # Create a statusbar.
        self.status = self.statusBar()
        self.progress = QProgressBar()
        self.status.addPermanentWidget(self.progress)

        # Thread runner
        self.threadpool = QThreadPool()

        # Create a runner
        self.runner = JobRunner()
        self.runner.signals.progress.connect(self.update_progress)
        self.threadpool.start(self.runner)

        btn_stop.pressed.connect(self.runner.kill)

        self.show()

    def update_progress(self, n):
        self.progress.setValue(n)


app = QApplication(sys.argv)
w = MainWindow()
app.exec()

The flag to indicate whether the runner should be killed is called .is_killed.

On each loop we test to see whether .is_killed is True in which case we throw an exception.

Catch the exception, we could emit a finished or error signal here.

.kill() convenience function so we can call worker.kill() to kill it.

 

오류를 발생시키지 않고 작업자를 중지하려면 run method등에서 return문을 실행한다. 
def run(self):
	for n in range(100): 
    	self.signals.progress.emit(n  +  1) 
        time.sleep(0)
        
    	if self.is_killed:
        	return
위의 예에서는 작업자가 한 명뿐입니다. 그러나 많은 응용 프로그램에서 더 많은 것을 갖게 될 것입니다. 여러 러너가 실행 중일 때 작업자 중지를 어떻게 처리합니까?

If you want the stop to stop all workers, then nothing is changed. You can simply hook all the workers up to the same "Stop" signal, and when that signal is fired e.g. by pressing a button all the workers will stop simultaneously.

If you want to be able to stop individual workers you would either need to create a separate button somewhere in your UI for each runner, or implement a manager to keep track of workers and provide a nicer interface to kill them.

 

Pausing a runner

concurrent/qrunnable_pause.py

import sys
import time

from PyQt6.QtCore import (
    QObject,
    QRunnable,
    Qt,
    QThreadPool,
    pyqtSignal,
    pyqtSlot,
)
from PyQt6.QtWidgets import (
    QApplication,
    QHBoxLayout,
    QMainWindow,
    QProgressBar,
    QPushButton,
    QWidget,
)


class WorkerKilledException(Exception):
    pass


class WorkerSignals(QObject):
    progress = pyqtSignal(int)


class JobRunner(QRunnable):

    signals = WorkerSignals()

    def __init__(self):
        super().__init__()

        self.is_paused = False
        self.is_killed = False

    @pyqtSlot()
    def run(self):
        for n in range(100):
            self.signals.progress.emit(n + 1)
            time.sleep(0.1)

            while self.is_paused:
                time.sleep(0)  # <1>

            if self.is_killed:
                raise WorkerKilledException

    def pause(self):
        self.is_paused = True

    def resume(self):
        self.is_paused = False

    def kill(self):
        self.is_killed = True


class MainWindow(QMainWindow):
    def __init__(self):
        super().__init__()

        # Some buttons
        w = QWidget()
        l = QHBoxLayout()
        w.setLayout(l)

        btn_stop = QPushButton("Stop")
        btn_pause = QPushButton("Pause")
        btn_resume = QPushButton("Resume")

        l.addWidget(btn_stop)
        l.addWidget(btn_pause)
        l.addWidget(btn_resume)

        self.setCentralWidget(w)

        # Create a statusbar.
        self.status = self.statusBar()
        self.progress = QProgressBar()
        self.status.addPermanentWidget(self.progress)

        # Thread runner
        self.threadpool = QThreadPool()

        # Create a runner
        self.runner = JobRunner()
        self.runner.signals.progress.connect(self.update_progress)
        self.threadpool.start(self.runner)

        btn_stop.pressed.connect(self.runner.kill)
        btn_pause.pressed.connect(self.runner.pause)
        btn_resume.pressed.connect(self.runner.resume)

        self.show()

    def update_progress(self, n):
        self.progress.setValue(n)


app = QApplication(sys.argv)
w = MainWindow()
app.exec()

You can put a higher value that 0 in the sleep call if you don’t want to check if it’s time to wake up very often.

 

If you click [ Stop ] the worker will stop, permanently, as before.

Python에는 여러 스레드가 동일한 GIL(Global Interpreter Lock )에 의해 바인딩된다는 문제가 있습니다. 즉, GIL을 해제하지 않는 Python 코드는 한 번에 하나의 스레드에서만 실행할 수 있습니다. 

You must include the time.sleep() call. This zero-second pause allows for Python to release the GIL, so this loop will not block other execution. Without that sleep you have a busy loop which will waste resources while doing nothing. Increase the sleep value if you want to check less often.

 

The Communicator

스레드를 실행할 때 자주 발생하는 동안 발생하는 일에서 출력을 얻을 수 있기를 원합니다.

이 예제에서는 별도의 스레드에서 원격 서버에 대한 요청을 수행하고 출력을 로거에 덤프하는 실행기를 만듭니다. 또한 사용자 지정 파서를 러너에 전달하여 요청에서 관심 있는 추가 데이터에 전달하는 방법도 살펴보겠습니다.

 

스레드가 아닌 외부 프로세스의 데이터를 기록하려면 외부 프로세스 실행 및 외부 명령 및 프로세스 실행을 살펴보십시오.

 

You must include the time.sleep() call. This zero-second pause allows for Python to release the GIL, so this loop will not block other execution. Without that sleep you have a busy loop which will waste resources while doing nothing. Increase the sleep value if you want to check less often

 

Dumping data

concurrent/qrunnable_io.py

import sys

import requests
from PyQt6.QtCore import (
    QObject,
    QRunnable,
    QThreadPool,
    QTimer,
    pyqtSignal,
    pyqtSlot,
)
from PyQt6.QtWidgets import (
    QApplication,
    QLabel,
    QMainWindow,
    QPlainTextEdit,
    QPushButton,
    QVBoxLayout,
    QWidget,
)


class WorkerSignals(QObject):
    """
    Defines the signals available from a running worker thread.

    data
        tuple of (identifier, data)
    """

    data = pyqtSignal(tuple)


class Worker(QRunnable):
    """
    Worker thread

    Inherits from QRunnable to handle worker thread setup, signals
    and wrap-up.

    :param id: The id for this worker
    :param url: The url to retrieve
    """

    def __init__(self, id, url):
        super().__init__()
        self.id = id
        self.url = url

        self.signals = WorkerSignals()

    @pyqtSlot()
    def run(self):
        r = requests.get(self.url)

        for line in r.text.splitlines():
            self.signals.data.emit((self.id, line))


class MainWindow(QMainWindow):
    def __init__(self):
        super().__init__()

        self.urls = [
            "https://www.pythonguis.com/",
            "https://www.mfitzp.com/",
            "https://www.google.com",
            "https://academy.pythonguis.com/",
        ]

        layout = QVBoxLayout()

        self.text = QPlainTextEdit()
        self.text.setReadOnly(True)

        button = QPushButton("GO GET EM!")
        button.pressed.connect(self.execute)

        layout.addWidget(self.text)
        layout.addWidget(button)

        w = QWidget()
        w.setLayout(layout)

        self.setCentralWidget(w)

        self.show()

        self.threadpool = QThreadPool()
        print(
            "Multithreading with maximum %d threads"
            % self.threadpool.maxThreadCount()
        )

    def execute(self):
        for n, url in enumerate(self.urls):
            worker = Worker(n, url)
            worker.signals.data.connect(self.display_output)

            # Execute
            self.threadpool.start(worker)

    def display_output(self, data):
        id, s = data
        self.text.appendPlainText("WORKER %d: %s" % (id, s))


app = QApplication(sys.argv)
window = MainWindow()
app.exec()

 

Parsing data

concurrent/qrunnable_io_parser.py

17.9 qrunnable_io_parser.py
0.00MB

 

 

import re
import sys

import requests
from PyQt6.QtCore import (
    QObject,
    QRunnable,
    QThreadPool,
    QTimer,
    pyqtSignal,
    pyqtSlot,
)
from PyQt6.QtWidgets import (
    QApplication,
    QLabel,
    QMainWindow,
    QPlainTextEdit,
    QPushButton,
    QVBoxLayout,
    QWidget,
)


class WorkerSignals(QObject):
    """
    Defines the signals available from a running worker thread.

    data
        tuple of (identifier, data)
    """

    data = pyqtSignal(tuple)


class Worker(QRunnable):
    """
    Worker thread

    Inherits from QRunnable to handle worker thread setup, signals
    and wrap-up.

    :param id: The id for this worker
    :param url: The url to retrieve
    """

    def __init__(self, id, url, parsers):
        super().__init__()
        self.id = id
        self.url = url
        self.parsers = parsers

        self.signals = WorkerSignals()

    @pyqtSlot()
    def run(self):
        r = requests.get(self.url)

        data = {}
        for name, parser in self.parsers.items():  # <1>
            m = parser.search(r.text)
            if m:  # <2>
                data[name] = m.group(1).strip()

        self.signals.data.emit((self.id, data))




class MainWindow(QMainWindow):
    def __init__(self):
        super().__init__()

        self.urls = [
            "https://www.pythonguis.com/",
            "https://www.mfitzp.com/",
            "https://www.google.com",
            "https://academy.pythonguis.com/",
        ]
        # tag::init[]
        self.parsers = {  # <1>
            # Regular expression parsers, to extract data from the HTML.
            "title": re.compile(
                r"<title.*?>(.*?)<\/title>", re.M | re.S
            ),
            "h1": re.compile(r"<h1.*?>(.*?)<\/h1>", re.M | re.S),
            "h2": re.compile(r"<h2.*?>(.*?)<\/h2>", re.M | re.S),
        }
        # end::init[]

        layout = QVBoxLayout()

        self.text = QPlainTextEdit()
        self.text.setReadOnly(True)

        button = QPushButton("GO GET EM!")
        button.pressed.connect(self.execute)

        layout.addWidget(self.text)
        layout.addWidget(button)

        w = QWidget()
        w.setLayout(layout)

        self.setCentralWidget(w)

        self.show()

        self.threadpool = QThreadPool()
        print(
            "Multithreading with maximum %d threads"
            % self.threadpool.maxThreadCount()
        )

    # tag::execute[]
    def execute(self):
        for n, url in enumerate(self.urls):
            worker = Worker(n, url, self.parsers)  # <1>
            worker.signals.data.connect(self.display_output)

            # Execute
            self.threadpool.start(worker)

    # end::execute[]

    def display_output(self, data):
        id, s = data
        self.text.appendPlainText("WORKER %d: %s" % (id, s))


app = QApplication(sys.argv)
window = MainWindow()
app.exec()

 

The Generic

concurrent/qrunnable_generic.py

import sys
import time
import traceback

from PyQt6.QtCore import (
    QObject,
    QRunnable,
    QThreadPool,
    QTimer,
    pyqtSignal,
    pyqtSlot,
)
from PyQt6.QtWidgets import (
    QApplication,
    QLabel,
    QMainWindow,
    QPushButton,
    QVBoxLayout,
    QWidget,
)


def execute_this_fn():
    for _ in range(0, 5):
        time.sleep(1)

    return "Done."


class WorkerSignals(QObject):
    """
    Defines the signals available from a running worker thread.

    Supported signals are:

    finished
        No data

    error
        `tuple` (exctype, value, traceback.format_exc() )

    result
        `object` data returned from processing, anything

    """

    finished = pyqtSignal()
    error = pyqtSignal(tuple)
    result = pyqtSignal(object)


class Worker(QRunnable):
    """
    Worker thread

    Inherits from QRunnable to handle worker thread setup, signals and wrap-up.

    :param callback: The function callback to run on this worker
    :thread. Supplied args and
                     kwargs will be passed through to the runner.
    :type callback: function
    :param args: Arguments to pass to the callback function
    :param kwargs: Keywords to pass to the callback function
    :
    """

    def __init__(self, fn, *args, **kwargs):
        super().__init__()
        # Store constructor arguments (re-used for processing)
        self.fn = fn
        self.args = args
        self.kwargs = kwargs
        self.signals = WorkerSignals()

    @pyqtSlot()
    def run(self):
        """
        Initialize the runner function with passed args, kwargs.
        """

        # Retrieve args/kwargs here; and fire processing using them
        try:
            result = self.fn(*self.args, **self.kwargs)
        except:
            traceback.print_exc()
            exctype, value = sys.exc_info()[:2]
            self.signals.error.emit(
                (exctype, value, traceback.format_exc())
            )
        else:
            self.signals.result.emit(
                result
            )  # Return the result of the processing
        finally:
            self.signals.finished.emit()  # Done


class MainWindow(QMainWindow):
    def __init__(self):
        super().__init__()

        self.counter = 0

        layout = QVBoxLayout()

        self.l = QLabel("Start")
        b = QPushButton("DANGER!")
        b.pressed.connect(self.oh_no)

        layout.addWidget(self.l)
        layout.addWidget(b)

        w = QWidget()
        w.setLayout(layout)

        self.setCentralWidget(w)

        self.show()

        self.threadpool = QThreadPool()
        print(
            "Multithreading with maximum %d threads"
            % self.threadpool.maxThreadCount()
        )

        self.timer = QTimer()
        self.timer.setInterval(1000)
        self.timer.timeout.connect(self.recurring_timer)
        self.timer.start()

    def print_output(self, s):
        print(s)

    def thread_complete(self):
        print("THREAD COMPLETE!")

    def oh_no(self):
        # Pass the function to execute
        worker = Worker(
            execute_this_fn
        )  # Any other args, kwargs are passed to the run function
        worker.signals.result.connect(self.print_output)
        worker.signals.finished.connect(self.thread_complete)

        # Execute
        self.threadpool.start(worker)

    def recurring_timer(self):
        self.counter += 1
        self.l.setText("Counter: %d" % self.counter)


app = QApplication(sys.argv)
window = MainWindow()
app.exec()

 

concurrent/qrunnable_generic_callback.py

import sys
import time
import traceback

from PyQt6.QtCore import (
    QObject,
    QRunnable,
    QThreadPool,
    QTimer,
    pyqtSignal,
    pyqtSlot,
)
from PyQt6.QtWidgets import (
    QApplication,
    QLabel,
    QMainWindow,
    QPushButton,
    QVBoxLayout,
    QWidget,
)


def execute_this_fn(signals):
    for n in range(0, 5):
        time.sleep(1)
        signals.progress.emit(n * 100 / 4)

    return "Done."


class WorkerSignals(QObject):
    """
    Defines the signals available from a running worker thread.

    Supported signals are:

    finished
        No data

    error
        `tuple` (exctype, value, traceback.format_exc() )

    result
        `object` data returned from processing, anything

    progress
        `int` indicating % progress

    """

    finished = pyqtSignal()
    error = pyqtSignal(tuple)
    result = pyqtSignal(object)
    progress = pyqtSignal(int)


class Worker(QRunnable):
    """
    Worker thread

    Inherits from QRunnable to handle worker thread setup, signals and wrap-up.

    :param callback: The function callback to run on this worker
    :thread. Supplied args and
                     kwargs will be passed through to the runner.
    :type callback: function
    :param args: Arguments to pass to the callback function
    :param kwargs: Keywords to pass to the callback function
    :
    """

    def __init__(self, fn, *args, **kwargs):
        super().__init__()
        # Store constructor arguments (re-used for processing)
        self.fn = fn
        self.args = args
        self.kwargs = kwargs
        self.signals = WorkerSignals()

        # Add the callback to our kwargs
        kwargs["signals"] = self.signals

    @pyqtSlot()
    def run(self):
        """
        Initialize the runner function with passed args, kwargs.
        """

        # Retrieve args/kwargs here; and fire processing using them
        try:
            result = self.fn(*self.args, **self.kwargs)
        except Exception:
            traceback.print_exc()
            exctype, value = sys.exc_info()[:2]
            self.signals.error.emit(
                (exctype, value, traceback.format_exc())
            )
        else:
            self.signals.result.emit(
                result
            )  # Return the result of the processing
        finally:
            self.signals.finished.emit()  # Done


class MainWindow(QMainWindow):
    def __init__(self):
        super().__init__()

        self.counter = 0

        layout = QVBoxLayout()

        self.l = QLabel("Start")
        b = QPushButton("DANGER!")
        b.pressed.connect(self.oh_no)

        layout.addWidget(self.l)
        layout.addWidget(b)

        w = QWidget()
        w.setLayout(layout)

        self.setCentralWidget(w)

        self.show()

        self.threadpool = QThreadPool()
        print(
            "Multithreading with maximum %d threads"
            % self.threadpool.maxThreadCount()
        )

        self.timer = QTimer()
        self.timer.setInterval(1000)
        self.timer.timeout.connect(self.recurring_timer)
        self.timer.start()

    def progress_fn(self, n):
        print("%d%% done" % n)

    def print_output(self, s):
        print(s)

    def thread_complete(self):
        print("THREAD COMPLETE!")

    def oh_no(self):
        # Pass the function to execute
        worker = Worker(
            execute_this_fn
        )  # Any other args, kwargs are passed to the run function
        worker.signals.result.connect(self.print_output)
        worker.signals.finished.connect(self.thread_complete)
        worker.signals.progress.connect(self.progress_fn)

        # Execute
        self.threadpool.start(worker)

    def recurring_timer(self):
        self.counter += 1
        self.l.setText("Counter: %d" % self.counter)


app = QApplication(sys.argv)
window = MainWindow()
app.exec()

 

Running External processes

지금까지 다른 스레드에서 Python 코드를 실행하는 방법을 살펴 보았습니다. 그러나 때로는 다른 프로세스에서 명령 줄 프로그램과 같은 외부 프로그램을 실행해야합니다.

실제로 PyQt6으로 외부 프로세스를 시작할 가지 옵션이 있습니다. 파이썬의 내장 서브 프로세스 모듈을 사용하여 프로세스를 시작하거나 Qt QProcess 사용할 있습니다.

프로세스를 시작하면 항상 실행 비용이 적게 들며 GUI 일시적으로 차단됩니다. 이는 일반적으로 인식할 없지만 사용 사례에 따라 추가될 있으며 성능에 영향을 미칠 있습니다. 다른 스레드에서 프로세스를 시작하여이 문제를 해결할 있습니다. 

실시간으로 프로세스와 통신하려면 GUI 차단하지 않도록 별도의 스레드가 필요합니다. QProcess 내부적으로이 별도의 스레드를 처리하지만 파이썬 하위 프로세스를 사용하면 이 작업을 직접 수행해야합니다. 

QRunnable 예제에서는 작업자 인스턴스를 사용하여 Python 하위 프로세스를 통해 외부 프로세스 시작을 처리합니다. 이렇게하면 프로세스의 시작 비용이 GUI 스레드에서 벗어나고 Python 통해 프로세스와 직접 상호 작용할 있습니다.

concurrent/qrunnable_process.py

dummy_script.py
0.00MB

 

 

import subprocess
import sys

from PyQt6.QtCore import (
    QObject,
    QRunnable,
    QThreadPool,
    pyqtSignal,
    pyqtSlot,
)
from PyQt6.QtWidgets import (
    QApplication,
    QMainWindow,
    QPlainTextEdit,
    QPushButton,
    QVBoxLayout,
    QWidget,
)


class WorkerSignals(QObject):
    """
    Defines the signals available from a running worker thread.

    Supported signals are:

    finished: No data
    result: str
    """

    result = pyqtSignal(
        str
    )  # Send back the output from the process as a string.
    finished = pyqtSignal()


class SubProcessWorker(QRunnable):
    """
    ProcessWorker worker thread

    Inherits from QRunnable to handle worker thread setup, signals and wrap-up.

    :param command: command to execute with `subprocess`.

    """

    def __init__(self, command):
        super().__init__()

        # Store constructor arguments (re-used for processing).
        self.signals = WorkerSignals()

        # The command to be executed.
        self.command = command

    @pyqtSlot()
    def run(self):
        """
        Execute the command, returning the result.
        """
        output = subprocess.getoutput(self.command)
        self.signals.result.emit(output)
        self.signals.finished.emit()


class MainWindow(QMainWindow):
    def __init__(self):
        super().__init__()

        # Some buttons
        layout = QVBoxLayout()

        self.text = QPlainTextEdit()
        layout.addWidget(self.text)

        btn_run = QPushButton("Execute")
        btn_run.clicked.connect(self.start)

        layout.addWidget(btn_run)

        w = QWidget()
        w.setLayout(layout)
        self.setCentralWidget(w)

        # Thread runner
        self.threadpool = QThreadPool()

        self.show()

    def start(self):
        # Create a runner
        self.runner = SubProcessWorker("python dummy_script.py")
        self.runner.signals.result.connect(self.result)
        self.threadpool.start(self.runner)

    def result(self, s):
        self.text.appendPlainText(s)


app = QApplication(sys.argv)
w = MainWindow()
app.exec()

Running processes have two streams of output standard out and standard error. The standard output returns the actual result of the execution (if any) while standard error returns any error or logging information.

 

In this example we’re running the external script using subprocess.getoutput. This runs the external program, waiting for it to complete before returning. Once it has completed, getoutput returns both the standard output and standard error together as a single string.

 

 

Parsing the result

출력을 있는 그대로 전달할 필요는 없습니다. 명령의 출력에 대해 수행할 사후 처리가 있는 경우 작업자 스레드에서도 이를 처리하여 자체 포함된 상태로 유지하는 것이 합리적일 있습니다. 그런 다음 작업자는 사용할 준비가 구조화된 형식으로 GUI 스레드에 데이터를 반환할 있습니다.

 

다음 예제에서는 데모 스크립트의 결과를 사후 처리하는 함수를 전달하여 관심 있는 값을 사전에 추출합니다. 데이터는 GUI 측에서 위젯을 업데이트하는 사용됩니다.

 

concurrent/qrunnable_process_result.py

import subprocess
import sys
from collections import namedtuple

from PyQt6.QtCore import (
    QObject,
    QRunnable,
    QThreadPool,
    pyqtSignal,
    pyqtSlot,
)
from PyQt6.QtWidgets import (
    QApplication,
    QLineEdit,
    QMainWindow,
    QPushButton,
    QSpinBox,
    QVBoxLayout,
    QWidget,
)


def extract_vars(l):
    """
    Extracts variables from lines, looking for lines
    containing an equals, and splitting into key=value.
    """
    data = {}
    for s in l.splitlines():
        if "=" in s:
            name, value = s.split("=")
            data[name] = value

    data["number_of_lines"] = len(l)
    return data


class WorkerSignals(QObject):
    """
    Defines the signals available from a running worker thread.

    Supported signals are:

    finished: No data
    result: dict
    """

    result = pyqtSignal(dict)  # Send back the output as dictionary.
    finished = pyqtSignal()


class SubProcessWorker(QRunnable):
    """
    ProcessWorker worker thread

    Inherits from QRunnable to handle worker thread setup, signals and wrap-up.

    :param command: command to execute with `subprocess`.

    """

    def __init__(self, command, process_result=None):
        super().__init__()

        # Store constructor arguments (re-used for processing).
        self.signals = WorkerSignals()

        # The command to be executed.
        self.command = command

        # The post-processing fn.
        self.process_result = process_result

    @pyqtSlot()
    def run(self):
        """
        Execute the command, returning the result.
        """
        output = subprocess.getoutput(self.command)

        if self.process_result:
            output = self.process_result(output)

        self.signals.result.emit(output)
        self.signals.finished.emit()


class MainWindow(QMainWindow):
    def __init__(self):
        super().__init__()

        # Some buttons
        layout = QVBoxLayout()

        self.name = QLineEdit()
        layout.addWidget(self.name)

        self.country = QLineEdit()
        layout.addWidget(self.country)

        self.website = QLineEdit()
        layout.addWidget(self.website)

        self.number_of_lines = QSpinBox()
        layout.addWidget(self.number_of_lines)

        btn_run = QPushButton("Execute")
        btn_run.clicked.connect(self.start)

        layout.addWidget(btn_run)

        w = QWidget()
        w.setLayout(layout)
        self.setCentralWidget(w)

        # Thread runner
        self.threadpool = QThreadPool()

        self.show()

    def start(self):
        # Create a runner
        self.runner = SubProcessWorker(
            "python dummy_script.py", process_result=extract_vars
        )
        self.runner.signals.result.connect(self.result)
        self.threadpool.start(self.runner)

    def result(self, data):
        print(data)
        self.name.setText(data["name"])
        self.country.setText(data["country"])
        self.website.setText(data["website"])
        self.number_of_lines.setValue(data["number_of_lines"])


app = QApplication(sys.argv)
w = MainWindow()
app.exec()

The simple parser in this case looks for any lines with a = in them, splits on this to produce a name and a value, which are then stored in a dict. However, you can use any tools you like to extract data from the string output.

 

Because getoutput blocks until the program is complete, we cannot see how the program is running for example, to get progress information. In the next example we’ll show how to get live output from a running process.

 

Tracking progress

Often external programs will output progress information to the console. You might want to capture this and either show it to your users, or use it to generate a progress bar.

 

For the result of the execution you usually want to capture standard out, for the progress to capture standard error. In this following example we capture both. As well as the command, we pass a custom parser function to the worker, to capture the current worker progress and emit it as a number 0-99.

 

This example is quite complex. The full source code is available in the source code with the book, but here we’ll cover the key differences to the simpler one.

 

concurrent/qrunnable_process_parser.py

import re
import subprocess
import sys

from PyQt6.QtCore import (
    QObject,
    QRunnable,
    QThreadPool,
    pyqtSignal,
    pyqtSlot,
)
from PyQt6.QtWidgets import (
    QApplication,
    QMainWindow,
    QPlainTextEdit,
    QProgressBar,
    QPushButton,
    QVBoxLayout,
    QWidget,
)

progress_re = re.compile("Total complete: (\d+)%")


def simple_percent_parser(output):
    """
    Matches lines using the progress_re regex,
    returning a single integer for the % progress.
    """
    m = progress_re.search(output)
    if m:
        pc_complete = m.group(1)
        return int(pc_complete)




class WorkerSignals(QObject):
    """
    Defines the signals available from a running worker thread.

    Supported signals are:

    finished: No data
    result: str
    """

    result = pyqtSignal(
        str
    )  # Send back the output from the process as a string.
    progress = pyqtSignal(
        int
    )  # Return an integer 0-100 showing the current progress.
    finished = pyqtSignal()


class SubProcessWorker(QRunnable):
    """
    ProcessWorker worker thread

    Inherits from QRunnable to handle worker thread setup, signals and wrap-up.

    :param command: command to execute with `subprocess`.

    """

    def __init__(self, command, parser=None):
        super().__init__()

        # Store constructor arguments (re-used for processing).
        self.signals = WorkerSignals()

        # The command to be executed.
        self.command = command

        # The parser function to extract the progress information.
        self.parser = parser

    # tag::workerRun[]
    @pyqtSlot()
    def run(self):
        """
        Initialize the runner function with passed args, kwargs.
        """

        result = []

        with subprocess.Popen(  # <1>
            self.command,
            bufsize=1,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,  # <2>
            universal_newlines=True,
        ) as proc:
            while proc.poll() is None:
                data = proc.stdout.readline()  # <3>
                result.append(data)
                if self.parser:  # <4>
                    value = self.parser(data)
                    if value:
                        self.signals.progress.emit(value)

        output = "".join(result)

        self.signals.result.emit(output)

    # end::workerRun[]


class MainWindow(QMainWindow):
    def __init__(self):
        super().__init__()

        layout = QVBoxLayout()

        self.text = QPlainTextEdit()
        layout.addWidget(self.text)

        self.progress = QProgressBar()
        self.progress.setRange(0, 100)
        self.progress.setValue(0)
        layout.addWidget(self.progress)

        btn_run = QPushButton("Execute")
        btn_run.clicked.connect(self.start)

        layout.addWidget(btn_run)

        w = QWidget()
        w.setLayout(layout)
        self.setCentralWidget(w)

        # Thread runner
        self.threadpool = QThreadPool()

        self.show()

    # tag::start[]
    def start(self):
        # Create a runner
        self.runner = SubProcessWorker(
            command="python dummy_script.py",
            parser=simple_percent_parser,
        )
        self.runner.signals.result.connect(self.result)
        self.runner.signals.progress.connect(self.progress.setValue)
        self.threadpool.start(self.runner)

    # end::start[]

    def result(self, s):
        self.text.appendPlainText(s)


app = QApplication(sys.argv)
w = MainWindow()
app.exec()

Run using Popen to give us access to output streams. 

We pipe standard error out together with standard output. 

Read a line from the process (or wait for one). 

Pass all collected data so far to the parser.

 

Parsing is handled by this simple parser function, which takes in a string and matches the regular expression Total complete: (\d+)%.

progress_re = re.compile("Total complete: (\d+)%")


def simple_percent_parser(output):
    """
    Matches lines using the progress_re regex,
    returning a single integer for the % progress.
    """
    m = progress_re.search(output)
    if m:
        pc_complete = m.group(1)
        return int(pc_complete)

The parser is passed into the runner along with the command this means we can use a generic runner for all subprocesses and handle the output differently for different commands.

    def start(self):
        # Create a runner
        self.runner = SubProcessWorker(
            command="python dummy_script.py",
            parser=simple_percent_parser,
        )
        self.runner.signals.result.connect(self.result)
        self.runner.signals.progress.connect(self.progress.setValue)
        self.threadpool.start(self.runner)

In this simple example we only pass the latest line from the process, since our custom script outputs lines like Total complete: 25%. That means that we only need the latest line to be able to calculate the current progress. 

Sometimes however, scripts can be a bit less helpful. For example ffmpeg the video encoder outputs the duration of the video file to be processed once at the beginning, then outputs the duration that has currently been processed. To calculate the % of progress you need both values. 

To do that, you can pass the collected output to the parser instead. There is an example of this in the source code with the book, named concurrent/qrunnable_process_parser_elapsed.py.

qrunnable_process_parser_elapsed.py
0.00MB

 

 

 

The Manager

In the previous examples we’ve created a number of different QRunnable implementations that can be used for different purposes in your application. In all cases you can run as many of these runners as you like, on the same or multiple QThreadPool pools. However, sometimes you will want to keep track of the runners which you have running in order to do something with their output, or provide users with control over the runners directly.

 

QThreadPool itself does not give you access to the currently running runners, so we need to create our own manager ourselves, through which we start and control our workers.

 

The example below brings together some of the other worker features already introduced progress, pause and stop control together with the model views to present individual progress bars. This manager will likely work as a drop-in for most use-cases you have for running threads.

 

This is quite a complex example, the full source code is available in  the  resources  for  the  book.  Here  we’ll  go  through  the  key parts of the QRunnable manager in turn.

 

The worker manager

The worker manager class holds the threadpool, our workers and their progress and state information. It is derived from QAbstractListModel meaning it also provides a Qt model-like interface, allowing for it to be used as the model for a QListView providing a per-worker progress bar and status indicator. The status tracking is handled through a number of internal signals, which attach automatically to every added worker.

 

concurrent/qrunnable_manager.py

import random
import subprocess
import sys
import time
import traceback
import uuid

from PyQt6.QtCore import (
    QAbstractListModel,
    QObject,
    QRect,
    QRunnable,
    Qt,
    QThreadPool,
    QTimer,
    pyqtSignal,
    pyqtSlot,
)
from PyQt6.QtGui import QBrush, QColor, QPen
from PyQt6.QtWidgets import (
    QApplication,
    QListView,
    QMainWindow,
    QPlainTextEdit,
    QProgressBar,
    QPushButton,
    QStyledItemDelegate,
    QVBoxLayout,
    QWidget,
)

STATUS_WAITING = "waiting"
STATUS_RUNNING = "running"
STATUS_ERROR = "error"
STATUS_COMPLETE = "complete"


STATUS_COLORS = {
    STATUS_RUNNING: "#33a02c",
    STATUS_ERROR: "#e31a1c",
    STATUS_COMPLETE: "#b2df8a",
}

DEFAULT_STATE = {"progress": 0, "status": STATUS_WAITING}

class WorkerSignals(QObject):
    """
    Defines the signals available from a running worker thread.

    Supported signals are:

    finished
        No data

    error
        `tuple` (exctype, value, traceback.format_exc() )

    result
        `object` data returned from processing, anything

    progress
        `int` indicating % progress

    """

    error = pyqtSignal(str, str)
    result = pyqtSignal(str, object)  # We can send anything back.

    finished = pyqtSignal(str)
    progress = pyqtSignal(str, int)
    status = pyqtSignal(str, str)


class Worker(QRunnable):
    """
    Worker thread

    Inherits from QRunnable to handle worker thread setup, signals and wrap-up.

    :param args: Arguments to pass for the worker
    :param kwargs: Keywords to pass for the worker

    """

    def __init__(self, *args, **kwargs):
        super().__init__()

        # Store constructor arguments (re-used for processing).
        self.signals = WorkerSignals()

        # Give this job a unique ID.
        self.job_id = str(uuid.uuid4())

        # The arguments for the worker
        self.args = args
        self.kwargs = kwargs

        self.signals.status.emit(self.job_id, STATUS_WAITING)

    @pyqtSlot()
    def run(self):
        """
        Initialize the runner function with passed args, kwargs.
        """

        self.signals.status.emit(self.job_id, STATUS_RUNNING)

        x, y = self.args

        try:

            value = random.randint(0, 100) * x
            delay = random.random() / 10
            result = []

            for n in range(100):
                # Generate some numbers.
                value = value / y
                y -= 1

                # The following will sometimes throw a division by zero error.
                result.append(value)

                # Pass out the current progress.
                self.signals.progress.emit(self.job_id, n + 1)
                time.sleep(delay)

        except Exception as e:
            print(e)
            # We swallow the error and continue.
            self.signals.error.emit(self.job_id, str(e))
            self.signals.status.emit(self.job_id, STATUS_ERROR)

        else:
            self.signals.result.emit(self.job_id, result)
            self.signals.status.emit(self.job_id, STATUS_COMPLETE)

        self.signals.finished.emit(self.job_id)




class WorkerManager(QAbstractListModel):
    """
    Manager to handle our worker queues and state.
    Also functions as a Qt data model for a view
    displaying progress for each worker.

    """

    _workers = {}
    _state = {}

    status = pyqtSignal(str)

    def __init__(self):
        super().__init__()

        # Create a threadpool for our workers.
        self.threadpool = QThreadPool()
        # self.threadpool.setMaxThreadCount(1)
        self.max_threads = self.threadpool.maxThreadCount()
        print(
            "Multithreading with maximum %d threads" % self.max_threads
        )

        self.status_timer = QTimer()
        self.status_timer.setInterval(100)
        self.status_timer.timeout.connect(self.notify_status)
        self.status_timer.start()

    def notify_status(self):
        n_workers = len(self._workers)
        running = min(n_workers, self.max_threads)
        waiting = max(0, n_workers - self.max_threads)
        self.status.emit(
            "{} running, {} waiting, {} threads".format(
                running, waiting, self.max_threads
            )
        )

    def enqueue(self, worker):
        """
        Enqueue a worker to run (at some point) by passing it to the QThreadPool.
        """
        worker.signals.error.connect(self.receive_error)
        worker.signals.status.connect(self.receive_status)
        worker.signals.progress.connect(self.receive_progress)
        worker.signals.finished.connect(self.done)

        self.threadpool.start(worker)
        self._workers[worker.job_id] = worker

        # Set default status to waiting, 0 progress.
        self._state[worker.job_id] = DEFAULT_STATE.copy()

        self.layoutChanged.emit()

    def receive_status(self, job_id, status):
        self._state[job_id]["status"] = status
        self.layoutChanged.emit()

    def receive_progress(self, job_id, progress):
        self._state[job_id]["progress"] = progress
        self.layoutChanged.emit()

    def receive_error(self, job_id, message):
        print(job_id, message)

    def done(self, job_id):
        """
        Task/worker complete. Remove it from the active workers
        dictionary. We leave it in worker_state, as this is used to
        to display past/complete workers too.
        """
        del self._workers[job_id]
        self.layoutChanged.emit()

    def cleanup(self):
        """
        Remove any complete/failed workers from worker_state.
        """
        for job_id, s in list(self._state.items()):
            if s["status"] in (STATUS_COMPLETE, STATUS_ERROR):
                del self._state[job_id]
        self.layoutChanged.emit()

    # Model interface
    def data(self, index, role):
        if role == Qt.ItemDataRole.DisplayRole:
            # See below for the data structure.
            job_ids = list(self._state.keys())
            job_id = job_ids[index.row()]
            return job_id, self._state[job_id]

    def rowCount(self, index):
        return len(self._state)




class ProgressBarDelegate(QStyledItemDelegate):
    def paint(self, painter, option, index):
        # data is our status dict, containing progress, id, status
        job_id, data = index.model().data(
            index, Qt.ItemDataRole.DisplayRole
        )
        if data["progress"] > 0:
            color = QColor(STATUS_COLORS[data["status"]])

            brush = QBrush()
            brush.setColor(color)
            brush.setStyle(Qt.BrushStyle.SolidPattern)

            width = option.rect.width() * data["progress"] / 100

            rect = QRect(
                option.rect
            )  # Copy of the rect, so we can modify.
            rect.setWidth(width)

            painter.fillRect(rect, brush)

        pen = QPen()
        pen.setColor(Qt.GlobalColor.black)
        painter.drawText(
            option.rect, Qt.AlignmentFlag.AlignLeft, job_id
        )




class MainWindow(QMainWindow):
    def __init__(self):
        super().__init__()

        self.workers = WorkerManager()

        self.workers.status.connect(self.statusBar().showMessage)

        layout = QVBoxLayout()

        self.progress = QListView()
        self.progress.setModel(self.workers)
        delegate = ProgressBarDelegate()
        self.progress.setItemDelegate(delegate)

        layout.addWidget(self.progress)

        self.text = QPlainTextEdit()
        self.text.setReadOnly(True)

        start = QPushButton("Start a worker")
        start.pressed.connect(self.start_worker)

        clear = QPushButton("Clear")
        clear.pressed.connect(self.workers.cleanup)

        layout.addWidget(self.text)
        layout.addWidget(start)
        layout.addWidget(clear)

        w = QWidget()
        w.setLayout(layout)

        self.setCentralWidget(w)

        self.show()

    # tag::startWorker[]
    def start_worker(self):
        x = random.randint(0, 1000)
        y = random.randint(0, 1000)

        w = Worker(x, y)
        w.signals.result.connect(self.display_result)
        w.signals.error.connect(self.display_result)

        self.workers.enqueue(w)

    # end::startWorker[]

    def display_result(self, job_id, data):
        self.text.appendPlainText("WORKER %s: %s" % (job_id, data))


app = QApplication(sys.argv)
window = MainWindow()
app.exec()

Workers are constructed outside the manager and passed in via .enqueue(). This connects all signals and adds the worker to the thread pool`. It will be executed, as normal once a thread is available. 

The worker’s are kept in an internal dictionary _workers keyed by the job id. There is a separate dictionary _state which stores the status and progress information about the workers. We keep them separate so we can delete jobs once complete, keeping an accurate count, yet  continue  to  show  information about completed jobs until cleared. 

Signals from each submitted workers are connected to slots on the manager, which update the _state dictionary, print error messages or delete the completed job. Once any state is updated, we must call .layoutChanged() to trigger a refresh of the model view. The _clear_ method iterates through the \_state list  and removes any that are complete or have failed. 

Lastly, we set up a timer to regularly trigger a method to emit the current thread counts as a status message. The number of active threads is the minimum of the number of _workers and the max_threads. The waiting threads is the number of

\_workers _minus_ the max_threads (as long as it is more than zero). The message is shown on the main window status bar.

 

The worker

The worker itself follows the same pattern as all our previous examples. The only requirement for our manager is the addition of a .job_id property which is set when the worker is created.

The signals from the workers must include this job id so the manager knows which worker sent the signal — updating the correct status, progress and finished states. 

The worker itself is a simply dummy worker, which iterates 100 times (1 for each % progress) and performs a simple calculation. This worker calculation generates a series of numbers, but is constructed to occasionally throw division by zero errors.

 

class WorkerSignals(QObject):
    """
    Defines the signals available from a running worker thread.

    Supported signals are:

    finished
        No data

    error
        `tuple` (exctype, value, traceback.format_exc() )

    result
        `object` data returned from processing, anything

    progress
        `int` indicating % progress

    """

    error = pyqtSignal(str, str)
    result = pyqtSignal(str, object)  # We can send anything back.

    finished = pyqtSignal(str)
    progress = pyqtSignal(str, int)
    status = pyqtSignal(str, str)


class Worker(QRunnable):
    """
    Worker thread

    Inherits from QRunnable to handle worker thread setup, signals and wrap-up.

    :param args: Arguments to pass for the worker
    :param kwargs: Keywords to pass for the worker

    """

    def __init__(self, *args, **kwargs):
        super().__init__()

        # Store constructor arguments (re-used for processing).
        self.signals = WorkerSignals()

        # Give this job a unique ID.
        self.job_id = str(uuid.uuid4())

        # The arguments for the worker
        self.args = args
        self.kwargs = kwargs

        self.signals.status.emit(self.job_id, STATUS_WAITING)

    @pyqtSlot()
    def run(self):
        """
        Initialize the runner function with passed args, kwargs.
        """

        self.signals.status.emit(self.job_id, STATUS_RUNNING)

        x, y = self.args

        try:

            value = random.randint(0, 100) * x
            delay = random.random() / 10
            result = []

            for n in range(100):
                # Generate some numbers.
                value = value / y
                y -= 1

                # The following will sometimes throw a division by zero error.
                result.append(value)

                # Pass out the current progress.
                self.signals.progress.emit(self.job_id, n + 1)
                time.sleep(delay)

        except Exception as e:
            print(e)
            # We swallow the error and continue.
            self.signals.error.emit(self.job_id, str(e))
            self.signals.status.emit(self.job_id, STATUS_ERROR)

        else:
            self.signals.result.emit(self.job_id, result)
            self.signals.status.emit(self.job_id, STATUS_COMPLETE)

        self.signals.finished.emit(self.job_id)

In addition to the progress signals we’ve seen before, we also have a status signal which emits one of the following statuses. Exceptions are caught and both the exception text and the error state are emitted using error and status.

 

STATUS_WAITING = "waiting"
STATUS_RUNNING = "running"
STATUS_ERROR = "error"
STATUS_COMPLETE = "complete"


STATUS_COLORS = {
    STATUS_RUNNING: "#33a02c",
    STATUS_ERROR: "#e31a1c",
    STATUS_COMPLETE: "#b2df8a",
}

DEFAULT_STATE = {"progress": 0, "status": STATUS_WAITING}

 

Each of the active statuses have assigned colors which will be used in drawing on the progress bar.

 

 

Custom row display

We’re using a QListView for the progress bar display. Normally a list view displays a simple text value for each row. To modify this we use a QItemDelegate which allows us to paint a custom widget for each row.

class ProgressBarDelegate(QStyledItemDelegate):
    def paint(self, painter, option, index):
        # data is our status dict, containing progress, id, status
        job_id, data = index.model().data(
            index, Qt.ItemDataRole.DisplayRole
        )
        if data["progress"] > 0:
            color = QColor(STATUS_COLORS[data["status"]])

            brush = QBrush()
            brush.setColor(color)
            brush.setStyle(Qt.BrushStyle.SolidPattern)

            width = option.rect.width() * data["progress"] / 100

            rect = QRect(
                option.rect
            )  # Copy of the rect, so we can modify.
            rect.setWidth(width)

            painter.fillRect(rect, brush)

        pen = QPen()
        pen.setColor(Qt.GlobalColor.black)
        painter.drawText(
            option.rect, Qt.AlignmentFlag.AlignLeft, job_id
        )

We get the data for the current row from the model using index.model().data(index,  Qt.ItemDataRole.DisplayRole).  This  is  calling  th.data() method on our custom model (manager) passing in the index and role. In our .data() method we are returning two bits of data job_id and the state dictionary, containing progress and status keys. 

For active jobs (progress > 0) status is used to select a color for the bar. This is drawn as a rectangle of the item row size option.rect(), with the width adjusted by the % completion. Finally, we write the job_id text over the top of this.

 

Sarting a job

With everything in place, we can now  enqueue  jobs by calling .self.worker.enqueue() passing in arguments to the worker.

    def start_worker(self):
        x = random.randint(0, 1000)
        y = random.randint(0, 1000)

        w = Worker(x, y)
        w.signals.result.connect(self.display_result)
        w.signals.error.connect(self.display_result)

        self.workers.enqueue(w)

The .enqueue() method accepts a constructed worker and attaches the internal signals to it to track progress. However, we can still attach any other external signals that we want.

Also, while this example has only a single worker class, you can use this same manager with any other QRunnable-derived classes, as long as they have the same signals available. This means you can use a single worker manager to manage all the workers in your app.

Take a look at the full code in the source files with this book and experiment modifying the manager to your needs — for example, try adding kill & pause functionality, generic function runners.

 

 

Stopping jobs

We can start jobs, and some of them die due to errors. But what if we want to stop jobs that are taking too long? The QListView allows us to select rows and through the selected row we can kill a specific worker. The method below is linked to a button, and looks up the worker from the current selected item in the list.

 

 

concurrent/qrunnable_manager_stop.py

17.16 qrunnable_manager_stop.py
0.01MB

 

 

    def stop_worker(self):
        selected = self.progress.selectedIndexes()
        for idx in selected:
            job_id, _ = self.workers.data(
                idx, Qt.ItemDataRole.DisplayRole
            )
            self.workers.kill(job_id)

In addition to this we need to modify the delegate to draw the currently selected item and update the worker and manager to pass through the kill signal. Take a look at the full source for this example to see how it all fits together.

'PyQt5_' 카테고리의 다른 글

Running external commands & processes  (0) 2023.03.13
Long-running threads  (0) 2023.03.13
Using the thread pool  (0) 2023.03.13
Using Custom Widgets in Qt Designer  (0) 2023.03.13
Creating Custom Widgets  (0) 2023.03.13

댓글