PaddleFluid与Kaggle 猫狗大战

摘要

前面已经介绍了如何在PaddlePaddle下做图像分类工作,以及搭配VisualDL做metric可视化。今天,我来尝试使用PaddleFluid做图像分类工作。这里需要说明的是,PaddleFluid更新的频率太高了,我这里的代码是在0.13.0的基础上写的,很多更方便的API,我在官方github上的branch看到了,但是暂时无法使用,不过我这里也会告诉大家这些新的写法。Fluid文档太少,需要看代码研究python接口。

PaddleFluid 简介

前面已经介绍了如何在 PaddlePaddle 下做图像分类工作,以及搭配 VisualDL 做 metric 可视化。今天,我来尝试使用 PaddleFluid 做图像分类工作。这里需要说明的是,PaddleFluid 更新的频率太高了,我这里的代码是在 0.13.0 的基础上写的,很多更方便的 API,我在官方 github 上的 branch 看到了,但是暂时无法使用,不过我这里也会告诉大家这些新的写法。Fluid 文档太少,需要看代码研究 python 接口。

模型介绍

resnet 搞过图像的应该都知道,kaiminghe 的 resnet,Deep Residual Learning for Image Recognition (15 年年底的文章,竟然有 9496 个 citations),就是那个最开始搞过 1000 多层的网络的,原理不说了,有好多好多的文章有介绍,随便一搜就好啦。这里,不需要把 resnet 的每一层用 fluid 都写出来,PaddlePaddle 的 repo 里面有这块的工作,可供直接复用。resnet.py

Dog vs Cat

Kaggle 网站上找到 Dog vs Cat 数据集,Dogs vs. Cats, 安装好 kaggle-api 后 kaggle competitions download -c dogs-vs-cats 即可下载数据集,后面实验我在训练集中用 80% 做训练数据,20% 做验证集。

Image Reader

defdefault_mapper(sample):

    img, label = sample

    img = image.simple_transform(

        img, 256, 224, True, mean=[103.94, 116.78, 123.68])

return img.flatten().astype('float32'), label

defdataset_reader(data_dir, train_val_ratio=0.8):

    img_list = []

    img2label =dict()

    label2id =dict()

    sub_dirs = [i for i in os.listdir(data_dir) if os.path.isdir(i)]

for index, sub_dir inenumerate(sub_dirs):

        label2id[sub_dir] = index

        sub_files = []

for root, dir, files in os.walk(os.path.join(data_dir, sub_dir)):

            sub_files = [os.path.join(root, file) forfilein files  iffile.split(".")[-1] in ["jpg, jpeg"]]

        img_list += sub_files

forfilein sub_files:

            img2label[file] =sub_dir

    random.shuffle(img_list)

    train_len =int(train_val_ratio*len(img_list))

    train_img_list = img_list[:train_len]

    val_img_list = img_list[train_len:]

deftrain_reader():

for idx, imgfile inenumerate(train_img_list):

try:

                data = image.load_image(imgfile)

                label = [label2id[img2label[imgfile]], ]

yield [data, label]

exceptExceptionas e:

print"error infor: {0}".format(e.message)

continue

deftest_reader():

for idx, imgfile inenumerate(val_img_list):

try:

                data = image.load_image(imgfile)

                label = [label2id[img2label[imgfile]], ]

yield [data, label]

exceptExceptionas e:

print"error infor: {0}".format(e.message)

continue

return paddle.reader.map_readers(default_mapper, train_reader), paddle.reader.map_readers(default_mapper, test_reader)

data_reader 函数主要有两个部分:

遍历所有图像;

读取图像,生成 train,test 的生成器;

模型构建

def conv_bn_layer(input, ch_out, filter_size, stride, padding, act='relu'):

    conv1 = fluid.layers.conv2d(

        input=input,

        filter_size=filter_size,

        num_filters=ch_out,

        stride=stride,

        padding=padding,

        act=None,

        bias_attr=False)

    return fluid.layers.batch_norm(input=conv1, act=act)

def shortcut(input, ch_out, stride):

    ch_in = input.shape[1]  # if args.data_format == 'NCHW' else input.shape[-1]

    if ch_in != ch_out:

        return conv_bn_layer(input, ch_out, 1, stride, 0, None)

    else:

        return input

