EVILOM
游戏故事集博客关于
GitHubTwitter
🤖
Back to Blog

🤖

2025-12-10
•
笔记LLMAI

😀 相信大家阅读完前面的训练教程,已经对大模型训练有所熟悉,并且已经得到了自己训练模型,接下来的文章将讲解如何进一步优化咱们的模型~

🚀 第十一部分:后续提升教程(从入门到精通)

11.1 模型性能优化进阶

📈 训练技巧提升

1. 学习率调度策略

# 余弦退火(Cosine Annealing)
--lr_scheduler cosine \
--warmup_steps 1000 \
--min_lr 1e-6

# 阶梯式下降
--lr_scheduler step \
--lr_decay_steps 2000 \
--lr_decay_rate 0.9

理解: 就像学习要有节奏,不能一直高强度,要有张有弛

2. 梯度裁剪(Gradient Clipping)

--grad_clip 1.0# 防止梯度爆炸

理解: 就像给学习速度设上限,防止学过头

3. 数据增强技巧

# 回译增强(中文→英文→中文)def back_translate(text):
# 中文→英文
    en_text = translate_to_english(text)
# 英文→中文
    zh_text = translate_to_chinese(en_text)
    return zh_text

# 同义词替换import synonyms
def synonym_replace(text, replace_rate=0.1):
    words = jieba.lcut(text)
    new_words = []
    for word in words:
        if random.random() < replace_rate:
            syns = synonyms.nearby(word)[0]
            if syns:
                word = random.choice(syns[:3])
        new_words.append(word)
    return ''.join(new_words)

🎯 模型架构优化

1. 注意力机制改进

# 旋转位置编码(RoPE)class RotaryPositionalEmbedding(nn.Module):
    def __init__(self, dim, max_seq_len=512):
        super().__init__()
        self.dim = dim
        self.max_seq_len = max_seq_len
# 预计算频率
        freqs = torch.exp(torch.arange(0, dim, 2).float() *
                         -(math.log(10000.0) / dim))
        self.register_buffer('freqs', freqs)

    def forward(self, x, seq_len):
# 应用旋转位置编码
        t = torch.arange(seq_len, device=x.device)
        freqs = torch.outer(t, self.freqs)
        return apply_rotary_pos_emb(x, freqs)

2. 激活函数优化

# SwiGLU激活函数(比ReLU效果更好)class SwiGLU(nn.Module):
    def forward(self, x):
        x1, x2 = x.chunk(2, dim=-1)
        return F.silu(x1) * x2

3. 归一化层改进

# RMSNorm(比LayerNorm更快)class RMSNorm(nn.Module):
    def __init__(self, dim, eps=1e-6):
        super().__init__()
        self.weight = nn.Parameter(torch.ones(dim))
        self.eps = eps

    def forward(self, x):
        variance = x.pow(2).mean(-1, keepdim=True)
        x = x * torch.rsqrt(variance + self.eps)
        return self.weight * x

11.2 大规模训练策略

🏭 分布式训练

1. 数据并行

# 单机多卡
python -m torch.distributed.launch \
    --nproc_per_node=4 \
    trainer/train_pretrain.py \
    --distributed \
    --local_rank $LOCAL_RANK

# 多机多卡
torchrun \
    --nnodes=2 \
    --nproc_per_node=4 \
    --node_rank=0 \
    --master_addr="192.168.1.100" \
    --master_port=12345 \
    trainer/train_pretrain.py

2. 模型并行

# 将模型分割到不同GPUclass ModelParallel(nn.Module):
    def __init__(self):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, embed_dim).to('cuda:0')
        self.transformer = Transformer(...).to('cuda:1')
        self.lm_head = nn.Linear(embed_dim, vocab_size).to('cuda:2')

    def forward(self, x):
        x = self.embed(x.to('cuda:0'))
        x = self.transformer(x.to('cuda:1'))
        return self.lm_head(x.to('cuda:2'))

3. 混合精度训练

from torch.cuda.amp import autocast, GradScaler

scaler = GradScaler()
for batch in dataloader:
    optimizer.zero_grad()

    with autocast():
        outputs = model(batch)
        loss = criterion(outputs, targets)

    scaler.scale(loss).backward()
    scaler.step(optimizer)
    scaler.update()

📊 高效数据处理

1. 流式数据加载

class StreamingDataset(torch.utils.data.IterableDataset):
    def __init__(self, data_files, tokenizer):
        self.data_files = data_files
        self.tokenizer = tokenizer

    def __iter__(self):
        for file in self.data_files:
            with open(file, 'r', encoding='utf-8') as f:
                for line in f:
                    text = json.loads(line)['text']
                    tokens = self.tokenizer.encode(text)
                    yield torch.tensor(tokens)

2. 动态批处理

def dynamic_batching(samples, max_tokens=4096):
    """根据序列长度动态调整批次大小"""
    batch = []
    current_tokens = 0

    for sample in sorted(samples, key=lambda x: len(x), reverse=True):
        sample_tokens = len(sample)
        if current_tokens + sample_tokens > max_tokens:
            yield batch
            batch = []
            current_tokens = 0

        batch.append(sample)
        current_tokens += sample_tokens

    if batch:
        yield batch

11.3 模型评估与测试

📋 全面评估指标

