import subprocess subprocess.Popen(['start', 'path_to_your_exe'])使用os实现
时间: 2024-10-27 12:07:39 浏览: 18
`import subprocess` 是Python中的一个模块,它提供了一种更底层的方式来控制其他进程,包括执行操作系统命令。`subprocess.Popen` 是其中的一个关键函数,用于创建一个新的子进程并运行指定的命令。
`subprocess.Popen(['start', 'path_to_your_exe'])` 这段代码的作用是在Windows环境下通过`start`命令启动指定的应用程序,这里的`['start']`是一个列表,包含命令名称,`'path_to_your_exe'` 是你要运行的可执行文件的实际路径。
如果你想用 `os` 模块来实现同样的功能,在Windows平台下,你可以使用 `os.system` 或 `os.startfile`,例如:
```python
# 使用os.system
os.system('start path_to_your_exe')
# 或者使用os.startfile
os.startfile('path_to_your_exe')
```
这两个函数都能执行指定的程序,不过 `os.system` 返回的是命令的退出状态码,而 `os.startfile` 则会直接启动程序,不会返回值。
相关问题
@app.route('/restruct', methods=['POST']) def restruct(): dch = request.form['dch'] dch_folder = "restr" + dch count = 1 while os.path.exists(os.path.join('D:\\cj', dch_folder)): dch_folder = "restr" + dch + "_" + str(count) count += 1 os.makedirs(os.path.join('D:\\cj', dch_folder, 'photos')) os.makedirs(os.path.join('D:\\cj', dch_folder, 'kmls')) os.makedirs(os.path.join('D:\\cj', dch_folder, 'project')) photos_folder = os.path.join('D:\\cj', dch_folder, 'photos') kmls_folder = os.path.join('D:\\cj', dch_folder, 'kmls') project_folder = os.path.join('D:\\cj', dch_folder, 'project') for file in request.files.getlist('files'): filename = file.filename file.save(os.path.join(photos_folder, filename)) dhs = request.form.getlist('dh') for dh in dhs: src_path = os.path.join(r'D:/911kml/', dh + '.xlsx.kml') dst_path = os.path.join(r'D:/cj', dch_folder, 'kmls/', dh + '.xlsx.kml') shutil.copy(src_path, dst_path) subprocess.Popen(["python", "restruct.py", dch, photos_folder, project_folder, kmls_folder])使用python3.6版本的websocket,将这段代码运行中的输出信息,实时发送给前端
您可以使用 Flask-SocketIO 扩展来实现实时发送输出信息到前端。首先,您需要安装 Flask-SocketIO 扩展:
```
pip install flask-socketio
```
然后,您可以使用以下代码来实现实时发送输出信息到前端:
```python
from flask_socketio import emit
from subprocess import Popen, PIPE
from threading import Thread
@app.route('/restruct', methods=['POST'])
def restruct():
dch = request.form['dch']
dch_folder = "restr" + dch
count = 1
while os.path.exists(os.path.join('D:\\cj', dch_folder)):
dch_folder = "restr" + dch + "_" + str(count)
count += 1
os.makedirs(os.path.join('D:\\cj', dch_folder, 'photos'))
os.makedirs(os.path.join('D:\\cj', dch_folder, 'kmls'))
os.makedirs(os.path.join('D:\\cj', dch_folder, 'project'))
photos_folder = os.path.join('D:\\cj', dch_folder, 'photos')
kmls_folder = os.path.join('D:\\cj', dch_folder, 'kmls')
project_folder = os.path.join('D:\\cj', dch_folder, 'project')
for file in request.files.getlist('files'):
filename = file.filename
file.save(os.path.join(photos_folder, filename))
dhs = request.form.getlist('dh')
for dh in dhs:
src_path = os.path.join(r'D:/911kml/', dh + '.xlsx.kml')
dst_path = os.path.join(r'D:/cj', dch_folder, 'kmls/', dh + '.xlsx.kml')
shutil.copy(src_path, dst_path)
# 使用 Popen 执行命令,并将输出信息实时发送给前端
process = Popen(["python", "restruct.py", dch, photos_folder, project_folder, kmls_folder], stdout=PIPE, stderr=PIPE)
def send_output():
while True:
output = process.stdout.readline().decode().strip()
if output:
emit('output', {'data': output})
else:
break
thread = Thread(target=send_output)
thread.start()
return 'success'
```
在上面的代码中,我们使用 `Popen` 执行命令,并将其的标准输出(stdout)实时发送给前端。为了避免阻塞 Flask 应用程序的主线程,我们在一个新的线程中执行 `send_output` 函数来实现实时发送输出信息到前端。同时,在前端页面中,您需要使用 SocketIO 来接收输出信息并将其显示在页面上:
```javascript
<script src="//cdnjs.cloudflare.com/ajax/libs/socket.io/2.2.0/socket.io.js"></script>
<script type="text/javascript">
var socket = io.connect('http://' + document.domain + ':' + location.port);
socket.on('connect', function() {
console.log('Socket connected');
});
socket.on('output', function(data) {
console.log(data);
// 将输出信息添加到页面上
$('#output').append('<p>' + data.data + '</p>');
});
</script>
```
在上面的代码中,我们使用 SocketIO 连接 Flask 应用程序,并在接收到实时输出信息时,将其添加到页面上。请注意,在上面的代码中,我们使用了 jQuery 库来操作 DOM 元素。如果您不想使用 jQuery,您可以使用纯 JavaScript 来实现相同的效果。
import queue import threading import cv2 as cv import subprocess as sp class Live(object): def __init__(self): self.frame_queue = queue.Queue() self.command = "" # 自行设置 self.rtmpUrl = "" self.camera_path = "" def read_frame(self): print("开启推流") cap = cv.VideoCapture(self.camera_path) # Get video information fps = int(cap.get(cv.CAP_PROP_FPS)) width = int(cap.get(cv.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv.CAP_PROP_FRAME_HEIGHT)) # ffmpeg command self.command = ['ffmpeg', '-y', '-f', 'rawvideo', '-vcodec','rawvideo', '-pix_fmt', 'bgr24', '-s', "{}x{}".format(width, height), '-r', str(fps), '-i', '-', '-c:v', 'libx264', '-pix_fmt', 'yuv420p', '-preset', 'ultrafast', '-f', 'flv', self.rtmpUrl] # read webcamera while(cap.isOpened()): ret, frame = cap.read() if not ret: print("Opening camera is failed") break # put frame into queue self.frame_queue.put(frame) def push_frame(self): # 防止多线程时 command 未被设置 while True: if len(self.command) > 0: # 管道配置 p = sp.Popen(self.command, stdin=sp.PIPE) break while True: if self.frame_queue.empty() != True: frame = self.frame_queue.get() # process frame # 你处理图片的代码 # write to pipe p.stdin.write(frame.tostring()) def run(self): threads = [ threading.Thread(target=Live.read_frame, args=(self,)), threading.Thread(target=Live.push_frame, args=(self,)) ] [thread.setDaemon(True) for thread in threads] [thread.start() for thread in threads]
这是一个 Python 类,用于推送摄像头视频流到 RTMP 服务器。该类包含以下方法:
1. `__init__(self)`:初始化方法,设置摄像头路径、RTMP 推流地址和帧队列。
2. `read_frame(self)`:读取摄像头视频流,将视频帧存入帧队列中。
3. `push_frame(self)`:从帧队列中取出视频帧,进行处理后写入到管道中,最终推送到 RTMP 服务器。
4. `run(self)`:开启两个线程,分别执行 `read_frame` 和 `push_frame` 方法。
为了正常运行该类,需要安装 `queue`、`threading`、`cv2` 和 `subprocess` 模块。同时需要在初始化方法中设置 `rtmpUrl` 和 `camera_path` 两个参数,分别为 RTMP 推流地址和摄像头路径。在 `push_frame` 方法中需要根据实际需求对视频帧进行处理。
阅读全文