定义一个字典类:Dictclass。完成下面的功能: 1.del_dict(key):删除某个key, 如果没有key则返回“not found” 2.has_dict(key):判断某个键是否在字典里,如果在返回键对应的值,不存在则返回"not found" 3.get_keys():返回键组成的列表,返回类型为tuple 4.update_dict(dict2):将该对象与字典dict2的values合并,返回全部values组成的列表,返回类型为list 测试如下: dict1=Dictclass({'A':1,'B':0.1,'C','C'}) dict1.del_dict('A') dict1.del_dict('a') dict1.has_dict('B') dict1.has_dict('b') dict1.get_keys() dict1.update_dict({'D':5,'E':10}) 代码中不用try,excrpt
时间: 2023-06-12 09:02:10 浏览: 186
class Dictclass:
def __init__(self, my_dict):
self.my_dict = my_dict
def del_dict(self, key):
if key in self.my_dict:
del self.my_dict[key]
else:
return "not found"
def has_dict(self, key):
if key in self.my_dict:
return self.my_dict[key]
else:
return "not found"
def get_keys(self):
return tuple(self.my_dict.keys())
def update_dict(self, dict2):
self.my_dict.update(dict2)
return list(self.my_dict.values())
相关问题
给下列代码加注释: def merge_accumulate_client_update(self, list_num_proc, list_state_dict, lr): total_num_proc = sum(list_num_proc) # merged_state_dict = dict() dict_keys = list_state_dict[0].keys() for state_dict in list_state_dict[1:]: assert state_dict.keys() == dict_keys # accumulate extra sgrad and remove from state_dict if self.use_adaptive and self.is_adj_round(): prefix = "extra." for state_dict in list_state_dict: del_list = [] for key, param in state_dict.items(): if key[:len(prefix)] == prefix: sgrad_key = key[len(prefix):] mask_0 = self.model.get_mask_by_name(sgrad_key) == 0. dense_sgrad = torch.zeros_like(mask_0, dtype=torch.float) dense_sgrad.masked_scatter_(mask_0, param) # no need to divide by lr self.control.accumulate(sgrad_key, dense_sgrad) del_list.append(key) for del_key in del_list: del state_dict[del_key]
```python
def merge_accumulate_client_update(self, list_num_proc, list_state_dict, lr):
total_num_proc = sum(list_num_proc)
# merged_state_dict = dict()
dict_keys = list_state_dict[0].keys()
# Check if all state dicts have the same keys
for state_dict in list_state_dict[1:]:
assert state_dict.keys() == dict_keys
# accumulate extra sgrad and remove from state_dict
if self.use_adaptive and self.is_adj_round():
prefix = "extra."
for state_dict in list_state_dict:
del_list = []
for key, param in state_dict.items():
# Check if the key starts with 'extra.'
if key[:len(prefix)] == prefix:
# Get the corresponding sgrad key
sgrad_key = key[len(prefix):]
# Create a mask of zeroes
mask_0 = self.model.get_mask_by_name(sgrad_key) == 0.
# Create a dense tensor and fill it with values from param based on the mask
dense_sgrad = torch.zeros_like(mask_0, dtype=torch.float)
dense_sgrad.masked_scatter_(mask_0, param)
# Accumulate the dense sgrad without dividing by lr
self.control.accumulate(sgrad_key, dense_sgrad)
# Add the key to the delete list
del_list.append(key)
# Remove the keys from the state_dict
for del_key in del_list:
del state_dict[del_key]
```
这段代码实现了一个`merge_accumulate_client_update`方法,主要功能是合并和累加`list_state_dict`中的状态字典。以下是对代码的注释:
- `total_num_proc`:所有进程数的总和。
- `dict_keys`:状态字典的键列表。
- 检查所有状态字典是否具有相同的键。
- 如果使用自适应且处于调整轮次,则累加额外的`sgrad`并从状态字典中删除。
- `prefix`:额外`sgrad`的前缀。
- 对于每个状态字典,遍历键和参数。
- 如果键以`prefix`开头,则获取相应的`sgrad`键。
- 创建一个零填充的掩码。
- 创建一个稠密张量,并根据掩码从参数中填充值。
- 累加不除以`lr`的稠密`sgrad`。
- 将键添加到删除列表。
- 从状态字典中删除键。
class UserCache(UserMixin): REDIS_PREFIX = f"{PROJECT_NAME}-user" def __init__(self, alternative_id, userid=None, username=None, homepage=None, permissions=None): self.alternative_id = alternative_id self.userid = userid self.username = username self.homepage = homepage self.permissions = permissions or [] def get_id(self): return self.alternative_id def add_to_cache(self): values = { "alternative_id": self.alternative_id, "userid": self.userid, "username": self.username, "homepage": self.homepage, "permissions": ",".join(self.permissions), } redis_key = f"{self.REDIS_PREFIX}:{self.alternative_id}" redis_client.hmset(redis_key, values) redis_client.expire(redis_key, SIGNIN_LIFETIME) def del_user_cache(self): redis_client.delete(f"{self.REDIS_PREFIX}:{self.alternative_id}") @classmethod def load_from_cache(cls, alternative_id): redis_key = f"{cls.REDIS_PREFIX}:{alternative_id}" user_cache_dict = redis_client.hgetall(redis_key) if user_cache_dict: user_cache_dict['permissions'] = user_cache_dict.get('permissions', '').split(",") return cls(**user_cache_dict) 优化并提出建议,python3.8
以下是对代码的一些优化和建议:
1. 在类定义中,将 REDIS_PREFIX 定义为类变量,而不是实例变量。这样可以提高代码的可读性,并且只需要在类定义中定义一次。
2. 使用 typing 模块来声明函数和方法的参数和返回值,以提高代码的可读性和可维护性。
3. 在 load_from_cache 方法中,使用 redis_client.hgetall 返回的字典对象的 items 方法来遍历键值对,而不是使用 for 循环和 get 方法。这样可以在一行代码中创建 user_cache_dict。
4. 使用 redis 的 pipeline 方法来优化 add_to_cache 方法,以减少与 redis 的通信次数。
优化后的代码如下所示:
```
from typing import List
class UserCache(UserMixin):
REDIS_PREFIX: str = f"{PROJECT_NAME}-user"
def __init__(self, alternative_id: str, userid: str = None, username: str = None, homepage: str = None, permissions: List[str] = None):
self.alternative_id = alternative_id
self.userid = userid
self.username = username
self.homepage = homepage
self.permissions = permissions or []
def get_id(self) -> str:
return self.alternative_id
def add_to_cache(self) -> None:
values = {
"alternative_id": self.alternative_id,
"userid": self.userid,
"username": self.username,
"homepage": self.homepage,
"permissions": ",".join(self.permissions),
}
redis_key = f"{self.REDIS_PREFIX}:{self.alternative_id}"
with redis_client.pipeline() as pipe:
pipe.hmset(redis_key, values)
pipe.expire(redis_key, SIGNIN_LIFETIME)
pipe.execute()
def del_user_cache(self) -> None:
redis_client.delete(f"{self.REDIS_PREFIX}:{self.alternative_id}")
@classmethod
def load_from_cache(cls, alternative_id: str) -> 'UserCache':
redis_key = f"{cls.REDIS_PREFIX}:{alternative_id}"
user_cache_dict = dict(redis_client.hgetall(redis_key).items())
if user_cache_dict:
user_cache_dict['permissions'] = user_cache_dict.get('permissions', '').split(",")
return cls(**user_cache_dict)
```
阅读全文