1. 困惑度(Perplexity)

def calculate_perplexity(model, test_data):
    """计算模型在测试集上的困惑度"""
    model.eval()
    total_loss = 0
    total_tokens = 0

    with torch.no_grad():
        for batch in test_data:
            outputs = model(batch)
            loss = criterion(outputs.view(-1, vocab_size), batch.view(-1))
            total_loss += loss.item() * batch.numel()
            total_tokens += batch.numel()

    avg_loss = total_loss / total_tokens
    perplexity = torch.exp(torch.tensor(avg_loss))
    return perplexity.item()

2. BLEU分数(翻译质量)

from nltk.translate.bleu_score import sentence_bleu

def calculate_bleu(reference, hypothesis):
    """计算BLEU分数"""
    reference = [reference.split()]
    hypothesis = hypothesis.split()
    return sentence_bleu(reference, hypothesis)

3. 人工评估

def human_evaluation(model, test_questions):
    """人工评估模型回答质量"""
    results = []
    for question in test_questions:
        answer = model.generate(question)

# 流畅度评分 (1-5)
        fluency = input(f"问题: {question}\n回答: {answer}\n流畅度评分(1-5): ")

# 相关性评分 (1-5)
        relevance = input(f"相关性评分(1-5): ")

# 有用性评分 (1-5)
        usefulness = input(f"有用性评分(1-5): ")

        results.append({
            'question': question,
            'answer': answer,
            'fluency': fluency,
            'relevance': relevance,
            'usefulness': usefulness
        })

    return results

🎯 特定任务评估

1. 阅读理解

def reading_comprehension_evaluation(model, passages, questions):
    """评估阅读理解能力"""
    scores = []

    for passage, question_data in zip(passages, questions):
        passage_text = passage['text']
        question = question_data['question']
        correct_answer = question_data['answer']

# 模型生成答案
        model_answer = model.answer_question(passage_text, question)

# 计算准确率
        is_correct = (model_answer.strip() == correct_answer.strip())
        scores.append(is_correct)

    return sum(scores) / len(scores)

2. 常识推理

def commonsense_reasoning_evaluation(model, questions):
    """评估常识推理能力"""
    correct = 0
    total = len(questions)

    for question in questions:
        prompt = f"问题: {question['question']}\n选项:\n"
        for i, option in enumerate(question['options']):
            prompt += f"{i+1}. {option}\n"
        prompt += "答案: "

        model_answer = model.generate(prompt)

# 提取答案if str(question['correct']) in model_answer:
            correct += 1

    return correct / total

11.4 模型部署与优化

🚀 推理加速

1. 模型量化

# 8位量化def quantize_model_8bit(model):
    """将模型量化为8位"""
    for name, param in model.named_parameters():
        param.data = (param.data * 127).round().clamp(-128, 127) / 127
    return model

# 4位量化def quantize_model_4bit(model):
    """将模型量化为4位"""
    for name, param in model.named_parameters():
        param.data = (param.data * 7).round().clamp(-8, 7) / 7
    return model

2. 知识蒸馏

class KnowledgeDistillation(nn.Module):
    def __init__(self, teacher_model, student_model, temperature=3.0):
        super().__init__()
        self.teacher = teacher_model
        self.student = student_model
        self.temperature = temperature

    def forward(self, x):
# 教师模型输出(不计算梯度)with torch.no_grad():
            teacher_logits = self.teacher(x) / self.temperature

# 学生模型输出
        student_logits = self.student(x) / self.temperature

# 蒸馏损失
        distill_loss = F.kl_div(
            F.log_softmax(student_logits, dim=-1),
            F.softmax(teacher_logits, dim=-1),
            reduction='batchmean'
        ) * (self.temperature ** 2)

        return distill_loss

3. 缓存优化

class KVCache:
    """键值缓存,加速自回归生成"""
    def __init__(self):
        self.key_cache = []
        self.value_cache = []

    def update(self, key, value, layer_idx):
        """更新缓存"""
        if layer_idx >= len(self.key_cache):
            self.key_cache.append(key)
            self.value_cache.append(value)
        else:
            self.key_cache[layer_idx] = torch.cat([self.key_cache[layer_idx], key], dim=-2)
            self.value_cache[layer_idx] = torch.cat([self.value_cache[layer_idx], value], dim=-2)

        return self.key_cache[layer_idx], self.value_cache[layer_idx]

    def clear(self):
        """清空缓存"""
        self.key_cache = []
        self.value_cache = []

🌐 Web部署

1. FastAPI服务

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

app = FastAPI()

class ChatRequest(BaseModel):
    message: str
    max_length: int = 512

class ChatResponse(BaseModel):
    response: str
    tokens_used: int

@app.post("/chat", response_model=ChatResponse)
async def chat_endpoint(request: ChatRequest):
    try:
# 生成回复
        response = model.generate(
            request.message,
            max_length=request.max_length
        )

