开源
Colossal-AI:可编写分布式深度学习的模型
来源:元经纪     阅读:1504
网站管理员
发布于 2023-02-01 07:35
查看主页

概述

如何更好、更快和更便宜地实现训练、微调AIGC模型,已成为AIGC商业化和应用爆发的最大痛点。Colossal-AI基于在大模型民主化的专业技术积累,开源完整Stable Diffusion预训练和个性化微调方案,预训练时间加速和经济成本降低6.5倍,个性化微调硬件成本降低7倍!在个人电脑的RTX 2070/3050上即可快速完成微调任务流程,让Stable Diffusion等AIGC模型触手可及。

关于 Colossal-AI

随着深度学习模型规模的发展,转向新的训练范式非常重要。没有并行性和优化的传统训练方法已成为过去,新的训练方法是使大规模模型训练高效且具有成本效益的关键。 Colossal-AI 旨在成为一个统一的系统,为用户提供一套集成的培训技能和实用程序。您可以找到常见的训练实用程序,例如混合精度训练和梯度累积。此外,我们提供了一系列并行性,包括数据、张量和管道并行性。我们使用不同的多维分布式矩阵矩阵乘法算法优化张量并行性。我们还提供了不同的管道并行方法,以允许用户有效地跨节点扩展他们的模型。也可以在本教程文档中详细找到卸载等更多高级功能。

一般用法

我们的目标是使 Colossal-AI 易于使用且对用户代码无干扰。如果您想使用 Colossal-AI,有一个简单的通用工作流程。

[hidecontent type="logged" desc="隐藏内容:登录后可查看"]

工作流程
  1. 准备一个配置文件,其中指定您要使用的功能和您的参数。
  2. 初始化分布式后端colossalai.launch
  3. 使用 将训练特征注入你的训练组件(例如模型、优化器)colossalai.initialize
  4. 运行训练和测试

示例:使用混合并行训练 GPT

作者:刘洪新、李永斌 示例代码 相关论文

简介

在之前的教程中,我们介绍了如何使用流水线训练 ViT。在本教程中,您将学习一个更复杂的场景——使用混合并行训练 GPT。在这种情况下,GPT-3 太大以至于 CPU 内存也无法容纳它。因此,您必须自己拆分模型。

目录

在本教程中,我们将介绍:
  1. GPT模型的定义,基于colossalai/model_zoo
  2. 处理数据集
  3. 使用混合并行训练 GPT

导入库

import json
import os
from typing import Callable

import colossalai
import colossalai.utils as utils
import model_zoo.gpt.gpt as col_gpt
import torch
import torch.nn as nn
from colossalai import nn as col_nn
from colossalai.amp import AMP_TYPE
from colossalai.builder.pipeline import partition_uniform
from colossalai.context.parallel_mode import ParallelMode
from colossalai.core import global_context as gpc
from colossalai.engine.schedule import (InterleavedPipelineSchedule,
                                        PipelineSchedule)
from colossalai.logging import disable_existing_loggers, get_dist_logger
from colossalai.nn.layer.wrapper import PipelineSharedModuleWrapper
from colossalai.trainer import Trainer, hooks
from colossalai.utils.timer import MultiTimer
from model_zoo.gpt import GPTLMLoss
from torch.nn import functional as F
from torch.utils.data import Dataset
from transformers import GPT2Tokenizer

定义 GPT模型