def basicblock(input, ch_out, stride):

    short = shortcut(input, ch_out, stride)

    conv1 = conv_bn_layer(input, ch_out, 3, stride, 1)

    conv2 = conv_bn_layer(conv1, ch_out, 3, 1, 1, act=None)

    return fluid.layers.elementwise_add(x=short, y=conv2, act='relu')

def bottleneck(input, ch_out, stride):

    short = shortcut(input, ch_out * 4, stride)

    conv1 = conv_bn_layer(input, ch_out, 1, stride, 0)

    conv2 = conv_bn_layer(conv1, ch_out, 3, 1, 1)

    conv3 = conv_bn_layer(conv2, ch_out * 4, 1, 1, 0, act=None)

    return fluid.layers.elementwise_add(x=short, y=conv3, act='relu')

def layer_warp(block_func, input, ch_out, count, stride):

    res_out = block_func(input, ch_out, stride)

    for i in range(1, count):

        res_out = block_func(res_out, ch_out, 1)

    return res_out

def resnet(input, class_dim, depth=18, data_format='NCHW'):

    cfg = {

        18: ([2, 2, 2, 1], basicblock),

        34: ([3, 4, 6, 3], basicblock),

        50: ([3, 4, 6, 3], bottleneck),

        101: ([3, 4, 23, 3], bottleneck),

        152: ([3, 8, 36, 3], bottleneck)

    }

    stages, block_func = cfg[depth]

    conv1 = conv_bn_layer(input, ch_out=64, filter_size=7, stride=2, padding=3)

    pool1 = fluid.layers.pool2d(

        input=conv1, pool_type='avg', pool_size=3, pool_stride=2)

    res1 = layer_warp(block_func, pool1, 64, stages[0], 1)

    res2 = layer_warp(block_func, res1, 128, stages[1], 2)

    res3 = layer_warp(block_func, res2, 256, stages[2], 2)

    res4 = layer_warp(block_func, res3, 512, stages[3], 2)

    pool2 = fluid.layers.pool2d(

        input=res4,

        pool_size=7,

        pool_type='avg',

        pool_stride=1,

        global_pooling=True)

    out = fluid.layers.fc(input=pool2, size=class_dim, act='softmax')

    return out

resnet() 配置不同层数的 resnet 网络,如 resnet50,resnet34, resnet101 等,这里主要是 fluid 的 api,主要是和模型结构相关的,一般来说,经典的模型都会有重现,想使用的同学 google 一下会有相应的实现,当然也要理解下怎么做的,这里我就不深究了,对比着论文应该不难。

训练

def train(args):

    # logger = LogWriter(args.logdir, sync_cycle=10000)

    model = resnet

    class_dim = args.class_dim

    if args.data_format == 'NCHW':

        dshape = [3, 224, 224]

    else:

        dshape = [224, 224, 3]

    if not args.data_path:

        raise Exception(

            "Must specify --data_path when training with imagenet")

    train_reader, test_reader = dataset_reader(args.data_path)

    print(train_reader)

    def train_network():

        input = fluid.layers.data(name='image', shape=dshape, dtype='float32')

        predict = model(input, class_dim)

        label = fluid.layers.data(name='label', shape=[1], dtype='int64')

        cost = fluid.layers.cross_entropy(input=predict, label=label)

        avg_cost = fluid.layers.mean(x=cost)

        batch_acc = fluid.layers.accuracy(input=predict, label=label)

        return [avg_cost, batch_acc]

    optimizer = fluid.optimizer.Momentum(learning_rate=0.01, momentum=0.9)

    batched_train_reader = paddle.batch(

        paddle.reader.shuffle(

            train_reader, buf_size=5120),

        batch_size=args.batch_size

        )

    batched_test_reader = paddle.batch(

        test_reader, batch_size=args.batch_size)

    def event_handler(event):

        if isinstance(event, fluid.EndStepEvent):

            print('Pass:{0},Step: {1},Metric: {2}'.format(event.epoch, event.step, event.metrics))

        if isinstance(event, fluid.EndEpochEvent):

            # save model to dir

            #trainer.save_params(".")

            avg_cost, acc = trainer.test(reader=batched_test_reader, feed_order=["image", "label"])

            print('Pass:{0},val avg_cost: {1}, acc: {2}'.format(event.epoch, avg_cost, acc))

            trainer.save_params("./ckpt") 

            # write the loss, acc to visualdl file

            pass

    # place = fluid.CUDAPlace(0) if args.use_cuda else fluid.CPUPlace()

    place = fluid.CUDAPlace(0)

    trainer = fluid.Trainer(

        train_func=train_network, optimizer=optimizer, place=place)

    print("Begin to Train")

    trainer.train(

        reader=batched_train_reader,

        num_epochs=args.pass_num,

        event_handler=event_handler,

        feed_order=['image', 'label'])

