dolphinscheduler 使用python组件写一段循环调用SUB_PROCESS组件的代码
时间: 2023-04-11 19:02:03 浏览: 209
可以使用以下代码实现:
```python
from dolphin.scheduler.api import process_definition
from dolphin.scheduler.api import process_instance
# 获取 SUB_PROCESS 组件的定义
sub_process_def = process_definition.get_process_definition_by_name("SUB_PROCESS")
# 循环调用 SUB_PROCESS 组件
for i in range(10):
sub_process_instance = process_instance.create_process_instance(sub_process_def["id"])
process_instance.start_process_instance(sub_process_instance["id"])
```
注意:以上代码仅供参考,具体实现可能需要根据实际情况进行调整。
相关问题
if name == "main": parser = argparse.ArgumentParser(description="Intensity Normalizer") parser.add_argument("-s", "--src", type=str, help="source directory.") parser.add_argument("-d", "--dst", type=str, help="destination directory.") parser.add_argument( "--mm_resolution", type=float, default=0.0, help="spatial resolution [mm].", ) parser.add_argument( "--depth", type=int, default=-1, help="depth of the maximum level to be explored. Defaults to unlimited.", ) args = parser.parse_args() if args.src is None: parser.print_help() exit(0) root_src_dir: Path = Path(args.src).resolve() if not root_src_dir.is_dir(): logger.error("Indicate valid virectory path.") exit() root_dst_dir = Path( args.dst or str(root_src_dir) + "_intensity_normalized" ) mm_resolution = float(args.mm_resolution) depth = int(args.depth) volume_loading_queue = Queue() volume_loading_process = Process( target=volume_loading_func, args=(root_src_dir, root_dst_dir, depth, volume_loading_queue, logger), ) volume_loading_process.start() volume_saving_queue = Queue() volume_saving_process = Process( target=volume_saving_func, args=(volume_saving_queue, logger), ) volume_saving_process.start() while True: ( volume_path, np_volume, volume_info, ) = volume_loading_queue.get() if volume_path is None: break relative_path = volume_path.relative_to(root_src_dir) np_volume = normalize_intensity(np_volume, relative_path, logger) if mm_resolution != 0: volume_info.update({"mm_resolution": mm_resolution}) while volume_saving_queue.qsize() == 1: pass dst_path = Path( root_dst_dir, re.sub(r"cb\d{3}$", "", str(relative_path)) ) volume_saving_queue.put( (dst_path, root_dst_dir, np_volume, volume_info) ) volume_saving_queue.put((None, None, None, None))请完整详细的解释每一行的代码意思
这段代码是一个 Python 程序,主要用于进行图像的强度归一化操作。下面是每一行代码的详细解释。
```python
if name == "main":
```
这是 Python 程序的入口,判断是否为主程序。
```python
parser = argparse.ArgumentParser(description="Intensity Normalizer")
parser.add_argument("-s", "--src", type=str, help="source directory.")
parser.add_argument("-d", "--dst", type=str, help="destination directory.")
parser.add_argument("--mm_resolution", type=float, default=0.0, help="spatial resolution [mm].")
parser.add_argument("--depth", type=int, default=-1, help="depth of the maximum level to be explored. Defaults to unlimited.")
args = parser.parse_args()
```
这段代码定义了命令行选项,用于接收程序运行时传入的参数。其中 `-s` 选项指定输入文件夹路径,`-d` 选项指定输出文件夹路径,`--mm_resolution` 选项指定空间分辨率,`--depth` 选项指定文件夹遍历的深度。然后使用 `parse_args()` 方法解析参数并将结果存放在 `args` 对象中。
```python
if args.src is None:
parser.print_help()
exit(0)
```
判断是否输入了输入文件夹路径,如果没有则打印帮助信息并退出程序。
```python
root_src_dir: Path = Path(args.src).resolve()
if not root_src_dir.is_dir():
logger.error("Indicate valid virectory path.")
exit()
```
将输入文件夹路径转换成 `Path` 对象,并使用 `resolve()` 方法去除路径中的符号链接,然后判断路径是否存在且是一个文件夹,如果不是则输出错误信息并退出程序。
```python
root_dst_dir = Path(args.dst or str(root_src_dir) + "_intensity_normalized")
mm_resolution = float(args.mm_resolution)
depth = int(args.depth)
```
如果指定了输出文件夹路径,则将其转换成 `Path` 对象,否则根据输入文件夹路径生成一个默认输出文件夹路径。然后将空间分辨率和文件夹遍历深度转换成对应的数据类型。
```python
volume_loading_queue = Queue()
volume_loading_process = Process(target=volume_loading_func, args=(root_src_dir, root_dst_dir, depth, volume_loading_queue, logger))
volume_loading_process.start()
```
使用 `multiprocessing` 模块创建一个进程,其中 `volume_loading_func` 是一个函数,主要用于加载图像数据并放入消息队列中。这里将输入文件夹路径、输出文件夹路径、文件夹遍历深度、消息队列和日志记录器作为参数传入进程。
```python
volume_saving_queue = Queue()
volume_saving_process = Process(target=volume_saving_func, args=(volume_saving_queue, logger))
volume_saving_process.start()
```
同样使用 `multiprocessing` 模块创建另一个进程,其中 `volume_saving_func` 是一个函数,主要用于将归一化后的图像数据保存到输出文件夹中。这里只需将消息队列和日志记录器作为参数传入进程即可。
```python
while True:
(volume_path, np_volume, volume_info,) = volume_loading_queue.get()
if volume_path is None:
break
relative_path = volume_path.relative_to(root_src_dir)
np_volume = normalize_intensity(np_volume, relative_path, logger)
if mm_resolution != 0:
volume_info.update({"mm_resolution": mm_resolution})
while volume_saving_queue.qsize() == 1:
pass
dst_path = Path(root_dst_dir, re.sub(r"cb\d{3}$", "", str(relative_path)))
volume_saving_queue.put((dst_path, root_dst_dir, np_volume, volume_info))
volume_saving_queue.put((None, None, None, None))
```
进入无限循环,从消息队列中获取从 `volume_loading_func` 函数中发来的消息。如果消息内容中的 `volume_path` 为 `None`,则表示所有图像数据已经处理完毕,退出循环。否则,从 `volume_path` 中提取相对输入文件夹路径并调用 `normalize_intensity` 函数对图像数据进行归一化处理,并更新 `volume_info` 中的空间分辨率。然后等待消息队列中只有一个等待处理的消息,将处理后的图像数据和相应的数据信息发送到 `volume_saving_func` 函数中进行保存。最后向消息队列中发送一个空消息以表示图像数据处理完毕。
整个程序实现了对文件夹中的图像进行强度归一化操作,并支持指定空间分辨率和文件夹遍历深度等功能。
import argparse import logging import re from multiprocessing import Process, Queue from pathlib import Path import numpy as np from skimage import exposure, filters from modules.config import logger from modules.volume import volume_loading_func, volume_saving_func def normalize_intensity( np_volume: np.ndarray, relative_path: Path, logger: logging.Logger ): logger.info(f"[processing start] {relative_path}") nstack = len(np_volume) stack: np.ndarray = np_volume[nstack // 2 - 16 : nstack // 2 + 16] hist_y, hist_x = exposure.histogram(stack[stack > 0]) thr = filters.threshold_otsu(stack[stack > 0]) peak_air = np.argmax(hist_y[hist_x < thr]) + hist_x[0] peak_soil = np.argmax(hist_y[hist_x > thr]) + (thr - hist_x[0]) + hist_x[0] np_volume = np_volume.astype(np.int64) for i in range(len(np_volume)): np_volume[i] = ( (np_volume[i] - peak_air).clip(0) / (peak_soil - peak_air) * 256 / 2 ) logger.info(f"[processing end] {relative_path}") return exposure.rescale_intensity( np_volume, in_range=(0, 255), out_range=(0, 255) ).astype(np.uint8) if name == "main": parser = argparse.ArgumentParser(description="Intensity Normalizer") parser.add_argument("-s", "--src", type=str, help="source directory.") parser.add_argument("-d", "--dst", type=str, help="destination directory.") parser.add_argument( "--mm_resolution", type=float, default=0.0, help="spatial resolution [mm].", ) parser.add_argument( "--depth", type=int, default=-1, help="depth of the maximum level to be explored. Defaults to unlimited.", ) args = parser.parse_args() if args.src is None: parser.print_help() exit(0) root_src_dir: Path = Path(args.src).resolve() if not root_src_dir.is_dir(): logger.error("Indicate valid virectory path.") exit() root_dst_dir = Path( args.dst or str(root_src_dir) + "_intensity_normalized" ) mm_resolution = float(args.mm_resolution) depth = int(args.depth) volume_loading_queue = Queue() volume_loading_process = Process( target=volume_loading_func, args=(root_src_dir, root_dst_dir, depth, volume_loading_queue, logger), ) volume_loading_process.start() volume_saving_queue = Queue() volume_saving_process = Process( target=volume_saving_func, args=(volume_saving_queue, logger), ) volume_saving_process.start() while True: ( volume_path, np_volume, volume_info, ) = volume_loading_queue.get() if volume_path is None: break relative_path = volume_path.relative_to(root_src_dir) np_volume = normalize_intensity(np_volume, relative_path, logger) if mm_resolution != 0: volume_info.update({"mm_resolution": mm_resolution}) while volume_saving_queue.qsize() == 1: pass dst_path = Path( root_dst_dir, re.sub(r"cb\d{3}$", "", str(relative_path)) ) volume_saving_queue.put( (dst_path, root_dst_dir, np_volume, volume_info) ) volume_saving_queue.put((None, None, None, None))完整详细的解释每一行的代码意思和作用
这段代码是一个 Python 脚本,用于对医学图像进行强度归一化处理。以下是每行代码的详细解释:
第 1 行:导入必要的模块。
第 3 行:定义一个函数 normalize_intensity,用于对给定的医学图像进行强度归一化处理。该函数接受三个参数:np_volume(一个 NumPy 数组,表示医学图像数据)、relative_path(一个 Path 对象,表示该数据在文件系统中的路径)、logger(用于记录日志的 Logger 对象)。
第 4 行:使用日志记录器输出一条日志,表示开始处理医学图像。
第 5 行:获取图像堆栈的数量 nstack。
第 6 行:将图像堆栈中居中的 16 张图像提取出来,作为参考图像,用于计算阈值。
第 7 行:使用 skimage 库中的 exposure.histogram 函数计算图像堆栈中所有非空像素的直方图 hist_y 和 hist_x。
第 8 行:使用 skimage 库中的 filters.threshold_otsu 函数计算一个全局阈值 thr。
第 9 行:计算阈值下面的像素的直方图峰值 peak_air。
第 10 行:计算阈值上面的像素的直方图峰值 peak_soil。
第 11 行:计算从阈值到图像范围最小值的距离 hist_x[0]。
第 12 行:将医学图像数据转换为 int64 类型。
第 13~17 行:循环遍历所有图像,对每个像素进行强度归一化处理,得到新的像素值。
第 18 行:使用 skimage 库中的 exposure.rescale_intensity 函数对强度归一化后的图像进行重新缩放,得到范围在 [0, 255] 内的 uint8 类型像素值。
第 19 行:使用日志记录器输出一条日志,表示完成医学图像处理。
第 20 行:返回经过强度归一化和重新缩放后的医学图像数据。
第 22~36 行:定义脚本的主函数。使用 argparse 模块解析命令行参数,获取源目录、目标目录、空间分辨率和深度等参数值。
第 37 行:如果源目录为空,则打印帮助信息并退出程序。
第 38 行:使用 pathlib 库中的 Path 类,获取源目录的绝对路径。
第 39 行:如果源目录不存在,则使用日志记录器输出一条错误信息,并退出程序。
第 40 行:使用 pathlib 库中的 Path 类,构造目标目录的路径。若目标目录为空,则默认使用源目录路径加上后缀 "_intensity_normalized"。
第 41 行:获取空间分辨率和深度参数值。
第 42 行:创建两个进程,用于对医学图像进行加载和保存。
第 44~48 行:循环遍历医学图像,调用 normalize_intensity 函数对图像进行强度归一化处理。当加载进程队列为空时,跳出循环。
第 49 行:获取该图像在源目录中的相对路径。
第 50 行:如果需要修改空间分辨率,则将该信息添加到图像的元数据中。
第 51~53 行:每当保存进程队列的大小为 1 时,等待一段时间,以防止队列溢出。
第 54~56 行:构造目标目录中的路径,删除文件名中的 cb 前缀,并将归一化后的医学图像保存到该路径中。
第 57 行:使用保存进程队列结束保存进程。
阅读全文