dolphinscheduler 使用python组件写一段循环调用SUB_PROCESS组件的代码

时间: 2023-04-11 19:02:03 浏览: 209
可以使用以下代码实现: ```python from dolphin.scheduler.api import process_definition from dolphin.scheduler.api import process_instance # 获取 SUB_PROCESS 组件的定义 sub_process_def = process_definition.get_process_definition_by_name("SUB_PROCESS") # 循环调用 SUB_PROCESS 组件 for i in range(10): sub_process_instance = process_instance.create_process_instance(sub_process_def["id"]) process_instance.start_process_instance(sub_process_instance["id"]) ``` 注意:以上代码仅供参考,具体实现可能需要根据实际情况进行调整。
相关问题

if name == "main": parser = argparse.ArgumentParser(description="Intensity Normalizer") parser.add_argument("-s", "--src", type=str, help="source directory.") parser.add_argument("-d", "--dst", type=str, help="destination directory.") parser.add_argument( "--mm_resolution", type=float, default=0.0, help="spatial resolution [mm].", ) parser.add_argument( "--depth", type=int, default=-1, help="depth of the maximum level to be explored. Defaults to unlimited.", ) args = parser.parse_args() if args.src is None: parser.print_help() exit(0) root_src_dir: Path = Path(args.src).resolve() if not root_src_dir.is_dir(): logger.error("Indicate valid virectory path.") exit() root_dst_dir = Path( args.dst or str(root_src_dir) + "_intensity_normalized" ) mm_resolution = float(args.mm_resolution) depth = int(args.depth) volume_loading_queue = Queue() volume_loading_process = Process( target=volume_loading_func, args=(root_src_dir, root_dst_dir, depth, volume_loading_queue, logger), ) volume_loading_process.start() volume_saving_queue = Queue() volume_saving_process = Process( target=volume_saving_func, args=(volume_saving_queue, logger), ) volume_saving_process.start() while True: ( volume_path, np_volume, volume_info, ) = volume_loading_queue.get() if volume_path is None: break relative_path = volume_path.relative_to(root_src_dir) np_volume = normalize_intensity(np_volume, relative_path, logger) if mm_resolution != 0: volume_info.update({"mm_resolution": mm_resolution}) while volume_saving_queue.qsize() == 1: pass dst_path = Path( root_dst_dir, re.sub(r"cb\d{3}$", "", str(relative_path)) ) volume_saving_queue.put( (dst_path, root_dst_dir, np_volume, volume_info) ) volume_saving_queue.put((None, None, None, None))请完整详细的解释每一行的代码意思

