解释下这段代码: def _registerSer(self): reg=KeytoAPISerial.ser_mapping port=self.ser_port if self.ser_port not in reg: reg[port]={} reg[port]['info']={k:v for k,v in self.ser_info.items()} reg[port]['_ser']=serial.Serial(port=port, baudrate=reg['info']['baud'], timeout=reg[port]['info']['timeout']) reg[port]['_devices']=[self.id] else: if len(set(self.ser_info.items())& set(reg[port]['info'].items())) !=3: raise serial.SerialException('KeytoAPISerial conflict:'\ 'another device is already registered to {0} with '\ 'different paramters'.format(port)) else: reg[port]['_devices'].append(self.id_) self._ser=reg[port]['_ser']
时间: 2024-02-14 13:21:24 浏览: 138
这段代码是一个类方法 `_registerSer`,用于注册串口设备。它首先获取一个名为 `ser_mapping` 的字典,然后获取当前设备的串口号 `port`。如果 `port` 不在 `ser_mapping` 中,就将当前设备添加到 `ser_mapping` 中,包括设备的信息和串口的基本参数,如波特率和超时时间,并创建一个串口对象 `_ser`。如果 `port` 已经在 `ser_mapping` 中,就检查设备的信息是否与已注册设备的信息相同,如果不同,则抛出一个 `SerialException` 异常。如果相同,则将当前设备添加到已注册设备列表 `_devices` 中,并将 `_ser` 设置为已注册设备的串口对象。最终,方法返回 `_ser` 对象。
相关问题
pyqt5怎么显示class QueryThread(QThread): query_result = pyqtSignal(object) def __init__(self, id): super().__init__() self.id = id def run(self): conn = pymysql.connect(host='39.99.214.172', user='root', password='Solotion.123', database='jj_tset') cursor = conn.cursor() cursor.execute("SELECT * FROM employee_table WHERE user_id='%s'" % self.id) result = cursor.fetchone() conn.close() self.query_result.emit(result) # 发送查询结果信号 class MainWindow(QtWidgets.QMainWindow): def __init__(self): super().__init__() self.query_thread = None # 查询线程 def read_data(self): data = self.ser.readline() if data: id = data.decode().strip() if not self.query_thread: # 如果查询线程不存在,则创建并启动 self.query_thread = QueryThread(id) self.query_thread.query_result.connect(self.update_ui) self.query_thread.start() else: self.query_thread.id = id # 如果查询线程已存在,则更新查询ID QTimer.singleShot(100, self.read_data) def update_ui(self, result): if result: self.id_label.setText("员工ID:" + result[0]) self.name_label.setText("姓名:" + str(result[1])) self.six_label.setText("性别:" + result[2]) self.sfz_label.setText("身份证:" + str(result[3])) self.tel_label.setText("电话:" + result[4]) else: self.id_label.setText("员工ID:") self.name_label.setText("姓名:") self.six_label.setText("性别:") self.sfz_label.setText("身份证:") self.tel_label.setText("电话:") def closeEvent(self, event): self.ser.close() if self.query_thread: self.query_thread.quit() self.query_thread.wait()
要在 PyQt5 中显示这段代码,可以使用 QTextEdit 或者 QPlainTextEdit 组件。首先,需要在程序中导入相关的模块:
```python
from PyQt5.QtWidgets import QApplication, QMainWindow, QTextEdit
```
然后,创建一个 QTextEdit 组件,并将代码添加到其中:
```python
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.code_edit = QTextEdit(self)
self.code_edit.setReadOnly(True)
self.setCentralWidget(self.code_edit)
self.code_edit.setPlainText('''
class QueryThread(QThread):
query_result = pyqtSignal(object)
def __init__(self, id):
super().__init__()
self.id = id
def run(self):
conn = pymysql.connect(host='39.99.214.172', user='root', password='Solotion.123', database='jj_tset')
cursor = conn.cursor()
cursor.execute("SELECT * FROM employee_table WHERE user_id='%s'" % self.id)
result = cursor.fetchone()
conn.close()
self.query_result.emit(result) # 发送查询结果信号
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.query_thread = None # 查询线程
def read_data(self):
data = self.ser.readline()
if data:
id = data.decode().strip()
if not self.query_thread: # 如果查询线程不存在,则创建并启动
self.query_thread = QueryThread(id)
self.query_thread.query_result.connect(self.update_ui)
self.query_thread.start()
else:
self.query_thread.id = id # 如果查询线程已存在,则更新查询ID
QTimer.singleShot(100, self.read_data)
def update_ui(self, result):
if result:
self.id_label.setText("员工ID:" + result[0])
self.name_label.setText("姓名:" + str(result[1]))
self.six_label.setText("性别:" + result[2])
self.sfz_label.setText("身份证:" + str(result[3]))
self.tel_label.setText("电话:" + result[4])
else:
self.id_label.setText("员工ID:")
self.name_label.setText("姓名:")
self.six_label.setText("性别:")
self.sfz_label.setText("身份证:")
self.tel_label.setText("电话:")
def closeEvent(self, event):
self.ser.close()
if self.query_thread:
self.query_thread.quit()
self.query_thread.wait()
''')
```
这样就可以将代码显示在程序的主窗口中了。需要注意的是,这段代码中的 self.ser 没有定义,如果没有在程序中定义,会导致程序出错。
这段代码没进 thing1()的原因 class Worker(QtCore.QThread): sinOut = pyqtSignal(str) def __init__(self, parent=None): super(Worker, self).__init__(parent) # 设置工作状态与初始num数值 self.working = True self.num = 0 #def __del__(self): # 线程状态改变与线程终止 #self.working = False #self.wait() def stop(self): #线程状态改变与线程终止 self.working = False self.wait() def run(self): self.working = True while self.working == True: #file_str = 'File index{0}'.format(self.num) self.num += 1 # 发射信号 #self.sinOut.emit(file_str) self.sinOut.emit('1') # 线程休眠2秒 self.msleep(5) class parentWindow(QMainWindow): def __init__(self): QMainWindow.__init__(self) self.main_ui = JQ.Ui_MainWindow() self.main_ui.setupUi(self) self.thread1 = Worker() self.main_ui.pushButton_2.clicked.connect(self.thing1) self.thread1.sinOut.connect(self.printt) def thing1(self): #self.main_ui.pushButton.setEnabled(False) print('9999999999') self.thread1.start() self.thread1.wait() print('123') #self.sleep(2) def printt(self): print('7777') def ok(): print('ok') # def hourstest(): # thread1 = Worker() # thread1.start() # com = JQ.Ui_MainWindow().comboBox_2.currentText() # ser = serial.Serial('com3', 1200, timeout=1) # data = bytes.fromhex( # '68 01 00 20 00 00 00 00 00 34 04 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 33 F4 16 ') # ser.write(data) if __name__ == "__main__": app = QtWidgets.QApplication(sys.argv) widget = QtWidgets.QMainWindow() widget = parentWindow() ui = JQ.Ui_MainWindow() # 这是原py中的类,因人而异哦 ui.setupUi(widget) ui.discoverSerial() #串口发现 #ui.pushButton_2.clicked.connect(widget.thing1) widget.show() sys.exit(app.exec_())
在你的代码中,`thing1()` 函数中调用了 `self.thread1.wait()` 方法,这个方法会阻塞主线程,直到子线程执行完毕。因此,如果在 `thing1()` 中调用 `self.thread1.start()` 方法后立即调用 `self.thread1.wait()` 方法,那么子线程就没有机会执行 `run()` 方法,因为主线程一直在等待子线程执行完毕。
解决方法是在 `thing1()` 中删除 `self.thread1.wait()` 方法,让子线程在后台执行。同时,建议在 `Worker` 类的 `__del__()` 方法中加入线程终止的代码,以确保线程能够正确地退出。具体来说,可以将 `__del__()` 方法的注释部分取消注释,代码如下:
```python
class Worker(QtCore.QThread):
sinOut = pyqtSignal(str)
def __init__(self, parent=None):
super(Worker, self).__init__(parent)
# 设置工作状态与初始num数值
self.working = True
self.num = 0
def __del__(self):
# 线程状态改变与线程终止
self.working = False
self.wait()
def stop(self):
#线程状态改变与线程终止
self.working = False
self.wait()
def run(self):
self.working = True
while self.working == True:
#file_str = 'File index{0}'.format(self.num)
self.num += 1
# 发射信号
#self.sinOut.emit(file_str)
self.sinOut.emit('1')
# 线程休眠2秒
self.msleep(5)
class parentWindow(QMainWindow):
def __init__(self):
QMainWindow.__init__(self)
self.main_ui = JQ.Ui_MainWindow()
self.main_ui.setupUi(self)
self.thread1 = Worker()
self.main_ui.pushButton_2.clicked.connect(self.thing1)
self.thread1.sinOut.connect(self.printt)
def thing1(self):
#self.main_ui.pushButton.setEnabled(False)
print('9999999999')
self.thread1.start()
#self.thread1.wait() # 删除这行
print('123')
#self.sleep(2)
def printt(self):
print('7777')
def ok():
print('ok')
# def hourstest():
# thread1 = Worker()
# thread1.start()
# com = JQ.Ui_MainWindow().comboBox_2.currentText()
# ser = serial.Serial('com3', 1200, timeout=1)
# data = bytes.fromhex(
# '68 01 00 20 00 00 00 00 00 34 04 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 33 F4 16 ')
# ser.write(data)
if __name__ == "__main__":
app = QtWidgets.QApplication(sys.argv)
#widget = QtWidgets.QMainWindow() # 这行可删除
widget = parentWindow()
ui = JQ.Ui_MainWindow() # 这是原py中的类,因人而异哦
ui.setupUi(widget)
ui.discoverSerial() #串口发现
#ui.pushButton_2.clicked.connect(widget.thing1)
widget.show()
sys.exit(app.exec_())
```
阅读全文