train() 主要包括:

构建模型,主要是 resnet 的部分,构建各种不同的 layer, 直接使用上节的模型构造即可;

构建训练相关的部分,配置输入输出(input,label), 构建 cost,acc 这类 op;

将 train_reader, test_reader 包装为 batch_reader;

配置设备信息,新建 Trainer 开始训练;

epoch 结束后保存模型的部分还是使用 v2 的风格,github 中 Fluid 已经支持 CheckpointConfig 来完成相应的配置,传给 Trainer,但是我这边应该从 pip 安装的是 0.13.0 的版本,我进系统的文件看了下这部分更改没有更新,所以就先使用 v2 的风格,save_params 来保存模型参数,个人从技术角度来说更偏爱 CheckpointConfig 这种 config 的模式。

训练日志

训练过程中发现一点问题:GPU 占用率跳动比较频繁, 占用率经常跳到 0,怀疑是等待问题,看代码部分发现 paddle.reader.map_readers(default_mapper, train_reader) 没有配置多个线程, 应该是由于单个线程在读 image,包括预处理的部分时间过长,造成了 gpu 计算时间的等待, 修改为 paddle.reader.xmap_readers(default_mapper, train_reader, cpu_count(), 51200) 之后,运行快了很多,不过还是有比较明显的 GPU 占用率跳的比较明显,看了下源码,读取数据的部分是 python 实现的,并不是很高效,现在只有一张卡,还好,要是多张卡,等待会更明显,这部分应该有一个更好的替代方案,可以从底层 cpp 来实现相应的读取逻辑,效率会很高。

def xmap_readers(mapper, reader, process_num, buffer_size, order=False):

    end = XmapEndSignal()

    # define a worker to read samples from reader to in_queue

    def read_worker(reader, in_queue):

        for i in reader():

            in_queue.put(i)

        in_queue.put(end)

    # define a worker to read samples from reader to in_queue with order flag

    def order_read_worker(reader, in_queue):

        in_order = 0

        for i in reader():

            in_queue.put((in_order, i))

            in_order += 1

        in_queue.put(end)

    # define a worker to handle samples from in_queue by mapper

    # and put mapped samples into out_queue

    def handle_worker(in_queue, out_queue, mapper):

        sample = in_queue.get()

        while not isinstance(sample, XmapEndSignal):

            r = mapper(sample)

            out_queue.put(r)

            sample = in_queue.get()

        in_queue.put(end)

        out_queue.put(end)

    # define a worker to handle samples from in_queue by mapper

    # and put mapped samples into out_queue by order

    def order_handle_worker(in_queue, out_queue, mapper, out_order):

        ins = in_queue.get()

        while not isinstance(ins, XmapEndSignal):

            order, sample = ins

            r = mapper(sample)

            while order != out_order[0]:

                pass

            out_queue.put(r)

            out_order[0] += 1

            ins = in_queue.get()

        in_queue.put(end)

        out_queue.put(end)

    def xreader():

        in_queue = Queue(buffer_size)

        out_queue = Queue(buffer_size)

        out_order = [0]

        # start a read worker in a thread

        target = order_read_worker if order else read_worker

        t = Thread(target=target, args=(reader, in_queue))

        t.daemon = True

        t.start()

        # start several handle_workers

        target = order_handle_worker if order else handle_worker

        args = (in_queue, out_queue, mapper, out_order) if order else (

            in_queue, out_queue, mapper)

        workers = []

        for i in xrange(process_num):

            worker = Thread(target=target, args=args)

            worker.daemon = True

            workers.append(worker)

        for w in workers:

            w.start()

        sample = out_queue.get()

        while not isinstance(sample, XmapEndSignal):

            yield sample

            sample = out_queue.get()

        finish = 1

        while finish < process_num:

            sample = out_queue.get()

            if isinstance(sample, XmapEndSignal):

                finish += 1

            else:

                yield sample

    return xreader

Image Augmentation

前面,我简单地跑起了流程,没有做基本的处理,比如 Image Augmentation,如果做了 Image Augmentation, 效果应该会更好一些,这里测试一下 Image Augmentation。读下上面的代码, Image Augmentation 的部分可以在 default_maper 的部分实现,这里可以尝试下:

DATA_DIM=224

img_mean = np.array([0.485, 0.456, 0.406]).reshape((3, 1, 1))

img_std = np.array([0.229, 0.224, 0.225]).reshape((3, 1, 1))

def resize_short(img, target_size):

    percent = float(target_size) / min(img.size[0], img.size[1])

    resized_width = int(round(img.size[0] * percent))

    resized_height = int(round(img.size[1] * percent))

    img = img.resize((resized_width, resized_height), Image.LANCZOS)

    return img

def crop_image(img, target_size, center):

    width, height = img.size

    size = target_size

    if center == True:

        w_start = (width - size) / 2

        h_start = (height - size) / 2

    else:

        w_start = random.randint(0, width - size)

        h_start = random.randint(0, height - size)

    w_end = w_start + size

    h_end = h_start + size

    img = img.crop((w_start, h_start, w_end, h_end))

    return img

def random_crop(img, size, scale=[0.08, 1.0], ratio=[3. / 4., 4. / 3.]):

    aspect_ratio = math.sqrt(random.uniform(*ratio))

    w = 1. * aspect_ratio

    h = 1. / aspect_ratio

    bound = min((float(img.size[0]) / img.size[1]) / (w**2),

                (float(img.size[1]) / img.size[0]) / (h**2))

    scale_max = min(scale[1], bound)

    scale_min = min(scale[0], bound)

    target_area = img.size[0] * img.size[1] * random.uniform(scale_min,

                                                            scale_max)

    target_size = math.sqrt(target_area)

    w = int(target_size * w)

    h = int(target_size * h)

    i = random.randint(0, img.size[0] - w)

    j = random.randint(0, img.size[1] - h)

    img = img.crop((i, j, i + w, j + h))

    img = img.resize((size, size), Image.LANCZOS)

    return img

def rotate_image(img):

    angle = random.randint(-10, 10)

    img = img.rotate(angle)

    return img

def distort_color(img):

    def random_brightness(img, lower=0.5, upper=1.5):

        e = random.uniform(lower, upper)

        return ImageEnhance.Brightness(img).enhance(e)

    def random_contrast(img, lower=0.5, upper=1.5):

        e = random.uniform(lower, upper)

        return ImageEnhance.Contrast(img).enhance(e)

    def random_color(img, lower=0.5, upper=1.5):

        e = random.uniform(lower, upper)

        return ImageEnhance.Color(img).enhance(e)

    ops = [random_brightness, random_contrast, random_color]

    random.shuffle(ops)

    img = ops[0](img)

    img = ops[1](img)

    img = ops[2](img)

    return img

def process_image(sample, mode, color_jitter, rotate):

    img_path = sample[0]

    img = Image.open(img_path)

    #img = sample[0]

    if mode == 'train':

        if rotate: img = rotate_image(img)

        img = random_crop(img, DATA_DIM)

    else:

        img = resize_short(img, target_size=256)

        img = crop_image(img, target_size=DATA_DIM, center=True)

    if mode == 'train':

        if color_jitter:

            img = distort_color(img)

        if random.randint(0, 1) == 1:

            img = img.transpose(Image.FLIP_LEFT_RIGHT)

    if img.mode != 'RGB':

        img = img.convert('RGB')

    img = np.array(img).astype('float32').transpose((2, 0, 1)) / 255

    img -= img_mean

    img /= img_std

    if mode == 'train' or mode == 'val':

        return img, sample[1]

    elif mode == 'test':

        return [img]

然后修改 mapper 的部分:

train_mapper = functools.partial(process_image, mode="train", color_jitter=False, rotate=False)

test_mapper = functools.partial(process_image, mode="test")

return paddle.reader.xmap_readers(train_mapper, train_reader, cpu_count(), 51200), paddle.reader.xmap_readers(test_mapper, test_reader, cpu_count(), 5120)

这里可以对比一下 Image Augmentation 前后的、在验证集上的结果:

很明显,在完成 Image Aug 之后,结果有了进一步提升。所有的源码都更新在 paddle-101, 这里只做基本的 demo,成绩这块未做进一步工作,大家可以尝试用 Fluid 刷下榜看看, 有兴趣的小伙伴可以玩一玩。


最新文章

极客公园

用极客视角,追踪你不可错过的科技圈.

极客之选

新鲜、有趣的硬件产品,第一时间为你呈现。

张鹏科技商业观察

聊科技,谈商业。