这段代码是一个 Python 程序,主要用于进行图像的强度归一化操作。下面是每一行代码的详细解释。 ```python if name == "main": ``` 这是 Python 程序的入口,判断是否为主程序。 ```python parser = argparse.ArgumentParser(description="Intensity Normalizer") parser.add_argument("-s", "--src", type=str, help="source directory.") parser.add_argument("-d", "--dst", type=str, help="destination directory.") parser.add_argument("--mm_resolution", type=float, default=0.0, help="spatial resolution [mm].") parser.add_argument("--depth", type=int, default=-1, help="depth of the maximum level to be explored. Defaults to unlimited.") args = parser.parse_args() ``` 这段代码定义了命令行选项,用于接收程序运行时传入的参数。其中 `-s` 选项指定输入文件夹路径,`-d` 选项指定输出文件夹路径,`--mm_resolution` 选项指定空间分辨率,`--depth` 选项指定文件夹遍历的深度。然后使用 `parse_args()` 方法解析参数并将结果存放在 `args` 对象中。 ```python if args.src is None: parser.print_help() exit(0) ``` 判断是否输入了输入文件夹路径,如果没有则打印帮助信息并退出程序。 ```python root_src_dir: Path = Path(args.src).resolve() if not root_src_dir.is_dir(): logger.error("Indicate valid virectory path.") exit() ``` 将输入文件夹路径转换成 `Path` 对象,并使用 `resolve()` 方法去除路径中的符号链接,然后判断路径是否存在且是一个文件夹,如果不是则输出错误信息并退出程序。 ```python root_dst_dir = Path(args.dst or str(root_src_dir) + "_intensity_normalized") mm_resolution = float(args.mm_resolution) depth = int(args.depth) ``` 如果指定了输出文件夹路径,则将其转换成 `Path` 对象,否则根据输入文件夹路径生成一个默认输出文件夹路径。然后将空间分辨率和文件夹遍历深度转换成对应的数据类型。 ```python volume_loading_queue = Queue() volume_loading_process = Process(target=volume_loading_func, args=(root_src_dir, root_dst_dir, depth, volume_loading_queue, logger)) volume_loading_process.start() ``` 使用 `multiprocessing` 模块创建一个进程,其中 `volume_loading_func` 是一个函数,主要用于加载图像数据并放入消息队列中。这里将输入文件夹路径、输出文件夹路径、文件夹遍历深度、消息队列和日志记录器作为参数传入进程。 ```python volume_saving_queue = Queue() volume_saving_process = Process(target=volume_saving_func, args=(volume_saving_queue, logger)) volume_saving_process.start() ``` 同样使用 `multiprocessing` 模块创建另一个进程,其中 `volume_saving_func` 是一个函数,主要用于将归一化后的图像数据保存到输出文件夹中。这里只需将消息队列和日志记录器作为参数传入进程即可。 ```python while True: (volume_path, np_volume, volume_info,) = volume_loading_queue.get() if volume_path is None: break relative_path = volume_path.relative_to(root_src_dir) np_volume = normalize_intensity(np_volume, relative_path, logger) if mm_resolution != 0: volume_info.update({"mm_resolution": mm_resolution}) while volume_saving_queue.qsize() == 1: pass dst_path = Path(root_dst_dir, re.sub(r"cb\d{3}$", "", str(relative_path))) volume_saving_queue.put((dst_path, root_dst_dir, np_volume, volume_info)) volume_saving_queue.put((None, None, None, None)) ``` 进入无限循环,从消息队列中获取从 `volume_loading_func` 函数中发来的消息。如果消息内容中的 `volume_path` 为 `None`,则表示所有图像数据已经处理完毕,退出循环。否则,从 `volume_path` 中提取相对输入文件夹路径并调用 `normalize_intensity` 函数对图像数据进行归一化处理,并更新 `volume_info` 中的空间分辨率。然后等待消息队列中只有一个等待处理的消息,将处理后的图像数据和相应的数据信息发送到 `volume_saving_func` 函数中进行保存。最后向消息队列中发送一个空消息以表示图像数据处理完毕。 整个程序实现了对文件夹中的图像进行强度归一化操作,并支持指定空间分辨率和文件夹遍历深度等功能。

