试写一个协程类
时间: 2023-08-15 22:03:49 浏览: 44
好的,下面是一个简单的协程类的示例:
```python
import asyncio
class MyCoroutine:
def __init__(self, name):
self.name = name
self.is_running = False
async def run(self):
self.is_running = True
print(f"{self.name} started running.")
while self.is_running:
print(f"{self.name} is running.")
await asyncio.sleep(1)
print(f"{self.name} stopped running.")
def stop(self):
self.is_running = False
```
在上面的代码中,我们定义了一个名为 MyCoroutine 的协程类。在类的构造函数中,我们传入了一个名称参数,并且定义了一个 is_running 实例变量,用于表示协程是否正在运行。
类中定义了一个 run() 方法,使用 async/await 语法来定义协程函数。在函数内部,我们使用一个 while 循环来模拟协程的运行过程,并且在循环中使用 asyncio.sleep() 函数来模拟协程的耗时操作。
类中还定义了一个 stop() 方法,用于停止协程的运行。在该方法中,我们将 is_running 变量设为 False,以便让协程退出 while 循环。
下面是使用该协程类的示例代码:
```python
async def main():
coro1 = MyCoroutine("Coroutine 1")
task1 = asyncio.create_task(coro1.run())
await asyncio.sleep(5)
coro2 = MyCoroutine("Coroutine 2")
task2 = asyncio.create_task(coro2.run())
await asyncio.sleep(5)
coro1.stop()
coro2.stop()
await task1
await task2
asyncio.run(main())
```
在上面的代码中,我们定义了一个名为 main() 的异步函数,用于启动协程任务。在该函数中,我们创建了两个 MyCoroutine 实例,并且使用 asyncio.create_task() 函数来创建协程任务。然后,我们等待一段时间后,停止了这两个协程的运行,并且等待任务完成。
运行该代码后,会输出类似于下面的结果:
```
Coroutine 1 started running.
Coroutine 1 is running.
Coroutine 1 is running.
Coroutine 1 is running.
Coroutine 1 is running.
Coroutine 1 is running.
Coroutine 2 started running.
Coroutine 1 stopped running.
Coroutine 2 is running.
Coroutine 2 is running.
Coroutine 2 is running.
Coroutine 2 is running.
Coroutine 2 is running.
Coroutine 2 stopped running.
```