def get(request): # 实例化SSHClient连接工具 ssh_client = paramiko.SSHClient() # 保存服务器密钥 ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 连接linux服务器 try: ssh_client.connect(hostname='192.168.233.128', port=22, username='root', password='12356789', timeout=2) except (EOFError, Exception) as e: request_data = {"code": 500, "message": "服务器连接失败", "data": []} return JsonResponse(request_data) # 返回了三个数据,第一个是输入命令,第2个是命今返回的结果,第3个是命令错误时返回的结果 # ps aux是linux服务器查询正在运行的所有应用程序的命令 stdin, stdout, stderr = ssh_client.exec_command('ps aux') # 读取返回结果,将编码转为utf8,避免乱码 df = stdout.read().decode('utf-8') # 创建一个临时文件存放命令返回的结果 # 因为linux服务器命令返回的数据是一个字符串,不能直接按空格截取,最好的方式就是写入文件读取一行再截取、拼接命令 # IO流, 数据存储到本地磁盘 filename = "tempdata.txt" # 打开文件,会在项目根目录打开,如果没有就会创建 w+为写入读取权限 fileObj = open(filename, mode='w+') # 将命令返回的结果写入文件 fileObj.write(df) # flush:保证数据成功写入文件。 fileObj.flush() fileObj.close()
时间: 2023-06-24 10:07:41 浏览: 130
这段代码是一个Django视图函数,实现了连接到一个Linux服务器并执行命令的功能。具体实现步骤如下:
1. 首先导入paramiko模块,该模块提供了SSH客户端的实现,可以用来连接远程Linux服务器。
2. 创建一个SSHClient对象,用来连接到Linux服务器,并设置自动添加新主机密钥的策略。
3. 使用connect()方法连接到指定的Linux服务器,需要传入服务器的IP地址、端口、用户名和密码等信息。
4. 如果连接失败,返回一个Json格式的错误信息。
5. 使用exec_command()方法执行命令,该方法返回三个对象:标准输入、标准输出和标准错误。在这里只需要标准输出对象。
6. 使用read()方法读取标准输出对象的内容,并将编码转换为UTF-8格式。
7. 创建一个临时文件,将命令返回的结果写入该文件。
8. 关闭文件并返回HttpResponse对象,该对象包含了命令返回的结果。
相关问题
检查代码中的错误 class ADB_SHELL: def get_ip(self): # 执行adb shell命令并输出结果 subprocess.check_output( ['adb', 'shell', 'udhcpc'] ) subprocess.check_output( ['adb', 'shell', 'udhcpc -i eth1'] ) self.conf = subprocess.check_output( ['adb', 'shell', 'ifconfig'] ).decode() # conf = str(ip).split(r'\r\r\n') # tmp = conf.replace( "\r\r\n", "\n" ) # print( tmp) self.ip = re.findall( r'addr:(.*?) Bcast', str( self.conf ) ) print(self.ip) for self.i in selfip : speed = subprocess.check_output((['adb', 'shell', f'iperf3 -B {self.i} -c 192.168.102.105'])).decode() print(speed.replace("\r\r\n", "\n")) for i in range(5): write_data = subprocess.check_output(['adb', 'shell', 'time dd if=/dev/zero of=/data/test.data bs=128k count=1024']).decode() print(write_data.replace("\r\r\n", "\n")) read_data = subprocess.check_output(['adb', 'shell', 'time dd if=/data/test.data of=/dev/null bs=128k count=1024']).decode() print(read_data.replace("\r\r\n", "\n")) ls = subprocess.check_output( ['adb', 'shell', 'ls /data'] ).decode() print( ls.replace( '\r', ' ' ) ) dl = subprocess.check_output( ['adb', 'shell', 'rm /data/test.data'] ).decode() l = subprocess.check_output(['adb', 'shell', 'ls /data']).decode() print( l.replace( '\r', ' ' ) ) subprocess.check_output( (['adb', 'shell', f'iperf3 -s']) ) def get_ssh(self): ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy) for i in self.ip: ssh.connect(hostname='192.168.102.105',port=22,username='root',password='xiayi123456',timeout=30) stdin,stdout,stderr = ssh.exec_command(f'iperf3 -c {i}') print(stdout.read()) # ssh.close() if __name__ == '__main__': # get_ip() # get_ssh() A = ADB_SHELL t1 = threading.Thread(target=A.get_ssh) t2 = threading.Thread(target=A.get_ip) t1.start() t2.start() t1.join() t2.join()
在第12行代码中,将 `selfip` 改为 `self.ip`。此外,第28行代码中的 `ssh.set_missing_host_key_policy` 应该是 `ssh.set_missing_host_key_policy()`。最后,第39行代码中的 `A = ADB_SHELL` 应该是 `A = ADB_SHELL()`,因为 ADB_SHELL 是一个类,需要实例化才能调用其方法。
实例化SSHClient连接工具
要实例化SSHClient连接工具,需要使用Paramiko库。下面是一个简单的示例代码:
```python
import paramiko
ssh = paramiko.SSHClient() # 创建SSHClient实例
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 设置自动添加主机名和主机密钥
ssh.connect(hostname='hostname', port=22, username='username', password='password') # 连接SSH服务器
stdin, stdout, stderr = ssh.exec_command('ls') # 执行命令
print(stdout.read().decode()) # 输出命令执行结果
ssh.close() # 关闭SSH连接
```
在这个示例中,我们首先导入了Paramiko库,然后创建了一个SSHClient实例。接着,我们设置了自动添加主机名和主机密钥的策略,并使用`connect()`方法连接SSH服务器。然后,我们使用`exec_command()`方法执行了一个命令,并使用`stdout.read().decode()`读取了命令执行结果。最后,我们关闭了SSH连接。
阅读全文