import argparse import logging import re from multiprocessing import Process, Queue from pathlib import Path import numpy as np from skimage import exposure, filters from modules.config import logger from modules.volume import volume_loading_func, volume_saving_func def normalize_intensity( np_volume: np.ndarray, relative_path: Path, logger: logging.Logger ): logger.info(f"[processing start] {relative_path}") nstack = len(np_volume) stack: np.ndarray = np_volume[nstack // 2 - 16 : nstack // 2 + 16] hist_y, hist_x = exposure.histogram(stack[stack > 0]) thr = filters.threshold_otsu(stack[stack > 0]) peak_air = np.argmax(hist_y[hist_x < thr]) + hist_x[0] peak_soil = np.argmax(hist_y[hist_x > thr]) + (thr - hist_x[0]) + hist_x[0] np_volume = np_volume.astype(np.int64) for i in range(len(np_volume)): np_volume[i] = ( (np_volume[i] - peak_air).clip(0) / (peak_soil - peak_air) * 256 / 2 ) logger.info(f"[processing end] {relative_path}") return exposure.rescale_intensity( np_volume, in_range=(0, 255), out_range=(0, 255) ).astype(np.uint8) if name == "main": parser = argparse.ArgumentParser(description="Intensity Normalizer") parser.add_argument("-s", "--src", type=str, help="source directory.") parser.add_argument("-d", "--dst", type=str, help="destination directory.") parser.add_argument( "--mm_resolution", type=float, default=0.0, help="spatial resolution [mm].", ) parser.add_argument( "--depth", type=int, default=-1, help="depth of the maximum level to be explored. Defaults to unlimited.", ) args = parser.parse_args() if args.src is None: parser.print_help() exit(0) root_src_dir: Path = Path(args.src).resolve() if not root_src_dir.is_dir(): logger.error("Indicate valid virectory path.") exit() root_dst_dir = Path( args.dst or str(root_src_dir) + "_intensity_normalized" ) mm_resolution = float(args.mm_resolution) depth = int(args.depth) volume_loading_queue = Queue() volume_loading_process = Process( target=volume_loading_func, args=(root_src_dir, root_dst_dir, depth, volume_loading_queue, logger), ) volume_loading_process.start() volume_saving_queue = Queue() volume_saving_process = Process( target=volume_saving_func, args=(volume_saving_queue, logger), ) volume_saving_process.start() while True: ( volume_path, np_volume, volume_info, ) = volume_loading_queue.get() if volume_path is None: break relative_path = volume_path.relative_to(root_src_dir) np_volume = normalize_intensity(np_volume, relative_path, logger) if mm_resolution != 0: volume_info.update({"mm_resolution": mm_resolution}) while volume_saving_queue.qsize() == 1: pass dst_path = Path( root_dst_dir, re.sub(r"cb\d{3}$", "", str(relative_path)) ) volume_saving_queue.put( (dst_path, root_dst_dir, np_volume, volume_info) ) volume_saving_queue.put((None, None, None, None))完整详细的解释每一行的代码意思和作用

这段代码是一个 Python 脚本,用于对医学图像进行强度归一化处理。以下是每行代码的详细解释: 第 1 行:导入必要的模块。 第 3 行:定义一个函数 normalize_intensity,用于对给定的医学图像进行强度归一化处理。该函数接受三个参数:np_volume(一个 NumPy 数组,表示医学图像数据)、relative_path(一个 Path 对象,表示该数据在文件系统中的路径)、logger(用于记录日志的 Logger 对象)。 第 4 行:使用日志记录器输出一条日志,表示开始处理医学图像。 第 5 行:获取图像堆栈的数量 nstack。 第 6 行:将图像堆栈中居中的 16 张图像提取出来,作为参考图像,用于计算阈值。 第 7 行:使用 skimage 库中的 exposure.histogram 函数计算图像堆栈中所有非空像素的直方图 hist_y 和 hist_x。 第 8 行:使用 skimage 库中的 filters.threshold_otsu 函数计算一个全局阈值 thr。 第 9 行:计算阈值下面的像素的直方图峰值 peak_air。 第 10 行:计算阈值上面的像素的直方图峰值 peak_soil。 第 11 行:计算从阈值到图像范围最小值的距离 hist_x[0]。 第 12 行:将医学图像数据转换为 int64 类型。 第 13~17 行:循环遍历所有图像,对每个像素进行强度归一化处理,得到新的像素值。 第 18 行:使用 skimage 库中的 exposure.rescale_intensity 函数对强度归一化后的图像进行重新缩放,得到范围在 [0, 255] 内的 uint8 类型像素值。 第 19 行:使用日志记录器输出一条日志,表示完成医学图像处理。 第 20 行:返回经过强度归一化和重新缩放后的医学图像数据。 第 22~36 行:定义脚本的主函数。使用 argparse 模块解析命令行参数,获取源目录、目标目录、空间分辨率和深度等参数值。 第 37 行:如果源目录为空,则打印帮助信息并退出程序。 第 38 行:使用 pathlib 库中的 Path 类,获取源目录的绝对路径。 第 39 行:如果源目录不存在,则使用日志记录器输出一条错误信息,并退出程序。 第 40 行:使用 pathlib 库中的 Path 类,构造目标目录的路径。若目标目录为空,则默认使用源目录路径加上后缀 "_intensity_normalized"。 第 41 行:获取空间分辨率和深度参数值。 第 42 行:创建两个进程,用于对医学图像进行加载和保存。 第 44~48 行:循环遍历医学图像,调用 normalize_intensity 函数对图像进行强度归一化处理。当加载进程队列为空时,跳出循环。 第 49 行:获取该图像在源目录中的相对路径。 第 50 行:如果需要修改空间分辨率,则将该信息添加到图像的元数据中。 第 51~53 行:每当保存进程队列的大小为 1 时,等待一段时间,以防止队列溢出。 第 54~56 行:构造目标目录中的路径,删除文件名中的 cb 前缀,并将归一化后的医学图像保存到该路径中。 第 57 行:使用保存进程队列结束保存进程。
阅读全文

相关推荐

最新推荐

recommend-type

使用python执行shell脚本 并动态传参 及subprocess的使用详解

标题和描述中提到的知识点主要涉及使用Python执行shell脚本并动态传递参数,以及Python的`subprocess`模块的使用。`subprocess`模块是Python内置的一个强大的子进程管理工具,能够方便地创建和管理子进程,同时还能...
recommend-type

STM32之光敏电阻模拟路灯自动开关灯代码固件

这是一个STM32模拟天黑天亮自动开关灯代码固件,使用了0.96寸OLED屏幕显示文字,例程亲测可用,视频示例可B站搜索 285902929
recommend-type

PHP在线工具箱源码站长引流+在线工具箱源码+多款有趣的在线工具+一键安装

PHP在线工具箱源码站长引流+在线工具箱源码+多款有趣的在线工具+一键安装 测试环境:nginx+php5.6+mysql5.5 安装说明:上传后访问安装即可
recommend-type

简化填写流程:Annoying Form Completer插件

资源摘要信息:"Annoying Form Completer-crx插件" Annoying Form Completer是一个针对Google Chrome浏览器的扩展程序,其主要功能是帮助用户自动填充表单中的强制性字段。对于经常需要在线填写各种表单的用户来说,这是一个非常实用的工具,因为它可以节省大量时间,并减少因重复输入相同信息而产生的烦恼。 该扩展程序的描述中提到了用户在填写表格时遇到的麻烦——必须手动输入那些恼人的强制性字段。这些字段可能包括但不限于用户名、邮箱地址、电话号码等个人信息,以及各种密码、确认密码等重复性字段。Annoying Form Completer的出现,使这一问题得到了缓解。通过该扩展,用户可以在表格填充时减少到“一个压力……或两个”,意味着极大的方便和效率提升。 值得注意的是,描述中也使用了“抽浏览器”的表述,这可能意味着该扩展具备某种数据提取或自动化填充的机制,虽然这个表述不是一个标准的技术术语,它可能暗示该扩展程序能够从用户之前的行为或者保存的信息中提取必要数据并自动填充到表单中。 虽然该扩展程序具有很大的便利性,但用户在使用时仍需谨慎,因为自动填充个人信息涉及到隐私和安全问题。理想情况下,用户应该只在信任的网站上使用这种类型的扩展程序,并确保扩展程序是从可靠的来源获取,以避免潜在的安全风险。 根据【压缩包子文件的文件名称列表】中的信息,该扩展的文件名为“Annoying_Form_Completer.crx”。CRX是Google Chrome扩展的文件格式,它是一种压缩的包格式,包含了扩展的所有必要文件和元数据。用户可以通过在Chrome浏览器中访问chrome://extensions/页面,开启“开发者模式”,然后点击“加载已解压的扩展程序”按钮来安装CRX文件。 在标签部分,我们看到“扩展程序”这一关键词,它明确了该资源的性质——这是一个浏览器扩展。扩展程序通常是通过增加浏览器的功能或提供额外的服务来增强用户体验的小型软件包。这些程序可以极大地简化用户的网上活动,从保存密码、拦截广告到自定义网页界面等。 总结来看,Annoying Form Completer作为一个Google Chrome的扩展程序,提供了一个高效的解决方案,帮助用户自动化处理在线表单的填写过程,从而提高效率并减少填写表单时的麻烦。在享受便捷的同时,用户也应确保使用扩展程序时的安全性和隐私性。
recommend-type

管理建模和仿真的文件

管理Boualem Benatallah引用此版本:布阿利姆·贝纳塔拉。管理建模和仿真。约瑟夫-傅立叶大学-格勒诺布尔第一大学,1996年。法语。NNT:电话:00345357HAL ID:电话:00345357https://theses.hal.science/tel-003453572008年12月9日提交HAL是一个多学科的开放存取档案馆,用于存放和传播科学研究论文,无论它们是否被公开。论文可以来自法国或国外的教学和研究机构,也可以来自公共或私人研究中心。L’archive ouverte pluridisciplinaire
recommend-type

CC-LINK远程IO模块AJ65SBTB1现场应用指南:常见问题快速解决

# 摘要 CC-LINK远程IO模块作为一种工业通信技术,为自动化和控制系统提供了高效的数据交换和设备管理能力。本文首先概述了CC-LINK远程IO模块的基础知识,接着详细介绍了其安装与配置流程,包括硬件的物理连接和系统集成要求,以及软件的参数设置与优化。为应对潜在的故障问题,本文还提供了故障诊断与排除的方法,并探讨了故障解决的实践案例。在高级应用方面,文中讲述了如何进行编程与控制,以及如何实现系统扩展与集成。最后,本文强调了CC-LINK远程IO模块的维护与管理的重要性,并对未来技术发展趋势进行了展望。 # 关键字 CC-LINK远程IO模块;系统集成;故障诊断;性能优化;编程与控制;维护
recommend-type

使用MATLAB写一个在柱坐标系中实现以下功能的代码:1) 生成具有损耗的平面电磁波模型;2) 调整电场分量Ex和Ey的幅度和相位,以仿真三种极化的形成?

