본문 바로가기
PyQt5_

Using Custom Widgets in QtDesigner

by 자동매매 2023. 3. 16.

23. Using Custom Widgets in Qt

Designer

In the previous chapter we built a custom PowerBar widget. The resulting widget can be used as-is in your own applications by importing and adding to layouts, just as for any built-in widget. But what if you’re building your application UI using Qt Designer? Can you add custom widgets there too?

The answer is — yes!

In this short chapter we’ll step through the process for adding custom widgets to your own Qt Designer applications. The process can be a little confusing, but if you follow the steps below you’ll be able to use any of your custom widgets in UIs you create in Designer.

ë

You can use the same approach for adding custom widgets from

other libraries, such as PyQtGraph or matplotlib.

Background

The first thing to understand is that you can’t load and display your custom widgets in Qt Designer. The widgets available in Designer are built-in and it has no way to interpret your Python code to discover what you’ve created.

Instead, to insert your widgets into the UI you add placeholder widgets and then tell Designer that you want to replace the placeholder with your custom widget in the application when it is run.

Inside Qt Designer you will see the placeholder. You can change the same parameters as you would on any widget of the same type and these will be passed to your custom widget. When you load the UI in your Python application PyQt6 will substitute your custom widget where it belongs.

In Qt this process of replacing placeholder widgets is known as promoting. The

built-in widget is promoted into your custom widget.

Writing Promotable Custom Widgets

Promoting widgets allows you to switch a placeholder widget used in Qt Designer with your own custom widget. When implementing your custom widget, you must subclass from another existing PyQt6 widget — even if that is the base QWidget. You must also ensure the your custom widget implements the default constructor of the widget you subclass. In most cases, that just means accepting parent as a first argument to your init method.

ë

If your custom widget throws an error, check the parameters

that PyQt6 is trying to pass it in the compiled UI file.

To promote to a custom widget, the custom widget must be in a separate file from where the compiled UI will be imported. However, you can define multiple custom widgets in the same file if you wish.

ë

This restriction is to avoid circular imports — if your application

file imports the compiled UI file and this in turn imports your

application file, this will not work.

Once you have your custom widget defined in a file, take a note of the file name and the class name. You will need these to promote the widget in Qt Designer.

Creating & Promoting Widgets in Designer

Choose where you want your custom widget to appear in your UI and add the placeholder widget. There is no rule here, but generally if your custom widget inherits from another Qt widget, use that widget as the placeholder. For example, if you’ve created a custom widget based on QLabel use Label as your placeholder. This allows you to access the label’s standard properties within Designer to customize your custom widget.

Figure 190. Simple UI layout, with a placeholder Widget on the left hand side.

j

You won’t be able alter any custom widget properties in

Designer — Qt Designer doesn’t know anything about your

custom widget or how it works. Do this in your code!

Once you’ve added the widgets you can promote them. Select the widgets you want to promote, right click and chose Promote to ...

Figure 191. Promoting widgets via the right click menu.

At the bottom of the dialog you can add a New Promoted Class. Enter the class name — the name of your custom widget’s Python class, e.g. PowerBar — and the Python file containing the class as the header file , omitting the .py suffix.

j

Qt will auto-suggest the filename based on the class name, but

will append a .h (the C++ standard suffix for header files ). You

must remove the .h even if the filename is correct.

If your custom widget is defined in a class in a sub-folder, provided the full Python dot-notation to the file, the same way you would for other imports. For example, perhaps you placed the file under ui/widgets/powerbar.py then enter ui.widgets.powerbar as the header file.

Figure 192. Adding the class name and header file.

Click "Add" to define the promotion. You can then select the promotion in the list at the top and click Promote to actually promote your widgets.

Figure 193. Selecting the promotion and applying it to your widgets.

The widgets will be promoted, and show their new class name (here PowerBar).

Figure 194. Promoted widgets showing in the UI hierarchy.

Save the UI file and compile it using the pyuic tool as before.

pyuic6 mainwindow.ui -o MainWindow.py

If you open the generated file, you’ll see custom PowerBar class is now used to construct a widget in the setupUi method and a new import has been added at the bottom of the file.

class Ui_MainWindow (object):

def setupUi (self, MainWindow):

# etc...

self.widget = PowerBar(self.centralwidget)

# etc...

def retranslateUi (self, MainWindow):

_translate = QtCore.QCoreApplication.translate

MainWindow.setWindowTitle(_translate("MainWindow",

"MainWindow"))

