1. 数据准备和预处理
# 导入必要的库
import os
import pandas as pd
import numpy as np
import random
import torch
import torch.nn as nn
import torch.nn.functional as F
from rdkit import Chem
from rdkit.Chem import AllChem
from rdkit.Chem import Draw
from rdkit import RDLogger
from rdkit.Chem import Descriptors
from rdkit.Chem import Crippen
from rdkit.Chem.Scaffolds import MurckoScaffold
from rdkit.Chem import BRICS
from rdkit.Chem import rdChemReactions
from rdkit.Chem.FilterCatalog import FilterCatalogParams, FilterCatalog
from torch.utils.data import DataLoader
from sklearn.model_selection import train_test_split
from tqdm import tqdm
# 禁用 RDKit 日志
RDLogger.DisableLog('rdApp.*')
# 设置随机种子
def seed_all(seed=42):
np.random.seed(seed)
random.seed(seed)
torch.manual_seed(seed)
if torch.cuda.is_available():
torch.cuda.manual_seed_all(seed)
seed_all()
# 检查是否有可用的 GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f'Using device: {device}')
# 创建输出目录
os.makedirs('generated_molecules', exist_ok=True)
os.makedirs('data', exist_ok=True)
os.makedirs('checkpoints', exist_ok=True)
# 加载数据集
df = pd.read_csv('egfr_data_ubstructures_matches.csv')
# 移除缺失或无效的 SMILES
df = df[df['smiles'].notnull()]
# 提取 SMILES 和标签(pIC50)
smiles_list = df['smiles'].tolist()
labels = df['pIC50'].tolist()
# 将 SMILES 转换为 RDKit 分子对象,过滤无效的分子
molecules = [Chem.MolFromSmiles(smi) for smi in smiles_list]
valid_indices = [i for i, mol in enumerate(molecules) if mol is not None]
smiles_list = [smiles_list[i] for i in valid_indices]
labels = [labels[i] for i in valid_indices]
molecules = [molecules[i] for i in valid_indices]
print(f'有效分子数量: {len(smiles_list)}')
# 提取 Murcko 骨架
scaffolds = set()
for mol in molecules:
scaffold = MurckoScaffold.GetScaffoldForMol(mol)
if scaffold:
scaffolds.add(Chem.MolToSmiles(scaffold))
print(f'提取到的独特 Murcko 骨架数量: {len(scaffolds)}')
# 提取 BRICS 片段
brics_fragments = set()
for mol in molecules:
frags = BRICS.BRICSDecompose(mol)
brics_fragments.update(frags)
print(f'提取到的独特 BRICS 片段数量: {len(brics_fragments)}')
# 合并所有片段
all_fragments_smiles = list(scaffolds.union(brics_fragments))
# 去重后的总片段数量
print(f'总片段数量(去重后): {len(all_fragments_smiles)}')
# 将片段转换为 RDKit 分子对象
fragment_mols = [Chem.MolFromSmiles(frag) for frag in all_fragments_smiles]
fragment_mols = [frag for frag in fragment_mols if frag is not None]2. 扩展反应 SMARTS 集合
我们将添加约 50 个常用的化学反应的 SMARTS 表示,以供生成新分子时使用。
# 定义扩展的反应 SMARTS 集合
reaction_smarts = [
# 酯化反应
'[C:1](=[O])[O][H].[O:2][C:3]>>[C:1](=[O])[O:2][C:3]',
# 酰胺化反应
'[C:1](=[O])[O][H].[N:2][H][H]>>[C:1](=[O])[N:2][H]',
# 醚化反应
'[O:1][H].[C:2][Br]>>[O:1][C:2]',
# 胺化反应
'[N:1][H][H].[C:2][Br]>>[N:1][C:2]',
# Suzuki 偶联反应
'[B:1]([O])[O].[C:2][Br]>>[C:2][B:1]',
# Heck 反应
'[C:1]=[C:2].[C:3]=[C:4][Br]>>[C:1]=[C:2][C:3]=[C:4]',
# Sonogashira 反应
'[C:1]#[C:2][H].[C:3][Br]>>[C:1]#[C:2][C:3]',
# Mitsunobu 反应
'[O:1][H].[N:2][H][C:3]>>[N:2][C:3]',
# Aldol 缩合反应
'[C:1]=O.[C:2]=O>>[C:1](O)[C:2]=O',
# Knoevenagel 缩合反应
'[C:1]=O.[C:2]=C[H]>>[C:1]=C[C:2]=C',
# Diels-Alder 反应
'[C:1]=[C:2].[C:3]=[C:4]>>[C:1]1[C:2][C:3][C:4]1',
# 取代反应(卤代烃与醇)
'[C:1][Cl].[O:2][H]>>[C:1][O:2]',
# 取代反应(卤代烃与硫醇)
'[C:1][Br].[S:2][H]>>[C:1][S:2]',
# 还原胺化
'[C:1]=O.[N:2][H][H]>>[C:1][N:2][H]',
# Wittig 反应
'[C:1]=O.[P:2][C:3][C:4]>>[C:1]=[C:3]',
# Michael 加成
'[C:1]=[C:2][C:3]=O.[N:4][H][H]>>[C:1][C:2][C:3](O)[N:4][H]',
# Friedel-Crafts 烷基化
'[c:1].[C:2][Cl]>>[c:1][C:2]',
# Friedel-Crafts 酰基化
'[c:1].[C:2](=O)[Cl]>>[c:1][C:2]=O',
# 氯化反应
'[C:1][H].[Cl][Cl]>>[C:1][Cl]',
# 溴化反应
'[C:1][H].[Br][Br]>>[C:1][Br]',
# 硝化反应
'[c:1].[N+](=O)[O-]>>[c:1][N+](=O)[O-]',
# 还原反应(醛到醇)
'[C:1]=O>>[C:1][O][H]',
# 氧化反应(醇到醛)
'[C:1][O][H]>>[C:1]=O',
# 氨解反应
'[C:1](=O)[O][C:2]>>[C:1](=O)[N][H]',
# 成环反应
'[C:1][C:2].[C:3][C:4]>>[C:1][C:2][C:3][C:4]',
# Grignard 反应
'[C:1][Mg][Br].[C:2]=O>>[C:1][C:2][O][H]',
# 保护基引入(甲基化)
'[O:1][H].[C:2][I]>>[O:1][C:2]',
# 保护基去除(脱甲基)
'[O:1][C:2][H]>>[O:1][H]',
# 甲酰化反应
'[C:1][H].[C:2](=O)[Cl]>>[C:1][C:2]=O',
# 脱水反应
'[C:1][O][H].[C:2][O][H]>>[C:1]=[C:2]',
# 氧化反应(伯醇到羧酸)
'[C:1][O][H]>>[C:1](=O)[O][H]',
# Mannich 反应
'[C:1]=O.[C:2]=C.[N:3][H][H]>>[C:1][C:2][N:3][H]',
# Beckmann 重排
'[C:1](=O)[N:2][OH]>>[C:1][N:2]=O',
# Claisen 缩合
'[C:1](=O)[O][C:2].[C:3]=O>>[C:1](=O)[C:3]=O',
# Gabriel 合成
'[C:1][Br].[N:2][H]>>[C:1][N:2][H]',
# Sandmeyer 反应
'[C:1][N+][N-].[Cu][Cl]>>[C:1][Cl]',
# Baeyer-Villiger 氧化
'[C:1](=O)[C:2]>>[C:1](=O)[O][C:2]',
# 脱卤反应
'[C:1][Br]>>[C:1][H]',
# 酰氯合成
'[C:1](=O)[O][H].[Cl][Cl]>>[C:1](=O)[Cl]',
# 二硫键形成
'[S:1][H].[S:2][H]>>[S:1][S:2]',
# 叠氮化反应
'[C:1][Br].[N3]>>[C:1][N3]',
# 环氧化反应
'[C:1]=[C:2]>>[C:1]1[O][C:2]1',
# Ozonolysis
'[C:1]=[C:2]>>[C:1](=O).[C:2](=O)',
# Ester Hydrolysis
'[C:1](=O)[O][C:2].[H][O][H]>>[C:1](=O)[O][H].[C:2][O][H]',
# Amide Hydrolysis
'[C:1](=O)[N][C:2].[H][O][H]>>[C:1](=O)[O][H].[N][C:2]',
# Wittig-Horner 反应
'[C:1]=O.[P:2](=O)[C:3][C:4]>>[C:1]=[C:3]',
# Enamine 反应
'[C:1]=O.[N:2][H][C:3]>>[C:1]=[N:2][C:3]',
# Curtius 重排
'[C:1](=O)[N3]>>[C:1][N][H]',
# Ullmann 反应
'[C:1][Br].[C:2][Br]>>[C:1][C:2]',
# Hantzsch 合成
'[C:1]=O.[C:2]=O.[N:3][H][H]>>[C:1][C:2][N:3]',
# Pechmann 反应
'[C:1]=O.[C:2]=C[O][H]>>[C:1][C:2]=O',
# Skraup 合成
'[C:1]=C.[N:2][H][H].[O]>>[C:1][N:2]=C',
# Tiffeneau-Demjanov 重排
'[C:1][C:2][N][H][H]>>[C:1]=O.[C:2][H]'
]
# 将 SMARTS 转换为 RDKit 反应对象
reactions = []
for rs in reaction_smarts:
try:
rxn = rdChemReactions.ReactionFromSmarts(rs)
reactions.append(rxn)
except Exception as e:
print(f"无法解析反应 SMARTS: {rs}")
continue
print(f"成功加载的反应数量: {len(reactions)}")3. 基于反应的片段组合
使用扩展的反应集合,将片段通过化学反应组合生成新分子。
from itertools import product
def generate_new_molecules(fragments, reactions, num_molecules=1000):
new_molecules = set()
fragment_pairs = list(product(fragments, repeat=2))
random.shuffle(fragment_pairs) # 随机打乱组合顺序
max_attempts = len(fragment_pairs)
attempts = 0
for frag1, frag2 in tqdm(fragment_pairs, desc="Generating molecules"):
if attempts >= max_attempts or len(new_molecules) >= num_molecules:
break
attempts += 1
for rxn in reactions:
try:
reactant1 = frag1
reactant2 = frag2
reactants = (reactant1, reactant2)
# 检查反应物数量
if rxn.GetNumReactantTemplates() != len(reactants):
continue
# 检查反应物是否匹配
matches = [rxn.IsMoleculeReactant(mol) for mol in reactants]
if not all(matches):
continue
products = rxn.RunReactants(reactants)
# 提取生成的产品
for product in products:
mol = product[0]
Chem.SanitizeMol(mol)
smi = Chem.MolToSmiles(mol)
new_molecules.add(smi)
except Exception as e:
continue
if len(new_molecules) >= num_molecules:
break
return list(new_molecules)
# 生成新分子
print("开始生成新分子...")
new_smiles = generate_new_molecules(fragment_mols, reactions, num_molecules=2000)
print(f'生成的新分子数量: {len(new_smiles)}')
# 将 SMILES 转换为分子对象
new_mols = [Chem.MolFromSmiles(smi) for smi in new_smiles]
new_mols = [mol for mol in new_mols if mol is not None]4. 使用图神经网络(GNN)进行 pIC50 预测
4.1 准备数据
# 安装 PyTorch Geometric(如果未安装)
# !pip install torch-geometric
# 导入 PyTorch Geometric 库
from torch_geometric.data import Data
from torch_geometric.loader import DataLoader as GeoDataLoader
from torch_geometric.nn import GCNConv, global_mean_pool
# 将分子转换为图数据
def mol_to_graph_data_obj(mol, label):
# 获取原子特征
atom_features_list = []
for atom in mol.GetAtoms():
atom_features = []
atom_features.append(atom.GetAtomicNum())
atom_features_list.append(atom_features)
x = torch.tensor(atom_features_list, dtype=torch.long)
# 获取边索引和边特征
edge_index = []
for bond in mol.GetBonds():
i = bond.GetBeginAtomIdx()
j = bond.GetEndAtomIdx()
edge_index.append([i, j])
edge_index.append([j, i])
edge_index = torch.tensor(edge_index, dtype=torch.long).t().contiguous()
# 构建图数据对象
data = Data(x=x, edge_index=edge_index, y=torch.tensor([label], dtype=torch.float))
return data
# 准备训练数据
graph_data_list = []
for smi, label in zip(smiles_list, labels):
mol = Chem.MolFromSmiles(smi)
if mol is not None:
data = mol_to_graph_data_obj(mol, label)
graph_data_list.append(data)
# 划分训练和验证集
train_graphs, val_graphs = train_test_split(graph_data_list, test_size=0.1, random_state=42)4.2 定义 GNN 模型
class GNNModel(nn.Module):
def __init__(self, num_features=1, hidden_dim=128, num_classes=1):
super(GNNModel, self).__init__()
torch.manual_seed(42)
self.conv1 = GCNConv(num_features, hidden_dim)
self.conv2 = GCNConv(hidden_dim, hidden_dim)
self.conv3 = GCNConv(hidden_dim, hidden_dim)
self.lin = nn.Linear(hidden_dim, num_classes)
def forward(self, data):
x, edge_index, batch = data.x.float(), data.edge_index, data.batch
x = self.conv1(x, edge_index)
x = F.relu(x)
x = self.conv2(x, edge_index)
x = F.relu(x)
x = self.conv3(x, edge_index)
x = F.relu(x)
x = global_mean_pool(x, batch) # 全局平均池化
x = self.lin(x)
return x4.3 训练 GNN 模型
from torch.optim import Adam
# 创建数据加载器
train_loader = GeoDataLoader(train_graphs, batch_size=32, shuffle=True)
val_loader = GeoDataLoader(val_graphs, batch_size=32, shuffle=False)
# 初始化模型和优化器
model = GNNModel(num_features=1, hidden_dim=128, num_classes=1).to(device)
optimizer = Adam(model.parameters(), lr=0.001)
# 定义损失函数
loss_fn = nn.MSELoss()
# 训练模型
def train(model, loader):
model.train()
total_loss = 0
for data in loader:
data = data.to(device)
optimizer.zero_grad()
out = model(data)
loss = loss_fn(out.view(-1), data.y.view(-1))
loss.backward()
optimizer.step()
total_loss += loss.item() * data.num_graphs
return total_loss / len(loader.dataset)
def evaluate(model, loader):
model.eval()
total_loss = 0
with torch.no_grad():
for data in loader:
data = data.to(device)
out = model(data)
loss = loss_fn(out.view(-1), data.y.view(-1))
total_loss += loss.item() * data.num_graphs
return total_loss / len(loader.dataset)
# 训练循环
num_epochs = 30
for epoch in range(1, num_epochs + 1):
train_loss = train(model, train_loader)
val_loss = evaluate(model, val_loader)
print(f'Epoch: {epoch}, Training Loss: {train_loss:.4f}, Validation Loss: {val_loss:.4f}')
# 保存模型
torch.save(model.state_dict(), 'checkpoints/gnn_model.pt')5. 对生成的分子进行性质预测和筛选
5.1 pIC50 预测
# 对生成的新分子进行 pIC50 预测
generated_graphs = []
for mol in new_mols:
data = mol_to_graph_data_obj(mol, label=0) # 标签此处不重要
generated_graphs.append(data)
# 创建数据加载器
generated_loader = GeoDataLoader(generated_graphs, batch_size=32, shuffle=False)
# 预测 pIC50
model.eval()
predicted_pIC50 = []
with torch.no_grad():
for data in generated_loader:
data = data.to(device)
out = model(data)
predicted_pIC50.extend(out.view(-1).cpu().numpy())
# 将预测结果与 SMILES 对应起来
generated_smiles = [Chem.MolToSmiles(mol) for mol in new_mols]5.2 计算 ADMET 和其他性质
# 初始化 PAINS 过滤器
params = FilterCatalogParams()
params.AddCatalog(FilterCatalogParams.FilterCatalogs.PAINS)
catalog = FilterCatalog(params)
def calculate_properties(mols):
properties = []
for mol in mols:
mol_weight = Descriptors.MolWt(mol)
logp = Crippen.MolLogP(mol)
num_h_donors = Descriptors.NumHDonors(mol)
num_h_acceptors = Descriptors.NumHAcceptors(mol)
rotatable_bonds = Descriptors.NumRotatableBonds(mol)
tpsa = Descriptors.TPSA(mol)
qed = Descriptors.qed(mol)
# CNS 符合性(简单判断,TPSA < 90,分子量 < 450)
cns_pass = tpsa < 90 and mol_weight < 450
# Lipinski 规则符合性
lipinski_pass = (mol_weight <= 500 and
logp <= 5 and
num_h_donors <= 5 and
num_h_acceptors <= 10)
# PAINS 过滤器
pains_matches = catalog.GetMatches(mol)
pains_pass = len(pains_matches) == 0
properties.append({
'MolWt': mol_weight,
'LogP': logp,
'NumHDonors': num_h_donors,
'NumHAcceptors': num_h_acceptors,
'RotatableBonds': rotatable_bonds,
'TPSA': tpsa,
'QED': qed,
'CNS_Pass': cns_pass,
'Lipinski_Pass': lipinski_pass,
'PAINS_Pass': pains_pass
})
return properties
# 计算生成分子的性质
generated_properties = calculate_properties(new_mols)5.3 整合结果
# 整合 SMILES、预测的 pIC50 和计算的性质
results = []
for smi, pIC50, props in zip(generated_smiles, predicted_pIC50, generated_properties):
result = {
'SMILES': smi,
'Predicted_pIC50': pIC50,
'MolWt': props['MolWt'],
'LogP': props['LogP'],
'NumHDonors': props['NumHDonors'],
'NumHAcceptors': props['NumHAcceptors'],
'RotatableBonds': props['RotatableBonds'],
'TPSA': props['TPSA'],
'QED': props['QED'],
'CNS_Pass': props['CNS_Pass'],
'Lipinski_Pass': props['Lipinski_Pass'],
'PAINS_Pass': props['PAINS_Pass']
}
results.append(result)
# 将结果转换为 DataFrame
results_df = pd.DataFrame(results)
# 筛选符合条件的分子
filtered_df = results_df[
(results_df['Lipinski_Pass']) &
(results_df['CNS_Pass']) &
(results_df['PAINS_Pass']) &
(results_df['Predicted_pIC50'] > 7) # 假设我们关注 pIC50 > 7 的分子
]
print(f'符合筛选条件的分子数量: {len(filtered_df)}')6. 结果保存和可视化
# 保存所有生成的分子及其性质
results_df.to_csv('generated_molecules/generated_molecules_properties.csv', index=False)
# 保存筛选后的分子
filtered_df.to_csv('generated_molecules/filtered_molecules.csv', index=False)
print("结果已保存到 'generated_molecules' 目录下。")
# 可视化筛选后的前 16 个分子
filtered_mols = [Chem.MolFromSmiles(smi) for smi in filtered_df['SMILES'].tolist()[:16]]
if filtered_mols:
img = Draw.MolsToGridImage(filtered_mols, molsPerRow=4, subImgSize=(200, 200))
img.show()
else:
print("没有符合条件的分子可供可视化。")代码说明
扩展反应 SMARTS 集合:添加了约 50 个常用的化学反应的 SMARTS 表示,包括各种缩合、取代、加成、环化、氧化、还原等反应。这些反应可用于将片段组合成化学合理的新分子。
基于反应的片段组合:使用 RDKit 的
ReactionFromSmarts和RunReactants方法,将片段作为反应物,应用反应 SMARTS 进行反应,生成新分子。通过遍历片段对和反应集合,生成大量的新分子。使用 GNN 进行 pIC50 预测:
数据准备:将分子转换为图数据对象,包括原子特征、边索引等。
模型定义:使用 PyTorch Geometric 定义一个简单的 GCN 模型,包括三层图卷积层和全局平均池化。
模型训练:在训练集上训练模型,在验证集上评估模型性能。
预测:对生成的新分子进行 pIC50 预测。
性质计算和筛选:计算生成分子的物化性质,应用 Lipinski 规则、CNS 符合性、PAINS 过滤器等进行筛选。
结果保存和可视化:将所有生成的分子及其性质保存到 CSV 文件,筛选符合条件的分子,并可视化部分分子。
注意事项
反应匹配:在应用反应时,需要确保反应物的数量和类型与反应模板匹配。代码中增加了对反应物数量和匹配的检查。
计算时间:由于片段数量和反应数量较大,生成新分子的过程可能需要较长时间。可以根据需要调整
num_molecules参数和尝试次数。GNN 模型的性能:此处使用了一个简单的 GCN 模型,您可以根据需要调整模型架构、增加层数或更改超参数,以提高预测性能。
环境配置:使用 PyTorch Geometric 需要安装相应的依赖库,确保您的 Python 环境中已正确安装。
反应 SMARTS 的准确性:反应 SMARTS 字符串可能存在语法错误或化学错误,建议在实际使用中仔细验证每个反应的正确性。
进一步优化:您可以根据需要,对代码进行优化,例如并行化计算、优化数据结构等。
结论
评论
发表评论