在MATLAB中,处理电磁学问题通常需要利用`physconst`、`polar2cartesian`等函数库。以下是一个简化的示例,展示了如何生成一个基本的平面电磁波模型,并调整电场分量的幅度和相位。请注意,实际的损耗模型通常会涉及到复杂的阻抗和吸收系数,这里我们将简化为理想情况。 ```matlab % 初始化必要的物理常数 c = physconst('LightSpeed'); % 光速 omega = 2*pi * 5e9; % 角频率 (例如 GHz) eps0 = physconst('PermittivityOfFreeSpace'); % 真空介电常数 % 定义网格参数
recommend-type

TeraData技术解析与应用

资源摘要信息: "TeraData是一个高性能、高可扩展性的数据仓库和数据库管理系统,它支持大规模的数据存储和复杂的数据分析处理。TeraData的产品线主要面向大型企业级市场,提供多种数据仓库解决方案,包括并行数据仓库和云数据仓库等。由于其强大的分析能力和出色的处理速度,TeraData被广泛应用于银行、电信、制造、零售和其他需要处理大量数据的行业。TeraData系统通常采用MPP(大规模并行处理)架构,这意味着它可以通过并行处理多个计算任务来显著提高性能和吞吐量。" 由于提供的信息中描述部分也是"TeraData",且没有详细的内容,所以无法进一步提供关于该描述的详细知识点。而标签和压缩包子文件的文件名称列表也没有提供更多的信息。 在讨论TeraData时,我们可以深入了解以下几个关键知识点: 1. **MPP架构**:TeraData使用大规模并行处理(MPP)架构,这种架构允许系统通过大量并行运行的处理器来分散任务,从而实现高速数据处理。在MPP系统中,数据通常分布在多个节点上,每个节点负责一部分数据的处理工作,这样能够有效减少数据传输的时间,提高整体的处理效率。 2. **并行数据仓库**:TeraData提供并行数据仓库解决方案,这是针对大数据环境优化设计的数据库架构。它允许同时对数据进行读取和写入操作,同时能够支持对大量数据进行高效查询和复杂分析。 3. **数据仓库与BI**:TeraData系统经常与商业智能(BI)工具结合使用。数据仓库可以收集和整理来自不同业务系统的数据,BI工具则能够帮助用户进行数据分析和决策支持。TeraData的数据仓库解决方案提供了一整套的数据分析工具,包括但不限于ETL(抽取、转换、加载)工具、数据挖掘工具和OLAP(在线分析处理)功能。 4. **云数据仓库**:除了传统的本地部署解决方案,TeraData也在云端提供了数据仓库服务。云数据仓库通常更灵活、更具可伸缩性,可根据用户的需求动态调整资源分配,同时降低了企业的运维成本。 5. **高可用性和扩展性**:TeraData系统设计之初就考虑了高可用性和可扩展性。系统可以通过增加更多的处理节点来线性提升性能,同时提供了多种数据保护措施以保证数据的安全和系统的稳定运行。 6. **优化与调优**:对于数据仓库而言,性能优化是一个重要的环节。TeraData提供了一系列的优化工具和方法,比如SQL调优、索引策略和执行计划分析等,来帮助用户优化查询性能和提高数据访问效率。 7. **行业应用案例**:在金融、电信、制造等行业中,TeraData可以处理海量的交易数据、客户信息和业务数据,它在欺诈检测、客户关系管理、供应链优化等关键业务领域发挥重要作用。 8. **集成与兼容性**:TeraData系统支持与多种不同的业务应用和工具进行集成。它也遵循行业标准,能够与其他数据源、分析工具和应用程序无缝集成,为用户提供一致的用户体验。 以上便是关于TeraData的知识点介绍。由于文件描述内容重复且过于简略,未能提供更深层次的介绍,如果需要进一步详细的知识,建议参考TeraData官方文档或相关技术文章以获取更多的专业信息。
recommend-type

"互动学习:行动中的多样性与论文攻读经历"

多样性她- 事实上SCI NCES你的时间表ECOLEDO C Tora SC和NCESPOUR l’Ingén学习互动,互动学习以行动为中心的强化学习学会互动,互动学习,以行动为中心的强化学习计算机科学博士论文于2021年9月28日在Villeneuve d'Asq公开支持马修·瑟林评审团主席法布里斯·勒菲弗尔阿维尼翁大学教授论文指导奥利维尔·皮耶昆谷歌研究教授:智囊团论文联合主任菲利普·普雷教授,大学。里尔/CRISTAL/因里亚报告员奥利维耶·西格德索邦大学报告员卢多维奇·德诺耶教授,Facebook /索邦大学审查员越南圣迈IMT Atlantic高级讲师邀请弗洛里安·斯特鲁布博士,Deepmind对于那些及时看到自己错误的人...3谢谢你首先,我要感谢我的两位博士生导师Olivier和Philippe。奥利维尔,"站在巨人的肩膀上"这句话对你来说完全有意义了。从科学上讲,你知道在这篇论文的(许多)错误中,你是我可以依
recommend-type

CC-LINK远程IO模块在环境监控中的应用:技术与案例探讨

![CC-LINK](https://www.mitsubishielectric.com/fa/products/cnt/plcnet/pmerit/cclink_ie/concept/img/main_img.jpg) # 摘要 CC-LINK远程IO模块作为一种先进的工业通信技术,在环境监控系统中具有广泛应用。本文首先概述了CC-LINK远程IO模块的基本概念及其在环境监控系统中的基础理论,包括硬件组成、软件架构及技术优势。随后,详细介绍了其在实时监控与远程控制、系统集成与配置、安全维护方面的具体实践应用。案例分析部分深入探讨了CC-LINK模块在不同环境监控场景中的应用效果与技术解决