class ShowapiRequest: def __init__(self, url, my_appId, my_appSecret): self.url = url self.my_appId = my_appId self.my_appSecret = my_appSecret body["1422355"] = my_appId body["24db7e66fd054eab98c921a5c807d97d"] = my_appSecret headers["User-Agent"] = "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2427.7 Safari/537.36"
时间: 2024-02-29 22:55:35 浏览: 134
这段代码是 ShowapiRequest 类的初始化方法,用于初始化一些变量和设置默认的请求头。
在初始化方法中,会传入 Showapi 平台的 appid 和 secret,并在请求体中添加这些参数。此外,还会设置一个默认的 User-Agent 请求头。
`url` 是 API 接口的地址,`my_appId` 和 `my_appSecret` 是 Showapi 平台分配给应用的 appid 和 secret。`body` 和 `headers` 变量分别用于存储请求体和请求头,这些变量在类中可以被其他方法访问和修改。
因此,这段代码的作用是初始化 ShowapiRequest 类的实例,并设置默认的请求头和请求参数。
相关问题
import requests from urllib import parse #全局请求头 files = {} headers = {} body = {} timeouts = {} resHeader = {} class ShowapiRequest: def __init__(self, url, my_appId, my_appSecret): self.url = url self.my_appId = my_appId self.my_appSecret = my_appSecret body["1422355"] = my_appId body["24db7e66fd054eab98c921a5c807d97d"] = my_appSecret headers["User-Agent"] = "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2427.7 Safari/537.36" def addFilePara(self, key, value_url): files[key] = open(r"%s" % (value_url), 'rb') return self def addHeadPara(self, key, value): headers[key] = value return self def addBodyPara(self, key, value): body[key] = value return self #设置连接时间和读取时间 def setTimeout(self, connecttimout, readtimeout): timeouts["connecttimout"] = connecttimout timeouts["readtimeout"] = readtimeout return self def get(self): get_url = self.url + "?" + parse.urlencode(body) if not timeouts: res = requests.get(get_url, headers=headers) else: timeout = (timeouts["connecttimout"], timeouts["readtimeout"]) res = requests.get(get_url, headers=headers, timeout=timeouts) return res def post(self): if not timeouts: res = requests.post(self.url, files=files, data=body, headers=headers) else: timeout = (timeouts["connecttimout"], timeouts["readtimeout"]) res = requests.post(self.url, files=files, data=body, headers=headers, timeout=timeout) return res
这段代码是一个 ShowapiRequest 类,用于调用 Showapi 平台的 API 接口。它通过构建请求参数、请求头和超时时间等信息来发送 HTTP GET 或 POST 请求,并返回响应结果。
在类的初始化方法中,会传入 Showapi 平台的 appid 和 secret,并在请求体中添加这些参数。此外,还会设置一个默认的 User-Agent 请求头。
类中的 `addFilePara`、`addHeadPara` 和 `addBodyPara` 方法用于添加文件参数、请求头参数和请求体参数,并返回实例自身。
`setTimeout` 方法用于设置连接和读取的超时时间。
`get` 和 `post` 方法分别用于发送 HTTP GET 和 POST 请求,并返回响应结果。其中涉及到请求参数、请求头、超时时间的使用。
现在我要更改url,使其连接到我自己的服务器上,并且不需要通过api这个操作了,我该怎么修改
要将代码中的 URL 更改为连接到您自己的服务器,并且不使用 API 鉴权,您可以按照以下步骤进行修改:
1. **修改 URL**:将 `__create_url` 方法中的默认 URL 修改为您自己的服务器地址。
2. **移除鉴权相关逻辑**:删除或注释掉与鉴权相关的部分。
具体修改如下:
### 修改 `__create_url` 方法
将 `__create_url` 方法中的 URL 修改为您自己的服务器地址,并删除鉴权相关逻辑。
```python
def __create_url(self):
# 替换为您的服务器地址
url = "ws://your_server_address"
return url
```
### 移除鉴权相关逻辑
在 `__create_url` 方法中删除或注释掉与鉴权相关的部分。
```python
def __create_url(self):
# 替换为您的服务器地址
url = "ws://your_server_address"
# 原始鉴权逻辑已移除
return url
```
### 修改 WebSocket 初始化
在 `__connect` 方法中,初始化 WebSocket 时不再需要传递 SSL 选项,因为您可能使用的是非安全的 WebSocket(ws)而不是安全的 WebSocket(wss)。
```python
def __connect(self):
self.finalResults = ""
self.done = False
self.__frames.clear()
self.__ws = websocket.WebSocketApp(self.__URL, on_message=self.on_message, on_error=self.on_error, on_open=self.on_open)
self.__ws.on_open = self.on_open
self.__ws.run_forever()
```
### 完整的修改后的代码示例
```python
import ssl
import time
import hmac
import json
import base64
import hashlib
import websocket
from time import mktime
import _thread as thread
from threading import Thread
from datetime import datetime
from urllib.parse import urlencode
from wsgiref.handlers import format_date_time
STATUS_FIRST_FRAME = 0 # 第一帧的标识
STATUS_CONTINUE_FRAME = 1 # 中间帧标识
STATUS_LAST_FRAME = 2 # 最后一帧的标识
class XF_ASR(object):
def __init__(self, cfg):
self.APPID = cfg["APPID"]
self.APIKey = cfg["APIKey"]
self.APISecret = cfg["APISecret"]
# 公共参数(common)
self.CommonArgs = {"app_id": self.APPID}
# 业务参数(business),更多个性化参数可在官网查看
self.BusinessArgs = {"domain": "iat", "language": "zh_cn", "accent": "mandarin", "vinfo": 1, "vad_eos": 10000}
self.__URL = self.__create_url()
self.__ws = None
self.__connected = False
self.__closing = False
self.__frames = []
self.done = False
self.status = STATUS_FIRST_FRAME
self.finalResults = ""
def __create_url(self):
# 替换为您的服务器地址
url = "ws://your_server_address"
return url
def __on_msg(self):
pass
def on_message(self, ws, message):
try:
code = json.loads(message)["code"]
sid = json.loads(message)["sid"]
if code != 0:
errMsg = json.loads(message)["message"]
print("sid:%s call error:%s code is:%s" % (sid, errMsg, code))
else:
self.done = True
data = json.loads(message)["data"]["result"]["ws"]
for i in data:
for w in i["cw"]:
self.finalResults += w["w"]
except Exception as e:
print("receive msg,but parse exception:", e)
if self.__closing:
try:
self.__ws.close()
except Exception as e:
print("1", e)
def on_close(self, ws, code, msg):
self.__connected = False
print("### closed:", msg)
def on_error(self, ws, error):
self.__URL = self.__create_url()
print("### error:", error)
def on_open(self, ws):
self.__connected = True
def run(self, *args):
while self.__connected:
try:
if len(self.__frames) > 1:
frame = self.__frames[0]
self.__frames.pop(0)
if self.status == STATUS_FIRST_FRAME:
d = {
"common": self.CommonArgs,
"business": self.BusinessArgs,
"data": {
"status": 0,
"format": "audio/L16;rate=16000",
"audio": str(base64.b64encode(frame), 'utf-8'),
"encoding": "raw"
}
}
d = json.dumps(d)
self.__ws.send(d)
self.status = STATUS_CONTINUE_FRAME
elif self.status == STATUS_CONTINUE_FRAME:
d = {
"data": {
"status": 1,
"format": "audio/L16;rate=16000",
"audio": str(base64.b64encode(frame), 'utf-8'),
"encoding": "raw"
}
}
self.__ws.send(json.dumps(d))
except Exception as e:
print("2", e)
time.sleep(0.04)
thread.start_new_thread(self.run, ())
def __connect(self):
self.finalResults = ""
self.done = False
self.__frames.clear()
self.__ws = websocket.WebSocketApp(self.__URL, on_message=self.on_message, on_error=self.on_error, on_open=self.on_open)
self.__ws.on_open = self.on_open
self.__ws.run_forever()
def send(self, buf):
self.__frames.append(buf)
def start(self):
Thread(target=self.__connect, args=[]).start()
self.status = STATUS_FIRST_FRAME
def end(self):
if self.__connected:
try:
for frame in self.__frames:
self.__frames.pop(0)
if not self.__frames:
self.status = STATUS_LAST_FRAME
if self.status == STATUS_CONTINUE_FRAME:
d = {
"data": {
"status": 1,
"format": "audio/L16;rate=16000",
"audio": str(base64.b64encode(frame), 'utf-8'),
"encoding": "raw"
}
}
self.__ws.send(json.dumps(d))
elif self.status == STATUS_LAST_FRAME:
d = {
"data": {
"status": 2,
"format": "audio/L16;rate=16000",
"audio": str(base64.b64encode(frame), 'utf-8'),
"encoding": "raw"
}
}
self.__ws.send(json.dumps(d))
time.sleep(0.4)
self.__frames.clear()
except Exception as e:
print("3", e)
self.__closing = True
self.__connected = False
```
这样修改后,您的代码将连接到您指定的服务器地址,并且不会进行任何 API 鉴权。请确保您的服务器能够正确处理这些 WebSocket 请求。
阅读全文
相关推荐