在之前的教程中,我们介绍了 3 种构建流水线模型的方法。但对于像 GPT-3 这样的大型模型,你甚至无法在 CPU 中构建模型。在这种情况下,您必须自己拆分模型。 GPT 数据加载器返回input_idsattention_mask,因此我们在中使用两个关键字参数forward()来获取它们。请注意,对于除第一阶段以外的阶段, 的第一个位置参数forward()是前一阶段的输出张量。所以hidden_states是前一阶段的,第一阶段是None。 对于 GPT,词嵌入层输出头共享权重。我们提供PipelineSharedModuleWrapper在流水线阶段之间共享参数。它采用listofint作为参数,这意味着这些等级共享参数。您可以使用register_module()orregister_parameter()将模块或参数注册为共享模块或参数。如果你有多组共享模块/参数,你应该有多个PipelineSharedModuleWrapper实例。如果参数在一个阶段内共享,则不应使用PipelineSharedModuleWrapper,而应使用相同的模块/参数实例。在这个例子中,词嵌入层在第一阶段,输出头处于最后阶段。因此,它们在 ranks 之间共享[0, pipeline_size - 1]。 对于第一阶段,它维护嵌入层和一些转换器块。对于最后一个阶段,它维护了一些变压器块和输出头层。对于其他阶段,他们只是维护一些变压器块。partition_uniform(num_layers, pipeline_size, num_chunks)返回所有行列的部分,部分为tupleof (start, end)(不包括end)。start == 0表示是第一阶段,end == num_layers表示是最后阶段。
class PipelineGPTHybrid(nn.Module):
    def __init__(self,
                 num_layers: int = 12,
                 hidden_size: int = 768,
                 num_attention_heads: int = 12,
                 vocab_size: int = 50304,
                 embed_drop_rate: float = 0.,
                 act_func: Callable = F.gelu,
                 mlp_ratio: int = 4,
                 attn_drop_rate: float = 0.,
                 drop_rate: float = 0.,
                 dtype: torch.dtype = torch.float,
                 checkpoint: bool = False,
                 max_position_embeddings: int = 1024,
                 layer_norm_epsilon: float = 1e-5,
                 first: bool = False,
                 last: bool = False):
        super().__init__()
        self.embedding = None
        self.norm = None
        self.head = None
        if first:
            self.embedding = col_gpt.GPTEmbedding(
                hidden_size, vocab_size, max_position_embeddings, dropout=embed_drop_rate, dtype=dtype)
        self.blocks = nn.ModuleList([
            col_gpt.GPTBlock(hidden_size, num_attention_heads, mlp_ratio=mlp_ratio, attention_dropout=attn_drop_rate,
                             dropout=drop_rate, dtype=dtype, checkpoint=checkpoint, activation=act_func)
            for _ in range(num_layers)
        ])
        if last:
            self.norm = col_nn.LayerNorm(hidden_size, eps=layer_norm_epsilon)
            self.head = col_gpt.GPTLMHead(vocab_size=vocab_size,
                                          dim=hidden_size,
                                          dtype=dtype,
                                          bias=False)

    def forward(self, hidden_states=None, input_ids=None, attention_mask=None):
        if self.embedding is not None:
            hidden_states = self.embedding(input_ids=input_ids)
        batch_size = hidden_states.shape[0]
        attention_mask = attention_mask.view(batch_size, -1)
        attention_mask = attention_mask[:, None, None, :]
        attention_mask = attention_mask.to(dtype=hidden_states.dtype)  # fp16 compatibility
        attention_mask = (1.0 - attention_mask) * -10000.0
        for block in self.blocks:
            hidden_states, attention_mask = block(hidden_states, attention_mask)
        if self.norm is not None:
            hidden_states = self.head(self.norm(hidden_states))
        return hidden_states


def build_gpt_pipeline(num_layers, num_chunks, device=torch.device('cuda'), **kwargs):
    logger = get_dist_logger()
    pipeline_size = gpc.get_world_size(ParallelMode.PIPELINE)
    pipeline_rank = gpc.get_local_rank(ParallelMode.PIPELINE)
    rank = gpc.get_global_rank()
    wrapper = PipelineSharedModuleWrapper([0, pipeline_size - 1])
    parts = partition_uniform(num_layers, pipeline_size, num_chunks)[pipeline_rank]
    models = []
    for start, end in parts:
        kwargs['num_layers'] = end - start
        kwargs['first'] = start == 0
        kwargs['last'] = end == num_layers
        logger.info(f'Rank{rank} build layer {start}-{end}, {end-start}/{num_layers} layers')
        chunk = PipelineGPTHybrid(**kwargs).to(device)
        if start == 0:
            wrapper.register_module(chunk.embedding.word_embeddings)
        elif end == num_layers:
            wrapper.register_module(chunk.head)
        models.append(chunk)
    if len(models) == 1:
        model = models[0]
    else:
        model = nn.ModuleList(models)
    return model


def GPT2_exlarge_pipeline_hybrid(num_chunks=1, checkpoint=False, dtype=torch.float):
    cfg = dict(hidden_size=1600, num_attention_heads=32, checkpoint=checkpoint, dtype=dtype)
    return build_gpt_pipeline(48, num_chunks, **cfg)


def GPT3_pipeline_hybrid(num_chunks=1, checkpoint=False, dtype=torch.float):
    cfg = dict(hidden_size=12288, num_attention_heads=96,
               checkpoint=checkpoint, max_position_embeddings=2048, dtype=dtype)
    return build_gpt_pipeline(96, num_chunks, **cfg)

处理数据集

我们在这里提供了一个小型 GPT 网络文本数据集。原始格式是松散的 JSON,我们将保存处理后的数据集。
class WebtextDataset(Dataset):
    def __init__(self, path, seq_len=1024) -> None:
        super().__init__()
        root = os.path.dirname(path)
        encoded_data_cache_path = os.path.join(root, f'gpt_webtext_{seq_len}.pt')
        if os.path.isfile(encoded_data_cache_path):
            seq_len_, data, attention_mask = torch.load(
                encoded_data_cache_path)
            if seq_len_ == seq_len:
                self.data = data
                self.attention_mask = attention_mask
                return
        raw_data = []
        with open(path) as f:
            for line in f.readlines():
                raw_data.append(json.loads(line)['text'])
        tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
        tokenizer.pad_token = tokenizer.unk_token
        encoded_data = tokenizer(
            raw_data, padding=True, truncation=True, max_length=seq_len, return_tensors='pt')
        self.data = encoded_data['input_ids']
        self.attention_mask = encoded_data['attention_mask']
        torch.save((seq_len, self.data, self.attention_mask),
                   encoded_data_cache_path)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        return {
            'input_ids': self.data[index],
            'attention_mask': self.attention_mask[index]
        }, self.data[index]

