:return:
"""
if self.__system is "Windows":
pid_command = self.shell("ps | %s %s$" % (self.__find, package_name)).read()
else:
pid_command = self.shell("ps | %s -w %s" % (self.__find, package_name)).read()
return pid_command
def process_exists(self, package_name):
"""
返回进程是否存在
:param package_name:
:return:
"""
process = self.__get_process(package_name)
return package_name in process
def get_pid(self, package_name):
"""
获取pid
:return:
"""
pid_command = self.__get_process(package_name)
if pid_command == '':
print("The process doesn't exist.")
return pid_command
req = re.compile(r"\d+")
result = str(pid_command).split()
result.remove(result[0])
return req.findall(" ".join(result))[0]
def get_uid(self, pid):
"""
获取uid
:param pid:
:return:
"""
result = self.shell("cat /proc/%s/status" % pid).readlines()
for i in result:
if 'uid' in i.lower():
return i.split()[1]
def get_flow_data_tcp(self, uid):
"""
获取应用tcp流量
:return:(接收, 发送)
"""
tcp_rcv = self.shell("cat proc/uid_stat/%s/tcp_rcv" % uid).read().split()[0] tcp_snd = self.shell("cat proc/uid_stat/%s/tcp_snd"
% uid).read().split()[0] return tcp_rcv, tcp_snd
def get_flow_data_all(self, uid):
"""
获取应用流量全部数据
包含该应用多个进程的所有数据 tcp udp等
(rx_bytes, tx_bytes) >> (接收, 发送)
:param uid:
:return:list(dict)
"""
all_data = [] d = {}
data = self.shell("cat /proc/net/xt_qtaguid/stats | %s %s" % (self.__find, uid)).readlines()
for i in data:
if not i.startswith('\n'):
item = i.strip().split()
d['idx'] = item[0] d['iface'] = item[1] d['acct_tag_hex'] = item[2] d['uid_tag_int'] = item[3] d['cnt_set'] = item[4] d['rx_bytes'] =
item[5] d['rx_packets'] = item[6] d['tx_bytes'] = item[7] d['tx_packets'] = item[8] d['rx_tcp_bytes'] = item[9] d['rx_tcp_packets'] =
item[10] d['rx_udp_bytes'] = item[11] d['rx_udp_packets'] = item[12] d['rx_other_bytes'] = item[13] d['rx_other_packets'] =
item[14] d['tx_tcp_bytes'] = item[15] d['tx_tcp_packets'] = item[16] d['tx_udp_bytes'] = item[17] d['tx_udp_packets'] = item[18]
d['tx_other_bytes'] = item[19] d['tx_other_packets'] = item[20]