GUI应用程序非常适合测试内容,因为它很容易产生新任务并可视化正在发生的事情,所以我写了一个小示例应用程序(屏幕截图,下面的代码),因为我确实想自己学习。
最初,我采用了与您类似的方法,尝试实现“消费者/生产者”模式,并且我在后台进程中苦苦挣扎,不断循环以等待新工作,并自己来回进行通信。然后我发现了池接口,然后我可以用几行代码替换所有这些令人讨厌的代码。您需要的只是一个池和一些回调:
#!/usr/bin/env python3import multiprocessing, time, random, sysfrom PySide.QtCore import * # equivalent: from PyQt4.QtCore import *from PySide.QtGui import * # equivalent: from PyQt4.QtGui import *def compute(num): print("worker() started at %d" % num) random_number = random.randint(1, 6) if random_number in (2, 4, 6): raise Exception('Random Exception in _%d' % num) time.sleep(random_number) return numclass MainWindow(QMainWindow): def __init__(self): QMainWindow.__init__(self) self.toolBar = self.addToolBar("Toolbar") self.toolBar.addAction(QAction('Add Task', self, triggered=self.addTask)) self.list = QListWidget() self.setCentralWidget(self.list) # Pool of Background Processes self.pool = multiprocessing.Pool(processes=4) def addTask(self): num_row = self.list.count() self.pool.apply_async(func=compute, args=(num_row,), callback=self.receiveResult, error_callback=self.receiveException) item = QListWidgetItem("item %d" % num_row) item.setForeground(Qt.gray) self.list.addItem(item) def receiveResult(self, result): assert isinstance(result, int) print("end_work(), where result is %s" % result) self.list.item(result).setForeground(Qt.darkGreen) def receiveException(self, exception): error = str(exception) _pos = error.find('_') + 1 num_row = int(error[_pos:]) item = self.list.item(num_row) item.setForeground(Qt.darkRed) item.setText(item.text() + ' Retry...') self.pool.apply_async(func=compute, args=(num_row,), callback=self.receiveResult, error_callback=self.receiveException)if __name__ == '__main__': app = QApplication(sys.argv) main_window = MainWindow() main_window.show() sys.exit(app.exec_())编辑:我做了另一个示例,使用QTimer而不是回调,定期检查队列中的条目,更新QProgressBar:
#!/usr/bin/env python3import multiprocessing, multiprocessing.pool, time, random, sysfrom PySide.QtCore import *from PySide.QtGui import *def compute(num_row): print("worker started at %d" % num_row) random_number = random.randint(1, 10) for second in range(random_number): progress = float(second) / float(random_number) * 100 compute.queue.put((num_row, progress,)) time.sleep(1) compute.queue.put((num_row, 100))def pool_init(queue): # see http://stackoverflow.com/a/3843313/852994 compute.queue = queueclass MainWindow(QMainWindow): def __init__(self): QMainWindow.__init__(self) self.toolBar = self.addToolBar("Toolbar") self.toolBar.addAction(QAction('Add Task', self, triggered=self.addTask)) self.table = QTableWidget() self.table.verticalHeader().hide() self.table.setColumnCount(2) self.setCentralWidget(self.table) # Pool of Background Processes self.queue = multiprocessing.Queue() self.pool = multiprocessing.Pool(processes=4, initializer=pool_init, initargs=(self.queue,)) # Check for progress periodically self.timer = QTimer() self.timer.timeout.connect(self.updateProgress) self.timer.start(2000) def addTask(self): num_row = self.table.rowCount() self.pool.apply_async(func=compute, args=(num_row,)) label = QLabel("Queued") bar = QProgressBar() bar.setValue(0) self.table.setRowCount(num_row + 1) self.table.setCellWidget(num_row, 0, label) self.table.setCellWidget(num_row, 1, bar) def updateProgress(self): if self.queue.empty(): return num_row, progress = self.queue.get() # unpack print("received progress of %s at %s" % (progress, num_row)) label = self.table.cellWidget(num_row, 0) bar = self.table.cellWidget(num_row, 1) bar.setValue(progress) if progress == 100: label.setText('Finished') elif label.text() == 'Queued': label.setText('Downloading') self.updateProgress() # recursionif __name__ == '__main__': app = QApplication(sys.argv) main_window = MainWindow() main_window.show() sys.exit(app.exec_())