# 计算使用的token数
        tokens = tokenizer.encode(response)

        return ChatResponse(
            response=response,
            tokens_used=len(tokens)
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

2. 流式输出

@app.post("/chat/stream")
async def chat_stream_endpoint(request: ChatRequest):
    async def generate_stream():
        for chunk in model.generate_stream(request.message):
            yield f"data: {json.dumps({'chunk': chunk})}\n\n"
        yield f"data: {json.dumps({'done': True})}\n\n"

    return StreamingResponse(
        generate_stream(),
        media_type="text/event-stream"
    )

🎯 第十二部分:个性化训练案例——训练"你自己"的AI

12.1 案例概述

🎯 目标

训练一个了解你个人经历、兴趣爱好、说话方式的专属AI助手

📋 训练步骤

  1. 收集个人数据(你的聊天记录、作文、日记等)
  2. 数据预处理(清洗、格式化、增强)
  3. 基础模型训练(使用通用数据)
  4. 个性化微调(用你的数据微调)
  5. 测试和优化(不断迭代改进)

12.2 数据收集与准备

📥 数据收集

1. 聊天记录收集

import json
import re
from datetime import datetime

def extract_wechat_chat(file_path):
    """提取微信聊天记录"""
    with open(file_path, 'r', encoding='utf-8') as f:
        content = f.read()

# 解析微信格式
    pattern = r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\n(.*?): (.*?)(?=\n\d{4}-|$)'
    matches = re.findall(pattern, content, re.DOTALL)

    conversations = []
    current_conv = []

    for time_str, speaker, message in matches:
# 只保留你发送的消息if "你的昵称" in speaker:
            current_conv.append({
                'time': time_str,
                'speaker': 'me',
                'message': message.strip()
            })

# 如果对话太长,重新开始if len(current_conv) > 10:
            conversations.append(current_conv)
            current_conv = []

    return conversations

def extract_qq_chat(file_path):
    """提取QQ聊天记录"""
    with open(file_path, 'r', encoding='utf-8') as f:
        lines = f.readlines()

    conversations = []
    current_speaker = None
    current_message = []

    for line in lines:
        line = line.strip()
        if not line:
            continue

# QQ格式:时间 昵称 消息match = re.match(r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\s+(.*?)\s+(.*)', line)
        if match:
            time_str, speaker, message = match.groups()

# 只保留你的消息if "你的QQ昵称" in speaker:
                conversations.append({
                    'time': time_str,
                    'speaker': 'me',
                    'message': message.strip()
                })

    return conversations

2. 个人写作收集

def collect_personal_writing(folder_path):
    """收集个人写作(作文、日记、博客等)"""
    writing_data = []

# 支持的文件类型
    file_extensions = ['.txt', '.md', '.docx']

    for file_path in Path(folder_path).rglob('*'):
        if file_path.suffix in file_extensions:
            try:
                if file_path.suffix == '.txt':
                    with open(file_path, 'r', encoding='utf-8') as f:
                        content = f.read()
                elif file_path.suffix == '.md':
                    with open(file_path, 'r', encoding='utf-8') as f:
                        content = f.read()
                elif file_path.suffix == '.docx':
                    import docx
                    doc = docx.Document(file_path)
                    content = '\n'.join([para.text for para in doc.paragraphs])

                writing_data.append({
                    'filename': str(file_path),
                    'content': content,
                    'type': file_path.parent.name# 文件夹名作为类型
                })
            except Exception as e:
                print(f"处理文件 {file_path} 时出错: {e}")

    return writing_data

def extract_social_media_posts():
    """提取社交媒体帖子(微博、朋友圈等)"""
    posts = []

# 微博数据
    weibo_file = "weibo_posts.json"
    if os.path.exists(weibo_file):
        with open(weibo_file, 'r', encoding='utf-8') as f:
            weibo_data = json.load(f)
            for post in weibo_data:
                if 'content' in post and post.get('is_my_post'):
                    posts.append({
                        'platform': 'weibo',
                        'content': post['content'],
                        'time': post.get('created_at', ''),
                        'likes': post.get('like_count', 0)
                    })

# 朋友圈数据
    moments_file = "wechat_moments.json"
    if os.path.exists(moments_file):
        with open(moments_file, 'r', encoding='utf-8') as f:
            moments_data = json.load(f)
            for moment in moments_data:
                if moment.get('is_my_post'):
                    posts.append({
                        'platform': 'wechat_moments',
                        'content': moment['content'],
                        'time': moment.get('created_at', ''),
                        'images': moment.get('image_count', 0)
                    })

    return posts

3. 个人资料整理

def create_persona_profile():
    """创建个人资料档案"""
    profile = {
        'basic_info': {
            'name': '你的名字',
            'age': '你的年龄',
            'location': '你的城市',
            'occupation': '你的职业/专业',
            'education': '你的教育背景'
        },
        'personality_traits': [
            '性格特点1(如:乐观开朗)',
            '性格特点2(如:细心认真)',
            '性格特点3(如:善于思考)'
        ],
        'interests_hobbies': [
            '兴趣爱好1(如:编程)',
            '兴趣爱好2(如:音乐)',
            '兴趣爱好3(如:运动)'
        ],
        'communication_style': {
            'tone': '说话语气(如:温和、幽默、直接)',
            'common_phrases': [
                '常用表达1',
                '常用表达2',
                '常用表达3'
            ],
            'emoji_usage': '表情使用习惯',
            'response_length': '回复长度偏好(简短/详细)'
        },
        'knowledge_domains': [
            '专业领域1',
            '专业领域2',
            '熟悉的主题1',
            '熟悉的主题2'
        ],
        'typical_responses': {
            'greeting': '你通常怎么打招呼',
            'thanks': '如何表达感谢',
            'apology': '如何道歉',
            'goodbye': '如何告别'
        }
    }

    return profile

🧹 数据清洗和预处理

1. 数据清洗

def clean_personal_data(data):
    """清洗个人数据"""
    cleaned_data = []

    for item in data:
        text = item.get('message', '') or item.get('content', '')

# 移除URL
        text = re.sub(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', '', text)

# 移除特殊符号和表情(保留常用标点)
        text = re.sub(r'[^\u4e00-\u9fff\u3040-\u309f\u30a0-\u30ff\w\s。,!?:;""''()]', '', text)

# 移除过短的内容if len(text.strip()) < 5:
            continue

# 移除重复内容if text.strip() in [d['text'] for d in cleaned_data]:
            continue

        cleaned_data.append({
            'text': text.strip(),
            'source': item.get('platform', 'unknown'),
            'timestamp': item.get('time', ''),
            'type': item.get('type', 'chat')
        })

    return cleaned_data

def remove_sensitive_info(data):
    """移除敏感信息"""
    sensitive_patterns = [
        (r'\d{11}', '[手机号]'),# 手机号
        (r'\d{17}[\dXx]', '[身份证号]'),# 身份证号
        (r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', '[邮箱]'),# 邮箱
        (r'\d{4}-\d{4}-\d{4}-\d{4}', '[银行卡号]'),# 银行卡号
    ]

    cleaned_data = []
    for item in data:
        text = item['text']
        for pattern, replacement in sensitive_patterns:
            text = re.sub(pattern, replacement, text)

        item['text'] = text
        cleaned_data.append(item)

    return cleaned_data

2. 数据增强

def augment_personal_data(data):
    """增强个人数据"""
    augmented_data = []

    for item in data:
        text = item['text']

# 原始数据
        augmented_data.append(item)

# 同义词替换if len(text) > 10:
            augmented_text = synonym_replace(text, replace_rate=0.1)
            if augmented_text != text:
                new_item = item.copy()
                new_item['text'] = augmented_text
                new_item['augmented'] = True
                augmented_data.append(new_item)

# 句式变换if '。' in text and len(text.split('。')) > 1:
            sentences = text.split('。')
# 随机打乱句子顺序if len(sentences) > 2:
                random.shuffle(sentences)
                shuffled_text = '。'.join(sentences)
                new_item = item.copy()
                new_item['text'] = shuffled_text
                new_item['augmented'] = True
                augmented_data.append(new_item)

    return augmented_data

12.3 训练数据格式

📝 对话格式

1. 指令格式(推荐)

{
    "instruction": "描述你的兴趣爱好",
    "input": "",
    "output": "我喜欢编程、打篮球、听周杰伦的歌。编程让我能够创造新的东西,打篮球让我保持健康,而周杰伦的音乐总能让我放松。"
}

2. 对话格式

{
    "conversations": [
        {
            "from": "human",
            "value": "你平时喜欢做什么?"
        },
        {
            "from": "gpt",
            "value": "我平时喜欢编程、打篮球、还有听音乐。编程让我很有成就感,打篮球能让我放松,听音乐则是我享受生活的方式。"
        },
        {
            "from": "human",
            "value": "你最喜欢的编程语言是什么?"
        },
        {
            "from": "gpt",
            "value": "我最喜欢Python,因为它的语法简洁优雅,而且有很多强大的库。用Python写代码就像写诗一样流畅。"
        }
    ]
}

3. 问答格式

{
    "question": "你的性格是怎样的?",
    "answer": "我是一个比较乐观开朗的人,遇事喜欢往好的方面想。同时我也很细心,做事认真负责。朋友们都说我很可靠,有问题都愿意找我商量。"
}

📊 数据分割

def split_personal_data(data, train_ratio=0.8, val_ratio=0.1, test_ratio=0.1):
    """分割个人数据"""
# 按时间排序
    data.sort(key=lambda x: x.get('timestamp', ''))

    n = len(data)
    train_size = int(n * train_ratio)
    val_size = int(n * val_ratio)

    train_data = data[:train_size]
    val_data = data[train_size:train_size + val_size]
    test_data = data[train_size + val_size:]

    return train_data, val_data, test_data

# 保存为JSONL格式def save_as_jsonl(data, filename):
    """保存为JSONL格式"""
    with open(filename, 'w', encoding='utf-8') as f:
        for item in data:
            f.write(json.dumps(item, ensure_ascii=False) + '\n')

12.4 基础模型训练

🎯 预训练阶段

1. 使用通用中文数据

# 第1步:预训练(使用通用中文数据)
python trainer/train_pretrain.py \
    --dim 512 \
    --n_layers 8 \
    --n_heads 8 \
    --batch_size 64 \
    --max_epochs 10 \
    --lr 5e-4 \
    --max_seq_len 512 \
    --data_path "dataset/chinese_corpus.jsonl" \
    --save_name "personal_base"

2. 监控训练过程

# 训练监控脚本def monitor_training(log_file):
    """监控训练过程"""
    import matplotlib.pyplot as plt

    epochs = []
    losses = []
    perplexities = []

    with open(log_file, 'r') as f:
        for line in f:
            if 'Epoch' in line and 'Loss' in line:
                parts = line.split()
                epoch = int(parts[1].split('/')[0])
                loss = float(parts[3])

                epochs.append(epoch)
                losses.append(loss)

# 计算困惑度
                perplexity = np.exp(loss)
                perplexities.append(perplexity)

# 绘制图表
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))

    ax1.plot(epochs, losses, 'b-', label='Training Loss')
    ax1.set_xlabel('Epoch')
    ax1.set_ylabel('Loss')
    ax1.set_title('Training Loss over Time')
    ax1.grid(True)

    ax2.plot(epochs, perplexities, 'r-', label='Perplexity')
    ax2.set_xlabel('Epoch')
    ax2.set_ylabel('Perplexity')
    ax2.set_title('Perplexity over Time')
    ax2.grid(True)

    plt.tight_layout()
    plt.savefig('training_monitor.png')
    plt.show()

12.5 个性化微调

🔧 LoRA微调(推荐)

1. 准备配置文件

{
    "lora_config": {
        "rank": 16,
        "alpha": 32,
        "dropout": 0.1,
        "target_modules": ["q_proj", "k_proj", "v_proj", "o_proj"]
    },
    "training_config": {
        "batch_size": 32,
        "max_epochs": 5,
        "learning_rate": 2e-4,
        "warmup_steps": 100,
        "save_steps": 500
    }
}

2. 执行LoRA微调

# 第2步:个性化LoRA微调
python trainer/train_lora.py \
    --base_model "checkpoints/personal_base_512.pth" \
    --data_path "personal_data/train.jsonl" \
    --lora_rank 16 \
    --lora_alpha 32 \
    --lora_dropout 0.1 \
    --batch_size 32 \
    --max_epochs 5 \
    --lr 2e-4 \
    --save_name "personal_lora"

3. 合并LoRA权重

def merge_lora_weights(base_model, lora_weights):
    """合并LoRA权重到基础模型"""
    merged_model = base_model.copy()

    for name, param in merged_model.named_parameters():
        if name in lora_weights:
            lora_A = lora_weights[f"{name}.lora_A"]
            lora_B = lora_weights[f"{name}.lora_B"]

# LoRA公式: W' = W + α * B * A
            delta = lora_alpha * (lora_B @ lora_A)
            param.data += delta

    return merged_model

🎯 全参数微调(高级)

1. 数据混合策略

def mix_training_data(general_data, personal_data, mix_ratio=0.3):
    """混合通用数据和个人数据"""
    mixed_data = []

# 通用数据
    general_samples = int(len(personal_data) * (1 - mix_ratio) / mix_ratio)
    mixed_data.extend(random.sample(general_data, min(general_samples, len(general_data))))

# 个人数据
    mixed_data.extend(personal_data)

# 打乱顺序
    random.shuffle(mixed_data)

    return mixed_data

2. 执行全参数微调

# 第2步:全参数微调(需要更多显存)
python trainer/train_full_sft.py \
    --dim 512 \
    --n_layers 8 \
    --n_heads 8 \
    --batch_size 16 \
    --max_epochs 3 \
    --lr 1e-4 \
    --max_seq_len 512 \
    --data_path "personal_data/mixed_train.jsonl" \
    --base_model "checkpoints/personal_base_512.pth" \
    --save_name "personal_full"

12.6 测试和优化

🧪 个性化测试

1. 基础对话测试

def test_basic_conversation(model, tokenizer):
    """测试基础对话能力"""
    test_questions = [
        "你好,请介绍一下你自己",
        "你叫什么名字?",
        "你今年多大了?",
        "你是哪里人?",
        "你平时喜欢做什么?",
        "你的性格是怎样的?",
        "你有什么兴趣爱好?",
        "你最喜欢的食物是什么?"
    ]

    print("=== 基础对话测试 ===")
    for question in test_questions:
        response = generate_response(model, tokenizer, question)
        print(f"问:{question}")
        print(f"答:{response}")
        print("-" * 50)

2. 专业知识测试

def test_professional_knowledge(model, tokenizer):
    """测试专业知识"""
    professional_questions = [
# 根据你的专业领域定制"请解释一下你最擅长的技能",
        "你在学习过程中遇到过什么挑战?",
        "你对未来的职业规划是什么?",
        "你如何看待你专业领域的发展趋势?",
        "请分享一个你解决问题的经历"
    ]

    print("=== 专业知识测试 ===")
    for question in professional_questions:
        response = generate_response(model, tokenizer, question)
        print(f"问:{question}")
        print(f"答:{response}")
        print("-" * 50)

3. 个性特征测试

def test_personality_traits(model, tokenizer):
    """测试个性特征"""
    personality_questions = [
        "如果朋友遇到困难,你会怎么安慰他?",
        "当你压力很大的时候,你会怎么放松?",
        "你如何看待失败?",
        "你通常怎么表达对家人的爱?",
        "你生气的时候会怎么做?"
    ]

    print("=== 个性特征测试 ===")
    for question in personality_questions:
        response = generate_response(model, tokenizer, question)
        print(f"问:{question}")
        print(f"答:{response}")
        print("-" * 50)

📊 质量评估

1. 相似度评估

def evaluate_similarity(model_responses, real_responses):
    """评估模型回答与真实回答的相似度"""
    from sklearn.feature_extraction.text import TfidfVectorizer
    from sklearn.metrics.pairwise import cosine_similarity

    vectorizer = TfidfVectorizer()
    similarities = []

    for model_resp, real_resp in zip(model_responses, real_responses):
# 向量化
        vectors = vectorizer.fit_transform([model_resp, real_resp])

# 计算余弦相似度
        similarity = cosine_similarity(vectors[0:1], vectors[1:2])[0][0]
        similarities.append(similarity)

    return {
        'average_similarity': np.mean(similarities),
        'max_similarity': np.max(similarities),
        'min_similarity': np.min(similarities)
    }

2. 风格一致性评估

def evaluate_style_consistency(model_responses, training_data):
    """评估风格一致性"""
# 提取语言特征def extract_features(text):
        features = {}

# 平均句长
        sentences = text.split('。')
        features['avg_sentence_length'] = np.mean([len(s) for s in sentences if s])

# 常用词比例
        words = jieba.lcut(text)
        common_words = ['的', '了', '是', '我', '你', '他', '她', '它']
        features['common_word_ratio'] = sum(1 for w in words if w in common_words) / len(words)

# 感叹号使用频率
        features['exclamation_ratio'] = text.count('!') / len(text)

# 表情符号使用(如果有)
        emoji_pattern = re.compile(r'[\U00010000-\U0010ffff]')
        features['emoji_ratio'] = len(emoji_pattern.findall(text)) / len(text)

        return features

# 提取训练数据特征
    training_features = [extract_features(d['text']) for d in training_data[:100]]
    avg_training_features = {k: np.mean([f[k] for f in training_features])
                           for k in training_features[0].keys()}

# 提取模型回答特征
    model_features = [extract_features(resp) for resp in model_responses]
    avg_model_features = {k: np.mean([f[k] for f in model_features])
                         for k in model_features[0].keys()}

# 计算特征差异
    differences = {}
    for feature in avg_training_features:
        differences[feature] = abs(avg_training_features[feature] -
                                 avg_model_features[feature])

    return differences

12.7 持续优化

🔄 迭代改进

1. 错误分析

def analyze_bad_responses(bad_examples):
    """分析不好的回答,找出改进方向"""
    analysis = {
        'too_generic': 0,# 太通用'off_topic': 0,# 跑题'inconsistent': 0,# 不一致'too_formal': 0,# 太正式'too_casual': 0,# 太随意'missing_info': 0# 信息缺失
    }

    for example in bad_examples:
        question = example['question']
        bad_response = example['bad_response']
        expected = example.get('expected', '')

# 判断是否太通用if any(word in bad_response for word in ['一般来说', '通常情况下', '每个人']):
            analysis['too_generic'] += 1

# 判断是否跑题if not any(keyword in bad_response for keyword in question.split()):
            analysis['off_topic'] += 1

# 判断是否不一致if expected and len(set(bad_response.split()) & set(expected.split())) < 3:
            analysis['inconsistent'] += 1

    return analysis

2. 针对性数据增强

def targeted_augmentation(analysis_results, training_data):
    """根据分析结果进行针对性数据增强"""

    if analysis_results['too_generic'] > 5:
# 增加具体化的训练数据
        specific_examples = [
            {
                "instruction": "具体描述你的一个经历",
                "input": "",
                "output": "上周六下午,我在图书馆三楼靠窗的位置看《深度学习》,阳光透过窗户洒在书页上,那一刻我觉得特别充实。"
            },
            {
                "instruction": "用具体的例子说明",
                "input": "",
                "output": "比如上周我遇到一个问题,我的代码运行特别慢,后来我发现是算法复杂度太高,通过优化数据结构,速度提升了10倍。"
            }
        ]
        training_data.extend(specific_examples)

    if analysis_results['inconsistent'] > 5:
# 增加一致性训练数据
        consistency_examples = [
            {
                "instruction": "保持回答风格一致",
                "input": "你好",
                "output": "嗨!很高兴见到你~今天过得怎么样呀?"
            },
            {
                "instruction": "用你一贯的说话方式",
                "input": "谢谢你的帮助",
                "output": "哎呀你太客气啦!这点小事儿,举手之劳而已~"
            }
        ]
        training_data.extend(consistency_examples)

    return training_data

3. 自动化优化流程

def auto_optimization_pipeline(model, tokenizer, test_questions, real_answers):
    """自动化优化流程"""

    print("=== 开始自动化优化 ===")

# 第1步:生成测试回答
    model_answers = []
    for question in test_questions:
        answer = generate_response(model, tokenizer, question)
        model_answers.append(answer)

# 第2步:评估质量
    similarity_scores = evaluate_similarity(model_answers, real_answers)
    print(f"相似度评估: {similarity_scores}")

# 第3步:找出需要改进的例子
    bad_examples = []
    for i, (question, model_answer, real_answer) in enumerate(
            zip(test_questions, model_answers, real_answers)):

# 简单相似度判断(可以改进)
        similarity = len(set(model_answer.split()) & set(real_answer.split())) / \
                    len(set(real_answer.split()))

        if similarity < 0.3:# 阈值可以调整
            bad_examples.append({
                'question': question,
                'bad_response': model_answer,
                'expected': real_answer
            })

# 第4步:分析错误if bad_examples:
        analysis = analyze_bad_responses(bad_examples)
        print(f"错误分析: {analysis}")

# 第5步:生成改进数据
        improved_training_data = targeted_augmentation(analysis, [])

# 第6步:重新训练(这里可以增量训练)if improved_training_data:
            print(f"生成了 {len(improved_training_data)} 条改进数据")
# 保存改进数据
            save_as_jsonl(improved_training_data, "improved_training_data.jsonl")

            print("请使用改进数据重新训练模型:")
            print("python trainer/train_lora.py --data_path improved_training_data.jsonl")

    return model_answers

12.8 完整训练脚本

📜 一键训练脚本

#!/usr/bin/env python3"""
个性化AI训练脚本 - 训练"你自己"的AI助手
"""

import os
import json
import argparse
from pathlib import Path

def main():
    parser = argparse.ArgumentParser(description='训练个性化AI助手')
    parser.add_argument('--name', type=str, required=True, help='你的名字')
    parser.add_argument('--data_dir', type=str, default='personal_data', help='个人数据目录')
    parser.add_argument('--model_size', type=str, default='small', choices=['small', 'medium'], help='模型大小')
    parser.add_argument('--training_method', type=str, default='lora', choices=['lora', 'full'], help='训练方法')
    parser.add_argument('--quick_mode', action='store_true', help='快速模式(使用更少数据)')

    args = parser.parse_args()

    print(f"=== 开始训练 {args.name} 的个性化AI助手 ===")

# 第1步:数据收集和准备print("📥 正在收集和准备数据...")

# 创建数据目录
    os.makedirs(args.data_dir, exist_ok=True)

# 收集个人数据(这里需要你提供数据文件)print("请确保你的个人数据已经放在以下位置:")
    print(f"  - {args.data_dir}/chat_history.txt  # 聊天记录")
    print(f"  - {args.data_dir}/writings/  # 个人写作文件夹")
    print(f"  - {args.data_dir}/profile.json  # 个人资料")

    input("按回车键继续...")

# 第2步:数据处理print("🧹 正在处理数据...")

# 这里应该调用数据处理函数# processed_data = process_personal_data(args.data_dir)# train_data, val_data, test_data = split_personal_data(processed_data)

# 为了演示,我们创建示例数据
    create_sample_data(args.name, args.data_dir)

# 第3步:基础模型训练(如果使用预训练模型可以跳过)if not args.quick_mode:
        print("🏗️ 正在训练基础模型...")
        os.system(f"""
            python trainer/train_pretrain.py \\
                --dim {512 if args.model_size == 'small' else 768} \\
                --n_layers {8 if args.model_size == 'small' else 12} \\
                --n_heads {8 if args.model_size == 'small' else 12} \\
                --batch_size {64 if args.model_size == 'small' else 32} \\
                --max_epochs 8 \\
                --lr 5e-4 \\
                --max_seq_len 512 \\
                --data_path "{args.data_dir}/train.jsonl" \\
                --save_name "{args.name}_base"
        """)

# 第4步:个性化微调print("🎯 正在进行个性化微调...")

    if args.training_method == 'lora':
        os.system(f"""
            python trainer/train_lora.py \\
                --base_model "checkpoints/{'minimind_base_512.pth' if args.quick_mode else args.name + '_base_512.pth'}" \\
                --data_path "{args.data_dir}/train.jsonl" \\
                --lora_rank 16 \\
                --lora_alpha 32 \\
                --lora_dropout 0.1 \\
                --batch_size {32 if args.model_size == 'small' else 16} \\
                --max_epochs {3 if args.quick_mode else 5} \\
                --lr 2e-4 \\
                --save_name "{args.name}_personal"
        """)
    else:
        os.system(f"""
            python trainer/train_full_sft.py \\
                --dim {512 if args.model_size == 'small' else 768} \\
                --n_layers {8 if args.model_size == 'small' else 12} \\
                --n_heads {8 if args.model_size == 'small' else 12} \\
                --batch_size {16 if args.model_size == 'small' else 8} \\
                --max_epochs {2 if args.quick_mode else 3} \\
                --lr 1e-4 \\
                --max_seq_len 512 \\
                --data_path "{args.data_dir}/train.jsonl" \\
                --base_model "checkpoints/minimind_base_512.pth" \\
                --save_name "{args.name}_personal"
        """)

# 第5步:测试print("🧪 正在测试模型...")

    test_questions = [
        f"你好,请问你是{args.name}吗?",
        "能介绍一下你自己吗?",
        "你平时有什么爱好?",
        "你最喜欢的食物是什么?",
        "你的性格是怎样的?"
    ]

    print("测试问题:")
    for i, question in enumerate(test_questions, 1):
        print(f"{i}. {question}")

    print("\\n你可以这样测试模型:")
    print(f"python scripts/chat_openai_api.py --weight {args.name}_personal_lora")

    print(f"\\n=== {args.name} 的个性化AI助手训练完成! ===")
    print("🎉 现在你可以和你的AI助手聊天了!")

def create_sample_data(name, data_dir):
    """创建示例训练数据"""

# 示例对话数据
    sample_conversations = [
        {
            "instruction": f"你好,请问你是{name}吗?",
            "input": "",
            "output": f"是的,我就是{name}!很高兴认识你~我是一个比较开朗的人,平时喜欢交朋友。"
        },
        {
            "instruction": "能介绍一下你自己吗?",
            "input": "",
            "output": f"我是{name},一个对生活充满好奇心的人。我喜欢学习新知识,也乐于分享自己的想法。我的朋友们都说我很可靠,有问题都愿意找我商量。"
        },
        {
            "instruction": "你平时有什么爱好?",
            "input": "",
            "output": "我有很多爱好呢!比如编程、阅读、听音乐、运动等等。编程让我能够创造新的东西,阅读让我开阔视野,听音乐能让我放松,运动让我保持健康。"
        },
        {
            "instruction": "你的性格是怎样的?",
            "input": "",
            "output": "我觉得我是一个比较乐观开朗的人,遇事喜欢往好的方面想。同时我也很细心,做事认真负责。朋友们都说我很可靠,有问题都愿意找我商量。"
        },
        {
            "instruction": "你最喜欢的食物是什么?",
            "input": "",
            "output": "这个问题好难选择啊!不过如果要选一个的话,我会说妈妈做的红烧肉。那种家的味道,是任何餐厅都比不上的。当然,我也很喜欢各种美食,愿意去尝试不同的菜系。"
        }
    ]

# 保存训练数据with open(f"{data_dir}/train.jsonl", 'w', encoding='utf-8') as f:
        for item in sample_conversations:
            f.write(json.dumps(item, ensure_ascii=False) + '\n')

# 保存验证数据(重复一些,实际应该有不同数据)with open(f"{data_dir}/val.jsonl", 'w', encoding='utf-8') as f:
        for item in sample_conversations[:3]:
            f.write(json.dumps(item, ensure_ascii=False) + '\n')

    print(f"✅ 已为{name}创建示例训练数据")

if __name__ == "__main__":
    main()

🚀 使用方法

# 快速开始(使用示例数据)
python personal_ai_trainer.py --name "小明" --quick_mode

# 完整训练(使用你的真实数据)
python personal_ai_trainer.py --name "你的名字" --model_size medium --training_method lora

# 高级选项
python personal_ai_trainer.py --name "小红" \
    --model_size small \
    --training_method full \
    --data_dir "my_personal_data"

12.9 效果展示

💬 对话示例

用户:你好,请问你是小明吗?
AI:是的,我就是小明!很高兴认识你~我是一个比较开朗的人,平时喜欢交朋友。

用户:能介绍一下你自己吗?
AI:我是小明,一个对生活充满好奇心的人。我喜欢学习新知识,也乐于分享自己的想法。我的朋友们都说我很可靠,有问题都愿意找我商量。

用户:你平时喜欢做什么?
AI:我有很多爱好呢!比如编程、阅读、听音乐、运动等等。编程让我能够创造新的东西,阅读让我开阔视野,听音乐能让我放松,运动让我保持健康。

用户:你的性格是怎样的?
AI:我觉得我是一个比较乐观开朗的人,遇事喜欢往好的方面想。同时我也很细心,做事认真负责。朋友们都说我很可靠,有问题都愿意找我商量。

🎯 特色功能

1. 个性化问候

  • 根据你的习惯调整问候语
  • 记住你的偏好和习惯
  • 使用你常用的表达方式

2. 专业知识回答

  • 在你擅长的领域给出专业回答
  • 使用你习惯的表达方式
  • 体现你的思考方式

3. 情感共鸣

  • 理解你的情感状态
  • 给出符合你性格的回应
  • 提供情感支持

🎉 总结:你的AI训练之旅

恭喜你!通过本教程,你已经完成了:

✅ 基础掌握

  • 理解AI训练的基本概念和原理
  • 学会使用MiniMind进行模型训练
  • 掌握训练参数的调整方法
  • 能够监控和优化训练过程

🚀 实战技能

  • 成功训练了基础对话模型
  • 学会使用LoRA进行高效微调
  • 掌握模型评估和测试方法
  • 能够解决常见训练问题

🎯 个性化进阶

  • 了解如何训练"你自己"的AI助手
  • 学会收集和处理个人数据
  • 掌握个性化训练技巧
  • 能够持续优化模型效果

🔮 未来方向

  • 探索更先进的训练技术
  • 开发实用的AI应用
  • 参与AI社区贡献
  • 创造独特的AI产品

💡 学习建议

  1. 动手实践:理论学习很重要,但动手实践更能加深理解
  2. 记录过程:写训练日志,记录遇到的问题和解决方案
  3. 持续学习:AI技术发展迅速,要保持学习的习惯
  4. 分享交流:和他人分享你的经验,也能从他人那里学习
  5. 创新思考:不要局限于教程,要思考新的可能性

📚 后续学习资源

  • 论文阅读:关注arXiv上的最新论文
  • 开源项目:参与GitHub上的AI项目
  • 在线课程:Coursera、edX上的深度学习课程
  • 技术博客:关注AI领域的技术博客
  • 社区论坛:加入AI相关的技术社区

记住:最好的学习方式就是动手去做!

现在,开始你的AI训练之旅吧!期待看到属于你的独特AI模型!🤖✨


祝你训练顺利!加油! 💪

本教程会持续更新,如有问题或建议,欢迎反馈!