def dae_mq_get_consumer(self, data_type: str, num: int): vin = self.get_robot_variable("GLOBAL", "VIN") env = self.get_robot_variable("GLOBAL", "项目名称") config = MQ_CONFIG[env] assert data_type in config['topics'], KeyError(f"validation of {data_type} is not supported!") if data_type in self.consumers: return self.consumers[data_type] else: consumer = DAEKafkaConsumer( vin=vin, servers=config.get('servers', None), cluster=config.get('cluster', str()), project=config.get('project', str()), ) consumer.subscribe(topic=config['topics'][data_type], num=num) self.consumers[data_type] = consumer # consumer.seek_to_offset(topic=config['topics'][data_type]) # consumer.seek_to('begin') return consumer
时间: 2024-03-04 19:51:01 浏览: 42
ColladaLoader_sourcecode_1_1.zip_DAE_opengl dae_vc dae
这段代码是关于获取 Kafka 消费者的,接受三个参数,分别是数据类型、数量和一些配置信息。其中,数据类型必须是配置信息中的合法 topic,否则会抛出异常。如果已经存在该数据类型的消费者,则直接返回该消费者实例;否则,新建一个 DAEKafkaConsumer 类的实例,并订阅对应的 topic,最后将这个实例保存在 consumers 字典中,并返回该实例。
阅读全文