现在我要更改url,使其连接到我自己的服务器上,并且不需要通过api这个操作了,我该怎么修改
时间: 2024-11-10 15:25:31 浏览: 28
要将代码中的 URL 更改为连接到您自己的服务器,并且不使用 API 鉴权,您可以按照以下步骤进行修改:
1. **修改 URL**:将 `__create_url` 方法中的默认 URL 修改为您自己的服务器地址。
2. **移除鉴权相关逻辑**:删除或注释掉与鉴权相关的部分。
具体修改如下:
### 修改 `__create_url` 方法
将 `__create_url` 方法中的 URL 修改为您自己的服务器地址,并删除鉴权相关逻辑。
```python
def __create_url(self):
# 替换为您的服务器地址
url = "ws://your_server_address"
return url
```
### 移除鉴权相关逻辑
在 `__create_url` 方法中删除或注释掉与鉴权相关的部分。
```python
def __create_url(self):
# 替换为您的服务器地址
url = "ws://your_server_address"
# 原始鉴权逻辑已移除
return url
```
### 修改 WebSocket 初始化
在 `__connect` 方法中,初始化 WebSocket 时不再需要传递 SSL 选项,因为您可能使用的是非安全的 WebSocket(ws)而不是安全的 WebSocket(wss)。
```python
def __connect(self):
self.finalResults = ""
self.done = False
self.__frames.clear()
self.__ws = websocket.WebSocketApp(self.__URL, on_message=self.on_message, on_error=self.on_error, on_open=self.on_open)
self.__ws.on_open = self.on_open
self.__ws.run_forever()
```
### 完整的修改后的代码示例
```python
import ssl
import time
import hmac
import json
import base64
import hashlib
import websocket
from time import mktime
import _thread as thread
from threading import Thread
from datetime import datetime
from urllib.parse import urlencode
from wsgiref.handlers import format_date_time
STATUS_FIRST_FRAME = 0 # 第一帧的标识
STATUS_CONTINUE_FRAME = 1 # 中间帧标识
STATUS_LAST_FRAME = 2 # 最后一帧的标识
class XF_ASR(object):
def __init__(self, cfg):
self.APPID = cfg["APPID"]
self.APIKey = cfg["APIKey"]
self.APISecret = cfg["APISecret"]
# 公共参数(common)
self.CommonArgs = {"app_id": self.APPID}
# 业务参数(business),更多个性化参数可在官网查看
self.BusinessArgs = {"domain": "iat", "language": "zh_cn", "accent": "mandarin", "vinfo": 1, "vad_eos": 10000}
self.__URL = self.__create_url()
self.__ws = None
self.__connected = False
self.__closing = False
self.__frames = []
self.done = False
self.status = STATUS_FIRST_FRAME
self.finalResults = ""
def __create_url(self):
# 替换为您的服务器地址
url = "ws://your_server_address"
return url
def __on_msg(self):
pass
def on_message(self, ws, message):
try:
code = json.loads(message)["code"]
sid = json.loads(message)["sid"]
if code != 0:
errMsg = json.loads(message)["message"]
print("sid:%s call error:%s code is:%s" % (sid, errMsg, code))
else:
self.done = True
data = json.loads(message)["data"]["result"]["ws"]
for i in data:
for w in i["cw"]:
self.finalResults += w["w"]
except Exception as e:
print("receive msg,but parse exception:", e)
if self.__closing:
try:
self.__ws.close()
except Exception as e:
print("1", e)
def on_close(self, ws, code, msg):
self.__connected = False
print("### closed:", msg)
def on_error(self, ws, error):
self.__URL = self.__create_url()
print("### error:", error)
def on_open(self, ws):
self.__connected = True
def run(self, *args):
while self.__connected:
try:
if len(self.__frames) > 1:
frame = self.__frames[0]
self.__frames.pop(0)
if self.status == STATUS_FIRST_FRAME:
d = {
"common": self.CommonArgs,
"business": self.BusinessArgs,
"data": {
"status": 0,
"format": "audio/L16;rate=16000",
"audio": str(base64.b64encode(frame), 'utf-8'),
"encoding": "raw"
}
}
d = json.dumps(d)
self.__ws.send(d)
self.status = STATUS_CONTINUE_FRAME
elif self.status == STATUS_CONTINUE_FRAME:
d = {
"data": {
"status": 1,
"format": "audio/L16;rate=16000",
"audio": str(base64.b64encode(frame), 'utf-8'),
"encoding": "raw"
}
}
self.__ws.send(json.dumps(d))
except Exception as e:
print("2", e)
time.sleep(0.04)
thread.start_new_thread(self.run, ())
def __connect(self):
self.finalResults = ""
self.done = False
self.__frames.clear()
self.__ws = websocket.WebSocketApp(self.__URL, on_message=self.on_message, on_error=self.on_error, on_open=self.on_open)
self.__ws.on_open = self.on_open
self.__ws.run_forever()
def send(self, buf):
self.__frames.append(buf)
def start(self):
Thread(target=self.__connect, args=[]).start()
self.status = STATUS_FIRST_FRAME
def end(self):
if self.__connected:
try:
for frame in self.__frames:
self.__frames.pop(0)
if not self.__frames:
self.status = STATUS_LAST_FRAME
if self.status == STATUS_CONTINUE_FRAME:
d = {
"data": {
"status": 1,
"format": "audio/L16;rate=16000",
"audio": str(base64.b64encode(frame), 'utf-8'),
"encoding": "raw"
}
}
self.__ws.send(json.dumps(d))
elif self.status == STATUS_LAST_FRAME:
d = {
"data": {
"status": 2,
"format": "audio/L16;rate=16000",
"audio": str(base64.b64encode(frame), 'utf-8'),
"encoding": "raw"
}
}
self.__ws.send(json.dumps(d))
time.sleep(0.4)
self.__frames.clear()
except Exception as e:
print("3", e)
self.__closing = True
self.__connected = False
```
这样修改后,您的代码将连接到您指定的服务器地址,并且不会进行任何 API 鉴权。请确保您的服务器能够正确处理这些 WebSocket 请求。
阅读全文