😀 相信大家阅读完前面的训练教程,已经对大模型训练有所熟悉,并且已经得到了自己训练模型,接下来的文章将讲解如何进一步优化咱们的模型~
1. 学习率调度策略
# 余弦退火(Cosine Annealing)
--lr_scheduler cosine \
--warmup_steps 1000 \
--min_lr 1e-6
# 阶梯式下降
--lr_scheduler step \
--lr_decay_steps 2000 \
--lr_decay_rate 0.9
理解: 就像学习要有节奏,不能一直高强度,要有张有弛
2. 梯度裁剪(Gradient Clipping)
--grad_clip 1.0# 防止梯度爆炸
理解: 就像给学习速度设上限,防止学过头
3. 数据增强技巧
# 回译增强(中文→英文→中文)def back_translate(text):
# 中文→英文
en_text = translate_to_english(text)
# 英文→中文
zh_text = translate_to_chinese(en_text)
return zh_text
# 同义词替换import synonyms
def synonym_replace(text, replace_rate=0.1):
words = jieba.lcut(text)
new_words = []
for word in words:
if random.random() < replace_rate:
syns = synonyms.nearby(word)[0]
if syns:
word = random.choice(syns[:3])
new_words.append(word)
return ''.join(new_words)
1. 注意力机制改进
# 旋转位置编码(RoPE)class RotaryPositionalEmbedding(nn.Module):
def __init__(self, dim, max_seq_len=512):
super().__init__()
self.dim = dim
self.max_seq_len = max_seq_len
# 预计算频率
freqs = torch.exp(torch.arange(0, dim, 2).float() *
-(math.log(10000.0) / dim))
self.register_buffer('freqs', freqs)
def forward(self, x, seq_len):
# 应用旋转位置编码
t = torch.arange(seq_len, device=x.device)
freqs = torch.outer(t, self.freqs)
return apply_rotary_pos_emb(x, freqs)
2. 激活函数优化
# SwiGLU激活函数(比ReLU效果更好)class SwiGLU(nn.Module):
def forward(self, x):
x1, x2 = x.chunk(2, dim=-1)
return F.silu(x1) * x2
3. 归一化层改进
# RMSNorm(比LayerNorm更快)class RMSNorm(nn.Module):
def __init__(self, dim, eps=1e-6):
super().__init__()
self.weight = nn.Parameter(torch.ones(dim))
self.eps = eps
def forward(self, x):
variance = x.pow(2).mean(-1, keepdim=True)
x = x * torch.rsqrt(variance + self.eps)
return self.weight * x
1. 数据并行
# 单机多卡
python -m torch.distributed.launch \
--nproc_per_node=4 \
trainer/train_pretrain.py \
--distributed \
--local_rank $LOCAL_RANK
# 多机多卡
torchrun \
--nnodes=2 \
--nproc_per_node=4 \
--node_rank=0 \
--master_addr="192.168.1.100" \
--master_port=12345 \
trainer/train_pretrain.py
2. 模型并行
# 将模型分割到不同GPUclass ModelParallel(nn.Module):
def __init__(self):
super().__init__()
self.embed = nn.Embedding(vocab_size, embed_dim).to('cuda:0')
self.transformer = Transformer(...).to('cuda:1')
self.lm_head = nn.Linear(embed_dim, vocab_size).to('cuda:2')
def forward(self, x):
x = self.embed(x.to('cuda:0'))
x = self.transformer(x.to('cuda:1'))
return self.lm_head(x.to('cuda:2'))
3. 混合精度训练
from torch.cuda.amp import autocast, GradScaler
scaler = GradScaler()
for batch in dataloader:
optimizer.zero_grad()
with autocast():
outputs = model(batch)
loss = criterion(outputs, targets)
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
1. 流式数据加载
class StreamingDataset(torch.utils.data.IterableDataset):
def __init__(self, data_files, tokenizer):
self.data_files = data_files
self.tokenizer = tokenizer
def __iter__(self):
for file in self.data_files:
with open(file, 'r', encoding='utf-8') as f:
for line in f:
text = json.loads(line)['text']
tokens = self.tokenizer.encode(text)
yield torch.tensor(tokens)
2. 动态批处理
def dynamic_batching(samples, max_tokens=4096):
"""根据序列长度动态调整批次大小"""
batch = []
current_tokens = 0
for sample in sorted(samples, key=lambda x: len(x), reverse=True):
sample_tokens = len(sample)
if current_tokens + sample_tokens > max_tokens:
yield batch
batch = []
current_tokens = 0
batch.append(sample)
current_tokens += sample_tokens
if batch:
yield batch
1. 困惑度(Perplexity)
def calculate_perplexity(model, test_data):
"""计算模型在测试集上的困惑度"""
model.eval()
total_loss = 0
total_tokens = 0
with torch.no_grad():
for batch in test_data:
outputs = model(batch)
loss = criterion(outputs.view(-1, vocab_size), batch.view(-1))
total_loss += loss.item() * batch.numel()
total_tokens += batch.numel()
avg_loss = total_loss / total_tokens
perplexity = torch.exp(torch.tensor(avg_loss))
return perplexity.item()
2. BLEU分数(翻译质量)
from nltk.translate.bleu_score import sentence_bleu
def calculate_bleu(reference, hypothesis):
"""计算BLEU分数"""
reference = [reference.split()]
hypothesis = hypothesis.split()
return sentence_bleu(reference, hypothesis)
3. 人工评估
def human_evaluation(model, test_questions):
"""人工评估模型回答质量"""
results = []
for question in test_questions:
answer = model.generate(question)
# 流畅度评分 (1-5)
fluency = input(f"问题: {question}\n回答: {answer}\n流畅度评分(1-5): ")
# 相关性评分 (1-5)
relevance = input(f"相关性评分(1-5): ")
# 有用性评分 (1-5)
usefulness = input(f"有用性评分(1-5): ")
results.append({
'question': question,
'answer': answer,
'fluency': fluency,
'relevance': relevance,
'usefulness': usefulness
})
return results
1. 阅读理解
def reading_comprehension_evaluation(model, passages, questions):
"""评估阅读理解能力"""
scores = []
for passage, question_data in zip(passages, questions):
passage_text = passage['text']
question = question_data['question']
correct_answer = question_data['answer']
# 模型生成答案
model_answer = model.answer_question(passage_text, question)
# 计算准确率
is_correct = (model_answer.strip() == correct_answer.strip())
scores.append(is_correct)
return sum(scores) / len(scores)
2. 常识推理
def commonsense_reasoning_evaluation(model, questions):
"""评估常识推理能力"""
correct = 0
total = len(questions)
for question in questions:
prompt = f"问题: {question['question']}\n选项:\n"
for i, option in enumerate(question['options']):
prompt += f"{i+1}. {option}\n"
prompt += "答案: "
model_answer = model.generate(prompt)
# 提取答案if str(question['correct']) in model_answer:
correct += 1
return correct / total
1. 模型量化
# 8位量化def quantize_model_8bit(model):
"""将模型量化为8位"""
for name, param in model.named_parameters():
param.data = (param.data * 127).round().clamp(-128, 127) / 127
return model
# 4位量化def quantize_model_4bit(model):
"""将模型量化为4位"""
for name, param in model.named_parameters():
param.data = (param.data * 7).round().clamp(-8, 7) / 7
return model
2. 知识蒸馏
class KnowledgeDistillation(nn.Module):
def __init__(self, teacher_model, student_model, temperature=3.0):
super().__init__()
self.teacher = teacher_model
self.student = student_model
self.temperature = temperature
def forward(self, x):
# 教师模型输出(不计算梯度)with torch.no_grad():
teacher_logits = self.teacher(x) / self.temperature
# 学生模型输出
student_logits = self.student(x) / self.temperature
# 蒸馏损失
distill_loss = F.kl_div(
F.log_softmax(student_logits, dim=-1),
F.softmax(teacher_logits, dim=-1),
reduction='batchmean'
) * (self.temperature ** 2)
return distill_loss
3. 缓存优化
class KVCache:
"""键值缓存,加速自回归生成"""
def __init__(self):
self.key_cache = []
self.value_cache = []
def update(self, key, value, layer_idx):
"""更新缓存"""
if layer_idx >= len(self.key_cache):
self.key_cache.append(key)
self.value_cache.append(value)
else:
self.key_cache[layer_idx] = torch.cat([self.key_cache[layer_idx], key], dim=-2)
self.value_cache[layer_idx] = torch.cat([self.value_cache[layer_idx], value], dim=-2)
return self.key_cache[layer_idx], self.value_cache[layer_idx]
def clear(self):
"""清空缓存"""
self.key_cache = []
self.value_cache = []
1. FastAPI服务
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
app = FastAPI()
class ChatRequest(BaseModel):
message: str
max_length: int = 512
class ChatResponse(BaseModel):
response: str
tokens_used: int
@app.post("/chat", response_model=ChatResponse)
async def chat_endpoint(request: ChatRequest):
try:
# 生成回复
response = model.generate(
request.message,
max_length=request.max_length
)
# 计算使用的token数
tokens = tokenizer.encode(response)
return ChatResponse(
response=response,
tokens_used=len(tokens)
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
2. 流式输出
@app.post("/chat/stream")
async def chat_stream_endpoint(request: ChatRequest):
async def generate_stream():
for chunk in model.generate_stream(request.message):
yield f"data: {json.dumps({'chunk': chunk})}\n\n"
yield f"data: {json.dumps({'done': True})}\n\n"
return StreamingResponse(
generate_stream(),
media_type="text/event-stream"
)
训练一个了解你个人经历、兴趣爱好、说话方式的专属AI助手
1. 聊天记录收集
import json
import re
from datetime import datetime
def extract_wechat_chat(file_path):
"""提取微信聊天记录"""
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# 解析微信格式
pattern = r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\n(.*?): (.*?)(?=\n\d{4}-|$)'
matches = re.findall(pattern, content, re.DOTALL)
conversations = []
current_conv = []
for time_str, speaker, message in matches:
# 只保留你发送的消息if "你的昵称" in speaker:
current_conv.append({
'time': time_str,
'speaker': 'me',
'message': message.strip()
})
# 如果对话太长,重新开始if len(current_conv) > 10:
conversations.append(current_conv)
current_conv = []
return conversations
def extract_qq_chat(file_path):
"""提取QQ聊天记录"""
with open(file_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
conversations = []
current_speaker = None
current_message = []
for line in lines:
line = line.strip()
if not line:
continue
# QQ格式:时间 昵称 消息match = re.match(r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\s+(.*?)\s+(.*)', line)
if match:
time_str, speaker, message = match.groups()
# 只保留你的消息if "你的QQ昵称" in speaker:
conversations.append({
'time': time_str,
'speaker': 'me',
'message': message.strip()
})
return conversations
2. 个人写作收集
def collect_personal_writing(folder_path):
"""收集个人写作(作文、日记、博客等)"""
writing_data = []
# 支持的文件类型
file_extensions = ['.txt', '.md', '.docx']
for file_path in Path(folder_path).rglob('*'):
if file_path.suffix in file_extensions:
try:
if file_path.suffix == '.txt':
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
elif file_path.suffix == '.md':
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
elif file_path.suffix == '.docx':
import docx
doc = docx.Document(file_path)
content = '\n'.join([para.text for para in doc.paragraphs])
writing_data.append({
'filename': str(file_path),
'content': content,
'type': file_path.parent.name# 文件夹名作为类型
})
except Exception as e:
print(f"处理文件 {file_path} 时出错: {e}")
return writing_data
def extract_social_media_posts():
"""提取社交媒体帖子(微博、朋友圈等)"""
posts = []
# 微博数据
weibo_file = "weibo_posts.json"
if os.path.exists(weibo_file):
with open(weibo_file, 'r', encoding='utf-8') as f:
weibo_data = json.load(f)
for post in weibo_data:
if 'content' in post and post.get('is_my_post'):
posts.append({
'platform': 'weibo',
'content': post['content'],
'time': post.get('created_at', ''),
'likes': post.get('like_count', 0)
})
# 朋友圈数据
moments_file = "wechat_moments.json"
if os.path.exists(moments_file):
with open(moments_file, 'r', encoding='utf-8') as f:
moments_data = json.load(f)
for moment in moments_data:
if moment.get('is_my_post'):
posts.append({
'platform': 'wechat_moments',
'content': moment['content'],
'time': moment.get('created_at', ''),
'images': moment.get('image_count', 0)
})
return posts
3. 个人资料整理
def create_persona_profile():
"""创建个人资料档案"""
profile = {
'basic_info': {
'name': '你的名字',
'age': '你的年龄',
'location': '你的城市',
'occupation': '你的职业/专业',
'education': '你的教育背景'
},
'personality_traits': [
'性格特点1(如:乐观开朗)',
'性格特点2(如:细心认真)',
'性格特点3(如:善于思考)'
],
'interests_hobbies': [
'兴趣爱好1(如:编程)',
'兴趣爱好2(如:音乐)',
'兴趣爱好3(如:运动)'
],
'communication_style': {
'tone': '说话语气(如:温和、幽默、直接)',
'common_phrases': [
'常用表达1',
'常用表达2',
'常用表达3'
],
'emoji_usage': '表情使用习惯',
'response_length': '回复长度偏好(简短/详细)'
},
'knowledge_domains': [
'专业领域1',
'专业领域2',
'熟悉的主题1',
'熟悉的主题2'
],
'typical_responses': {
'greeting': '你通常怎么打招呼',
'thanks': '如何表达感谢',
'apology': '如何道歉',
'goodbye': '如何告别'
}
}
return profile
1. 数据清洗
def clean_personal_data(data):
"""清洗个人数据"""
cleaned_data = []
for item in data:
text = item.get('message', '') or item.get('content', '')
# 移除URL
text = re.sub(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', '', text)
# 移除特殊符号和表情(保留常用标点)
text = re.sub(r'[^\u4e00-\u9fff\u3040-\u309f\u30a0-\u30ff\w\s。,!?:;""''()]', '', text)
# 移除过短的内容if len(text.strip()) < 5:
continue
# 移除重复内容if text.strip() in [d['text'] for d in cleaned_data]:
continue
cleaned_data.append({
'text': text.strip(),
'source': item.get('platform', 'unknown'),
'timestamp': item.get('time', ''),
'type': item.get('type', 'chat')
})
return cleaned_data
def remove_sensitive_info(data):
"""移除敏感信息"""
sensitive_patterns = [
(r'\d{11}', '[手机号]'),# 手机号
(r'\d{17}[\dXx]', '[身份证号]'),# 身份证号
(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', '[邮箱]'),# 邮箱
(r'\d{4}-\d{4}-\d{4}-\d{4}', '[银行卡号]'),# 银行卡号
]
cleaned_data = []
for item in data:
text = item['text']
for pattern, replacement in sensitive_patterns:
text = re.sub(pattern, replacement, text)
item['text'] = text
cleaned_data.append(item)
return cleaned_data
2. 数据增强
def augment_personal_data(data):
"""增强个人数据"""
augmented_data = []
for item in data:
text = item['text']
# 原始数据
augmented_data.append(item)
# 同义词替换if len(text) > 10:
augmented_text = synonym_replace(text, replace_rate=0.1)
if augmented_text != text:
new_item = item.copy()
new_item['text'] = augmented_text
new_item['augmented'] = True
augmented_data.append(new_item)
# 句式变换if '。' in text and len(text.split('。')) > 1:
sentences = text.split('。')
# 随机打乱句子顺序if len(sentences) > 2:
random.shuffle(sentences)
shuffled_text = '。'.join(sentences)
new_item = item.copy()
new_item['text'] = shuffled_text
new_item['augmented'] = True
augmented_data.append(new_item)
return augmented_data
1. 指令格式(推荐)
{
"instruction": "描述你的兴趣爱好",
"input": "",
"output": "我喜欢编程、打篮球、听周杰伦的歌。编程让我能够创造新的东西,打篮球让我保持健康,而周杰伦的音乐总能让我放松。"
}
2. 对话格式
{
"conversations": [
{
"from": "human",
"value": "你平时喜欢做什么?"
},
{
"from": "gpt",
"value": "我平时喜欢编程、打篮球、还有听音乐。编程让我很有成就感,打篮球能让我放松,听音乐则是我享受生活的方式。"
},
{
"from": "human",
"value": "你最喜欢的编程语言是什么?"
},
{
"from": "gpt",
"value": "我最喜欢Python,因为它的语法简洁优雅,而且有很多强大的库。用Python写代码就像写诗一样流畅。"
}
]
}
3. 问答格式
{
"question": "你的性格是怎样的?",
"answer": "我是一个比较乐观开朗的人,遇事喜欢往好的方面想。同时我也很细心,做事认真负责。朋友们都说我很可靠,有问题都愿意找我商量。"
}
def split_personal_data(data, train_ratio=0.8, val_ratio=0.1, test_ratio=0.1):
"""分割个人数据"""
# 按时间排序
data.sort(key=lambda x: x.get('timestamp', ''))
n = len(data)
train_size = int(n * train_ratio)
val_size = int(n * val_ratio)
train_data = data[:train_size]
val_data = data[train_size:train_size + val_size]
test_data = data[train_size + val_size:]
return train_data, val_data, test_data
# 保存为JSONL格式def save_as_jsonl(data, filename):
"""保存为JSONL格式"""
with open(filename, 'w', encoding='utf-8') as f:
for item in data:
f.write(json.dumps(item, ensure_ascii=False) + '\n')
1. 使用通用中文数据
# 第1步:预训练(使用通用中文数据)
python trainer/train_pretrain.py \
--dim 512 \
--n_layers 8 \
--n_heads 8 \
--batch_size 64 \
--max_epochs 10 \
--lr 5e-4 \
--max_seq_len 512 \
--data_path "dataset/chinese_corpus.jsonl" \
--save_name "personal_base"
2. 监控训练过程
# 训练监控脚本def monitor_training(log_file):
"""监控训练过程"""
import matplotlib.pyplot as plt
epochs = []
losses = []
perplexities = []
with open(log_file, 'r') as f:
for line in f:
if 'Epoch' in line and 'Loss' in line:
parts = line.split()
epoch = int(parts[1].split('/')[0])
loss = float(parts[3])
epochs.append(epoch)
losses.append(loss)
# 计算困惑度
perplexity = np.exp(loss)
perplexities.append(perplexity)
# 绘制图表
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))
ax1.plot(epochs, losses, 'b-', label='Training Loss')
ax1.set_xlabel('Epoch')
ax1.set_ylabel('Loss')
ax1.set_title('Training Loss over Time')
ax1.grid(True)
ax2.plot(epochs, perplexities, 'r-', label='Perplexity')
ax2.set_xlabel('Epoch')
ax2.set_ylabel('Perplexity')
ax2.set_title('Perplexity over Time')
ax2.grid(True)
plt.tight_layout()
plt.savefig('training_monitor.png')
plt.show()
1. 准备配置文件
{
"lora_config": {
"rank": 16,
"alpha": 32,
"dropout": 0.1,
"target_modules": ["q_proj", "k_proj", "v_proj", "o_proj"]
},
"training_config": {
"batch_size": 32,
"max_epochs": 5,
"learning_rate": 2e-4,
"warmup_steps": 100,
"save_steps": 500
}
}
2. 执行LoRA微调
# 第2步:个性化LoRA微调
python trainer/train_lora.py \
--base_model "checkpoints/personal_base_512.pth" \
--data_path "personal_data/train.jsonl" \
--lora_rank 16 \
--lora_alpha 32 \
--lora_dropout 0.1 \
--batch_size 32 \
--max_epochs 5 \
--lr 2e-4 \
--save_name "personal_lora"
3. 合并LoRA权重
def merge_lora_weights(base_model, lora_weights):
"""合并LoRA权重到基础模型"""
merged_model = base_model.copy()
for name, param in merged_model.named_parameters():
if name in lora_weights:
lora_A = lora_weights[f"{name}.lora_A"]
lora_B = lora_weights[f"{name}.lora_B"]
# LoRA公式: W' = W + α * B * A
delta = lora_alpha * (lora_B @ lora_A)
param.data += delta
return merged_model
1. 数据混合策略
def mix_training_data(general_data, personal_data, mix_ratio=0.3):
"""混合通用数据和个人数据"""
mixed_data = []
# 通用数据
general_samples = int(len(personal_data) * (1 - mix_ratio) / mix_ratio)
mixed_data.extend(random.sample(general_data, min(general_samples, len(general_data))))
# 个人数据
mixed_data.extend(personal_data)
# 打乱顺序
random.shuffle(mixed_data)
return mixed_data
2. 执行全参数微调
# 第2步:全参数微调(需要更多显存)
python trainer/train_full_sft.py \
--dim 512 \
--n_layers 8 \
--n_heads 8 \
--batch_size 16 \
--max_epochs 3 \
--lr 1e-4 \
--max_seq_len 512 \
--data_path "personal_data/mixed_train.jsonl" \
--base_model "checkpoints/personal_base_512.pth" \
--save_name "personal_full"
1. 基础对话测试
def test_basic_conversation(model, tokenizer):
"""测试基础对话能力"""
test_questions = [
"你好,请介绍一下你自己",
"你叫什么名字?",
"你今年多大了?",
"你是哪里人?",
"你平时喜欢做什么?",
"你的性格是怎样的?",
"你有什么兴趣爱好?",
"你最喜欢的食物是什么?"
]
print("=== 基础对话测试 ===")
for question in test_questions:
response = generate_response(model, tokenizer, question)
print(f"问:{question}")
print(f"答:{response}")
print("-" * 50)
2. 专业知识测试
def test_professional_knowledge(model, tokenizer):
"""测试专业知识"""
professional_questions = [
# 根据你的专业领域定制"请解释一下你最擅长的技能",
"你在学习过程中遇到过什么挑战?",
"你对未来的职业规划是什么?",
"你如何看待你专业领域的发展趋势?",
"请分享一个你解决问题的经历"
]
print("=== 专业知识测试 ===")
for question in professional_questions:
response = generate_response(model, tokenizer, question)
print(f"问:{question}")
print(f"答:{response}")
print("-" * 50)
3. 个性特征测试
def test_personality_traits(model, tokenizer):
"""测试个性特征"""
personality_questions = [
"如果朋友遇到困难,你会怎么安慰他?",
"当你压力很大的时候,你会怎么放松?",
"你如何看待失败?",
"你通常怎么表达对家人的爱?",
"你生气的时候会怎么做?"
]
print("=== 个性特征测试 ===")
for question in personality_questions:
response = generate_response(model, tokenizer, question)
print(f"问:{question}")
print(f"答:{response}")
print("-" * 50)
1. 相似度评估
def evaluate_similarity(model_responses, real_responses):
"""评估模型回答与真实回答的相似度"""
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
vectorizer = TfidfVectorizer()
similarities = []
for model_resp, real_resp in zip(model_responses, real_responses):
# 向量化
vectors = vectorizer.fit_transform([model_resp, real_resp])
# 计算余弦相似度
similarity = cosine_similarity(vectors[0:1], vectors[1:2])[0][0]
similarities.append(similarity)
return {
'average_similarity': np.mean(similarities),
'max_similarity': np.max(similarities),
'min_similarity': np.min(similarities)
}
2. 风格一致性评估
def evaluate_style_consistency(model_responses, training_data):
"""评估风格一致性"""
# 提取语言特征def extract_features(text):
features = {}
# 平均句长
sentences = text.split('。')
features['avg_sentence_length'] = np.mean([len(s) for s in sentences if s])
# 常用词比例
words = jieba.lcut(text)
common_words = ['的', '了', '是', '我', '你', '他', '她', '它']
features['common_word_ratio'] = sum(1 for w in words if w in common_words) / len(words)
# 感叹号使用频率
features['exclamation_ratio'] = text.count('!') / len(text)
# 表情符号使用(如果有)
emoji_pattern = re.compile(r'[\U00010000-\U0010ffff]')
features['emoji_ratio'] = len(emoji_pattern.findall(text)) / len(text)
return features
# 提取训练数据特征
training_features = [extract_features(d['text']) for d in training_data[:100]]
avg_training_features = {k: np.mean([f[k] for f in training_features])
for k in training_features[0].keys()}
# 提取模型回答特征
model_features = [extract_features(resp) for resp in model_responses]
avg_model_features = {k: np.mean([f[k] for f in model_features])
for k in model_features[0].keys()}
# 计算特征差异
differences = {}
for feature in avg_training_features:
differences[feature] = abs(avg_training_features[feature] -
avg_model_features[feature])
return differences
1. 错误分析
def analyze_bad_responses(bad_examples):
"""分析不好的回答,找出改进方向"""
analysis = {
'too_generic': 0,# 太通用'off_topic': 0,# 跑题'inconsistent': 0,# 不一致'too_formal': 0,# 太正式'too_casual': 0,# 太随意'missing_info': 0# 信息缺失
}
for example in bad_examples:
question = example['question']
bad_response = example['bad_response']
expected = example.get('expected', '')
# 判断是否太通用if any(word in bad_response for word in ['一般来说', '通常情况下', '每个人']):
analysis['too_generic'] += 1
# 判断是否跑题if not any(keyword in bad_response for keyword in question.split()):
analysis['off_topic'] += 1
# 判断是否不一致if expected and len(set(bad_response.split()) & set(expected.split())) < 3:
analysis['inconsistent'] += 1
return analysis
2. 针对性数据增强
def targeted_augmentation(analysis_results, training_data):
"""根据分析结果进行针对性数据增强"""
if analysis_results['too_generic'] > 5:
# 增加具体化的训练数据
specific_examples = [
{
"instruction": "具体描述你的一个经历",
"input": "",
"output": "上周六下午,我在图书馆三楼靠窗的位置看《深度学习》,阳光透过窗户洒在书页上,那一刻我觉得特别充实。"
},
{
"instruction": "用具体的例子说明",
"input": "",
"output": "比如上周我遇到一个问题,我的代码运行特别慢,后来我发现是算法复杂度太高,通过优化数据结构,速度提升了10倍。"
}
]
training_data.extend(specific_examples)
if analysis_results['inconsistent'] > 5:
# 增加一致性训练数据
consistency_examples = [
{
"instruction": "保持回答风格一致",
"input": "你好",
"output": "嗨!很高兴见到你~今天过得怎么样呀?"
},
{
"instruction": "用你一贯的说话方式",
"input": "谢谢你的帮助",
"output": "哎呀你太客气啦!这点小事儿,举手之劳而已~"
}
]
training_data.extend(consistency_examples)
return training_data
3. 自动化优化流程
def auto_optimization_pipeline(model, tokenizer, test_questions, real_answers):
"""自动化优化流程"""
print("=== 开始自动化优化 ===")
# 第1步:生成测试回答
model_answers = []
for question in test_questions:
answer = generate_response(model, tokenizer, question)
model_answers.append(answer)
# 第2步:评估质量
similarity_scores = evaluate_similarity(model_answers, real_answers)
print(f"相似度评估: {similarity_scores}")
# 第3步:找出需要改进的例子
bad_examples = []
for i, (question, model_answer, real_answer) in enumerate(
zip(test_questions, model_answers, real_answers)):
# 简单相似度判断(可以改进)
similarity = len(set(model_answer.split()) & set(real_answer.split())) / \
len(set(real_answer.split()))
if similarity < 0.3:# 阈值可以调整
bad_examples.append({
'question': question,
'bad_response': model_answer,
'expected': real_answer
})
# 第4步:分析错误if bad_examples:
analysis = analyze_bad_responses(bad_examples)
print(f"错误分析: {analysis}")
# 第5步:生成改进数据
improved_training_data = targeted_augmentation(analysis, [])
# 第6步:重新训练(这里可以增量训练)if improved_training_data:
print(f"生成了 {len(improved_training_data)} 条改进数据")
# 保存改进数据
save_as_jsonl(improved_training_data, "improved_training_data.jsonl")
print("请使用改进数据重新训练模型:")
print("python trainer/train_lora.py --data_path improved_training_data.jsonl")
return model_answers
#!/usr/bin/env python3"""
个性化AI训练脚本 - 训练"你自己"的AI助手
"""
import os
import json
import argparse
from pathlib import Path
def main():
parser = argparse.ArgumentParser(description='训练个性化AI助手')
parser.add_argument('--name', type=str, required=True, help='你的名字')
parser.add_argument('--data_dir', type=str, default='personal_data', help='个人数据目录')
parser.add_argument('--model_size', type=str, default='small', choices=['small', 'medium'], help='模型大小')
parser.add_argument('--training_method', type=str, default='lora', choices=['lora', 'full'], help='训练方法')
parser.add_argument('--quick_mode', action='store_true', help='快速模式(使用更少数据)')
args = parser.parse_args()
print(f"=== 开始训练 {args.name} 的个性化AI助手 ===")
# 第1步:数据收集和准备print("📥 正在收集和准备数据...")
# 创建数据目录
os.makedirs(args.data_dir, exist_ok=True)
# 收集个人数据(这里需要你提供数据文件)print("请确保你的个人数据已经放在以下位置:")
print(f" - {args.data_dir}/chat_history.txt # 聊天记录")
print(f" - {args.data_dir}/writings/ # 个人写作文件夹")
print(f" - {args.data_dir}/profile.json # 个人资料")
input("按回车键继续...")
# 第2步:数据处理print("🧹 正在处理数据...")
# 这里应该调用数据处理函数# processed_data = process_personal_data(args.data_dir)# train_data, val_data, test_data = split_personal_data(processed_data)
# 为了演示,我们创建示例数据
create_sample_data(args.name, args.data_dir)
# 第3步:基础模型训练(如果使用预训练模型可以跳过)if not args.quick_mode:
print("🏗️ 正在训练基础模型...")
os.system(f"""
python trainer/train_pretrain.py \\
--dim {512 if args.model_size == 'small' else 768} \\
--n_layers {8 if args.model_size == 'small' else 12} \\
--n_heads {8 if args.model_size == 'small' else 12} \\
--batch_size {64 if args.model_size == 'small' else 32} \\
--max_epochs 8 \\
--lr 5e-4 \\
--max_seq_len 512 \\
--data_path "{args.data_dir}/train.jsonl" \\
--save_name "{args.name}_base"
""")
# 第4步:个性化微调print("🎯 正在进行个性化微调...")
if args.training_method == 'lora':
os.system(f"""
python trainer/train_lora.py \\
--base_model "checkpoints/{'minimind_base_512.pth' if args.quick_mode else args.name + '_base_512.pth'}" \\
--data_path "{args.data_dir}/train.jsonl" \\
--lora_rank 16 \\
--lora_alpha 32 \\
--lora_dropout 0.1 \\
--batch_size {32 if args.model_size == 'small' else 16} \\
--max_epochs {3 if args.quick_mode else 5} \\
--lr 2e-4 \\
--save_name "{args.name}_personal"
""")
else:
os.system(f"""
python trainer/train_full_sft.py \\
--dim {512 if args.model_size == 'small' else 768} \\
--n_layers {8 if args.model_size == 'small' else 12} \\
--n_heads {8 if args.model_size == 'small' else 12} \\
--batch_size {16 if args.model_size == 'small' else 8} \\
--max_epochs {2 if args.quick_mode else 3} \\
--lr 1e-4 \\
--max_seq_len 512 \\
--data_path "{args.data_dir}/train.jsonl" \\
--base_model "checkpoints/minimind_base_512.pth" \\
--save_name "{args.name}_personal"
""")
# 第5步:测试print("🧪 正在测试模型...")
test_questions = [
f"你好,请问你是{args.name}吗?",
"能介绍一下你自己吗?",
"你平时有什么爱好?",
"你最喜欢的食物是什么?",
"你的性格是怎样的?"
]
print("测试问题:")
for i, question in enumerate(test_questions, 1):
print(f"{i}. {question}")
print("\\n你可以这样测试模型:")
print(f"python scripts/chat_openai_api.py --weight {args.name}_personal_lora")
print(f"\\n=== {args.name} 的个性化AI助手训练完成! ===")
print("🎉 现在你可以和你的AI助手聊天了!")
def create_sample_data(name, data_dir):
"""创建示例训练数据"""
# 示例对话数据
sample_conversations = [
{
"instruction": f"你好,请问你是{name}吗?",
"input": "",
"output": f"是的,我就是{name}!很高兴认识你~我是一个比较开朗的人,平时喜欢交朋友。"
},
{
"instruction": "能介绍一下你自己吗?",
"input": "",
"output": f"我是{name},一个对生活充满好奇心的人。我喜欢学习新知识,也乐于分享自己的想法。我的朋友们都说我很可靠,有问题都愿意找我商量。"
},
{
"instruction": "你平时有什么爱好?",
"input": "",
"output": "我有很多爱好呢!比如编程、阅读、听音乐、运动等等。编程让我能够创造新的东西,阅读让我开阔视野,听音乐能让我放松,运动让我保持健康。"
},
{
"instruction": "你的性格是怎样的?",
"input": "",
"output": "我觉得我是一个比较乐观开朗的人,遇事喜欢往好的方面想。同时我也很细心,做事认真负责。朋友们都说我很可靠,有问题都愿意找我商量。"
},
{
"instruction": "你最喜欢的食物是什么?",
"input": "",
"output": "这个问题好难选择啊!不过如果要选一个的话,我会说妈妈做的红烧肉。那种家的味道,是任何餐厅都比不上的。当然,我也很喜欢各种美食,愿意去尝试不同的菜系。"
}
]
# 保存训练数据with open(f"{data_dir}/train.jsonl", 'w', encoding='utf-8') as f:
for item in sample_conversations:
f.write(json.dumps(item, ensure_ascii=False) + '\n')
# 保存验证数据(重复一些,实际应该有不同数据)with open(f"{data_dir}/val.jsonl", 'w', encoding='utf-8') as f:
for item in sample_conversations[:3]:
f.write(json.dumps(item, ensure_ascii=False) + '\n')
print(f"✅ 已为{name}创建示例训练数据")
if __name__ == "__main__":
main()
# 快速开始(使用示例数据)
python personal_ai_trainer.py --name "小明" --quick_mode
# 完整训练(使用你的真实数据)
python personal_ai_trainer.py --name "你的名字" --model_size medium --training_method lora
# 高级选项
python personal_ai_trainer.py --name "小红" \
--model_size small \
--training_method full \
--data_dir "my_personal_data"
用户:你好,请问你是小明吗?
AI:是的,我就是小明!很高兴认识你~我是一个比较开朗的人,平时喜欢交朋友。
用户:能介绍一下你自己吗?
AI:我是小明,一个对生活充满好奇心的人。我喜欢学习新知识,也乐于分享自己的想法。我的朋友们都说我很可靠,有问题都愿意找我商量。
用户:你平时喜欢做什么?
AI:我有很多爱好呢!比如编程、阅读、听音乐、运动等等。编程让我能够创造新的东西,阅读让我开阔视野,听音乐能让我放松,运动让我保持健康。
用户:你的性格是怎样的?
AI:我觉得我是一个比较乐观开朗的人,遇事喜欢往好的方面想。同时我也很细心,做事认真负责。朋友们都说我很可靠,有问题都愿意找我商量。
1. 个性化问候
2. 专业知识回答
3. 情感共鸣
恭喜你!通过本教程,你已经完成了:
记住:最好的学习方式就是动手去做!
现在,开始你的AI训练之旅吧!期待看到属于你的独特AI模型!🤖✨
祝你训练顺利!加油! 💪
本教程会持续更新,如有问题或建议,欢迎反馈!