使用混合并行

在前面的教程中,我们解释了一些管道参数的含义。在这种情况下,我们可以确定在流水线阶段之间交换的每个输出张量的形状。对于 GPT,形状是(MICRO BATCH SIZE, SEQUENCE LEN, HIDDEN SIZE). 通过设置这个,我们可以避免交换每个阶段的张量形状。当您不确定张量形状时,您可以保留它None,并自动推断出形状。确保dtype您的模型正确。当你使用fp16时,dtype你的模型的必须是torch.half。否则,dtype必须是torch.float. 对于管道并行性,仅AMP_TYPE.NAIVE支持。 parallel您可以通过在 中设置来轻松使用张量并行CONFIG。数据并行度大小是根据 GPU 的数量自动设置的。
NUM_EPOCHS = 60
SEQ_LEN = 1024
BATCH_SIZE = 192
NUM_CHUNKS = None
TENSOR_SHAPE = (1, 1024, 1600)
# only pipeline parallel
# CONFIG = dict(parallel=dict(pipeline=2), fp16=dict(mode=AMP_TYPE.NAIVE))
# pipeline + 1D model parallel
CONFIG = dict(NUM_MICRO_BATCHES = 192, parallel=dict(pipeline=2, tensor=dict(mode='1d', size=2)), fp16=dict(mode=AMP_TYPE.NAIVE))


def train():
    disable_existing_loggers()
    parser = colossalai.get_default_parser()
    args = parser.parse_args()
    colossalai.launch_from_torch(config=CONFIG, backend=args.backend)
    logger = get_dist_logger()

    train_ds = WebtextDataset(os.environ['DATA'], seq_len=SEQ_LEN)
    train_dataloader = utils.get_dataloader(train_ds,
                                            seed=42,
                                            batch_size=BATCH_SIZE,
                                            pin_memory=True,
                                            shuffle=True,
                                            drop_last=True)

    use_interleaved = NUM_CHUNKS is not None
    num_chunks = 1 if not use_interleaved else NUM_CHUNKS
    model = GPT2_exlarge_pipeline_hybrid(num_chunks=num_chunks, checkpoint=True, dtype=torch.half)
    # model = GPT3_pipeline_hybrid(num_chunks=num_chunks, checkpoint=True, dtype=torch.half)
    if use_interleaved and not isinstance(model, nn.ModuleList):
        model = nn.ModuleList([model])

    criterion = GPTLMLoss()

    optimizer = torch.optim.Adam(model.parameters(), lr=0.00015, weight_decay=1e-2,)

    engine, train_dataloader, _, _ = colossalai.initialize(model,
                                                           optimizer,
                                                           criterion,
                                                           train_dataloader=train_dataloader)
    global_batch_size = BATCH_SIZE * \
        gpc.get_world_size(ParallelMode.DATA) * getattr(gpc.config, "gradient_accumulation", 1)
    logger.info(f'Init done, global batch size = {global_batch_size}', ranks=[0])

    timer = MultiTimer()

    trainer = Trainer(
        engine=engine,
        logger=logger,
        timer=timer
    )

    hook_list = [
        hooks.LossHook(),
        hooks.LogMetricByEpochHook(logger),
        hooks.ThroughputHook(),
        hooks.LogMetricByStepHook(),
    ]

    trainer.fit(
        train_dataloader=train_dataloader,
        epochs=NUM_EPOCHS,
        test_interval=1,
        hooks=hook_list,
        display_progress=True,
        return_output_label=False,
    )

未来发展

Colossal-AI 系统将被扩展以包含更多的训练技能,这些新的发展可能包括但不限于:
  1. 分布式操作的优化
  2. 异构系统训练优化
  3. 实施培训实用程序以减少模型大小并加快培训速度,同时保持模型性能
  4. 现有并行方法的扩展
注:以上仅为Colossal-AI 的官方示例教程,详情请参阅以下链接: 官方网站:https://colossalai.org/ 详细使用教程:https://colossalai.org/docs/basics/launch_colossalai 详细解读:https://www.sohu.com/a/603960592_121124362

[/hidecontent]

 
 
免责声明:本文为用户发表,不代表网站立场,仅供参考,不构成引导等用途。 开源
gpdmini2024掌机3月6日开启预售,亮度500尼特
日产拼了 中大型纯电轿车11.99万起售!日产N7上市一小时订单破万
奇瑞风云a9车身配色官图发布,定位于中大型车
线上业务超九成 太原住房公积金数字化建设纵深推进
专家喊话引热议:别老骂国足 中国足球人口极少

首页

分类

定制方案

消息

我的