self.label.setText(_translate("MainWindow", "Some custom

widgets here next to the PowerBar (left)."))

self.pushButton.setText(_translate("MainWindow", "A button"))

from powerbar import PowerBar

You can use the compiled UI file as normal. You don’t need to import your custom widget into your application since this is handled in the compiled UI file.

Listing 169. custom-widgets/promote_test.py

import random

import sys

from PyQt6.QtCore import Qt

from PyQt6.QtWidgets import QApplication, QMainWindow

from MainWindow import Ui_MainWindow

class MainWindow (QMainWindow, Ui_MainWindow):

def __init__ (self):

super().__init__()

self.setupUi(self)

self.show()

app = QApplication(sys.argv)

w = MainWindow()

app. exec ()

When you run the app your custom widgets will be loaded and automatically appear in the right place.

Figure 195. PowerBar custom widget showing in the app.

ë

Most errors you see will be due to imports. The first step should

always be to check the import at the bottom of the compiled UI

file, to see if it makes sense. Is the target file reachable?

Third-party widgets

You can use this same technique to add other third-party widgets to your applications too. The process is exactly the same, you just need to refer to the widget by the fully-qualified Python import path and use the appropriate class names. Below, are some example configurations for common third-party widgets.

ë We’ll be covering how to use these libraries in a later chapter!

PyQtGraph

Use PlotWidget as the promoted class name and pyqtgraph as the header file in Qt Designer. Use QWidget as the placeholder widget. The PyQtGraph plot widget will work as-is in the generated UI file.

See the custom-widgets/pyqtgraph_demo.py file in the source code downloads for this book for a working demo.

Figure 196. PyQtGraph plot widget added via widget promotion.

Matplotlib

The matplotlib custom widget FigureCanvasQTAgg cannot be used directly in Qt Designer because the constructor doesn’t accept parent as the first parameter, expecting a Figure object instead.

We can work around this by adding a simple wrapper class, defined below.

Listing 170. custom-widgets/mpl.py

from matplotlib.backends.backend_qtagg import FigureCanvasQTAgg

from matplotlib.figure import Figure

class MplCanvas (FigureCanvasQTAgg):

def __init__ (self, parent=None, width= 5 , height= 4 , dpi= 100 ):

fig = Figure(figsize=(width, height), dpi=dpi)

self.axes = fig.add_subplot( 111 )

super().__init__(fig)

Add this file to your project named mpl.py and then use MplCanvas as the promoted class name and mpl as the header file in Qt Designer. Use QWidget as the placeholder widget.

See the custom-widgets/matplotlib_demo.py file in the source code downloads for this book for a working demo.

Figure 197. matplotlib plot widget added via widget promotion.

Using these techniques you should be able to use any custom widgets you come across in your PyQt6 applications.

One of the most powerful tools you can exploit when building user interfaces is familiarity. That is, giving your users the sense that your interface is something they have used before. Familiar interfaces are often described as being intuitive. There is nothing naturally intuitive about moving a mouse pointer around a screen and clicking on square-ish bumps. But, after spending years doing exactly that, there is something very familiar about it.

Search for familiarity in user interfaces led to skeuomorphism. Skeuomorphism is the application of non-functional design cues from objects, where those design elements are functional. That can mean using common interface elements, or making interfaces which look like real objects. While in recent years GUI trends have moved back to abstract "flat" designs, all modern user-interfaces retain skeuomorphic touches.

The modern desktop calculator is a good example. When we perform calculations we put the result at the bottom. So why is the screen on the top of a calculator? Because otherwise it would be obscured by your hand. The screen position is functional.

For calculators on computers, this

position is retained even though it is

non-functional — the mouse pointer will

not obscure the screen and input is often

via the keyboard. But if you opened up a

calculator and it had the screen at the

bottom you would be confused. It looks

upside down. It’s weird or unintuitive

despite being perfectly usable. This is

the essence of skeuomorphism —

making user interfaces feel more

intuitive by exploiting the familiarity of

users with existing objects.

Where your own software sits on this scale is up to you. The important thing is to be aware of existing interfaces and to exploit them where possible to improve usability of your own apps. Your users will thank you for it!

DO Takeinspirationfromexistinginterfaceswhendesigningyourown.

DO Includeskeuomorphicelementswherethey help yourusers.

Familiarity & Skeuomorphism

RealPhone—OneofIBM’sRealThings™

Calculator&upsidedowncalculator(Windows10)

The calculator

Threading is a good option when you need to perform complex calculations. If you’re using the Python numpy , scipy or pandas libraries then these calculations may also release the Python Global Interpreter Lock (GIL) meaning both your GUI and calculation thread can run at full speed.

In this example we’ll create a number of workers which are all performing some simple calculations. The results of these calculations will be returned to the GUI thread and displayed in a plot.

Z

We cover PyQtGraph in detail in Plotting with PyQtGraph, for

now just focus on the QRunnable.

Listing 185. concurrent/qrunnable_calculator.py

import random

import sys

import time

import uuid

from PyQt6.QtCore import (

QObject,

QRunnable,

QThreadPool,

QTimer,

pyqtSignal,

pyqtSlot,

)

from PyQt6.QtWidgets import (

QApplication,

QMainWindow,

QPushButton,

QVBoxLayout,

QWidget,

)

import pyqtgraph as pg

class WorkerSignals (QObject): """ Defines the signals available from a running worker thread.

data tuple data point (worker_id, x, y) """

data = pyqtSignal(tuple) ①

class Worker (QRunnable): """ Worker thread

Inherits from QRunnable to handle worker thread setup, signals and wrap-up. """

def init (self): super().init() self.worker_id = uuid.uuid4().hex # Unique ID for this worker. self.signals = WorkerSignals()

@pyqtSlot() def run (self):

total_n = 1000 y2 = random.randint( 0 , 10 ) delay = random.random() / 100 # Random delay value. value = 0

for n in range(total_n):

Dummy calculation, each worker will produce different

values,

because of the random y & y2 values.

y = random.randint( 0 , 10 ) value += n * y2 - n * y

self.signals.data.emit((self.worker_id, n, value)) ② time.sleep(delay)

class MainWindow (QMainWindow): def init (self): super().init()

self.threadpool = QThreadPool()

self.x = {} # Keep timepoints. self.y = {} # Keep data. self.lines = {} # Keep references to plotted lines, to update.

layout = QVBoxLayout() self.graphWidget = pg.PlotWidget() self.graphWidget.setBackground("w") layout.addWidget(self.graphWidget)

button = QPushButton("Create New Worker") button.pressed.connect(self.execute)

layout.addWidget(self.progress)

layout.addWidget(button)

w = QWidget() w.setLayout(layout)

self.setCentralWidget(w)

self.show()

def execute (self): worker = Worker() worker.signals.data.connect(self.receive_data)

Execute

self.threadpool.start(worker)

def receive_data (self, data): worker_id, x, y = data ③

if worker_id not in self.lines: self.x[worker_id] = [x] self.y[worker_id] = [y]

# Generate a random color.

pen = pg.mkPen(

width= 2 ,

color=(

random.randint( 100 , 255 ),

random.randint( 100 , 255 ),

random.randint( 100 , 255 ),

),

)

self.lines[worker_id] = self.graphWidget.plot(

self.x[worker_id], self.y[worker_id], pen=pen

)

return

# Update existing plot/data

self.x[worker_id].append(x)

self.y[worker_id].append(y)

self.lines[worker_id].setData(

self.x[worker_id], self.y[worker_id]

)

app = QApplication(sys.argv)

window = MainWindow()

app. exec ()

①Setup a custom signal to pass out the data. Using tuple allows you to send out

any number of values wrapped in a tuple.

②Here we’re emitting a worker_id , x and y value.

③Receiver slot unpacks the data.

작업자로부터 데이터를 받으면 테이블 또는 모델 뷰에 추가할 수 있습니다. 여기서는 x  y 값을 worker_id 키가  지정된 dict 객체에  저장합니다. 이렇게 하면 각 작업자에 대한 데이터가 별도로 유지되고 개별적으로 플롯할 수 있습니다.

이 예제를 실행하면 버튼을 누르면 플롯에 선이 나타나고

점차적으로 확장하십시오. 버튼을 다시 누르면 다른 작업자가 시작되어 더 많은 데이터를 반환하고 플롯에 다른 선을 추가합니다. 각 작업자는 서로 다른 속도로 데이터를 생성하며 각각 100개의 값을 생성합니다.

그림 204 . 몇 번의 반복 후에 단일 주자의 출력을 플로팅합니다.

시스템에서 사용 가능한 최대 스레드까지 새 작업자를 시작할 수  있습니다. 100개의 값을 생성한 후 작업자가 종료되고 대기 중인 다음 작업자가 시작되어 값을 새 줄로 추가합니다.

그림 205 . 여러 주자에서 플롯합니다.

튜플은 물론 선택 사항이며, 러너가 하나만 있거나 출력을 소스와 연결할 필요가없는 경우 베어 문자열을 다시 보낼 수 있습니다. 신호를 설정하여 바이트럿 또는 다른 유형의 데이터를 보낼 수도 있습니다.

적당히.

Stopping a running QRunnable

QRunnable을 시작하면 기본적으로 중지 할 방법이 없습니다. 이것은 유용성의 관점에서 볼 때 그리 좋지 않습니다 - 사용자가 실수로 작업을 시작하면 앉아서 완료 될 때까지 기다려야합니다. 불행히도 주자를 일 방법은 없지만  중지하도록 멋지게 요청할 수 있습니다. 이 예제에서는 플래그를 사용하여  러너에게 중지해야 함을 나타내는 방법을 살펴보겠습니다.

컴퓨팅에서 플래그는 전류를 신호하는 데 사용되는 변수입니다.

또는 상태 변경. 선박이 깃발을 사용하여 통신하는 방법을 생각해보십시오.

서로.

그림 206. 리마, "즉시 배를 멈춰야 합니다."

아래 코드는 왼쪽에서 오른쪽으로 0.01초마다 증가하는 진행률 표시줄과 [ Stop ] 버튼이 있는 간단한 실행기를 구현합니다. [중지]를 클릭하면 작업자가 종료되고 진행률 표시 줄이 영구적으로 중지됩니다.

 

 

 

 

Listing 186. concurrent/qrunnable_stop.py

import sys

import time

from PyQt6.QtCore import (

QObject,

QRunnable,

Qt,

QThreadPool,

pyqtSignal,

pyqtSlot,

)

from PyQt6.QtWidgets import (

QApplication,

QHBoxLayout,

QMainWindow,

QProgressBar,

QPushButton, QWidget, )

class WorkerKilledException (Exception): pass

class WorkerSignals (QObject): progress = pyqtSignal(int)

class JobRunner (QRunnable):

signals = WorkerSignals()

def init (self): super().init()

self.is_killed = False ①

@pyqtSlot() def run (self): try : for n in range( 100 ): self.signals.progress.emit(n + 1 ) time.sleep( 0.1 )

if self.is_killed: ② raise WorkerKilledException

except WorkerKilledException: pass ③

def kill (self): ④ self.is_killed = True

class MainWindow (QMainWindow): def init (self): super().init()

# Some buttons

w = QWidget()

l = QHBoxLayout()

w.setLayout(l)

btn_stop = QPushButton("Stop")

l.addWidget(btn_stop)

self.setCentralWidget(w)

# Create a statusbar.

self.status = self.statusBar()

self.progress = QProgressBar()

self.status.addPermanentWidget(self.progress)

# Thread runner

self.threadpool = QThreadPool()

# Create a runner

self.runner = JobRunner()

self.runner.signals.progress.connect(self.update_progress)

self.threadpool.start(self.runner)

btn_stop.pressed.connect(self.runner.kill)

self.show()

def update_progress (self, n):

self.progress.setValue(n)

app = QApplication(sys.argv)

w = MainWindow()

app. exec ()

①The flag to indicate whether the runner should be killed is called .is_killed.

②On each loop we test to see whether .is_killed is True in which case we throw

an exception.

③Catch the exception, we could emit a finished or error signal here.

④.kill() convenience function so we can call worker.kill() to kill it.

If you want to stop the worker without throwing an error, you can simply return from the run method, e.g.

def run (self):

for n in range( 100 ):

self.signals.progress.emit(n + 1 )

time.sleep( 0 )

if self.is_killed:

return

In the above example we only have a single worker. However, in many applications you will have more. How do you handle stopping workers when you have multiple runners running?

If you want the stop to stop all workers, then nothing is changed. You can simply hook all the workers up to the same "Stop" signal, and when that signal is fired — e.g. by pressing a button — all the workers will stop simultaneously.

If you want to be able to stop individual workers you would either need to create a separate button somewhere in your UI for each runner, or implement a manager to keep track of workers and provide a nicer interface to kill them. Take a look at The Manager for a working example.

Pausing a runner

Pausing a runner is a rarer requirement — normally you want things to go as fast as possible. But sometimes you may want to put a worker to "sleep" so it temporarily stops reading from a data source. You can do this with a small modification to the approach used to stop the runner. The code to do this is shown below.

q

The paused runner still takes up a slot in the thread pool,

limiting the number of concurrent tasks that can be run. Use

carefully!

Listing 187. concurrent/qrunnable_pause.py

import sys

import time

from PyQt6.QtCore import (

QObject,

QRunnable,

Qt,

QThreadPool,

pyqtSignal,

pyqtSlot,

)

from PyQt6.QtWidgets import (

QApplication,

QHBoxLayout,

QMainWindow,

QProgressBar,

QPushButton,

QWidget,

)

class WorkerKilledException (Exception):

pass

class WorkerSignals (QObject):

progress = pyqtSignal(int)

class JobRunner (QRunnable):

signals = WorkerSignals()

def __init__ (self):

super().__init__()

self.is_paused = False self.is_killed = False

@pyqtSlot() def run (self): for n in range( 100 ): self.signals.progress.emit(n + 1 ) time.sleep( 0.1 )

while self.is_paused: time.sleep( 0 ) ①

if self.is_killed: raise WorkerKilledException

def pause (self): self.is_paused = True

def resume (self): self.is_paused = False

def kill (self): self.is_killed = True

class MainWindow (QMainWindow): def init (self): super().init()

Some buttons

w = QWidget() l = QHBoxLayout() w.setLayout(l)

btn_stop = QPushButton("Stop") btn_pause = QPushButton("Pause") btn_resume = QPushButton("Resume")

l.addWidget(btn_stop) l.addWidget(btn_pause) l.addWidget(btn_resume)

self.setCentralWidget(w)

# Create a statusbar.

self.status = self.statusBar()

self.progress = QProgressBar()

self.status.addPermanentWidget(self.progress)

# Thread runner

self.threadpool = QThreadPool()

# Create a runner

self.runner = JobRunner()

self.runner.signals.progress.connect(self.update_progress)

self.threadpool.start(self.runner)

btn_stop.pressed.connect(self.runner.kill)

btn_pause.pressed.connect(self.runner.pause)

btn_resume.pressed.connect(self.runner.resume)

self.show()

def update_progress (self, n):

self.progress.setValue(n)

app = QApplication(sys.argv)

w = MainWindow()

app. exec ()

①You can put a higher value that 0 in the sleep call if you don’t want to check if

it’s time to wake up very often.

If you run this example you’ll see a progress bar moving from left to right. If you click [ Pause ] the worker will pause. If you then click [ Resume ] the worker will continue from where it started. If you click [ Stop ] the worker will stop, permanently, as before.

Rather than throw an exception when receiving the is_paused signal, we enter a pause loop. This stops execution of the worker, but does not exit the run method or terminate the worker.

By using while self.is_paused: for this loop, we will exit the loop as soon as the worker is unpaused, and resume what we were doing before.

j

You must include the time.sleep() call. This zero-second pause

allows for Python to release the GIL, so this loop will not block

other execution. Without that sleep you have a busy loop which

will waste resources while doing nothing. Increase the sleep

value if you want to check less often.

The Communicator

스레드를 실행할 때 자주 발생하는 동안 발생하는 일에서 출력을 얻을 수 있기를 원합니다.

이 예제에서는 별도의 스레드에서 원격 서버에 대한 요청을 수행하고 출력을 로거에 덤프하는 실행기를 만듭니다. 또한 사용자 지정 파서를 러너에 전달하여 요청에서 관심 있는 추가 데이터에 전달하는 방법도 살펴보겠습니다.

스레드가 아닌 외부 프로세스의 데이터를 기록하려면 외부 프로세스 실행 및 외부 명령 및 프로세스 실행을 살펴보십시오.

 

 

Dumping data

In this first example we’ll just dump the raw data (HTML) from each request to the output, using a custom signal.

Listing 188. concurrent/qrunnable_io.py

import sys

import requests

from PyQt6.QtCore import (

QObject,

QRunnable,

QThreadPool,

QTimer,

pyqtSignal,

pyqtSlot,

)

from PyQt6.QtWidgets import (

QApplication,

QLabel,

QMainWindow,

QPlainTextEdit,

QPushButton,

QVBoxLayout,

QWidget, )

class WorkerSignals (QObject): """ Defines the signals available from a running worker thread.

data tuple of (identifier, data) """

data = pyqtSignal(tuple)

class Worker (QRunnable): """ Worker thread

Inherits from QRunnable to handle worker thread setup, signals and wrap-up.

:param id: The id for this worker :param url: The url to retrieve """

def init (self, id, url): super().init() self.id = id self.url = url

self.signals = WorkerSignals()

@pyqtSlot() def run (self): r = requests.get(self.url)

for line in r.text.splitlines(): self.signals.data.emit((self.id, line))

class MainWindow (QMainWindow): def init (self):

super().init()

self.urls = [ "https://www.pythonguis.com/", "https://www.mfitzp.com/", "https://www.google.com", "https://academy.pythonguis.com/", ]

layout = QVBoxLayout()

self.text = QPlainTextEdit() self.text.setReadOnly(True)

button = QPushButton("GO GET EM!") button.pressed.connect(self.execute)

layout.addWidget(self.text) layout.addWidget(button)

w = QWidget() w.setLayout(layout)

self.setCentralWidget(w)

self.show()

self.threadpool = QThreadPool() print ( "Multithreading with maximum %d threads" % self.threadpool.maxThreadCount() )

def execute (self): for n, url in enumerate(self.urls): worker = Worker(n, url) worker.signals.data.connect(self.display_output)

Execute

self.threadpool.start(worker)

def display_output (self, data): id, s = data

self.text.appendPlainText("WORKER %d: %s" % (id, s))

app = QApplication(sys.argv)

window = MainWindow()

app. exec ()

If you run this example and press the button you’ll see the HTML output from a number of websites, prepended by the worker ID that retrieve them. Note that output from different workers is interleaved.

Figure 207. Logging output from multiple workers to the main window.

The tuple is of course optional, you could send back bare strings if you have only one runner, or don’t need to associated outputs with a source. It is also possible to send a bytestring, or any other type of data, by setting up the signals appropriately.

Parsing data

Often you are not interested in the raw data from the thread (whether from a server or other external device) and instead want to process the data in some way first. In this example we create custom parsers, which can extract specific data from pages requested. We can create multiple workers, each receiving a different list of sites and parsers.

Listing 189. concurrent/qrunnable_io_parser.py

self.parsers = { ①

# Regular expression parsers, to extract data from the

HTML.

"title": re.compile(

r"<title.*?>(.*?)<\/title>", re.M | re.S</title.*?>

),

"h1": re.compile(r"<h1.*?>(.*?)<\/h1>", re.M | re.S),</h1.*?>

"h2": re.compile(r"<h2.*?>(.*?)<\/h2>", re.M | re.S),</h2.*?>

}

①The parsers are defined as a series of compiled regular expressions. But you

can define parsers however you like.

Listing 190. concurrent/qrunnable_io_parser.py

def execute (self):

for n, url in enumerate(self.urls):

worker = Worker(n, url, self.parsers) ①

worker.signals.data.connect(self.display_output)

# Execute

self.threadpool.start(worker)

①Pass the list of parsers to each worker.

Listing 191. concurrent/qrunnable_io_parser.py

class Worker (QRunnable):

"""

Worker thread

Inherits from QRunnable to handle worker thread setup, signals

and wrap-up.

:param id: The id for this worker

:param url: The url to retrieve

"""

def __init__ (self, id, url, parsers):

super().__init__()

self.id = id

self.url = url

self.parsers = parsers

self.signals = WorkerSignals()

@pyqtSlot()

def run (self):

r = requests.get(self.url)

data = {}

for name, parser in self.parsers.items(): ①

m = parser.search(r.text)

if m: ②

data[name] = m.group( 1 ).strip()

self.signals.data.emit((self.id, data))

①Iterate the parser list we passed to the worker. Run each parser on the data

for this page.

②If the regular expression matched, add the data to our data dictionary.

Running this, you’ll see the output from each worker, with the H1, H2 and TITLE tags extracted.

Figure 208. Displaying parsed output from multiple workers.

ë

If you are building tools to extract data from websites, take a

look at BeautifulSoup 4 which is far more robust than using

regular expressions.

The Generic

You won’t always know ahead of time what you want to use workers for. Or you may have a number of similar functions to perform and want a regular API for running them. In that case you can take advantage of the fact that in Python functions are objects to build a generic runner which accepts not just arguments, but also the function to run.

In the following example we create a single Worker class and then use it to run a number of different functions. With this setup you can pass in any Python function and have it executed in a separate thread.

The complete working example is given below, showcasing the custom QRunnable worker together with the worker & progress signals. You should be able to adapt this code to any application you develop.

Listing 192. concurrent/qrunnable_generic.py

import sys

import time

import traceback

from PyQt6.QtCore import (

QObject,

QRunnable,

QThreadPool,

QTimer,

pyqtSignal,

pyqtSlot,

)

from PyQt6.QtWidgets import (

QApplication,

QLabel,

QMainWindow,

QPushButton,

QVBoxLayout,

QWidget,

)

def execute_this_fn (): for _ in range( 0 , 5 ): time.sleep( 1 )

return "Done."

class WorkerSignals (QObject): """ Defines the signals available from a running worker thread.

Supported signals are:

finished No data

error tuple (exctype, value, traceback.format_exc() )

result object data returned from processing, anything

"""

finished = pyqtSignal() error = pyqtSignal(tuple) result = pyqtSignal(object)

class Worker (QRunnable): """ Worker thread

Inherits from QRunnable to handle worker thread setup, signals and wrap-up.

:param callback: The function callback to run on this worker :thread. Supplied args and kwargs will be passed through to the runner. :type callback: function :param args: Arguments to pass to the callback function :param kwargs: Keywords to pass to the callback function

:

"""

def init (self, fn, *args, **kwargs): super().init()

Store constructor arguments (re-used for processing)

self.fn = fn self.args = args self.kwargs = kwargs self.signals = WorkerSignals()

@pyqtSlot() def run (self): """ Initialize the runner function with passed args, kwargs. """

Retrieve args/kwargs here; and fire processing using them

try : result = self.fn(*self.args, **self.kwargs) except : traceback.print_exc() exctype, value = sys.exc_info()[: 2 ] self.signals.error.emit( (exctype, value, traceback.format_exc()) ) else : self.signals.result.emit( result ) # Return the result of the processing finally : self.signals.finished.emit() # Done

class MainWindow (QMainWindow): def init (self): super().init()

self.counter = 0

layout = QVBoxLayout()

self.l = QLabel("Start")

b = QPushButton("DANGER!") b.pressed.connect(self.oh_no)

layout.addWidget(self.l) layout.addWidget(b)

w = QWidget() w.setLayout(layout)

self.setCentralWidget(w)

self.show()

self.threadpool = QThreadPool() print ( "Multithreading with maximum %d threads" % self.threadpool.maxThreadCount() )

self.timer = QTimer() self.timer.setInterval( 1000 ) self.timer.timeout.connect(self.recurring_timer) self.timer.start()

def print_output (self, s): print (s)

def thread_complete (self): print ("THREAD COMPLETE!")

def oh_no (self):

Pass the function to execute

worker = Worker( execute_this_fn ) # Any other args, kwargs are passed to the run function worker.signals.result.connect(self.print_output) worker.signals.finished.connect(self.thread_complete)

Execute

self.threadpool.start(worker)

def recurring_timer (self): self.counter += 1

self.l.setText("Counter: %d" % self.counter)

app = QApplication(sys.argv)

window = MainWindow()

app. exec ()

The generic function approach adds a limitation that may not be immediately obvious — the run function does not have access to the self object of your runner, and therefore cannot access the signals to emit the data itself. We can only emit the return value of the function, once it has ended. While you can return a compound type, such as a tuple, to return multiple values, you can’t get progress signals or in-progress data.

However, there is a way around this. Since you can pass anything you want into the custom function, you can also pass self or the self.signals object to make them available to you.

Listing 193. concurrent/qrunnable_generic_callback.py

import sys

import time

import traceback

from PyQt6.QtCore import (

QObject,

QRunnable,

QThreadPool,

QTimer,

pyqtSignal,

pyqtSlot,

)

from PyQt6.QtWidgets import (

QApplication,

QLabel,

QMainWindow,

QPushButton,

QVBoxLayout,

QWidget,

)

def execute_this_fn (signals): for n in range( 0 , 5 ): time.sleep( 1 ) signals.progress.emit(n * 100 / 4 )

return "Done."

class WorkerSignals (QObject): """ Defines the signals available from a running worker thread.

Supported signals are:

finished No data

error tuple (exctype, value, traceback.format_exc() )

result object data returned from processing, anything

progress int indicating % progress

"""

finished = pyqtSignal() error = pyqtSignal(tuple) result = pyqtSignal(object) progress = pyqtSignal(int)

class Worker (QRunnable): """ Worker thread

Inherits from QRunnable to handle worker thread setup, signals and wrap-up.

:param callback: The function callback to run on this worker :thread. Supplied args and kwargs will be passed through to the runner. :type callback: function :param args: Arguments to pass to the callback function :param kwargs: Keywords to pass to the callback function : """

def init (self, fn, *args, **kwargs): super().init()

Store constructor arguments (re-used for processing)

self.fn = fn self.args = args self.kwargs = kwargs self.signals = WorkerSignals()

Add the callback to our kwargs

kwargs["signals"] = self.signals

@pyqtSlot() def run (self): """ Initialize the runner function with passed args, kwargs. """

Retrieve args/kwargs here; and fire processing using them

try : result = self.fn(*self.args, **self.kwargs) except Exception: traceback.print_exc() exctype, value = sys.exc_info()[: 2 ] self.signals.error.emit( (exctype, value, traceback.format_exc()) ) else : self.signals.result.emit( result ) # Return the result of the processing finally : self.signals.finished.emit() # Done

class MainWindow (QMainWindow): def init (self): super().init()

self.counter = 0

layout = QVBoxLayout()

self.l = QLabel("Start") b = QPushButton("DANGER!") b.pressed.connect(self.oh_no)

layout.addWidget(self.l) layout.addWidget(b)

w = QWidget() w.setLayout(layout)

self.setCentralWidget(w)

self.show()

self.threadpool = QThreadPool() print ( "Multithreading with maximum %d threads" % self.threadpool.maxThreadCount() )

self.timer = QTimer() self.timer.setInterval( 1000 ) self.timer.timeout.connect(self.recurring_timer) self.timer.start()

def progress_fn (self, n): print ("%d%% done" % n)

def print_output (self, s): print (s)

def thread_complete (self): print ("THREAD COMPLETE!")

def oh_no (self):

# Pass the function to execute

worker = Worker(

execute_this_fn

) # Any other args, kwargs are passed to the run function

worker.signals.result.connect(self.print_output)

worker.signals.finished.connect(self.thread_complete)

worker.signals.progress.connect(self.progress_fn)

# Execute

self.threadpool.start(worker)

def recurring_timer (self):

self.counter += 1

self.l.setText("Counter: %d" % self.counter)

app = QApplication(sys.argv)

window = MainWindow()

app. exec ()

Note that for this to work, your custom function must be able to accept the additional argument. You can do this by defining the functions with **kwargs to silently swallow the extra arguments if they aren’t used.

def execute_this_fn (**kwargs): ①

for _ in range( 0 , 5 ):

time.sleep( 1 )

return "Done."

①The signals keyword argument is swallowed up by **kwargs.

Running External processes

So far we’ve looked how we can run Python code in another thread. Sometimes however you need to run external programs — such as command line programs — in another process.

You actually have two options when starting external processes with PyQt6. You can either use Python’s built-in subprocess module to start the processes, or you can use Qt’s QProcess.

ë

For more information on running external processes using

QProcess take a look at the Running external commands &

processes chapter.

Starting a new process always comes with a small execution cost and will block your GUI momentarily. This is usually not perceptible but it can add up depending on your use case and may have performance impacts. You can get around this by starting your processes in another thread.

If you want to communicate with the process in real-time you will need a separate thread to avoid blocking the GUI. QProcess handles this separate thread for you internally, but with Python subprocess you will need to do this yourself.

In this QRunnable example we use instances of workers to handle starting external processes through Python subprocess. This keeps the startup cost of the process out of the GUI thread and also allows us to interact with the processes directly through Python.

Listing 194. concurrent/qrunnable_process.py

import subprocess

import sys

from PyQt6.QtCore import (

QObject,

QRunnable,

QThreadPool, pyqtSignal, pyqtSlot, ) from PyQt6.QtWidgets import ( QApplication, QMainWindow, QPlainTextEdit, QPushButton, QVBoxLayout, QWidget, )

class WorkerSignals (QObject): """ Defines the signals available from a running worker thread.

Supported signals are:

finished: No data result: str """

result = pyqtSignal( str ) # Send back the output from the process as a string. finished = pyqtSignal()

class SubProcessWorker (QRunnable): """ ProcessWorker worker thread

Inherits from QRunnable to handle worker thread setup, signals and wrap-up.

:param command: command to execute with subprocess.

"""

def init (self, command): super().init()

Store constructor arguments (re-used for processing).

self.signals = WorkerSignals()

The command to be executed.

self.command = command

@pyqtSlot() def run (self): """ Execute the command, returning the result. """ output = subprocess.getoutput(self.command) self.signals.result.emit(output) self.signals.finished.emit()

class MainWindow (QMainWindow): def init (self): super().init()

Some buttons

layout = QVBoxLayout()

self.text = QPlainTextEdit() layout.addWidget(self.text)

btn_run = QPushButton("Execute") btn_run.clicked.connect(self.start)

layout.addWidget(btn_run)

w = QWidget() w.setLayout(layout) self.setCentralWidget(w)

Thread runner

self.threadpool = QThreadPool()

self.show()

def start (self):

Create a runner

self.runner = SubProcessWorker("python dummy_script.py")

self.runner.signals.result.connect(self.result)

self.threadpool.start(self.runner)

def result (self, s):

self.text.appendPlainText(s)

app = QApplication(sys.argv)

w = MainWindow()

app. exec ()

ë

The "external program" in this example is a simple Python script

python dummy_script.py. However can replace this with any

other program you like.

Running processes have two streams of output — standard out and standard error. The standard output returns the actual result of the execution (if any) while standard error returns any error or logging information.

In this example we’re running the external script using subprocess.getoutput. This runs the external program, waiting for it to complete before returning. Once it has completed, getoutput returns both the standard output and standard error together as a single string.

Parsing the result

You don’t have to pass the output as-is. If you have post-processing to do on the output from the command, it can make sense to handle this in your worker thread as well, to keep it self-contained. The worker can then return the data to your GUI thread in a structured format, ready to be used.

In the following example, we pass in a function to post-process the result of the demo script to extract the values of interest into a dictionary. This data is used to update widgets on the GUI side.

Listing 195. concurrent/qrunnable_process_result.py

import subprocess import sys from collections import namedtuple

from PyQt6.QtCore import ( QObject, QRunnable, QThreadPool, pyqtSignal, pyqtSlot, ) from PyQt6.QtWidgets import ( QApplication, QLineEdit, QMainWindow, QPushButton, QSpinBox, QVBoxLayout, QWidget, )

def extract_vars (l): """ Extracts variables from lines, looking for lines containing an equals, and splitting into key=value. """ data = {} for s in l.splitlines(): if "=" in s: name, value = s.split("=") data[name] = value

data["number_of_lines"] = len(l) return data

class WorkerSignals (QObject): """ Defines the signals available from a running worker thread.

Supported signals are:

finished: No data result: dict """

result = pyqtSignal(dict) # Send back the output as dictionary. finished = pyqtSignal()

class SubProcessWorker (QRunnable): """ ProcessWorker worker thread

Inherits from QRunnable to handle worker thread setup, signals and wrap-up.

:param command: command to execute with subprocess.

"""

def init (self, command, process_result=None): super().init()

Store constructor arguments (re-used for processing).

self.signals = WorkerSignals()

The command to be executed.

self.command = command

The post-processing fn.

self.process_result = process_result

@pyqtSlot() def run (self): """ Execute the command, returning the result. """ output = subprocess.getoutput(self.command)

if self.process_result: output = self.process_result(output)

self.signals.result.emit(output)

self.signals.finished.emit()

class MainWindow (QMainWindow): def init (self): super().init()

Some buttons

layout = QVBoxLayout()

self.name = QLineEdit() layout.addWidget(self.name)

self.country = QLineEdit() layout.addWidget(self.country)

self.website = QLineEdit() layout.addWidget(self.website)

self.number_of_lines = QSpinBox() layout.addWidget(self.number_of_lines)

btn_run = QPushButton("Execute") btn_run.clicked.connect(self.start)

layout.addWidget(btn_run)

w = QWidget() w.setLayout(layout) self.setCentralWidget(w)

Thread runner

self.threadpool = QThreadPool()

self.show()

def start (self):

Create a runner

self.runner = SubProcessWorker( "python dummy_script.py", process_result=extract_vars ) self.runner.signals.result.connect(self.result) self.threadpool.start(self.runner)

def result (self, data):

print (data)

self.name.setText(data["name"])

self.country.setText(data["country"])

self.website.setText(data["website"])

self.number_of_lines.setValue(data["number_of_lines"])

app = QApplication(sys.argv)

w = MainWindow()

app. exec ()

The simple parser in this case looks for any lines with a = in them, splits on this to produce a name and a value, which are then stored in a dict. However, you can use any tools you like to extract data from the string output.

Because getoutput blocks until the program is complete, we cannot see how the program is running — for example, to get progress information. In the next example we’ll show how to get live output from a running process.

Tracking progress

Often external programs will output progress information to the console. You might want to capture this and either show it to your users, or use it to generate a progress bar.

For the result of the execution you usually want to capture standard out, for the progress to capture standard error. In this following example we capture both. As well as the command, we pass a custom parser function to the worker, to capture the current worker progress and emit it as a number 0-99.

This example is quite complex. The full source code is available in the source code with the book, but here we’ll cover the key differences to the simpler one.

Listing 196. concurrent/qrunnable_process_parser.py

@pyqtSlot()

def run (self):

"""

Initialize the runner function with passed args, kwargs.

"""

result = []

with subprocess.Popen( ①

self.command,

bufsize= 1 ,

stdout=subprocess.PIPE,

stderr=subprocess.STDOUT, ②

universal_newlines=True,

) as proc:

while proc.poll() is None:

data = proc.stdout.readline() ③

result.append(data)

if self.parser: ④

value = self.parser(data)

if value:

self.signals.progress.emit(value)

output = "".join(result)

self.signals.result.emit(output)

①Run using Popen to give us access to output streams.

②We pipe standard error out together with standard output.

③Read a line from the process (or wait for one).

④Pass all collected data so far to the parser.

Parsing is handled by this simple parser function, which takes in a string and matches the regular expression Total complete: (\d+)%.

Listing 197. concurrent/qrunnable_process_parser.py

progress_re = re.compile("Total complete: (\d+)%")

def simple_percent_parser (output):

"""

Matches lines using the progress_re regex,

returning a single integer for the % progress.

"""

m = progress_re.search(output)

if m:

pc_complete = m.group( 1 )

return int(pc_complete)

The parser is passed into the runner along with the command — this means we can use a generic runner for all subprocesses and handle the output differently for different commands.

Listing 198. concurrent/qrunnable_process_parser.py

def start (self):

# Create a runner

self.runner = SubProcessWorker(

command="python dummy_script.py",

parser=simple_percent_parser,

)

self.runner.signals.result.connect(self.result)

self.runner.signals.progress.connect(self.progress.setValue)

self.threadpool.start(self.runner)

In this simple example we only pass the latest line from the process, since our custom script outputs lines like Total complete: 25%. That means that we only need the latest line to be able to calculate the current progress.

Sometimes however, scripts can be a bit less helpful. For example ffmpeg the video encoder outputs the duration of the video file to be processed once at the beginning, then outputs the duration that has currently been processed. To

calculate the % of progress you need both values.

To do that, you can pass the collected output to the parser instead. There is an example of this in the source code with the book, named concurrent/qrunnable_process_parser_elapsed.py.

The Manager

In the previous examples we’ve created a number of different QRunnable implementations that can be used for different purposes in your application. In all cases you can run as many of these runners as you like, on the same or multiple QThreadPool pools. However, sometimes you will want to keep track of the runners which you have running in order to do something with their output, or provide users with control over the runners directly.

QThreadPool itself does not give you access to the currently running runners, so we need to create our own manager ourselves, through which we start and control our workers.

The example below brings together some of the other worker features already introduced — progress, pause and stop control — together with the model views to present individual progress bars. This manager will likely work as a drop-in for most use-cases you have for running threads.

ë

This is quite a complex example, the full source code is available

in the resources for the book. Here we’ll go through the key

parts of the QRunnable manager in turn.

The worker manager

The worker manager class holds the threadpool, our workers and their progress and state information. It is derived from QAbstractListModel meaning it also provides a Qt model-like interface, allowing for it to be used as the model for a QListView — providing a per-worker progress bar and status indicator. The status tracking is handled through a number of internal signals, which attach automatically to every added worker.

Listing 199. concurrent/qrunnable_manager.py

class WorkerManager (QAbstractListModel):

"""

Manager to handle our worker queues and state. Also functions as a Qt data model for a view displaying progress for each worker.

"""

_workers = {} _state = {}

status = pyqtSignal(str)

def init (self): super().init()

Create a threadpool for our workers.

self.threadpool = QThreadPool()

self.threadpool.setMaxThreadCount(1)

self.max_threads = self.threadpool.maxThreadCount() print ( "Multithreading with maximum %d threads" % self .max_threads )

self.status_timer = QTimer() self.status_timer.setInterval( 100 ) self.status_timer.timeout.connect(self.notify_status) self.status_timer.start()

def notify_status (self): n_workers = len(self._workers) running = min(n_workers, self.max_threads) waiting = max( 0 , n_workers - self.max_threads) self.status.emit( "{} running, {} waiting, {} threads".format( running, waiting, self.max_threads ) )

def enqueue (self, worker): """ Enqueue a worker to run (at some point) by passing it to the QThreadPool. """

worker.signals.error.connect(self.receive_error) worker.signals.status.connect(self.receive_status) worker.signals.progress.connect(self.receive_progress) worker.signals.finished.connect(self.done)

self.threadpool.start(worker) self._workers[worker.job_id] = worker

Set default status to waiting, 0 progress.

self._state[worker.job_id] = DEFAULT_STATE.copy()

self.layoutChanged.emit()

def receive_status (self, job_id, status): self._state[job_id]["status"] = status self.layoutChanged.emit()

def receive_progress (self, job_id, progress): self._state[job_id]["progress"] = progress self.layoutChanged.emit()

def receive_error (self, job_id, message): print (job_id, message)

def done (self, job_id): """ Task/worker complete. Remove it from the active workers dictionary. We leave it in worker_state, as this is used to to display past/complete workers too. """ del self._workers[job_id] self.layoutChanged.emit()

def cleanup (self): """ Remove any complete/failed workers from worker_state. """ for job_id, s in list(self._state.items()): if s["status"] in (STATUS_COMPLETE, STATUS_ERROR): del self._state[job_id] self.layoutChanged.emit()

Model interface

def data (self, index, role):

if role == Qt.ItemDataRole.DisplayRole:

# See below for the data structure.

job_ids = list(self._state.keys())

job_id = job_ids[index.row()]

return job_id, self._state[job_id]

def rowCount (self, index):

return len(self._state)

Workers are constructed outside the manager and passed in via .enqueue(). This connects all signals and adds the worker to the thread pool`. It will be executed, as normal once a thread is available.

The worker’s are kept in an internal dictionary _workers keyed by the job id. There is a separate dictionary _state which stores the status and progress information about the workers. We keep them separate so we can delete jobs once complete, keeping an accurate count, yet continue to show information about completed jobs until cleared.

Signals from each submitted workers are connected to slots on the manager, which update the _state dictionary, print error messages or delete the completed job. Once any state is updated, we must call .layoutChanged() to trigger a refresh of the model view. The clear method iterates through the _state list and removes any that are complete or have failed.

Lastly, we set up a timer to regularly trigger a method to emit the current thread counts as a status message. The number of active threads is the minimum of the number of _workers and the max_threads. The waiting threads is the number of _workers minus the max_threads (as long as it is more than zero). The message is shown on the main window status bar.

The worker

The worker itself follows the same pattern as all our previous examples. The only requirement for our manager is the addition of a .job_id property which is set

when the worker is created.

The signals from the workers must include this job id so the manager knows which worker sent the signal — updating the correct status, progress and finished states.

The worker itself is a simply dummy worker, which iterates 100 times (1 for each % progress) and performs a simple calculation. This worker calculation generates a series of numbers, but is constructed to occasionally throw division by zero errors.

Listing 200. concurrent/qrunnable_manager.py

class WorkerSignals (QObject):

"""

Defines the signals available from a running worker thread.

Supported signals are:

finished

No data

error

`tuple` (exctype, value, traceback.format_exc() )

result

`object` data returned from processing, anything

progress

`int` indicating % progress

"""

error = pyqtSignal(str, str)

result = pyqtSignal(str, object) # We can send anything back.

finished = pyqtSignal(str)

progress = pyqtSignal(str, int)

status = pyqtSignal(str, str)

class Worker (QRunnable): """ Worker thread

Inherits from QRunnable to handle worker thread setup, signals and wrap-up.

:param args: Arguments to pass for the worker :param kwargs: Keywords to pass for the worker

"""

def init (self, *args, **kwargs): super().init()

Store constructor arguments (re-used for processing).

self.signals = WorkerSignals()

Give this job a unique ID.

self.job_id = str(uuid.uuid4())

The arguments for the worker

self.args = args self.kwargs = kwargs

self.signals.status.emit(self.job_id, STATUS_WAITING)

@pyqtSlot() def run (self): """ Initialize the runner function with passed args, kwargs. """

self.signals.status.emit(self.job_id, STATUS_RUNNING)

x, y = self.args

try :

value = random.randint( 0 , 100 ) * x delay = random.random() / 10 result = []

for n in range( 100 ):

# Generate some numbers.

value = value / y

y -= 1

# The following will sometimes throw a division by

zero error.

result.append(value)

# Pass out the current progress.

self.signals.progress.emit(self.job_id, n + 1 )

time.sleep(delay)

except Exception as e:

print (e)

# We swallow the error and continue.

self.signals.error.emit(self.job_id, str(e))

self.signals.status.emit(self.job_id, STATUS_ERROR)

else :

self.signals.result.emit(self.job_id, result)

self.signals.status.emit(self.job_id, STATUS_COMPLETE)

self.signals.finished.emit(self.job_id)

In addition to the progress signals we’ve seen before, we also have a status signal which emits one of the following statuses. Exceptions are caught and both the exception text and the error state are emitted using error and status.

Listing 201. concurrent/qrunnable_manager.py

STATUS_WAITING = "waiting"

STATUS_RUNNING = "running"

STATUS_ERROR = "error"

STATUS_COMPLETE = "complete"

STATUS_COLORS = {

STATUS_RUNNING: "#33a02c",

STATUS_ERROR: "#e31a1c",

STATUS_COMPLETE: "#b2df8a",

}

DEFAULT_STATE = {"progress": 0 , "status": STATUS_WAITING}

Each of the active statuses have assigned colors which will be used in drawing on the progress bar.

Custom row display

We’re using a QListView for the progress bar display. Normally a list view displays a simple text value for each row. To modify this we use a QItemDelegate which allows us to paint a custom widget for each row.

Listing 202. concurrent/qrunnable_manager.py

class ProgressBarDelegate (QStyledItemDelegate):

def paint (self, painter, option, index):

# data is our status dict, containing progress, id, status

job_id, data = index.model().data(

index, Qt.ItemDataRole.DisplayRole

)

if data["progress"] > 0 :

color = QColor(STATUS_COLORS[data["status"]])

brush = QBrush()

brush.setColor(color)

brush.setStyle(Qt.BrushStyle.SolidPattern)

width = option.rect.width() * data["progress"] / 100

rect = QRect(

option.rect

) # Copy of the rect, so we can modify.

rect.setWidth(width)

painter.fillRect(rect, brush)

pen = QPen()

pen.setColor(Qt.GlobalColor.black)

painter.drawText(

option.rect, Qt.AlignmentFlag.AlignLeft, job_id

)

We get the data for the current row from the model using index.model().data(index, Qt.ItemDataRole.DisplayRole). This is calling the .data() method on our custom model (manager) passing in the index and role. In our .data() method we are returning two bits of data — job_id and the state dictionary, containing progress and status keys.

For active jobs (progress > 0) status is used to select a color for the bar. This is drawn as a rectangle of the item row size option.rect(), with the width adjusted by the % completion. Finally, we write the job_id text over the top of this.

Starting a job

With everything in place, we can now enqueue jobs by calling .self.worker.enqueue() passing in arguments to the worker.

Listing 203. concurrent/qrunnable_manager.py

def start_worker (self):

x = random.randint( 0 , 1000 )

y = random.randint( 0 , 1000 )

w = Worker(x, y)

w.signals.result.connect(self.display_result)

w.signals.error.connect(self.display_result)

self.workers.enqueue(w)

The .enqueue() method accepts a constructed worker and attaches the internal signals to it to track progress. However, we can still attach any other external signals that we want.

Figure 209. The manager interface, where you can start new jobs and see progress.

Also, while this example has only a single worker class, you can use this same

manager with any other QRunnable-derived classes, as long as they have the same signals available. This means you can use a single worker manager to manage all the workers in your app.

ë

Take a look at the full code in the source files with this book and

experiment modifying the manager to your needs — for

example, try adding kill & pause functionality, generic function

runners.

Stopping jobs

We can start jobs, and some of them die due to errors. But what if we want to stop jobs that are taking too long? The QListView allows us to select rows and through the selected row we can kill a specific worker. The method below is linked to a button, and looks up the worker from the current selected item in the list.

Listing 204. concurrent/qrunnable_manager_stop.py

def stop_worker (self):

selected = self.progress.selectedIndexes()

for idx in selected:

job_id, _ = self.workers.data(

idx, Qt.ItemDataRole.DisplayRole

)

self.workers.kill(job_id)

In addition to this we need to modify the delegate to draw the currently selected item and update the worker and manager to pass through the kill signal. Take a look at the full source for this example to see how it all fits together.

Figure 210. The manager, selecting a job allows you to stop it.

'PyQt5_' 카테고리의 다른 글

Moonsweeper  (0) 2023.03.16
Mozzarella Ashbadger  (0) 2023.03.16
PyQt6 and PySide6 — What’s the difference?  (0) 2023.03.13
Translating C++ Examples to Python.  (0) 2023.03.13
Creating a Linux Package with  (0) 2023.03.13

댓글