def logging_config(folder=None, name=None, level=logging.DEBUG, console_level=logging.DEBUG, no_console=True): if not os.path.exists(folder): os.makedirs(folder) for handler in logging.root.handlers: logging.root.removeHandler(handler) logging.root.handlers = [] logpath = os.path.join(folder, name + ".log") print("All logs will be saved to %s" %logpath) logging.root.setLevel(level) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') logfile = logging.FileHandler(logpath) logfile.setLevel(level) logfile.setFormatter(formatter) logging.root.addHandler(logfile) if not no_console: logconsole = logging.StreamHandler() logconsole.setLevel(console_level) logconsole.setFormatter(formatter) logging.root.addHandler(logconsole) return folder
时间: 2024-03-29 17:36:13 浏览: 19
这段代码是用来配置日志记录器的,它可以将日志记录到文件中并且可以在控制台中输出。其中,参数 folder 表示日志文件夹,name 表示日志文件名,level 表示日志级别,console_level 表示控制台日志级别,no_console 表示是否在控制台中输出日志。函数首先检查文件夹是否存在,如果不存在则创建文件夹。然后移除所有的处理器并重新设置处理器,设置日志记录的级别和格式,创建一个文件处理器并添加到根处理器中,如果 no_console 的值为 False,则创建一个控制台处理器并添加到根处理器中。最后返回日志文件夹的路径。
相关问题
cls.params["folderId"] = folder_id res = requests.get(url=cls.url, params=cls.params) js_data = json.loads(res.text)["data"] js_data_folders: list[dict] = js_data["FoldersInfo"] js_data_files: list[dict] = js_data["FilesInfo"] df = pd.concat([df, pd.DataFrame(js_data_files)], axis=0) ft = pd.concat([ft, pd.DataFrame(js_data_folders)], axis=0) folders_to_process = js_data_folders.copy() while folders_to_process: folder_id = folders_to_process.pop(0)["FolderId"] cls.params["folderId"] = folder_id res = requests.get(url=cls.url, params=cls.params) js_data = json.loads(res.text)["data"] js_data_folders: list[dict] = js_data["FoldersInfo"] js_data_files: list[dict] = js_data["FilesInfo"] df = pd.concat([df, pd.DataFrame(js_data_files)], axis=0) ft = pd.concat([ft, pd.DataFrame(js_data_folders)], axis=0) folders_to_process.extend(js_data_folders) return df, ft 优化这段代码
这段代码已经进行了一些优化,但是还有一些可以改进的地方:
1. 需要确保请求返回的数据格式和结构是正确的,否则可能会导致代码出错。可以添加一些异常处理机制,例如使用 try-except 捕获异常并记录错误日志。
2. 对于返回的数据,可以使用 data.get("FoldersInfo", []) 和 data.get("FilesInfo", []) 来获取需要的信息,这样可以避免出现 Key Error 的问题。
3. 可以使用列表推导式来简化代码,例如使用 [x["FolderId"] for x in js_data_folders] 来获取所有文件夹的 ID。
下面是一个可能的优化方案:
```python
def get_all_file_info(cls, folder_id, df, ft):
"""
获取指定文件夹及其子文件夹中的所有文件和文件夹信息
:param cls: 类实例
:param folder_id: 文件夹 ID
:param df: 文件信息 DataFrame
:param ft: 文件夹信息 DataFrame
:return: 文件信息 DataFrame 和文件夹信息 DataFrame
"""
try:
cls.params["folderId"] = folder_id
res = requests.get(url=cls.url, params=cls.params)
res.raise_for_status()
data = json.loads(res.text)["data"]
except (requests.HTTPError, ValueError) as e:
logging.error(f"Failed to get file info for folder {folder_id}: {e}")
return df, ft
js_data_folders = data.get("FoldersInfo", [])
js_data_files = data.get("FilesInfo", [])
df = pd.concat([df, pd.DataFrame(js_data_files)], axis=0)
ft = pd.concat([ft, pd.DataFrame(js_data_folders)], axis=0)
folders_to_process = [x["FolderId"] for x in js_data_folders]
while folders_to_process:
folder_id = folders_to_process.pop(0)
try:
cls.params["folderId"] = folder_id
res = requests.get(url=cls.url, params=cls.params)
res.raise_for_status()
data = json.loads(res.text)["data"]
except (requests.HTTPError, ValueError) as e:
logging.error(f"Failed to get file info for folder {folder_id}: {e}")
continue
js_data_folders = data.get("FoldersInfo", [])
js_data_files = data.get("FilesInfo", [])
df = pd.concat([df, pd.DataFrame(js_data_files)], axis=0)
ft = pd.concat([ft, pd.DataFrame(js_data_folders)], axis=0)
folders_to_process.extend([x["FolderId"] for x in js_data_folders])
return df, ft
```
这个优化方案添加了异常处理机制,使用了 data.get() 方法来获取需要的信息,并使用了列表推导式来获取所有文件夹的 ID。如果请求出错,会记录错误日志并跳过当前文件夹的处理。
downloadFileStream = requests.get(url=userDataUrl) filename = "{bucket}/{folder}/{file_name}.csv".format( bucket=config.Configuration["BUCKET"], folder="pinterestUserFile", file_name=today, ) logging.info("filename:") logging.info(filename) logging.info("downloadFileStream:") logging.info(downloadFileStream.content) logging.info("downloadFileStream.status_code:") logging.info(downloadFileStream.status_code) # getCsvUrl生成csv fwrite = blobstore.open(filename, 'w', content_type='text/csv') fwrite.write(downloadFileStream.content.decode("utf-8")) fwrite.close()
这段代码的作用是从指定的 URL 中下载文件,并将其存储到 Google Cloud Storage 中。具体来说,代码使用 requests 库向指定的 URL 发送 GET 请求,获取文件的字节流(content)。然后,代码将文件内容写入到 Google Cloud Storage 中的指定文件中,文件路径和文件名由代码中的变量 filename 指定。最后,代码关闭文件流。
需要注意的是,代码中使用了 Python 的 logging 模块进行日志记录,可以方便地跟踪代码的执行情况。另外,代码中使用了 Google Cloud Storage 的 blobstore 模块进行文件的写入操作,需要先导入该模块。