diff --git a/deepmd/pd/entrypoints/main.py b/deepmd/pd/entrypoints/main.py index 0cfe610fb5..f397bc358b 100644 --- a/deepmd/pd/entrypoints/main.py +++ b/deepmd/pd/entrypoints/main.py @@ -574,7 +574,7 @@ def change_bias( log.info(f"Saved model to {output_path}") -def main(args: list[str] | argparse.Namespace | None = None): +def main(args: list[str] | argparse.Namespace | None = None) -> None: if not isinstance(args, argparse.Namespace): FLAGS = parse_args(args=args) else: diff --git a/deepmd/pd/infer/deep_eval.py b/deepmd/pd/infer/deep_eval.py index 67c435ab3f..6c0ffed7ec 100644 --- a/deepmd/pd/infer/deep_eval.py +++ b/deepmd/pd/infer/deep_eval.py @@ -474,7 +474,7 @@ def _eval_model( fparam: np.ndarray | None, aparam: np.ndarray | None, request_defs: list[OutputVariableDef], - ): + ) -> tuple[np.ndarray, ...]: if not self.static_model: model = self.dp.to(DEVICE) prec = NP_PRECISION_DICT[RESERVED_PRECISION_DICT[GLOBAL_PD_FLOAT_PRECISION]] diff --git a/deepmd/pd/infer/inference.py b/deepmd/pd/infer/inference.py index ae1b8e8516..2ab3d1b0e2 100644 --- a/deepmd/pd/infer/inference.py +++ b/deepmd/pd/infer/inference.py @@ -23,9 +23,9 @@ class Tester: def __init__( self, - model_ckpt, - head=None, - ): + model_ckpt: str, + head: str | None = None, + ) -> None: """Construct a DeePMD tester. Args: diff --git a/deepmd/pd/loss/ener.py b/deepmd/pd/loss/ener.py index 81d2589844..3f069feb9d 100644 --- a/deepmd/pd/loss/ener.py +++ b/deepmd/pd/loss/ener.py @@ -1,4 +1,7 @@ # SPDX-License-Identifier: LGPL-3.0-or-later +from typing import ( + Any, +) import paddle import paddle.nn.functional as F @@ -20,7 +23,9 @@ ) -def custom_huber_loss(predictions, targets, delta=1.0): +def custom_huber_loss( + predictions: paddle.Tensor, targets: paddle.Tensor, delta: float = 1.0 +) -> paddle.Tensor: error = targets - predictions abs_error = paddle.abs(error) quadratic_loss = 0.5 * paddle.pow(error, 2) @@ -32,13 +37,13 @@ def custom_huber_loss(predictions, targets, delta=1.0): class EnergyStdLoss(TaskLoss): def __init__( self, - starter_learning_rate=1.0, - start_pref_e=0.0, - limit_pref_e=0.0, - start_pref_f=0.0, - limit_pref_f=0.0, - start_pref_v=0.0, - limit_pref_v=0.0, + starter_learning_rate: float = 1.0, + start_pref_e: float = 0.0, + limit_pref_e: float = 0.0, + start_pref_f: float = 0.0, + limit_pref_f: float = 0.0, + start_pref_v: float = 0.0, + limit_pref_v: float = 0.0, start_pref_ae: float = 0.0, limit_pref_ae: float = 0.0, start_pref_pf: float = 0.0, @@ -49,10 +54,10 @@ def __init__( limit_pref_gf: float = 0.0, numb_generalized_coord: int = 0, use_l1_all: bool = False, - inference=False, - use_huber=False, - huber_delta=0.01, - **kwargs, + inference: bool = False, + use_huber: bool = False, + huber_delta: float = 0.01, + **kwargs: Any, ) -> None: r"""Construct a layer to compute loss on energy, force and virial. @@ -146,7 +151,15 @@ def __init__( "Huber loss is not implemented for force with atom_pref, generalized force and relative force. " ) - def forward(self, input_dict, model, label, natoms, learning_rate, mae=False): + def forward( + self, + input_dict: dict[str, paddle.Tensor], + model: paddle.nn.Layer, + label: dict[str, paddle.Tensor], + natoms: int, + learning_rate: float, + mae: bool = False, + ) -> tuple[dict[str, paddle.Tensor], paddle.Tensor, dict[str, paddle.Tensor]]: """Return loss on energy and force. Parameters @@ -535,10 +548,10 @@ def deserialize(cls, data: dict) -> "TaskLoss": class EnergyHessianStdLoss(EnergyStdLoss): def __init__( self, - start_pref_h=0.0, - limit_pref_h=0.0, - **kwargs, - ): + start_pref_h: float = 0.0, + limit_pref_h: float = 0.0, + **kwargs: Any, + ) -> None: r"""Enable the layer to compute loss on hessian. Parameters @@ -556,7 +569,15 @@ def __init__( self.start_pref_h = start_pref_h self.limit_pref_h = limit_pref_h - def forward(self, input_dict, model, label, natoms, learning_rate, mae=False): + def forward( + self, + input_dict: dict[str, paddle.Tensor], + model: paddle.nn.Module, + label: dict[str, paddle.Tensor], + natoms: int, + learning_rate: float, + mae: bool = False, + ) -> tuple[dict[str, paddle.Tensor], paddle.Tensor, dict[str, paddle.Tensor]]: model_pred, loss, more_loss = super().forward( input_dict, model, label, natoms, learning_rate, mae=mae ) diff --git a/deepmd/pd/loss/loss.py b/deepmd/pd/loss/loss.py index f825f9ff61..0b6d7f7e46 100644 --- a/deepmd/pd/loss/loss.py +++ b/deepmd/pd/loss/loss.py @@ -3,6 +3,9 @@ ABC, abstractmethod, ) +from typing import ( + Any, +) import paddle @@ -15,11 +18,19 @@ class TaskLoss(paddle.nn.Layer, ABC, make_plugin_registry("loss")): - def __init__(self, **kwargs): + def __init__(self, **kwargs: Any) -> None: """Construct loss.""" super().__init__() - def forward(self, input_dict, model, label, natoms, learning_rate): + def forward( + self, + input_dict: dict[str, paddle.Tensor], + model: paddle.nn.Module, + label: dict[str, paddle.Tensor], + natoms: int, + learning_rate: float, + mae: bool | None = None, + ) -> paddle.Tensor: """Return loss .""" raise NotImplementedError diff --git a/deepmd/pd/model/atomic_model/base_atomic_model.py b/deepmd/pd/model/atomic_model/base_atomic_model.py index 87cb18f6fc..39c55ce8ed 100644 --- a/deepmd/pd/model/atomic_model/base_atomic_model.py +++ b/deepmd/pd/model/atomic_model/base_atomic_model.py @@ -578,7 +578,7 @@ def _default_bias(self) -> paddle.Tensor: device=device ) - def _default_std(self): + def _default_std(self) -> paddle.Tensor: ntypes = self.get_ntypes() return paddle.ones([self.n_out, ntypes, self.max_out_size], dtype=dtype).to( device=device @@ -626,7 +626,7 @@ def _store_out_stat( paddle.assign(out_bias_data, self.out_bias) paddle.assign(out_std_data, self.out_std) - def get_ntypes(self): + def get_ntypes(self) -> int: return len(self.type_map) def get_buffer_ntypes(self) -> paddle.Tensor: diff --git a/deepmd/pd/model/atomic_model/dp_atomic_model.py b/deepmd/pd/model/atomic_model/dp_atomic_model.py index c5aa8b8a56..dc8d82830a 100644 --- a/deepmd/pd/model/atomic_model/dp_atomic_model.py +++ b/deepmd/pd/model/atomic_model/dp_atomic_model.py @@ -385,7 +385,7 @@ def compute_or_load_stat( stat_file_path /= " ".join(self.type_map) @functools.lru_cache - def wrapped_sampler(): + def wrapped_sampler() -> list[dict]: sampled = sampled_func() if self.pair_excl is not None: pair_exclude_types = self.pair_excl.get_exclude_types() diff --git a/deepmd/pd/model/atomic_model/energy_atomic_model.py b/deepmd/pd/model/atomic_model/energy_atomic_model.py index 708ec9db7f..f36bdd3873 100644 --- a/deepmd/pd/model/atomic_model/energy_atomic_model.py +++ b/deepmd/pd/model/atomic_model/energy_atomic_model.py @@ -1,4 +1,14 @@ # SPDX-License-Identifier: LGPL-3.0-or-later +from typing import ( + Any, +) + +from deepmd.pd.model.descriptor.base_descriptor import ( + BaseDescriptor, +) +from deepmd.pd.model.task.base_fitting import ( + BaseFitting, +) from deepmd.pd.model.task.ener import ( EnergyFittingNet, InvarFitting, @@ -10,7 +20,13 @@ class DPEnergyAtomicModel(DPAtomicModel): - def __init__(self, descriptor, fitting, type_map, **kwargs): + def __init__( + self, + descriptor: BaseDescriptor, + fitting: BaseFitting, + type_map: list[str], + **kwargs: Any, + ) -> None: assert isinstance(fitting, EnergyFittingNet) or isinstance( fitting, InvarFitting ) diff --git a/deepmd/pd/model/descriptor/descriptor.py b/deepmd/pd/model/descriptor/descriptor.py index d4ca4bc151..53c7f4dc1a 100644 --- a/deepmd/pd/model/descriptor/descriptor.py +++ b/deepmd/pd/model/descriptor/descriptor.py @@ -8,6 +8,7 @@ Callable, ) from typing import ( + Any, NoReturn, ) @@ -43,7 +44,7 @@ class DescriptorBlock(paddle.nn.Layer, ABC, make_plugin_registry("DescriptorBloc local_cluster = False - def __new__(cls, *args, **kwargs): + def __new__(cls, *args: Any, **kwargs: Any) -> "DescriptorBlock": if cls is DescriptorBlock: try: descrpt_type = kwargs["type"] @@ -126,7 +127,9 @@ def get_stats(self) -> dict[str, StatItem]: """Get the statistics of the descriptor.""" raise NotImplementedError - def share_params(self, base_class, shared_level, resume=False) -> None: + def share_params( + self, base_class: Any, shared_level: int, resume: bool = False + ) -> None: """ Share the parameters of self to the base_class with shared_level during multitask training. If not start from checkpoint (resume is False), @@ -180,7 +183,7 @@ def forward( extended_atype_embd: paddle.Tensor | None = None, mapping: paddle.Tensor | None = None, type_embedding: paddle.Tensor | None = None, - ): + ) -> paddle.Tensor: """Calculate DescriptorBlock.""" pass @@ -194,14 +197,16 @@ def need_sorted_nlist_for_lower(self) -> bool: def make_default_type_embedding( - ntypes, -): + ntypes: int, +) -> tuple[TypeEmbedNet, dict]: aux = {} aux["tebd_dim"] = 8 return TypeEmbedNet(ntypes, aux["tebd_dim"]), aux -def extend_descrpt_stat(des, type_map, des_with_stat=None) -> None: +def extend_descrpt_stat( + des: Any, type_map: list[str], des_with_stat: Any = None +) -> None: r""" Extend the statistics of a descriptor block with types from newly provided `type_map`. diff --git a/deepmd/pd/model/descriptor/dpa1.py b/deepmd/pd/model/descriptor/dpa1.py index 65baa8daa6..ca286ba945 100644 --- a/deepmd/pd/model/descriptor/dpa1.py +++ b/deepmd/pd/model/descriptor/dpa1.py @@ -2,6 +2,9 @@ from collections.abc import ( Callable, ) +from typing import ( + Any, +) import paddle @@ -228,8 +231,8 @@ def __init__( exclude_types: list[tuple[int, int]] = [], env_protection: float = 0.0, scaling_factor: int = 1.0, - normalize=True, - temperature=None, + normalize: bool = True, + temperature: float | None = None, concat_output_tebd: bool = True, trainable: bool = True, trainable_ln: bool = True, @@ -242,7 +245,7 @@ def __init__( use_tebd_bias: bool = False, type_map: list[str] | None = None, # not implemented - spin=None, + spin: Any = None, type: str | None = None, ) -> None: super().__init__() @@ -397,7 +400,9 @@ def get_env_protection(self) -> float: """Returns the protection of building environment matrix.""" return self.se_atten.get_env_protection() - def share_params(self, base_class, shared_level, resume=False) -> None: + def share_params( + self, base_class: Any, shared_level: int, resume: bool = False + ) -> None: """ Share the parameters of self to the base_class with shared_level during multitask training. If not start from checkpoint (resume is False), @@ -425,18 +430,18 @@ def share_params(self, base_class, shared_level, resume=False) -> None: raise NotImplementedError @property - def dim_out(self): + def dim_out(self) -> int: return self.get_dim_out() @property - def dim_emb(self): + def dim_emb(self) -> int: return self.get_dim_emb() def compute_input_stats( self, merged: Callable[[], list[dict]] | list[dict], path: DPPath | None = None, - ): + ) -> None: """ Compute the input statistics (e.g. mean and stddev) for the descriptors from packed data. @@ -469,7 +474,7 @@ def get_stat_mean_and_stddev(self) -> tuple[paddle.Tensor, paddle.Tensor]: return self.se_atten.mean, self.se_atten.stddev def change_type_map( - self, type_map: list[str], model_with_new_type_stat=None + self, type_map: list[str], model_with_new_type_stat: Any = None ) -> None: """Change the type related params to new ones, according to `type_map` and the original one in the model. If there are new types in `type_map`, statistics will be updated accordingly to `model_with_new_type_stat` for these new types. @@ -569,7 +574,7 @@ def deserialize(cls, data: dict) -> "DescrptDPA1": data["use_tebd_bias"] = True obj = cls(**data) - def t_cvt(xx): + def t_cvt(xx: Any) -> paddle.Tensor: return paddle.to_tensor(xx, dtype=obj.se_atten.prec).to(device=env.DEVICE) obj.type_embedding.embedding = TypeEmbedNetConsistent.deserialize( @@ -620,7 +625,7 @@ def forward( nlist: paddle.Tensor, mapping: paddle.Tensor | None = None, comm_dict: list[paddle.Tensor] | None = None, - ): + ) -> paddle.Tensor: """Compute the descriptor. Parameters diff --git a/deepmd/pd/model/descriptor/dpa2.py b/deepmd/pd/model/descriptor/dpa2.py index 645ad2f2fe..0969c1acae 100644 --- a/deepmd/pd/model/descriptor/dpa2.py +++ b/deepmd/pd/model/descriptor/dpa2.py @@ -2,6 +2,9 @@ from collections.abc import ( Callable, ) +from typing import ( + Any, +) import paddle @@ -149,7 +152,7 @@ def __init__( """ super().__init__() - def init_subclass_params(sub_data, sub_class): + def init_subclass_params(sub_data: dict | Any, sub_class: type) -> Any: if isinstance(sub_data, dict): return sub_class(**sub_data) elif isinstance(sub_data, sub_class): @@ -400,7 +403,9 @@ def get_env_protection(self) -> float: # the env_protection of repinit is the same as that of the repformer return self.repinit.get_env_protection() - def share_params(self, base_class, shared_level, resume=False) -> None: + def share_params( + self, base_class: Any, shared_level: int, resume: bool = False + ) -> None: """ Share the parameters of self to the base_class with shared_level during multitask training. If not start from checkpoint (resume is False), @@ -436,7 +441,7 @@ def share_params(self, base_class, shared_level, resume=False) -> None: raise NotImplementedError def change_type_map( - self, type_map: list[str], model_with_new_type_stat=None + self, type_map: list[str], model_with_new_type_stat: Any = None ) -> None: """Change the type related params to new ones, according to `type_map` and the original one in the model. If there are new types in `type_map`, statistics will be updated accordingly to `model_with_new_type_stat` for these new types. @@ -491,11 +496,11 @@ def change_type_map( repinit_three_body["dstd"] = repinit_three_body["dstd"][remap_index] @property - def dim_out(self): + def dim_out(self) -> int: return self.get_dim_out() @property - def dim_emb(self): + def dim_emb(self) -> int: """Returns the embedding dimension g2.""" return self.get_dim_emb() @@ -672,7 +677,7 @@ def deserialize(cls, data: dict) -> "DescrptDPA2": if obj.repinit.dim_out != obj.repformers.dim_in: obj.g1_shape_tranform = MLPLayer.deserialize(g1_shape_tranform) - def t_cvt(xx): + def t_cvt(xx: Any) -> paddle.Tensor: return paddle.to_tensor(xx, dtype=obj.repinit.prec, place=env.DEVICE) # deserialize repinit @@ -727,7 +732,7 @@ def forward( nlist: paddle.Tensor, mapping: paddle.Tensor | None = None, comm_dict: list[paddle.Tensor] | None = None, - ): + ) -> paddle.Tensor: """Compute the descriptor. Parameters diff --git a/deepmd/pd/model/descriptor/dpa3.py b/deepmd/pd/model/descriptor/dpa3.py index 205829860f..bdf7464c27 100644 --- a/deepmd/pd/model/descriptor/dpa3.py +++ b/deepmd/pd/model/descriptor/dpa3.py @@ -2,6 +2,9 @@ from collections.abc import ( Callable, ) +from typing import ( + Any, +) import paddle @@ -120,7 +123,7 @@ def __init__( ) -> None: super().__init__() - def init_subclass_params(sub_data, sub_class): + def init_subclass_params(sub_data: dict | Any, sub_class: type) -> Any: if isinstance(sub_data, dict): return sub_class(**sub_data) elif isinstance(sub_data, sub_class): @@ -302,7 +305,9 @@ def get_env_protection(self) -> float: """Returns the protection of building environment matrix.""" return self.repflows.get_env_protection() - def share_params(self, base_class, shared_level, resume=False) -> None: + def share_params( + self, base_class: Any, shared_level: int, resume: bool = False + ) -> None: """ Share the parameters of self to the base_class with shared_level during multitask training. If not start from checkpoint (resume is False), @@ -330,7 +335,7 @@ def share_params(self, base_class, shared_level, resume=False) -> None: raise NotImplementedError def change_type_map( - self, type_map: list[str], model_with_new_type_stat=None + self, type_map: list[str], model_with_new_type_stat: Any = None ) -> None: """Change the type related params to new ones, according to `type_map` and the original one in the model. If there are new types in `type_map`, statistics will be updated accordingly to `model_with_new_type_stat` for these new types. @@ -359,11 +364,11 @@ def change_type_map( repflow["dstd"] = repflow["dstd"][remap_index] @property - def dim_out(self): + def dim_out(self) -> int: return self.get_dim_out() @property - def dim_emb(self): + def dim_emb(self) -> int: """Returns the embedding dimension g2.""" return self.get_dim_emb() @@ -463,7 +468,7 @@ def deserialize(cls, data: dict) -> "DescrptDPA3": type_embedding ) - def t_cvt(xx): + def t_cvt(xx: Any) -> paddle.Tensor: return paddle.to_tensor(xx, dtype=obj.repflows.prec, place=env.DEVICE) # deserialize repflow @@ -488,7 +493,7 @@ def forward( nlist: paddle.Tensor, mapping: paddle.Tensor | None = None, comm_dict: list[paddle.Tensor] | None = None, - ): + ) -> paddle.Tensor: """Compute the descriptor. Parameters diff --git a/deepmd/pd/model/descriptor/env_mat.py b/deepmd/pd/model/descriptor/env_mat.py index 2cc23fcadf..8e3fd121e2 100644 --- a/deepmd/pd/model/descriptor/env_mat.py +++ b/deepmd/pd/model/descriptor/env_mat.py @@ -9,14 +9,14 @@ def _make_env_mat( - nlist, - coord, + nlist: paddle.Tensor, + coord: paddle.Tensor, rcut: float, ruct_smth: float, radial_only: bool = False, protection: float = 0.0, use_exp_switch: bool = False, -): +) -> tuple[paddle.Tensor, paddle.Tensor, paddle.Tensor]: """Make smooth environment matrix.""" bsz, natoms, nnei = nlist.shape coord = coord.reshape([bsz, -1, 3]) @@ -49,17 +49,17 @@ def _make_env_mat( def prod_env_mat( - extended_coord, - nlist, - atype, - mean, - stddev, + extended_coord: paddle.Tensor, + nlist: paddle.Tensor, + atype: paddle.Tensor, + mean: paddle.Tensor, + stddev: paddle.Tensor, rcut: float, rcut_smth: float, radial_only: bool = False, protection: float = 0.0, use_exp_switch: bool = False, -): +) -> tuple[paddle.Tensor, paddle.Tensor, paddle.Tensor]: """Generate smooth environment matrix from atom coordinates and other context. Args: diff --git a/deepmd/pd/model/descriptor/repflow_layer.py b/deepmd/pd/model/descriptor/repflow_layer.py index eca1659b67..e7264c14f5 100644 --- a/deepmd/pd/model/descriptor/repflow_layer.py +++ b/deepmd/pd/model/descriptor/repflow_layer.py @@ -714,7 +714,7 @@ def forward( a_sw: paddle.Tensor, # switch func, nf x nloc x a_nnei edge_index: paddle.Tensor, # 2 x n_edge angle_index: paddle.Tensor, # 3 x n_angle - ): + ) -> tuple[paddle.Tensor, paddle.Tensor, paddle.Tensor]: """ Parameters ---------- diff --git a/deepmd/pd/model/descriptor/repflows.py b/deepmd/pd/model/descriptor/repflows.py index 21b5984ef3..d40e6b3648 100644 --- a/deepmd/pd/model/descriptor/repflows.py +++ b/deepmd/pd/model/descriptor/repflows.py @@ -56,15 +56,15 @@ if not ENABLE_CUSTOMIZED_OP: def border_op( - argument0, - argument1, - argument2, - argument3, - argument4, - argument5, - argument6, - argument7, - argument8, + argument0: paddle.Tensor, + argument1: paddle.Tensor, + argument2: paddle.Tensor, + argument3: paddle.Tensor, + argument4: paddle.Tensor, + argument5: paddle.Tensor, + argument6: paddle.Tensor, + argument7: paddle.Tensor, + argument8: paddle.Tensor, ) -> paddle.Tensor: raise NotImplementedError( "The 'border_op' operator is unavailable because the custom Paddle OP library was not built when freezing the model.\n" @@ -192,11 +192,11 @@ class DescrptBlockRepflows(DescriptorBlock): def __init__( self, - e_rcut, - e_rcut_smth, + e_rcut: float, + e_rcut_smth: float, e_sel: int, - a_rcut, - a_rcut_smth, + a_rcut: float, + a_rcut_smth: float, a_sel: int, ntypes: int, nlayers: int = 6, @@ -394,7 +394,7 @@ def get_dim_emb(self) -> int: """Returns the embedding dimension e_dim.""" return self.e_dim - def __setitem__(self, key, value) -> None: + def __setitem__(self, key: str, value: paddle.Tensor) -> None: if key in ("avg", "data_avg", "davg"): self.mean = value elif key in ("std", "data_std", "dstd"): @@ -402,7 +402,7 @@ def __setitem__(self, key, value) -> None: else: raise KeyError(key) - def __getitem__(self, key): + def __getitem__(self, key: str) -> paddle.Tensor: if key in ("avg", "data_avg", "davg"): return self.mean elif key in ("std", "data_std", "dstd"): @@ -427,17 +427,17 @@ def get_env_protection(self) -> float: return self.env_protection @property - def dim_out(self): + def dim_out(self) -> int: """Returns the output dimension of this descriptor.""" return self.n_dim @property - def dim_in(self): + def dim_in(self) -> int: """Returns the atomic input dimension of this descriptor.""" return self.n_dim @property - def dim_emb(self): + def dim_emb(self) -> int: """Returns the embedding dimension e_dim.""" return self.get_dim_emb() @@ -456,7 +456,7 @@ def forward( extended_atype_embd: paddle.Tensor | None = None, mapping: paddle.Tensor | None = None, comm_dict: list[paddle.Tensor] | None = None, - ): + ) -> paddle.Tensor: parallel_mode = comm_dict is not None if not parallel_mode: if paddle.in_dynamic_mode(): diff --git a/deepmd/pd/model/descriptor/repformer_layer.py b/deepmd/pd/model/descriptor/repformer_layer.py index efb1881d59..57d619c00a 100644 --- a/deepmd/pd/model/descriptor/repformer_layer.py +++ b/deepmd/pd/model/descriptor/repformer_layer.py @@ -591,12 +591,12 @@ def deserialize(cls, data: dict) -> "LocalAtten": class RepformerLayer(paddle.nn.Layer): def __init__( self, - rcut, - rcut_smth, + rcut: float, + rcut_smth: float, sel: int, ntypes: int, - g1_dim=128, - g2_dim=16, + g1_dim: int = 128, + g2_dim: int = 16, axis_neuron: int = 4, update_chnnl_2: bool = True, update_g1_has_conv: bool = True, @@ -1147,7 +1147,7 @@ def forward( nlist: paddle.Tensor, # nf x nloc x nnei nlist_mask: paddle.Tensor, # nf x nloc x nnei sw: paddle.Tensor, # switch func, nf x nloc x nnei - ): + ) -> tuple[paddle.Tensor, paddle.Tensor, paddle.Tensor]: """ Parameters ---------- diff --git a/deepmd/pd/model/descriptor/repformers.py b/deepmd/pd/model/descriptor/repformers.py index 24f92f1bee..1c07761e6b 100644 --- a/deepmd/pd/model/descriptor/repformers.py +++ b/deepmd/pd/model/descriptor/repformers.py @@ -53,15 +53,15 @@ if not ENABLE_CUSTOMIZED_OP: def border_op( - argument0, - argument1, - argument2, - argument3, - argument4, - argument5, - argument6, - argument7, - argument8, + argument0: paddle.Tensor, + argument1: paddle.Tensor, + argument2: paddle.Tensor, + argument3: paddle.Tensor, + argument4: paddle.Tensor, + argument5: paddle.Tensor, + argument6: paddle.Tensor, + argument7: paddle.Tensor, + argument8: paddle.Tensor, ) -> paddle.Tensor: raise NotImplementedError( "The 'border_op' operator is unavailable because the custom Paddle OP library was not built when freezing the model.\n" @@ -80,13 +80,13 @@ def border_op( class DescrptBlockRepformers(DescriptorBlock): def __init__( self, - rcut, - rcut_smth, + rcut: float, + rcut_smth: float, sel: int, ntypes: int, nlayers: int = 3, - g1_dim=128, - g2_dim=16, + g1_dim: int = 128, + g2_dim: int = 16, axis_neuron: int = 4, direct_dist: bool = False, update_g1_has_conv: bool = True, @@ -359,7 +359,7 @@ def get_dim_emb(self) -> int: """Returns the embedding dimension g2.""" return self.g2_dim - def __setitem__(self, key, value) -> None: + def __setitem__(self, key: str, value: paddle.Tensor) -> None: if key in ("avg", "data_avg", "davg"): self.mean = value elif key in ("std", "data_std", "dstd"): @@ -367,7 +367,7 @@ def __setitem__(self, key, value) -> None: else: raise KeyError(key) - def __getitem__(self, key): + def __getitem__(self, key: str) -> paddle.Tensor: if key in ("avg", "data_avg", "davg"): return self.mean elif key in ("std", "data_std", "dstd"): @@ -392,17 +392,17 @@ def get_env_protection(self) -> float: return self.env_protection @property - def dim_out(self): + def dim_out(self) -> int: """Returns the output dimension of this descriptor.""" return self.g1_dim @property - def dim_in(self): + def dim_in(self) -> int: """Returns the atomic input dimension of this descriptor.""" return self.g1_dim @property - def dim_emb(self): + def dim_emb(self) -> int: """Returns the embedding dimension g2.""" return self.get_dim_emb() @@ -422,7 +422,7 @@ def forward( mapping: paddle.Tensor | None = None, type_embedding: paddle.Tensor | None = None, comm_dict: list[paddle.Tensor] | None = None, - ): + ) -> paddle.Tensor: if (comm_dict is None or len(comm_dict) == 0) and paddle.in_dynamic_mode(): assert mapping is not None assert extended_atype_embd is not None diff --git a/deepmd/pd/model/descriptor/se_a.py b/deepmd/pd/model/descriptor/se_a.py index 17ef5d67c4..63b54626c6 100644 --- a/deepmd/pd/model/descriptor/se_a.py +++ b/deepmd/pd/model/descriptor/se_a.py @@ -71,11 +71,11 @@ class DescrptSeA(BaseDescriptor, paddle.nn.Layer): def __init__( self, - rcut, - rcut_smth, - sel, - neuron=[25, 50, 100], - axis_neuron=16, + rcut: float, + rcut_smth: float, + sel: int | list[int], + neuron: list[int] = [25, 50, 100], + axis_neuron: int = 16, set_davg_zero: bool = False, activation_function: str = "tanh", precision: str = "float64", @@ -88,7 +88,7 @@ def __init__( ntypes: int | None = None, # to be compat with input type_map: list[str] | None = None, # not implemented - spin=None, + spin: object = None, ) -> None: del ntypes if spin is not None: @@ -171,7 +171,7 @@ def get_dim_emb(self) -> int: """Returns the output dimension.""" return self.sea.get_dim_emb() - def mixed_types(self): + def mixed_types(self) -> bool: """Returns if the descriptor requires a neighbor list that distinguish different atomic types or not. """ @@ -189,7 +189,9 @@ def get_env_protection(self) -> float: """Returns the protection of building environment matrix.""" return self.sea.get_env_protection() - def share_params(self, base_class, shared_level, resume=False) -> None: + def share_params( + self, base_class: object, shared_level: int, resume: bool = False + ) -> None: """ Share the parameters of self to the base_class with shared_level during multitask training. If not start from checkpoint (resume is False), @@ -208,12 +210,12 @@ def share_params(self, base_class, shared_level, resume=False) -> None: raise NotImplementedError @property - def dim_out(self): + def dim_out(self) -> int: """Returns the output dimension of this descriptor.""" return self.sea.dim_out def change_type_map( - self, type_map: list[str], model_with_new_type_stat=None + self, type_map: list[str], model_with_new_type_stat: object = None ) -> None: """Change the type related params to new ones, according to `type_map` and the original one in the model. If there are new types in `type_map`, statistics will be updated accordingly to `model_with_new_type_stat` for these new types. @@ -228,7 +230,7 @@ def compute_input_stats( self, merged: Callable[[], list[dict]] | list[dict], path: DPPath | None = None, - ): + ) -> None: """ Compute the input statistics (e.g. mean and stddev) for the descriptors from packed data. @@ -286,7 +288,7 @@ def forward( nlist: paddle.Tensor, mapping: paddle.Tensor | None = None, comm_dict: list[paddle.Tensor] | None = None, - ): + ) -> paddle.Tensor: """Compute the descriptor. Parameters @@ -389,7 +391,7 @@ def deserialize(cls, data: dict) -> "DescrptSeA": env_mat = data.pop("env_mat") obj = cls(**data) - def t_cvt(xx): + def t_cvt(xx: np.ndarray) -> paddle.Tensor: return paddle.to_tensor(xx, dtype=obj.sea.prec).to(device=env.DEVICE) obj.sea["davg"] = t_cvt(variables["davg"]) @@ -436,11 +438,11 @@ class DescrptBlockSeA(DescriptorBlock): def __init__( self, - rcut, - rcut_smth, - sel, - neuron=[25, 50, 100], - axis_neuron=16, + rcut: float, + rcut_smth: float, + sel: int | list[int], + neuron: list[int] = [25, 50, 100], + axis_neuron: int = 16, set_davg_zero: bool = False, activation_function: str = "tanh", precision: str = "float64", @@ -450,7 +452,7 @@ def __init__( type_one_side: bool = True, trainable: bool = True, seed: int | list[int] | None = None, - **kwargs, + **kwargs: object, ) -> None: """Construct an embedding net of type `se_a`. @@ -596,7 +598,7 @@ def get_env_protection(self) -> float: return self.env_protection @property - def dim_out(self): + def dim_out(self) -> int: """Returns the output dimension of this descriptor.""" return self.filter_neuron[-1] * self.axis_neuron @@ -605,7 +607,7 @@ def dim_in(self) -> int: """Returns the atomic input dimension of this descriptor.""" return 0 - def __setitem__(self, key, value) -> None: + def __setitem__(self, key: str, value: paddle.Tensor) -> None: if key in ("avg", "data_avg", "davg"): self.mean = value elif key in ("std", "data_std", "dstd"): @@ -613,7 +615,7 @@ def __setitem__(self, key, value) -> None: else: raise KeyError(key) - def __getitem__(self, key): + def __getitem__(self, key: str) -> paddle.Tensor: if key in ("avg", "data_avg", "davg"): return self.mean elif key in ("std", "data_std", "dstd"): @@ -725,7 +727,7 @@ def forward( extended_atype_embd: paddle.Tensor | None = None, mapping: paddle.Tensor | None = None, type_embedding: paddle.Tensor | None = None, - ): + ) -> paddle.Tensor: """Calculate decoded embedding for each atom. Args: @@ -763,7 +765,11 @@ def forward( # nfnl x nnei exclude_mask = self.emask(nlist, extended_atype).reshape([nfnl, self.nnei]) for embedding_idx, (ll, compress_data_ii, compress_info_ii) in enumerate( - zip(self.filter_layers.networks, self.compress_data, self.compress_info) + zip( + self.filter_layers.networks, + self.compress_data, + self.compress_info, + ) ): if self.type_one_side: ii = embedding_idx diff --git a/deepmd/pd/model/descriptor/se_atten.py b/deepmd/pd/model/descriptor/se_atten.py index 2c93c35ef8..33d8e8d4cf 100644 --- a/deepmd/pd/model/descriptor/se_atten.py +++ b/deepmd/pd/model/descriptor/se_atten.py @@ -65,12 +65,12 @@ def __init__( attn_layer: int = 2, attn_dotr: bool = True, attn_mask: bool = False, - activation_function="tanh", + activation_function: str = "tanh", precision: str = "float64", resnet_dt: bool = False, - scaling_factor=1.0, - normalize=True, - temperature=None, + scaling_factor: float = 1.0, + normalize: bool = True, + temperature: float | None = None, smooth: bool = True, type_one_side: bool = False, exclude_types: list[tuple[int, int]] = [], @@ -318,7 +318,7 @@ def get_dim_emb(self) -> int: """Returns the output dimension of embedding.""" return self.filter_neuron[-1] - def __setitem__(self, key, value) -> None: + def __setitem__(self, key: str, value: paddle.Tensor) -> None: if key in ("avg", "data_avg", "davg"): self.mean = value elif key in ("std", "data_std", "dstd"): @@ -326,7 +326,7 @@ def __setitem__(self, key, value) -> None: else: raise KeyError(key) - def __getitem__(self, key): + def __getitem__(self, key: str) -> paddle.Tensor: if key in ("avg", "data_avg", "davg"): return self.mean elif key in ("std", "data_std", "dstd"): @@ -351,17 +351,17 @@ def get_env_protection(self) -> float: return self.env_protection @property - def dim_out(self): + def dim_out(self) -> int: """Returns the output dimension of this descriptor.""" return self.filter_neuron[-1] * self.axis_neuron @property - def dim_in(self): + def dim_in(self) -> int: """Returns the atomic input dimension of this descriptor.""" return self.tebd_dim @property - def dim_emb(self): + def dim_emb(self) -> int: """Returns the output dimension of embedding.""" return self.get_dim_emb() @@ -428,10 +428,10 @@ def reinit_exclude( def enable_compression( self, - table_data, - table_config, - lower, - upper, + table_data: dict[str, paddle.Tensor], + table_config: list[float], + lower: dict[str, float], + upper: dict[str, float], ) -> None: net = "filter_net" self.compress_info[0] = paddle.to_tensor( @@ -460,7 +460,9 @@ def forward( extended_atype_embd: paddle.Tensor | None = None, mapping: paddle.Tensor | None = None, type_embedding: paddle.Tensor | None = None, - ): + ) -> tuple[ + paddle.Tensor, paddle.Tensor | None, paddle.Tensor, paddle.Tensor, paddle.Tensor + ]: """Compute the descriptor. Parameters @@ -723,11 +725,11 @@ def __init__( def forward( self, - input_G, - nei_mask, + input_G: paddle.Tensor, + nei_mask: paddle.Tensor, input_r: paddle.Tensor | None = None, sw: paddle.Tensor | None = None, - ): + ) -> paddle.Tensor: """Compute the multi-layer gated self-attention. Parameters @@ -746,13 +748,13 @@ def forward( out = layer(out, nei_mask, input_r=input_r, sw=sw) return out - def __getitem__(self, key): + def __getitem__(self, key: int) -> nn.Layer: if isinstance(key, int): return self.attention_layers[key] else: raise TypeError(key) - def __setitem__(self, key, value) -> None: + def __setitem__(self, key: int, value: nn.Layer | dict) -> None: if not isinstance(key, int): raise TypeError(key) if isinstance(value, self.network_type): @@ -864,11 +866,11 @@ def __init__( def forward( self, - x, - nei_mask, + x: paddle.Tensor, + nei_mask: paddle.Tensor, input_r: paddle.Tensor | None = None, sw: paddle.Tensor | None = None, - ): + ) -> paddle.Tensor: residual = x x, _ = self.attention_layer(x, nei_mask, input_r=input_r, sw=sw) x = residual + x @@ -982,12 +984,12 @@ def __init__( def forward( self, - query, - nei_mask, + query: paddle.Tensor, + nei_mask: paddle.Tensor, input_r: paddle.Tensor | None = None, sw: paddle.Tensor | None = None, attnw_shift: float = 20.0, - ): + ) -> tuple[paddle.Tensor, paddle.Tensor]: """Compute the multi-head gated self-attention. Parameters diff --git a/deepmd/pd/model/descriptor/se_atten_v2.py b/deepmd/pd/model/descriptor/se_atten_v2.py index 8855fb3037..21019de59a 100644 --- a/deepmd/pd/model/descriptor/se_atten_v2.py +++ b/deepmd/pd/model/descriptor/se_atten_v2.py @@ -52,8 +52,8 @@ def __init__( exclude_types: list[tuple[int, int]] = [], env_protection: float = 0.0, scaling_factor: int = 1.0, - normalize=True, - temperature=None, + normalize: bool = True, + temperature: float | None = None, concat_output_tebd: bool = True, trainable: bool = True, trainable_ln: bool = True, @@ -65,7 +65,7 @@ def __init__( use_tebd_bias: bool = False, type_map: list[str] | None = None, # not implemented - spin=None, + spin: object = None, type: str | None = None, ) -> None: r"""Construct smooth version of embedding net of type `se_atten_v2`. @@ -253,7 +253,7 @@ def deserialize(cls, data: dict) -> "DescrptSeAttenV2": data["use_tebd_bias"] = True obj = cls(**data) - def t_cvt(xx): + def t_cvt(xx: object) -> paddle.Tensor: return paddle.to_tensor(xx, dtype=obj.se_atten.prec, place=env.DEVICE) obj.type_embedding.embedding = TypeEmbedNetConsistent.deserialize( diff --git a/deepmd/pd/model/descriptor/se_t_tebd.py b/deepmd/pd/model/descriptor/se_t_tebd.py index b0f409f2bb..64c988e57d 100644 --- a/deepmd/pd/model/descriptor/se_t_tebd.py +++ b/deepmd/pd/model/descriptor/se_t_tebd.py @@ -2,6 +2,9 @@ from collections.abc import ( Callable, ) +from typing import ( + Any, +) import paddle @@ -138,7 +141,7 @@ def __init__( type_map: list[str] | None = None, concat_output_tebd: bool = True, use_econf_tebd: bool = False, - use_tebd_bias=False, + use_tebd_bias: bool = False, smooth: bool = True, ) -> None: super().__init__() @@ -257,7 +260,9 @@ def get_env_protection(self) -> float: """Returns the protection of building environment matrix.""" return self.se_ttebd.get_env_protection() - def share_params(self, base_class, shared_level, resume=False) -> None: + def share_params( + self, base_class: object, shared_level: int, resume: bool = False + ) -> None: """ Share the parameters of self to the base_class with shared_level during multitask training. If not start from checkpoint (resume is False), @@ -285,18 +290,18 @@ def share_params(self, base_class, shared_level, resume=False) -> None: raise NotImplementedError @property - def dim_out(self): + def dim_out(self) -> int: return self.get_dim_out() @property - def dim_emb(self): + def dim_emb(self) -> int: return self.get_dim_emb() def compute_input_stats( self, merged: Callable[[], list[dict]] | list[dict], path: DPPath | None = None, - ): + ) -> None: """ Compute the input statistics (e.g. mean and stddev) for the descriptors from packed data. @@ -329,7 +334,7 @@ def get_stat_mean_and_stddev(self) -> tuple[paddle.Tensor, paddle.Tensor]: return self.se_ttebd.mean, self.se_ttebd.stddev def change_type_map( - self, type_map: list[str], model_with_new_type_stat=None + self, type_map: list[str], model_with_new_type_stat: Any | None = None ) -> None: """Change the type related params to new ones, according to `type_map` and the original one in the model. If there are new types in `type_map`, statistics will be updated accordingly to `model_with_new_type_stat` for these new types. @@ -409,7 +414,7 @@ def deserialize(cls, data: dict) -> "DescrptSeTTebd": embeddings_strip = None obj = cls(**data) - def t_cvt(xx): + def t_cvt(xx: paddle.Tensor) -> paddle.Tensor: return paddle.to_tensor(xx, dtype=obj.se_ttebd.prec).to(device=env.DEVICE) obj.type_embedding.embedding = TypeEmbedNetConsistent.deserialize( @@ -431,7 +436,7 @@ def forward( nlist: paddle.Tensor, mapping: paddle.Tensor | None = None, comm_dict: list[paddle.Tensor] | None = None, - ): + ) -> paddle.Tensor: """Compute the descriptor. Parameters @@ -539,7 +544,7 @@ def __init__( tebd_dim: int = 8, tebd_input_mode: str = "concat", set_davg_zero: bool = True, - activation_function="tanh", + activation_function: str = "tanh", precision: str = "float64", resnet_dt: bool = False, exclude_types: list[tuple[int, int]] = [], @@ -668,7 +673,7 @@ def get_dim_emb(self) -> int: """Returns the output dimension of embedding.""" return self.filter_neuron[-1] - def __setitem__(self, key, value) -> None: + def __setitem__(self, key: str, value: paddle.Tensor) -> None: if key in ("avg", "data_avg", "davg"): self.mean = value elif key in ("std", "data_std", "dstd"): @@ -676,7 +681,7 @@ def __setitem__(self, key, value) -> None: else: raise KeyError(key) - def __getitem__(self, key): + def __getitem__(self, key: str) -> paddle.Tensor: if key in ("avg", "data_avg", "davg"): return self.mean elif key in ("std", "data_std", "dstd"): @@ -701,17 +706,17 @@ def get_env_protection(self) -> float: return self.env_protection @property - def dim_out(self): + def dim_out(self) -> int: """Returns the output dimension of this descriptor.""" return self.filter_neuron[-1] @property - def dim_in(self): + def dim_in(self) -> int: """Returns the atomic input dimension of this descriptor.""" return self.tebd_dim @property - def dim_emb(self): + def dim_emb(self) -> int: """Returns the output dimension of embedding.""" return self.get_dim_emb() @@ -783,7 +788,7 @@ def forward( extended_atype_embd: paddle.Tensor | None = None, mapping: paddle.Tensor | None = None, type_embedding: paddle.Tensor | None = None, - ): + ) -> paddle.Tensor: """Compute the descriptor. Parameters diff --git a/deepmd/pd/model/model/__init__.py b/deepmd/pd/model/model/__init__.py index cd758add6d..8348ac039e 100644 --- a/deepmd/pd/model/model/__init__.py +++ b/deepmd/pd/model/model/__init__.py @@ -13,6 +13,9 @@ import copy import json +from typing import ( + Any, +) import numpy as np @@ -40,7 +43,9 @@ ) -def _get_standard_model_components(model_params, ntypes): +def _get_standard_model_components( + model_params: dict, ntypes: int +) -> tuple[BaseDescriptor, BaseFitting, str]: # descriptor model_params["descriptor"]["ntypes"] = ntypes model_params["descriptor"]["type_map"] = copy.deepcopy(model_params["type_map"]) @@ -63,7 +68,7 @@ def _get_standard_model_components(model_params, ntypes): return descriptor, fitting, fitting_net["type"] -def _can_be_converted_to_float(value): +def _can_be_converted_to_float(value: Any) -> bool: try: float(value) return True @@ -72,7 +77,9 @@ def _can_be_converted_to_float(value): return False -def _convert_preset_out_bias_to_array(preset_out_bias, type_map): +def _convert_preset_out_bias_to_array( + preset_out_bias: dict | None, type_map: list[str] +) -> dict | None: if preset_out_bias is not None: for kk in preset_out_bias: if len(preset_out_bias[kk]) != len(type_map): @@ -95,7 +102,7 @@ def _convert_preset_out_bias_to_array(preset_out_bias, type_map): return preset_out_bias -def get_standard_model(model_params): +def get_standard_model(model_params: dict) -> BaseModel: model_params_old = model_params model_params = copy.deepcopy(model_params) ntypes = len(model_params["type_map"]) @@ -126,7 +133,7 @@ def get_standard_model(model_params): return model -def get_model(model_params): +def get_model(model_params: dict) -> BaseModel: model_type = model_params.get("type", "standard") if model_type == "standard": return get_standard_model(model_params) diff --git a/deepmd/pd/model/model/dp_model.py b/deepmd/pd/model/model/dp_model.py index fe107263c4..cddbd34cfb 100644 --- a/deepmd/pd/model/model/dp_model.py +++ b/deepmd/pd/model/model/dp_model.py @@ -44,11 +44,11 @@ def update_sel( ) return local_jdata_cpy, min_nbor_dist - def get_fitting_net(self): + def get_fitting_net(self) -> object: """Get the fitting network.""" return self.atomic_model.fitting_net - def get_descriptor(self): + def get_descriptor(self) -> object: """Get the descriptor.""" return self.atomic_model.descriptor diff --git a/deepmd/pd/model/model/ener_model.py b/deepmd/pd/model/model/ener_model.py index 8111ae434d..3a57e79d3a 100644 --- a/deepmd/pd/model/model/ener_model.py +++ b/deepmd/pd/model/model/ener_model.py @@ -1,5 +1,9 @@ # SPDX-License-Identifier: LGPL-3.0-or-later +from typing import ( + Any, +) + import paddle from deepmd.pd.model.atomic_model import ( @@ -25,8 +29,8 @@ class EnergyModel(DPModelCommon, DPEnergyModel_): def __init__( self, - *args, - **kwargs, + *args: Any, + **kwargs: Any, ) -> None: DPModelCommon.__init__(self) DPEnergyModel_.__init__(self, *args, **kwargs) @@ -43,7 +47,7 @@ def get_buffer_type_map(self) -> paddle.Tensor: """ return super().get_buffer_type_map() - def translated_output_def(self): + def translated_output_def(self) -> dict: out_def_data = self.model_output_def().get_data() output_def = { "atom_energy": out_def_data["energy"], @@ -63,8 +67,8 @@ def translated_output_def(self): def forward( self, - coord, - atype, + coord: paddle.Tensor, + atype: paddle.Tensor, box: paddle.Tensor | None = None, fparam: paddle.Tensor | None = None, aparam: paddle.Tensor | None = None, @@ -105,15 +109,15 @@ def forward( def forward_lower( self, - extended_coord, - extended_atype, - nlist, + extended_coord: paddle.Tensor, + extended_atype: paddle.Tensor, + nlist: paddle.Tensor, mapping: paddle.Tensor | None = None, fparam: paddle.Tensor | None = None, aparam: paddle.Tensor | None = None, do_atomic_virial: bool = False, comm_dict: list[paddle.Tensor] | None = None, - ): + ) -> dict[str, paddle.Tensor]: model_ret = self.forward_common_lower( extended_coord, extended_atype, diff --git a/deepmd/pd/model/model/frozen.py b/deepmd/pd/model/model/frozen.py index f585cb1e24..365202dd6c 100644 --- a/deepmd/pd/model/model/frozen.py +++ b/deepmd/pd/model/model/frozen.py @@ -1,5 +1,9 @@ # SPDX-License-Identifier: LGPL-3.0-or-later import json +from typing import ( + Any, + NoReturn, +) import paddle @@ -24,7 +28,7 @@ class FrozenModel(BaseModel): The path to the frozen model """ - def __init__(self, model_file: str, **kwargs): + def __init__(self, model_file: str, **kwargs: Any) -> None: super().__init__(**kwargs) self.model_file = model_file if model_file.endswith(".json"): @@ -96,8 +100,8 @@ def need_sorted_nlist_for_lower(self) -> bool: def forward( self, - coord, - atype, + coord: paddle.Tensor, + atype: paddle.Tensor, box: paddle.Tensor | None = None, fparam: paddle.Tensor | None = None, aparam: paddle.Tensor | None = None, @@ -136,7 +140,7 @@ def serialize(self) -> dict: return model.serialize() @classmethod - def deserialize(cls, data: dict): + def deserialize(cls, data: dict) -> NoReturn: raise RuntimeError("Should not touch here.") def get_nnei(self) -> int: diff --git a/deepmd/pd/model/model/make_model.py b/deepmd/pd/model/model/make_model.py index 03aaf621f8..72811c9e1c 100644 --- a/deepmd/pd/model/model/make_model.py +++ b/deepmd/pd/model/model/make_model.py @@ -1,4 +1,10 @@ # SPDX-License-Identifier: LGPL-3.0-or-later +from collections.abc import ( + Callable, +) +from typing import ( + Any, +) import paddle @@ -36,7 +42,7 @@ ) -def make_model(T_AtomicModel: type[BaseAtomicModel]): +def make_model(T_AtomicModel: type[BaseAtomicModel]) -> type[BaseModel]: """Make a model as a derived class of an atomic model. The model provide two interfaces. @@ -62,10 +68,10 @@ def make_model(T_AtomicModel: type[BaseAtomicModel]): class CM(BaseModel): def __init__( self, - *args, + *args: Any, # underscore to prevent conflict with normal inputs atomic_model_: T_AtomicModel | None = None, - **kwargs, + **kwargs: Any, ) -> None: super().__init__(*args, **kwargs) if atomic_model_ is not None: @@ -77,7 +83,7 @@ def __init__( self.global_pd_float_precision = GLOBAL_PD_FLOAT_PRECISION self.global_pd_ener_float_precision = GLOBAL_PD_ENER_FLOAT_PRECISION - def model_output_def(self): + def model_output_def(self) -> ModelOutputDef: """Get the output def for the model.""" return ModelOutputDef(self.atomic_output_def()) @@ -124,8 +130,8 @@ def enable_compression( def forward_common( self, - coord, - atype, + coord: paddle.Tensor, + atype: paddle.Tensor, box: paddle.Tensor | None = None, fparam: paddle.Tensor | None = None, aparam: paddle.Tensor | None = None, @@ -201,8 +207,8 @@ def set_out_bias(self, out_bias: paddle.Tensor) -> None: def change_out_bias( self, - merged, - bias_adjust_mode="change-by-statistic", + merged: list[dict] | Callable[[], list[dict]], + bias_adjust_mode: str = "change-by-statistic", ) -> None: """Change the output bias of atomic model according to the input data and the pretrained model. @@ -230,16 +236,16 @@ def change_out_bias( def forward_common_lower( self, - extended_coord, - extended_atype, - nlist, + extended_coord: paddle.Tensor, + extended_atype: paddle.Tensor, + nlist: paddle.Tensor, mapping: paddle.Tensor | None = None, fparam: paddle.Tensor | None = None, aparam: paddle.Tensor | None = None, do_atomic_virial: bool = False, comm_dict: list[paddle.Tensor] | None = None, extra_nlist_sort: bool = False, - ): + ) -> dict[str, paddle.Tensor]: """Return model prediction. Lower interface that takes extended atomic coordinates and types, nlist, and mapping as input, and returns the predictions on the extended region. @@ -379,7 +385,7 @@ def format_nlist( extended_atype: paddle.Tensor, nlist: paddle.Tensor, extra_nlist_sort: bool = False, - ): + ) -> paddle.Tensor: """Format the neighbor list. 1. If the number of neighbors in the `nlist` is equal to sum(self.sel), @@ -430,7 +436,7 @@ def _format_nlist( nlist: paddle.Tensor, nnei: int, extra_nlist_sort: bool = False, - ): + ) -> paddle.Tensor: n_nf, n_nloc, n_nnei = nlist.shape # nf x nall x 3 extended_coord = extended_coord.reshape([n_nf, -1, 3]) @@ -498,7 +504,7 @@ def do_grad_c( return self.atomic_model.do_grad_c(var_name) def change_type_map( - self, type_map: list[str], model_with_new_type_stat=None + self, type_map: list[str], model_with_new_type_stat: "CM | None" = None ) -> None: """Change the type related params to new ones, according to `type_map` and the original one in the model. If there are new types in `type_map`, statistics will be updated accordingly to `model_with_new_type_stat` for these new types. @@ -514,10 +520,10 @@ def serialize(self) -> dict: return self.atomic_model.serialize() @classmethod - def deserialize(cls, data) -> "CM": + def deserialize(cls, data: dict) -> "CM": return cls(atomic_model_=T_AtomicModel.deserialize(data)) - def set_case_embd(self, case_idx: int): + def set_case_embd(self, case_idx: int) -> None: self.atomic_model.set_case_embd(case_idx) def get_dim_fparam(self) -> int: @@ -590,9 +596,9 @@ def atomic_output_def(self) -> FittingOutputDef: def compute_or_load_stat( self, - sampled_func, + sampled_func: Callable, stat_file_path: DPPath | None = None, - ): + ) -> None: """Compute or load the statistics.""" return self.atomic_model.compute_or_load_stat(sampled_func, stat_file_path) @@ -622,8 +628,8 @@ def need_sorted_nlist_for_lower(self) -> bool: def forward( self, - coord, - atype, + coord: paddle.Tensor, + atype: paddle.Tensor, box: paddle.Tensor | None = None, fparam: paddle.Tensor | None = None, aparam: paddle.Tensor | None = None, diff --git a/deepmd/pd/model/model/model.py b/deepmd/pd/model/model/model.py index 5027590a9e..9df3137beb 100644 --- a/deepmd/pd/model/model/model.py +++ b/deepmd/pd/model/model/model.py @@ -1,5 +1,13 @@ # SPDX-License-Identifier: LGPL-3.0-or-later +from collections.abc import ( + Callable, +) +from typing import ( + Any, + NoReturn, +) + import paddle from deepmd.dpmodel.model.base_model import ( @@ -11,7 +19,7 @@ class BaseModel(paddle.nn.Layer, make_base_model()): - def __init__(self, *args, **kwargs): + def __init__(self, *args: Any, **kwargs: Any) -> None: """Construct a basic model for different tasks.""" paddle.nn.Layer.__init__(self) self.model_def_script = "" @@ -19,9 +27,9 @@ def __init__(self, *args, **kwargs): def compute_or_load_stat( self, - sampled_func, + sampled_func: Callable, stat_file_path: DPPath | None = None, - ): + ) -> NoReturn: """ Compute or load the statistics parameters of the model, such as mean and standard deviation of descriptors or the energy bias of the fitting net. @@ -47,7 +55,7 @@ def get_min_nbor_dist(self) -> float | None: """Get the minimum distance between two atoms.""" return self.min_nbor_dist - def get_ntypes(self): + def get_ntypes(self) -> int: """Returns the number of element types.""" return len(self.get_type_map()) diff --git a/deepmd/pd/model/model/transform_output.py b/deepmd/pd/model/model/transform_output.py index 47004265c7..923ea7b01e 100644 --- a/deepmd/pd/model/model/transform_output.py +++ b/deepmd/pd/model/model/transform_output.py @@ -18,7 +18,7 @@ def atomic_virial_corr( extended_coord: paddle.Tensor, atom_energy: paddle.Tensor, -): +) -> paddle.Tensor: nall = extended_coord.shape[1] nloc = atom_energy.shape[1] coord, _ = paddle.split(extended_coord, [nloc, nall - nloc], axis=1) @@ -69,7 +69,7 @@ def task_deriv_one( do_virial: bool = True, do_atomic_virial: bool = False, create_graph: bool = True, -): +) -> tuple[paddle.Tensor, paddle.Tensor | None]: # faked_grad = paddle.ones_like(energy) # lst = paddle.jit.annotate(List[Optional[paddle.Tensor]], [faked_grad]) extended_force = paddle.autograd.grad( @@ -99,7 +99,7 @@ def task_deriv_one( def get_leading_dims( vv: paddle.Tensor, vdef: OutputVariableDef, -): +) -> list[int]: """Get the dimensions of nf x nloc.""" vshape = vv.shape return list(vshape[: (len(vshape) - len(vdef.shape))]) @@ -113,7 +113,7 @@ def take_deriv( do_virial: bool = False, do_atomic_virial: bool = False, create_graph: bool = True, -): +) -> tuple[paddle.Tensor, paddle.Tensor | None]: size = 1 for ii in vdef.shape: size *= ii diff --git a/deepmd/pd/model/network/init.py b/deepmd/pd/model/network/init.py index 83a16807d7..8d3cdd1756 100644 --- a/deepmd/pd/model/network/init.py +++ b/deepmd/pd/model/network/init.py @@ -26,19 +26,33 @@ PaddleGenerator = paddle.base.libpaddle.Generator -def _no_grad_uniform_(tensor: paddle.Tensor, a, b, generator=None): +def _no_grad_uniform_( + tensor: paddle.Tensor, a: float, b: float, generator: PaddleGenerator | None = None +) -> paddle.Tensor: with paddle.no_grad(): return tensor.uniform_(a, b) -def _no_grad_normal_(tensor: paddle.Tensor, mean, std, generator=None): +def _no_grad_normal_( + tensor: paddle.Tensor, + mean: float, + std: float, + generator: PaddleGenerator | None = None, +) -> paddle.Tensor: with paddle.no_grad(): return tensor.normal_(mean, std) -def _no_grad_trunc_normal_(tensor: paddle.Tensor, mean, std, a, b, generator=None): +def _no_grad_trunc_normal_( + tensor: paddle.Tensor, + mean: float, + std: float, + a: float, + b: float, + generator: PaddleGenerator | None = None, +) -> paddle.Tensor: # Method based on https://people.sc.fsu.edu/~jburkardt/presentations/truncated_normal.pdf - def norm_cdf(x): + def norm_cdf(x: float) -> float: # Computes standard normal cumulative distribution function return (1.0 + math.erf(x / math.sqrt(2.0))) / 2.0 @@ -73,17 +87,17 @@ def norm_cdf(x): return tensor -def _no_grad_zero_(tensor: paddle.Tensor): +def _no_grad_zero_(tensor: paddle.Tensor) -> paddle.Tensor: with paddle.no_grad(): return tensor.zero_() -def _no_grad_fill_(tensor: paddle.Tensor, val): +def _no_grad_fill_(tensor: paddle.Tensor, val: float) -> paddle.Tensor: with paddle.no_grad(): return tensor.fill_(val) -def calculate_gain(nonlinearity, param=None): +def calculate_gain(nonlinearity: str, param: float | None = None) -> float: r"""Return the recommended gain value for the given nonlinearity function. The values are as follows: @@ -154,7 +168,9 @@ def calculate_gain(nonlinearity, param=None): raise ValueError(f"Unsupported nonlinearity {nonlinearity}") -def _calculate_fan_in_and_fan_out(tensor, reverse=False): +def _calculate_fan_in_and_fan_out( + tensor: paddle.Tensor, reverse: bool = False +) -> tuple[int, int]: dimensions = tensor.ndim if dimensions < 2: raise ValueError( @@ -176,7 +192,9 @@ def _calculate_fan_in_and_fan_out(tensor, reverse=False): return fan_in, fan_out -def _calculate_correct_fan(tensor, mode, reverse=False): +def _calculate_correct_fan( + tensor: paddle.Tensor, mode: str, reverse: bool = False +) -> int: mode = mode.lower() valid_modes = ["fan_in", "fan_out"] if mode not in valid_modes: @@ -292,7 +310,7 @@ def kaiming_uniform_( nonlinearity: str = "leaky_relu", generator: PaddleGenerator | None = None, reverse: bool = False, -): +) -> Tensor: r"""Fill the input `Tensor` with values using a Kaiming uniform distribution. The method is described in `Delving deep into rectifiers: Surpassing @@ -342,7 +360,7 @@ def kaiming_normal_( nonlinearity: str = "leaky_relu", generator: PaddleGenerator | None = None, reverse: bool = False, -): +) -> Tensor: r"""Fill the input `Tensor` with values using a Kaiming normal distribution. The method is described in `Delving deep into rectifiers: Surpassing diff --git a/deepmd/pd/model/network/layernorm.py b/deepmd/pd/model/network/layernorm.py index d3d00beacd..932248fd01 100644 --- a/deepmd/pd/model/network/layernorm.py +++ b/deepmd/pd/model/network/layernorm.py @@ -27,14 +27,14 @@ device = env.DEVICE -def empty_t(shape, precision): +def empty_t(shape: list[int], precision: paddle.dtype) -> paddle.Tensor: return paddle.empty(shape, dtype=precision).to(device=device) class LayerNorm(nn.Layer): def __init__( self, - num_in, + num_in: int, eps: float = 1e-5, uni_init: bool = True, bavg: float = 0.0, @@ -42,7 +42,7 @@ def __init__( precision: str = DEFAULT_PRECISION, trainable: bool = True, seed: int | list[int] | None = None, - ): + ) -> None: super().__init__() self.eps = eps self.uni_init = uni_init @@ -146,7 +146,7 @@ def deserialize(cls, data: dict) -> "LayerNorm": ) prec = PRECISION_DICT[obj.precision] - def check_load_param(ss): + def check_load_param(ss: str) -> paddle.Tensor | None: if nl[ss] is not None: tensor = to_paddle_tensor(nl[ss]) return paddle.create_parameter( diff --git a/deepmd/pd/model/network/mlp.py b/deepmd/pd/model/network/mlp.py index ee408b8719..ebae333d61 100644 --- a/deepmd/pd/model/network/mlp.py +++ b/deepmd/pd/model/network/mlp.py @@ -4,6 +4,7 @@ ) from typing import ( + Any, ClassVar, ) @@ -45,12 +46,12 @@ ) -def empty_t(shape, precision): +def empty_t(shape: list[int], precision: paddle.dtype) -> paddle.Tensor: return paddle.empty(shape, dtype=precision).to(device=device) class Identity(nn.Layer): - def __init__(self): + def __init__(self) -> None: super().__init__() def forward( @@ -74,8 +75,8 @@ def deserialize(cls, data: dict) -> Identity: class MLPLayer(nn.Layer): def __init__( self, - num_in, - num_out, + num_in: int, + num_out: int, bias: bool = True, use_timestep: bool = False, activation_function: str | None = None, @@ -86,7 +87,7 @@ def __init__( init: str = "default", seed: int | list[int] | None = None, trainable: bool = True, - ): + ) -> None: super().__init__() self.trainable = trainable # only use_timestep when skip connection is established. @@ -147,10 +148,10 @@ def __init__( else: raise ValueError(f"Unknown initialization method: {init}") - def check_type_consistency(self): + def check_type_consistency(self) -> None: precision = self.precision - def check_var(var): + def check_var(var: paddle.Tensor | None) -> None: if var is not None: # assertion "float64" == "double" would fail assert PRECISION_DICT[var.dtype.name] is PRECISION_DICT[precision] @@ -170,7 +171,7 @@ def _default_normal_init( bavg: float = 0.0, stddev: float = 1.0, generator: PaddleGenerator | None = None, - ): + ) -> None: normal_( self.matrix.data, std=stddev / np.sqrt(self.num_out + self.num_in), @@ -181,7 +182,9 @@ def _default_normal_init( if self.idt is not None: normal_(self.idt.data, mean=0.1, std=0.001, generator=generator) - def _trunc_normal_init(self, scale=1.0, generator: PaddleGenerator | None = None): + def _trunc_normal_init( + self, scale: float = 1.0, generator: PaddleGenerator | None = None + ) -> None: # Constant from scipy.stats.truncnorm.std(a=-2, b=2, loc=0., scale=1.) TRUNCATED_NORMAL_STDDEV_FACTOR = 0.87962566103423978 _, fan_in = self.matrix.shape @@ -189,17 +192,17 @@ def _trunc_normal_init(self, scale=1.0, generator: PaddleGenerator | None = None std = (scale**0.5) / TRUNCATED_NORMAL_STDDEV_FACTOR trunc_normal_(self.matrix, mean=0.0, std=std, generator=generator) - def _glorot_uniform_init(self, generator: PaddleGenerator | None = None): + def _glorot_uniform_init(self, generator: PaddleGenerator | None = None) -> None: xavier_uniform_(self.matrix, gain=1, generator=generator) - def _zero_init(self, use_bias=True): + def _zero_init(self, use_bias: bool = True) -> None: with paddle.no_grad(): self.matrix.fill_(0.0) if use_bias and self.bias is not None: with paddle.no_grad(): self.bias.fill_(1.0) - def _normal_init(self, generator: PaddleGenerator | None = None): + def _normal_init(self, generator: PaddleGenerator | None = None) -> None: kaiming_normal_(self.matrix, nonlinearity="linear", generator=generator) def forward( @@ -284,7 +287,7 @@ def deserialize(cls, data: dict) -> MLPLayer: ) prec = PRECISION_DICT[obj.precision] - def check_load_param(ss): + def check_load_param(ss: str) -> paddle.Tensor | None: if nl[ss] is not None: tensor = to_paddle_tensor(nl[ss]) return paddle.create_parameter( @@ -304,7 +307,7 @@ def check_load_param(ss): class MLP(MLP_): - def __init__(self, *args, **kwargs): + def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) self.layers = paddle.nn.LayerList(self.layers) @@ -325,7 +328,7 @@ class NetworkCollection(DPNetworkCollection, nn.Layer): "fitting_network": FittingNet, } - def __init__(self, *args, **kwargs): + def __init__(self, *args: Any, **kwargs: Any) -> None: # init both two base classes DPNetworkCollection.__init__(self, *args, **kwargs) nn.Layer.__init__(self) diff --git a/deepmd/pd/model/network/network.py b/deepmd/pd/model/network/network.py index 81e2dad710..77149464b6 100644 --- a/deepmd/pd/model/network/network.py +++ b/deepmd/pd/model/network/network.py @@ -23,7 +23,7 @@ ) -def Tensor(*shape): +def Tensor(*shape: int) -> paddle.Tensor: return paddle.empty(shape, dtype=env.GLOBAL_PD_FLOAT_PRECISION).to( device=env.DEVICE ) @@ -32,15 +32,15 @@ def Tensor(*shape): class TypeEmbedNet(nn.Layer): def __init__( self, - type_nums, - embed_dim, - bavg=0.0, - stddev=1.0, - precision="default", + type_nums: int, + embed_dim: int, + bavg: float = 0.0, + stddev: float = 1.0, + precision: str = "default", seed: int | list[int] | None = None, - use_econf_tebd=False, + use_econf_tebd: bool = False, use_tebd_bias: bool = False, - type_map=None, + type_map: list[str] | None = None, trainable: bool = True, ) -> None: """Construct a type embedding net.""" @@ -66,7 +66,7 @@ def __init__( ) # init.normal_(self.embedding.weight[:-1], mean=bavg, std=stddev) - def forward(self, atype): + def forward(self, atype: paddle.Tensor) -> paddle.Tensor: """ Args: atype: Type of each input, [nframes, nloc] or [nframes, nloc, nnei]. @@ -78,7 +78,7 @@ def forward(self, atype): """ return self.embedding(atype.place)[atype] - def get_full_embedding(self, device: str): + def get_full_embedding(self, device: str) -> paddle.Tensor: """ Get the type embeddings of all types. @@ -95,7 +95,9 @@ def get_full_embedding(self, device: str): """ return self.embedding(device) - def share_params(self, base_class, shared_level, resume=False) -> None: + def share_params( + self, base_class: "TypeEmbedNet", shared_level: int, resume: bool = False + ) -> None: """ Share the parameters of self to the base_class with shared_level during multitask training. If not start from checkpoint (resume is False), @@ -112,7 +114,9 @@ def share_params(self, base_class, shared_level, resume=False) -> None: raise NotImplementedError def change_type_map( - self, type_map: list[str], model_with_new_type_stat=None + self, + type_map: list[str], + model_with_new_type_stat: "TypeEmbedNet | None" = None, ) -> None: """Change the type related params to new ones, according to `type_map` and the original one in the model. If there are new types in `type_map`, statistics will be updated accordingly to `model_with_new_type_stat` for these new types. @@ -198,7 +202,7 @@ def __init__( for param in self.parameters(): param.stop_gradient = not trainable - def forward(self, device: str): + def forward(self, device: str) -> paddle.Tensor: """Caulate type embedding network. Returns @@ -221,7 +225,9 @@ def forward(self, device: str): return embed def change_type_map( - self, type_map: list[str], model_with_new_type_stat=None + self, + type_map: list[str], + model_with_new_type_stat: "TypeEmbedNetConsistent | None" = None, ) -> None: """Change the type related params to new ones, according to `type_map` and the original one in the model. If there are new types in `type_map`, statistics will be updated accordingly to `model_with_new_type_stat` for these new types. @@ -286,7 +292,7 @@ def change_type_map( self.ntypes = len(type_map) @classmethod - def deserialize(cls, data: dict): + def deserialize(cls, data: dict) -> "TypeEmbedNetConsistent": """Deserialize the model. Parameters diff --git a/deepmd/pd/model/network/utils.py b/deepmd/pd/model/network/utils.py index cfd6753638..cea832e11a 100644 --- a/deepmd/pd/model/network/utils.py +++ b/deepmd/pd/model/network/utils.py @@ -54,7 +54,7 @@ def get_graph_index( a_nlist_mask: paddle.Tensor, nall: int, use_loc_mapping: bool = True, -): +) -> tuple[paddle.Tensor, paddle.Tensor, paddle.Tensor, paddle.Tensor, paddle.Tensor]: """ Get the index mapping for edge graph and angle graph, ready in `aggregate` or `index_select`. diff --git a/deepmd/pd/model/task/ener.py b/deepmd/pd/model/task/ener.py index 3d2025bcc2..c11124df8e 100644 --- a/deepmd/pd/model/task/ener.py +++ b/deepmd/pd/model/task/ener.py @@ -1,6 +1,9 @@ # SPDX-License-Identifier: LGPL-3.0-or-later import copy import logging +from typing import ( + Any, +) import paddle @@ -44,8 +47,8 @@ def __init__( mixed_types: bool = True, seed: int | list[int] | None = None, type_map: list[str] | None = None, - **kwargs, - ): + **kwargs: Any, + ) -> None: super().__init__( "energy", ntypes, diff --git a/deepmd/pd/model/task/fitting.py b/deepmd/pd/model/task/fitting.py index d92ad6e947..ad39e3a0a6 100644 --- a/deepmd/pd/model/task/fitting.py +++ b/deepmd/pd/model/task/fitting.py @@ -6,6 +6,9 @@ from collections.abc import ( Callable, ) +from typing import ( + Any, +) import numpy as np import paddle @@ -51,12 +54,14 @@ class Fitting(paddle.nn.Layer, BaseFitting): # plugin moved to BaseFitting - def __new__(cls, *args, **kwargs): + def __new__(cls, *args: Any, **kwargs: Any) -> "Fitting": if cls is Fitting: return BaseFitting.__new__(BaseFitting, *args, **kwargs) return super().__new__(cls) - def share_params(self, base_class, shared_level, resume=False) -> None: + def share_params( + self, base_class: "Fitting", shared_level: int, resume: bool = False + ) -> None: """ Share the parameters of self to the base_class with shared_level during multitask training. If not start from checkpoint (resume is False), @@ -242,7 +247,7 @@ def __init__( type_map: list[str] | None = None, use_aparam_as_mask: bool = False, default_fparam: list[float] | None = None, - **kwargs, + **kwargs: Any, ) -> None: super().__init__() self.var_name = var_name @@ -365,7 +370,7 @@ def reinit_exclude( self.emask = AtomExcludeMask(self.ntypes, self.exclude_types) def change_type_map( - self, type_map: list[str], model_with_new_type_stat=None + self, type_map: list[str], model_with_new_type_stat: "Fitting | None" = None ) -> None: """Change the type related params to new ones, according to `type_map` and the original one in the model. If there are new types in `type_map`, statistics will be updated accordingly to `model_with_new_type_stat` for these new types. @@ -491,7 +496,7 @@ def get_buffer_type_map(self) -> paddle.Tensor: """ return self.buffer_type_map - def set_case_embd(self, case_idx: int): + def set_case_embd(self, case_idx: int) -> None: """ Set the case embedding of this fitting net by the given case_idx, typically concatenated with the output of the descriptor and fed into the fitting net. @@ -503,7 +508,7 @@ def set_case_embd(self, case_idx: int): def set_return_middle_output(self, return_middle_output: bool = True) -> None: self.eval_return_middle_output = return_middle_output - def __setitem__(self, key, value) -> None: + def __setitem__(self, key: str, value: paddle.Tensor) -> None: if key in ["bias_atom_e"]: value = value.reshape([self.ntypes, self._net_out_dim()]) self.bias_atom_e = value @@ -522,7 +527,7 @@ def __setitem__(self, key, value) -> None: else: raise KeyError(key) - def __getitem__(self, key): + def __getitem__(self, key: str) -> paddle.Tensor: if key in ["bias_atom_e"]: return self.bias_atom_e elif key in ["fparam_avg"]: @@ -541,7 +546,7 @@ def __getitem__(self, key): raise KeyError(key) @abstractmethod - def _net_out_dim(self): + def _net_out_dim(self) -> int: """Set the FittingNet output dim.""" pass @@ -560,7 +565,7 @@ def _forward_common( h2: paddle.Tensor | None = None, fparam: paddle.Tensor | None = None, aparam: paddle.Tensor | None = None, - ): + ) -> tuple[paddle.Tensor, paddle.Tensor | None]: # cast the input to internal precsion xx = descriptor.astype(self.prec) fparam = fparam.astype(self.prec) if fparam is not None else None diff --git a/deepmd/pd/model/task/invar_fitting.py b/deepmd/pd/model/task/invar_fitting.py index 6dd5b29cb9..04c66befc1 100644 --- a/deepmd/pd/model/task/invar_fitting.py +++ b/deepmd/pd/model/task/invar_fitting.py @@ -1,6 +1,9 @@ # SPDX-License-Identifier: LGPL-3.0-or-later import copy import logging +from typing import ( + Any, +) import paddle @@ -101,8 +104,8 @@ def __init__( atom_ener: list[paddle.Tensor | None] | None = None, type_map: list[str] | None = None, use_aparam_as_mask: bool = False, - **kwargs, - ): + **kwargs: Any, + ) -> None: self.dim_out = dim_out self.atom_ener = atom_ener super().__init__( @@ -129,7 +132,7 @@ def __init__( **kwargs, ) - def _net_out_dim(self): + def _net_out_dim(self) -> int: """Set the FittingNet output dim.""" return self.dim_out @@ -168,7 +171,7 @@ def forward( h2: paddle.Tensor | None = None, fparam: paddle.Tensor | None = None, aparam: paddle.Tensor | None = None, - ): + ) -> paddle.Tensor: """Based on embedding net output, alculate total energy. Args: diff --git a/deepmd/pd/train/training.py b/deepmd/pd/train/training.py index dd0fbdc94b..7ba0255494 100644 --- a/deepmd/pd/train/training.py +++ b/deepmd/pd/train/training.py @@ -86,16 +86,16 @@ class Trainer: def __init__( self, config: dict[str, Any], - training_data, - stat_file_path=None, - validation_data=None, - init_model=None, - restart_model=None, - finetune_model=None, - force_load=False, - shared_links=None, - finetune_links=None, - init_frz_model=None, + training_data: Any, + stat_file_path: str | Path | None = None, + validation_data: Any | None = None, + init_model: str | None = None, + restart_model: str | None = None, + finetune_model: str | None = None, + force_load: bool = False, + shared_links: dict[str, Any] | None = None, + finetune_links: dict[str, Any] | None = None, + init_frz_model: str | None = None, ) -> None: """Construct a DeePMD trainer. @@ -148,7 +148,7 @@ def __init__( ) self.lcurve_should_print_header = True - def get_opt_param(params): + def get_opt_param(params: dict[str, Any]) -> tuple[str, dict[str, Any]]: opt_type = params.get("opt_type", "Adam") opt_param = { "kf_blocksize": params.get("kf_blocksize", 5120), @@ -159,8 +159,12 @@ def get_opt_param(params): } return opt_type, opt_param - def get_data_loader(_training_data, _validation_data, _training_params): - def get_dataloader_and_buffer(_data, _params): + def get_data_loader( + _training_data: Any, _validation_data: Any, _training_params: dict[str, Any] + ) -> tuple[Any, Any, Any, Any]: + def get_dataloader_and_buffer( + _data: Any, _params: dict[str, Any] + ) -> tuple[Any, Any]: _sampler = get_sampler_from_params(_data, _params) if _sampler is None: log.warning( @@ -207,21 +211,21 @@ def get_dataloader_and_buffer(_data, _params): ) def single_model_stat( - _model, - _data_stat_nbatch, - _training_data, - _validation_data, - _stat_file_path, - _data_requirement, - finetune_has_new_type=False, - ): + _model: Any, + _data_stat_nbatch: int, + _training_data: Any, + _validation_data: Any | None, + _stat_file_path: str | Path | None, + _data_requirement: list[DataRequirementItem], + finetune_has_new_type: bool = False, + ) -> Any: _data_requirement += get_additional_data_requirement(_model) _training_data.add_data_requirement(_data_requirement) if _validation_data is not None: _validation_data.add_data_requirement(_data_requirement) @functools.lru_cache - def get_sample(): + def get_sample() -> dict[str, Any]: sampled = make_stat_input( _training_data.systems, _training_data.dataloaders, @@ -483,11 +487,11 @@ def get_lr(lr_params: dict[str, Any]) -> BaseLR: state_dict = pretrained_model_wrapper.state_dict() def collect_single_finetune_params( - _model_key, - _finetune_rule_single, - _new_state_dict, - _origin_state_dict, - _random_state_dict, + _model_key: str, + _finetune_rule_single: Any, + _new_state_dict: dict, + _origin_state_dict: dict, + _random_state_dict: dict, ) -> None: _new_fitting = _finetune_rule_single.get_random_fitting() _model_key_from = _finetune_rule_single.get_model_branch() @@ -532,10 +536,10 @@ def collect_single_finetune_params( if finetune_model is not None: def single_model_finetune( - _model, - _finetune_rule_single, - _sample_func, - ): + _model: Any, + _finetune_rule_single: Any, + _sample_func: Any, + ) -> Any: _model = model_change_out_bias( _model, _sample_func, @@ -580,7 +584,7 @@ def single_model_finetune( # TODO add lr warmups for multitask # author: iProzd - def warm_up_linear(step, warmup_steps): + def warm_up_linear(step: int, warmup_steps: int) -> float: if step < warmup_steps: return step / warmup_steps else: @@ -728,7 +732,7 @@ def run(self) -> None: core.nvprof_start() core.nvprof_enable_record_event() - def step(_step_id, task_key="Default") -> None: + def step(_step_id: int, task_key: str = "Default") -> None: if self.multi_task: model_index = dp_random.choice( np.arange(self.num_model, dtype=np.int_), @@ -811,7 +815,9 @@ def step(_step_id, task_key="Default") -> None: ): self.wrapper.eval() # Will set to train mode before fininshing validation - def log_loss_train(_loss, _more_loss, _task_key="Default"): + def log_loss_train( + _loss: Any, _more_loss: dict, _task_key: str = "Default" + ) -> dict: results = {} rmse_val = { item: _more_loss[item] @@ -822,7 +828,7 @@ def log_loss_train(_loss, _more_loss, _task_key="Default"): results[item] = rmse_val[item] return results - def log_loss_valid(_task_key="Default"): + def log_loss_valid(_task_key: str = "Default") -> dict: single_results = {} sum_natoms = 0 if not self.multi_task: @@ -1054,7 +1060,7 @@ def log_loss_valid(_task_key="Default"): "files, which can be viewd in NVIDIA Nsight Systems software" ) - def save_model(self, save_path, lr=0.0, step=0) -> None: + def save_model(self, save_path: str, lr: float = 0.0, step: int = 0) -> None: module = ( self.wrapper._layers if dist.is_available() and dist.is_initialized() @@ -1076,7 +1082,9 @@ def save_model(self, save_path, lr=0.0, step=0) -> None: checkpoint_files.sort(key=lambda x: x.stat().st_mtime) checkpoint_files[0].unlink() - def get_data(self, is_train=True, task_key="Default"): + def get_data( + self, is_train: bool = True, task_key: str = "Default" + ) -> tuple[dict[str, Any], dict[str, Any], dict[str, Any]]: if not self.multi_task: if is_train: try: @@ -1152,7 +1160,9 @@ def get_data(self, is_train=True, task_key="Default"): log_dict["sid"] = batch_data["sid"] return input_dict, label_dict, log_dict - def print_header(self, fout, train_results, valid_results) -> None: + def print_header( + self, fout: Any, train_results: dict[str, Any], valid_results: dict[str, Any] + ) -> None: train_keys = sorted(train_results.keys()) print_str = "" print_str += "# {:5s}".format("step") @@ -1184,7 +1194,12 @@ def print_header(self, fout, train_results, valid_results) -> None: fout.flush() def print_on_training( - self, fout, step_id, cur_lr, train_results, valid_results + self, + fout: Any, + step_id: int, + cur_lr: float, + train_results: dict[str, Any], + valid_results: dict[str, Any], ) -> None: train_keys = sorted(train_results.keys()) print_str = "" @@ -1216,7 +1231,7 @@ def print_on_training( fout.flush() -def get_additional_data_requirement(_model): +def get_additional_data_requirement(_model: Any) -> list[DataRequirementItem]: additional_data_requirement = [] if _model.get_dim_fparam() > 0: fparam_requirement_items = [ @@ -1243,12 +1258,14 @@ def get_additional_data_requirement(_model): return additional_data_requirement -def whether_hessian(loss_params): +def whether_hessian(loss_params: dict[str, Any]) -> bool: loss_type = loss_params.get("type", "ener") return loss_type == "ener" and loss_params.get("start_pref_h", 0.0) > 0.0 -def get_loss(loss_params, start_lr, _ntypes, _model): +def get_loss( + loss_params: dict[str, Any], start_lr: float, _ntypes: int, _model: Any +) -> TaskLoss: loss_type = loss_params.get("type", "ener") if whether_hessian(loss_params): loss_params["starter_learning_rate"] = start_lr @@ -1262,17 +1279,17 @@ def get_loss(loss_params, start_lr, _ntypes, _model): def get_single_model( - _model_params, -): + _model_params: dict[str, Any], +) -> Any: model = get_model(deepcopy(_model_params)).to(DEVICE) return model def get_model_for_wrapper( - _model_params, - resuming=False, - _loss_params=None, -): + _model_params: dict[str, Any], + resuming: bool = False, + _loss_params: dict[str, Any] | None = None, +) -> Any: if "model_dict" not in _model_params: if _loss_params is not None and whether_hessian(_loss_params): _model_params["hessian_mode"] = True @@ -1295,7 +1312,7 @@ def get_model_for_wrapper( return _model -def get_case_embd_config(_model_params): +def get_case_embd_config(_model_params: dict[str, Any]) -> tuple[bool, dict[str, Any]]: assert "model_dict" in _model_params, ( "Only support setting case embedding for multi-task model!" ) @@ -1320,10 +1337,10 @@ def get_case_embd_config(_model_params): def model_change_out_bias( - _model, - _sample_func, - _bias_adjust_mode="change-by-statistic", -): + _model: Any, + _sample_func: Any, + _bias_adjust_mode: str = "change-by-statistic", +) -> None: old_bias = deepcopy(_model.get_out_bias()) _model.change_out_bias( _sample_func, diff --git a/deepmd/pd/train/wrapper.py b/deepmd/pd/train/wrapper.py index 6c8db691c4..bdb22532e1 100644 --- a/deepmd/pd/train/wrapper.py +++ b/deepmd/pd/train/wrapper.py @@ -7,6 +7,9 @@ from collections import ( OrderedDict, ) +from typing import ( + Any, +) import paddle @@ -21,8 +24,8 @@ def __init__( self, model: paddle.nn.Layer | dict, loss: paddle.nn.Layer | dict = None, - model_params=None, - shared_links=None, + model_params: dict[str, Any] | None = None, + shared_links: dict[str, Any] | None = None, ) -> None: """Construct a DeePMD model wrapper. @@ -61,7 +64,7 @@ def __init__( self.loss[task_key] = loss[task_key] self.inference_only = self.loss is None - def share_params(self, shared_links, resume=False) -> None: + def share_params(self, shared_links: dict[str, Any], resume: bool = False) -> None: """ Share the parameters of classes following rules defined in shared_links during multitask training. If not start from checkpoint (resume is False), @@ -134,18 +137,18 @@ def share_params(self, shared_links, resume=False) -> None: def forward( self, - coord, - atype, + coord: paddle.Tensor, + atype: paddle.Tensor, spin: paddle.Tensor | None = None, box: paddle.Tensor | None = None, cur_lr: paddle.Tensor | None = None, label: paddle.Tensor | None = None, task_key: paddle.Tensor | None = None, - inference_only=False, - do_atomic_virial=False, + inference_only: bool = False, + do_atomic_virial: bool = False, fparam: paddle.Tensor | None = None, aparam: paddle.Tensor | None = None, - ): + ) -> dict[str, paddle.Tensor]: if not self.multi_task: task_key = "Default" else: @@ -193,13 +196,13 @@ def set_state_dict( ) -> tuple[list[str], list[str]]: return self.load_state_dict(state_dict) - def state_dict(self): + def state_dict(self) -> dict[str, Any]: state_dict = super().state_dict() extra_state = self.get_extra_state() state_dict.update({"_extra_state": extra_state}) return state_dict - def set_extra_state(self, extra_state: dict): + def set_extra_state(self, extra_state: dict[str, Any]) -> None: self.model_params = extra_state["model_params"] self.train_infos = extra_state["train_infos"] return None diff --git a/deepmd/pd/utils/dataloader.py b/deepmd/pd/utils/dataloader.py index 0cb8adbc63..acaadb67aa 100644 --- a/deepmd/pd/utils/dataloader.py +++ b/deepmd/pd/utils/dataloader.py @@ -4,6 +4,7 @@ import queue import time from collections.abc import ( + Iterable, Iterator, ) from multiprocessing.dummy import ( @@ -12,6 +13,9 @@ from threading import ( Thread, ) +from typing import ( + Any, +) import h5py import numpy as np @@ -53,7 +57,7 @@ # paddle.multiprocessing.set_sharing_strategy("file_system") -def setup_seed(seed): +def setup_seed(seed: int | list | tuple) -> None: if isinstance(seed, (list, tuple)): mixed_seed = mix_entropy(seed) else: @@ -82,12 +86,12 @@ class DpLoaderSet(Dataset): def __init__( self, - systems, - batch_size, - type_map, - seed=None, - shuffle=True, - ): + systems: str | list[str], + batch_size: int, + type_map: list[str], + seed: int | None = None, + shuffle: bool = True, + ) -> None: if seed is not None: setup_seed(seed) if isinstance(systems, str): @@ -98,7 +102,7 @@ def __init__( if len(systems) >= 100: log.info(f"Constructing DataLoaders from {len(systems)} systems") - def construct_dataset(system): + def construct_dataset(system: str) -> DeepmdDataSetForLoader: return DeepmdDataSetForLoader( system=system, type_map=type_map, @@ -203,14 +207,14 @@ def construct_dataset(system): class LazyIter: """Lazy iterator to prevent fetching data when iter(item).""" - def __init__(self, item): + def __init__(self, item: Any) -> None: self.item = item - def __iter__(self): + def __iter__(self) -> "LazyIter": # directly return return self - def __next__(self): + def __next__(self) -> Any: if not isinstance(self.item, Iterator): # make iterator here lazily self.item = iter(self.item) @@ -221,7 +225,7 @@ def __next__(self): for item in self.dataloaders: self.iters.append(LazyIter(item)) - def set_noise(self, noise_settings): + def set_noise(self, noise_settings: dict) -> None: # noise_settings['noise_type'] # "trunc_normal", "normal", "uniform" # noise_settings['noise'] # float, default 1.0 # noise_settings['noise_mode'] # "prob", "fix_num" @@ -234,7 +238,7 @@ def set_noise(self, noise_settings): def __len__(self) -> int: return len(self.dataloaders) - def __getitem__(self, idx): + def __getitem__(self, idx: int) -> dict: # log.warning(str(paddle.distributed.get_rank())+" idx: "+str(idx)+" index: "+str(self.index[idx])) try: batch = next(self.iters[idx]) @@ -244,7 +248,7 @@ def __getitem__(self, idx): batch["sid"] = idx return batch - def add_data_requirement(self, data_requirement: list[DataRequirementItem]): + def add_data_requirement(self, data_requirement: list[DataRequirementItem]) -> None: """Add data requirement for each system in multiple systems.""" for system in self.systems: system.add_data_requirement(data_requirement) @@ -253,7 +257,7 @@ def print_summary( self, name: str, prob: list[float], - ): + ) -> None: rank = dist.get_rank() if dist.is_initialized() else 0 if rank == 0: print_summary( @@ -276,7 +280,9 @@ def print_summary( class BackgroundConsumer(Thread): - def __init__(self, queue, source, max_len) -> None: + def __init__( + self, queue: "queue.Queue[Any]", source: Iterable, max_len: int + ) -> None: Thread.__init__(self) self._queue = queue self._source = source # Main DL iterator @@ -291,7 +297,7 @@ def run(self) -> None: class BufferedIterator: - def __init__(self, iterable) -> None: + def __init__(self, iterable: Iterable) -> None: self._queue = queue.Queue(QUEUESIZE) self._iterable = iterable self._consumer = None @@ -305,13 +311,13 @@ def _create_consumer(self) -> None: self._consumer.daemon = True self._consumer.start() - def __iter__(self): + def __iter__(self) -> "BufferedIterator": return self def __len__(self) -> int: return self.total - def __next__(self): + def __next__(self) -> Any: # Create consumer if not created yet if self._consumer is None: self._create_consumer() @@ -338,7 +344,7 @@ def __next__(self): return item -def collate_batch(batch): +def collate_batch(batch: list[dict]) -> dict: example = batch[0] result = {} for key in example.keys(): @@ -356,7 +362,9 @@ def collate_batch(batch): return result -def get_weighted_sampler(training_data, prob_style, sys_prob=False): +def get_weighted_sampler( + training_data: DpLoaderSet, prob_style: str, sys_prob: bool = False +) -> WeightedRandomSampler: if sys_prob is False: if prob_style == "prob_uniform": prob_v = 1.0 / float(training_data.__len__()) @@ -376,7 +384,9 @@ def get_weighted_sampler(training_data, prob_style, sys_prob=False): return sampler -def get_sampler_from_params(_data, _params): +def get_sampler_from_params( + _data: DpLoaderSet, _params: dict +) -> WeightedRandomSampler | BatchSampler | DistributedBatchSampler: if ( "sys_probs" in _params and _params["sys_probs"] is not None ): # use sys_probs first diff --git a/deepmd/pd/utils/dataset.py b/deepmd/pd/utils/dataset.py index 5accd0315b..bd8a0bc91e 100644 --- a/deepmd/pd/utils/dataset.py +++ b/deepmd/pd/utils/dataset.py @@ -1,6 +1,10 @@ # SPDX-License-Identifier: LGPL-3.0-or-later +from typing import ( + Any, +) + from paddle.io import ( Dataset, ) @@ -15,7 +19,7 @@ class DeepmdDataSetForLoader(Dataset): - def __init__(self, system: str, type_map: list[str] | None = None): + def __init__(self, system: str, type_map: list[str] | None = None) -> None: """Construct DeePMD-style dataset containing frames cross different systems. Args: @@ -30,16 +34,16 @@ def __init__(self, system: str, type_map: list[str] | None = None): self._natoms = self._data_system.get_natoms() self._natoms_vec = self._data_system.get_natoms_vec(self._ntypes) - def __len__(self): + def __len__(self) -> int: return self._data_system.nframes - def __getitem__(self, index): + def __getitem__(self, index: int) -> dict[str, Any]: """Get a frame from the selected system.""" b_data = self._data_system.get_item_paddle(index, max(1, NUM_WORKERS)) b_data["natoms"] = self._natoms_vec return b_data - def add_data_requirement(self, data_requirement: list[DataRequirementItem]): + def add_data_requirement(self, data_requirement: list[DataRequirementItem]) -> None: """Add data requirement for this data system.""" for data_item in data_requirement: self._data_system.add( diff --git a/deepmd/pd/utils/env.py b/deepmd/pd/utils/env.py index 8715b59f54..d7a0278637 100644 --- a/deepmd/pd/utils/env.py +++ b/deepmd/pd/utils/env.py @@ -121,7 +121,7 @@ def to_bool(flag: int | bool | str) -> bool: # os.environ['CPU_NUM'] = str(intra_nthreads) -def enable_prim(enable: bool = True): +def enable_prim(enable: bool = True) -> None: # NOTE: operators in list below will not use composite # operator but kernel instead for better performance EAGER_COMP_OP_BLACK_LIST = [ diff --git a/deepmd/pd/utils/env_mat_stat.py b/deepmd/pd/utils/env_mat_stat.py index 0e41243924..aed5259a50 100644 --- a/deepmd/pd/utils/env_mat_stat.py +++ b/deepmd/pd/utils/env_mat_stat.py @@ -68,7 +68,7 @@ class EnvMatStatSe(EnvMatStat): The descriptor of the model. """ - def __init__(self, descriptor: "DescriptorBlock"): + def __init__(self, descriptor: "DescriptorBlock") -> None: super().__init__() self.descriptor = descriptor self.last_dim = ( @@ -197,7 +197,7 @@ def get_hash(self) -> str: } ) - def __call__(self): + def __call__(self) -> dict[str, paddle.Tensor]: avgs = self.get_avg() stds = self.get_std() diff --git a/deepmd/pd/utils/exclude_mask.py b/deepmd/pd/utils/exclude_mask.py index cde8730c9a..e22379ea3e 100644 --- a/deepmd/pd/utils/exclude_mask.py +++ b/deepmd/pd/utils/exclude_mask.py @@ -32,10 +32,10 @@ def reinit( ) self.type_mask = to_paddle_tensor(self.type_mask).reshape([-1]) - def get_exclude_types(self): + def get_exclude_types(self) -> list[int]: return self.exclude_types - def get_type_mask(self): + def get_type_mask(self) -> paddle.Tensor: return self.type_mask def forward( @@ -98,7 +98,7 @@ def reinit( self.type_mask = to_paddle_tensor(self.type_mask).reshape([-1]) self.no_exclusion = len(self._exclude_types) == 0 - def get_exclude_types(self): + def get_exclude_types(self) -> set[tuple[int, int]]: return self._exclude_types # may have a better place for this method... diff --git a/deepmd/pd/utils/finetune.py b/deepmd/pd/utils/finetune.py index edac72d9c9..7b3bdf615b 100644 --- a/deepmd/pd/utils/finetune.py +++ b/deepmd/pd/utils/finetune.py @@ -14,13 +14,13 @@ def get_finetune_rule_single( - _single_param_target, - _model_param_pretrained, - from_multitask=False, - model_branch="Default", - model_branch_from="", - change_model_params=False, -): + _single_param_target: dict, + _model_param_pretrained: dict, + from_multitask: bool = False, + model_branch: str = "Default", + model_branch_from: str = "", + change_model_params: bool = False, +) -> dict: single_config = deepcopy(_single_param_target) new_fitting = False model_branch_chosen = "Default" @@ -77,8 +77,11 @@ def get_finetune_rule_single( def get_finetune_rules( - finetune_model, model_config, model_branch="", change_model_params=True -): + finetune_model: str, + model_config: dict, + model_branch: str = "", + change_model_params: bool = True, +) -> tuple[dict, str]: """ Get fine-tuning rules and (optionally) change the model_params according to the pretrained one. diff --git a/deepmd/pd/utils/multi_task.py b/deepmd/pd/utils/multi_task.py index 92d7f380e5..2f2c11759f 100644 --- a/deepmd/pd/utils/multi_task.py +++ b/deepmd/pd/utils/multi_task.py @@ -11,7 +11,7 @@ ) -def preprocess_shared_params(model_config): +def preprocess_shared_params(model_config: dict) -> tuple[dict, dict]: """Preprocess the model params for multitask model, and generate the links dict for further sharing. Args: @@ -97,7 +97,11 @@ def preprocess_shared_params(model_config): type_map_keys = [] def replace_one_item( - params_dict, key_type, key_in_dict, suffix="", index=None + params_dict: dict, + key_type: str, + key_in_dict: str, + suffix: str = "", + index: int | None = None, ) -> None: shared_type = key_type shared_key = key_in_dict @@ -155,7 +159,7 @@ def replace_one_item( return model_config, shared_links -def get_class_name(item_key, item_params): +def get_class_name(item_key: str, item_params: dict) -> type: if item_key == "descriptor": return BaseDescriptor.get_class_by_type(item_params.get("type", "se_e2_a")) elif item_key == "fitting_net": diff --git a/deepmd/pd/utils/neighbor_stat.py b/deepmd/pd/utils/neighbor_stat.py index f569999bfc..cc3e99bd0e 100644 --- a/deepmd/pd/utils/neighbor_stat.py +++ b/deepmd/pd/utils/neighbor_stat.py @@ -171,7 +171,7 @@ def _execute( coord: np.ndarray, atype: np.ndarray, cell: np.ndarray | None, - ): + ) -> tuple[np.ndarray, np.ndarray]: """Execute the operation. Parameters diff --git a/deepmd/pd/utils/nlist.py b/deepmd/pd/utils/nlist.py index cd041a345c..4d22e5c910 100644 --- a/deepmd/pd/utils/nlist.py +++ b/deepmd/pd/utils/nlist.py @@ -13,13 +13,13 @@ def extend_input_and_build_neighbor_list( - coord, - atype, + coord: paddle.Tensor, + atype: paddle.Tensor, rcut: float, sel: list[int], mixed_types: bool = False, box: paddle.Tensor | None = None, -): +) -> tuple[paddle.Tensor, paddle.Tensor, paddle.Tensor, paddle.Tensor]: nframes, nloc = atype.shape[:2] if box is not None: box_gpu = box @@ -279,7 +279,7 @@ def nlist_distinguish_types( nlist: paddle.Tensor, atype: paddle.Tensor, sel: list[int], -): +) -> paddle.Tensor: """Given a nlist that does not distinguish atom types, return a nlist that distinguish atom types. diff --git a/deepmd/pd/utils/preprocess.py b/deepmd/pd/utils/preprocess.py index 3be42b522e..e30a61d3a8 100644 --- a/deepmd/pd/utils/preprocess.py +++ b/deepmd/pd/utils/preprocess.py @@ -6,7 +6,9 @@ log = logging.getLogger(__name__) -def compute_smooth_weight(distance, rmin: float, rmax: float): +def compute_smooth_weight( + distance: paddle.Tensor, rmin: float, rmax: float +) -> paddle.Tensor: """Compute smooth weight for descriptor elements.""" if rmin >= rmax: raise ValueError("rmin should be less than rmax.") @@ -17,7 +19,7 @@ def compute_smooth_weight(distance, rmin: float, rmax: float): return vv -def compute_exp_sw(distance, rmin: float, rmax: float): +def compute_exp_sw(distance: paddle.Tensor, rmin: float, rmax: float) -> paddle.Tensor: """Compute the exponential switch function for neighbor update.""" if rmin >= rmax: raise ValueError("rmin should be less than rmax.") diff --git a/deepmd/pd/utils/region.py b/deepmd/pd/utils/region.py index d2600ef16e..237fa84b26 100644 --- a/deepmd/pd/utils/region.py +++ b/deepmd/pd/utils/region.py @@ -75,7 +75,7 @@ def to_face_distance( return dist.reshape(list(cshape[:-2]) + [3]) # noqa:RUF005 -def b_to_face_distance(cell): +def b_to_face_distance(cell: paddle.Tensor) -> paddle.Tensor: volume = paddle.linalg.det(cell) c_yz = paddle.cross(cell[:, 1], cell[:, 2], axis=-1) _h2yz = volume / paddle.linalg.norm(c_yz, axis=-1) diff --git a/deepmd/pd/utils/spin.py b/deepmd/pd/utils/spin.py index 27bc355877..83fa01a8d0 100644 --- a/deepmd/pd/utils/spin.py +++ b/deepmd/pd/utils/spin.py @@ -4,10 +4,10 @@ def concat_switch_virtual( - extended_tensor, - extended_tensor_virtual, + extended_tensor: paddle.Tensor, + extended_tensor_virtual: paddle.Tensor, nloc: int, -): +) -> paddle.Tensor: """ Concat real and virtual extended tensors, and switch all the local ones to the first nloc * 2 atoms. - [:, :nloc]: original nloc real atoms. diff --git a/deepmd/pd/utils/stat.py b/deepmd/pd/utils/stat.py index 3642d309b8..23c6c508a1 100644 --- a/deepmd/pd/utils/stat.py +++ b/deepmd/pd/utils/stat.py @@ -176,7 +176,7 @@ def _compute_model_predict( fparam = system.get("fparam", None) aparam = system.get("aparam", None) - def model_forward_auto_batch_size(*args, **kwargs): + def model_forward_auto_batch_size(*args: Any, **kwargs: Any) -> paddle.Tensor: return auto_batch_size.execute_all( model_forward, nframes, diff --git a/deepmd/pd/utils/utils.py b/deepmd/pd/utils/utils.py index 7224547805..2b6c1933b8 100644 --- a/deepmd/pd/utils/utils.py +++ b/deepmd/pd/utils/utils.py @@ -12,6 +12,13 @@ overload, ) +if TYPE_CHECKING: + from collections.abc import Generator + + from deepmd.pd.model.network.init import ( + PaddleGenerator, + ) + import ml_dtypes import numpy as np import paddle @@ -30,11 +37,6 @@ ) from .env import PRECISION_DICT as PD_PRECISION_DICT -if TYPE_CHECKING: - from deepmd.pd.model.network.init import ( - PaddleGenerator, - ) - def silut_forward( x: paddle.Tensor, threshold: float, slope: float, const_val: float @@ -84,7 +86,7 @@ def silut_double_backward( class SiLUTScript(paddle.nn.Layer): - def __init__(self, threshold: float = 3.0): + def __init__(self, threshold: float = 3.0) -> None: super().__init__() self.threshold = threshold @@ -96,7 +98,7 @@ def __init__(self, threshold: float = 3.0): self.const_val = float(threshold * sigmoid_threshold) self.get_script_code() - def get_script_code(self): + def get_script_code(self) -> None: silut_forward_script = paddle.jit.to_static(silut_forward, full_graph=True) silut_backward_script = paddle.jit.to_static(silut_backward, full_graph=True) silut_double_backward_script = paddle.jit.to_static( @@ -105,7 +107,13 @@ def get_script_code(self): class SiLUTFunction(paddle.autograd.PyLayer): @staticmethod - def forward(ctx, x, threshold, slope, const_val): + def forward( + ctx: paddle.autograd.PyLayerContext, + x: paddle.Tensor, + threshold: float, + slope: float, + const_val: float, + ) -> paddle.Tensor: ctx.save_for_backward(x) ctx.threshold = threshold ctx.slope = slope @@ -113,7 +121,9 @@ def forward(ctx, x, threshold, slope, const_val): return silut_forward_script(x, threshold, slope, const_val) @staticmethod - def backward(ctx, grad_output): + def backward( + ctx: paddle.autograd.PyLayerContext, grad_output: paddle.Tensor + ) -> paddle.Tensor: (x,) = ctx.saved_tensor() threshold = ctx.threshold slope = ctx.slope @@ -123,7 +133,13 @@ def backward(ctx, grad_output): class SiLUTGradFunction(paddle.autograd.PyLayer): @staticmethod - def forward(ctx, x, grad_output, threshold, slope): + def forward( + ctx: paddle.autograd.PyLayerContext, + x: paddle.Tensor, + grad_output: paddle.Tensor, + threshold: float, + slope: float, + ) -> paddle.Tensor: ctx.threshold = threshold ctx.slope = slope grad_input = silut_backward_script(x, grad_output, threshold, slope) @@ -131,7 +147,9 @@ def forward(ctx, x, grad_output, threshold, slope): return grad_input @staticmethod - def backward(ctx, grad_grad_output): + def backward( + ctx: paddle.autograd.PyLayerContext, grad_grad_output: paddle.Tensor + ) -> tuple[paddle.Tensor, paddle.Tensor]: (x, grad_output) = ctx.saved_tensor() threshold = ctx.threshold slope = ctx.slope @@ -143,21 +161,21 @@ def backward(ctx, grad_grad_output): self.SiLUTFunction = SiLUTFunction - def forward(self, x): + def forward(self, x: paddle.Tensor) -> paddle.Tensor: return self.SiLUTFunction.apply(x, self.threshold, self.slope, self.const_val) class SiLUT(paddle.nn.Layer): - def __init__(self, threshold=3.0): + def __init__(self, threshold: float = 3.0) -> None: super().__init__() - def sigmoid(x): + def sigmoid(x: paddle.Tensor) -> paddle.Tensor: return F.sigmoid(x) - def silu(x): + def silu(x: paddle.Tensor) -> paddle.Tensor: return F.silu(x) - def silu_grad(x): + def silu_grad(x: paddle.Tensor) -> paddle.Tensor: sig = sigmoid(x) return sig + x * sig * (1 - sig) @@ -182,7 +200,7 @@ def forward(self, x: paddle.Tensor) -> paddle.Tensor: class ActivationFn(paddle.nn.Layer): - def __init__(self, activation: str | None): + def __init__(self, activation: str | None) -> None: super().__init__() self.activation: str = activation if activation is not None else "linear" if self.activation.lower().startswith( @@ -282,7 +300,9 @@ def to_paddle_tensor( return paddle.to_tensor(xx, dtype=prec, place=DEVICE) -def dict_to_device(sample_dict): +def dict_to_device( + sample_dict: dict[str, paddle.Tensor | list[paddle.Tensor] | None], +) -> None: for key in sample_dict: if isinstance(sample_dict[key], list): sample_dict[key] = [item.to(DEVICE) for item in sample_dict[key]] @@ -304,7 +324,7 @@ def dict_to_device(sample_dict): XSHIFT = 16 -def hashmix(value: int, hash_const: list[int]): +def hashmix(value: int, hash_const: list[int]) -> int: value ^= INIT_A hash_const[0] *= MULT_A value *= INIT_A @@ -315,7 +335,7 @@ def hashmix(value: int, hash_const: list[int]): return value -def mix(x: int, y: int): +def mix(x: int, y: int) -> int: result = MIX_MULT_L * x - MIX_MULT_R * y # prevent overflow result &= 0xFFFF_FFFF_FFFF_FFFF @@ -368,7 +388,7 @@ def get_generator( @contextmanager -def nvprof_context(enable_profiler: bool, name: str): +def nvprof_context(enable_profiler: bool, name: str) -> Generator[None, None, None]: if enable_profiler: core.nvprof_nvtx_push(name) diff --git a/pyproject.toml b/pyproject.toml index 9c6f213cfd..3e9a7f9b18 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -433,7 +433,7 @@ runtime-evaluated-base-classes = ["torch.nn.Module"] "deepmd/tf/**" = ["TID253", "ANN"] "deepmd/pt/**" = ["TID253", "B905"] "deepmd/jax/**" = ["TID253"] -"deepmd/pd/**" = ["TID253", "ANN", "B905"] +"deepmd/pd/**" = ["TID253", "B905"] "source/**" = ["ANN"] "source/tests/tf/**" = ["TID253", "ANN"]