跳到内容

图像分类

开始入门

部署 Argilla 服务器

如果您已经部署了 Argilla,则可以跳过此步骤。否则,您可以按照本指南快速部署 Argilla。

设置环境

要完成本教程,您需要通过 pip 安装 Argilla SDK 和一些第三方库。

!pip install argilla
!pip install "transformers[torch]~=4.0" "accelerate~=0.34"

让我们进行所需的导入

import base64
import io
import re

from IPython.display import display
import numpy as np
import torch
from PIL import Image

from datasets import load_dataset, Dataset, load_metric
from transformers import (
    AutoImageProcessor,
    AutoModelForImageClassification,
    pipeline,
    Trainer,
    TrainingArguments
)

import argilla as rg

您还需要使用 api_urlapi_key 连接到 Argilla 服务器。

# Replace api_url with your url if using Docker
# Replace api_key with your API key under "My Settings" in the UI
# Uncomment the last line and set your HF_TOKEN if your space is private
client = rg.Argilla(
    api_url="https://[your-owner-name]-[your_space_name].hf.space",
    api_key="[your-api-key]",
    # headers={"Authorization": f"Bearer {HF_TOKEN}"}
)

数据集概览

我们将查看 数据集,以了解其结构和包含的数据类型。我们通过使用嵌入式 Hugging Face Dataset Viewer 来做到这一点。

配置和创建 Argilla 数据集

现在,我们需要配置数据集。在设置中,我们可以指定指南、字段和问题。如果需要,您还可以添加元数据和向量。但是,对于我们的用例,我们只需要 image 列的字段和 label 列的标签问题。

注意

查看此操作指南,以了解有关配置和创建数据集的更多信息。

labels = [str(x) for x in range(10)]

settings = rg.Settings(
    guidelines="The goal of this task is to classify a given image of a handwritten digit into one of 10 classes representing integer values from 0 to 9, inclusively.",
    fields=[
        rg.ImageField(
            name="image",
            title="An image of a handwritten digit.",
        ),
    ],
    questions=[
        rg.LabelQuestion(
            name="image_label",
            title="What digit do you see on the image?",
            labels=labels,
        )
    ]
)

让我们使用名称和定义的设置创建数据集

dataset = rg.Dataset(
    name="image_classification_dataset",
    settings=settings,
)
dataset.create()

添加记录

即使我们创建了数据集,它仍然缺少要标注的信息(您可以在 UI 中查看)。我们将使用来自 Hugging Face Hubylecun/mnist 数据集。具体来说,我们将使用 100 个示例。因为我们正在处理可能很大的图像数据集,所以我们将设置 streaming=True 以避免将整个数据集加载到内存中,并迭代数据以延迟加载它。

提示

当使用 Hugging Face 数据集时,您可以设置 Image(decode=False),以便我们可以获取公共图像 URL,但这取决于数据集。

n_rows = 100

hf_dataset = load_dataset("ylecun/mnist", streaming=True)
dataset_rows = [row for _,row in zip(range(n_rows), hf_dataset["train"])]
hf_dataset = Dataset.from_list(dataset_rows)

hf_dataset
Dataset({
    features: ['image', 'label'],
    num_rows: 100
})

让我们看一下数据集中的第一张图像。

hf_dataset[0]
{'image': <PIL.PngImagePlugin.PngImageFile image mode=L size=28x28>,
 'label': 5}

我们将使用 log 轻松地将它们添加到数据集中,而无需映射,因为名称已经与 Argilla 资源匹配。此外,由于图像已经采用 PIL 格式,并在 Hugging Face 数据集的功能中定义为 Image,因此我们可以直接记录它们。我们还将在每个记录中包含一个 id 列,以便我们轻松地追溯到外部数据源。

hf_dataset = hf_dataset.add_column("id", range(len(hf_dataset)))
dataset.records.log(records=hf_dataset)

添加初始模型建议

下一步是将建议添加到数据集。这将使标注团队的工作更轻松快捷。建议将显示为预选选项,因此标注者只需更正它们即可。在我们的例子中,我们将使用 zero-shot CLIP 模型生成它们。但是,您可以使用您选择的框架或技术。

我们将首先使用 transformers pipeline 加载模型。

checkpoint = "openai/clip-vit-large-patch14"
detector = pipeline(model=checkpoint, task="zero-shot-image-classification")

现在,让我们尝试进行模型预测,看看它是否有意义。

predictions = detector(hf_dataset[1]["image"], candidate_labels=labels)
predictions, display(hf_dataset[1]["image"])
No description has been provided for this image
([{'score': 0.5236628651618958, 'label': '0'},
  {'score': 0.11496700346469879, 'label': '7'},
  {'score': 0.08030630648136139, 'label': '8'},
  {'score': 0.07141078263521194, 'label': '9'},
  {'score': 0.05868939310312271, 'label': '6'},
  {'score': 0.05507850646972656, 'label': '5'},
  {'score': 0.0341767854988575, 'label': '1'},
  {'score': 0.027202051132917404, 'label': '4'},
  {'score': 0.018533246591687202, 'label': '3'},
  {'score': 0.015973029658198357, 'label': '2'}],
 None)

现在是时候对数据集进行预测了!我们将设置一个使用 zero-shot 模型的函数。该模型将根据图像推断标签。当处理大型数据集时,您可以创建一个 batch_predict 方法来加速该过程。

def predict(input, labels):
    prediction = detector(input, candidate_labels=labels)
    prediction = prediction[0]
    return {"image_label": prediction["label"], "score": prediction["score"]}

要更新记录,我们需要从服务器检索它们并使用新建议更新它们。 id 始终需要提供,因为它是记录的标识符,用于更新记录并避免创建新记录。

data = dataset.records.to_list(flatten=True)
updated_data = [
    {
        "id": sample["id"],
        **predict(sample["image"], labels),
    }
    for sample in data
]
dataset.records.log(records=updated_data, mapping={"score": "image_label.suggestion.score"})

瞧!我们已将建议添加到数据集,它们将显示在 UI 中,并标有 ✨。

使用 Argilla 评估

现在,我们可以开始标注过程。只需在 Argilla UI 中打开数据集并开始标注记录。如果建议正确,您只需单击 Submit。否则,您可以选择正确的标签。

注意

查看此操作指南,以了解有关在 UI 中标注的更多信息。

训练您的模型

标注完成后,我们将拥有一个强大的数据集来训练主模型。在我们的例子中,我们将使用 transformers 进行微调。但是,您可以选择最符合您要求的模型。

格式化数据

因此,让我们首先检索标注的记录并将它们导出为 Dataset,这样图像将采用 PIL 格式。

注意

查看此操作指南,以了解有关在 Argilla 中过滤和查询的更多信息。此外,您可以查看 Hugging Face 文档中关于微调图像分类模型的内容。

dataset = client.datasets("image_classification_dataset")
status_filter = rg.Query(filter=rg.Filter(("response.status", "==", "submitted")))

submitted = dataset.records(status_filter).to_datasets()

我们现在需要确保我们的图像以正确的尺寸转发。由于原始 MNIST 数据集是灰度的,而 VIT 模型期望 RGB,我们需要为图像添加通道维度。我们将通过沿通道轴堆叠图像来实现这一点。

def greyscale_to_rgb(img) -&gt; Image:
    return Image.merge('RGB', (img, img, img))

submitted_image_rgb = [
    {
        "id": sample["id"],
        "image": greyscale_to_rgb(sample["image"]),
        "label": sample["image_label.responses"][0],
    }
    for sample in submitted
]
submitted_image_rgb[0]
{'id': '0', 'image': <PIL.Image.Image image mode=RGB size=28x28>, 'label': '0'}

接下来,我们将加载 ImageProcessor 以微调模型。此处理器将处理图像大小调整和归一化,以便与我们打算使用的模型兼容。

checkpoint = "google/vit-base-patch16-224-in21k"
processor = AutoImageProcessor.from_pretrained(checkpoint)

submitted_image_rgb_processed = [
    {
        "pixel_values": processor(sample["image"], return_tensors='pt')["pixel_values"],
        "label": sample["label"],
    }
    for sample in submitted_image_rgb
]
submitted_image_rgb_processed[0]

我们现在可以将图像转换为 Hugging Face Dataset,以便进行微调。

prepared_ds = Dataset.from_list(submitted_image_rgb_processed)
prepared_ds = prepared_ds.train_test_split(test_size=0.2)
prepared_ds
DatasetDict({
    train: Dataset({
        features: ['pixel_values', 'label'],
        num_rows: 80
    })
    test: Dataset({
        features: ['pixel_values', 'label'],
        num_rows: 20
    })
})

实际训练

然后我们需要定义我们的数据整理器,这将确保数据被正确解包和堆叠以供模型使用。

def collate_fn(batch):
    return {
        'pixel_values': torch.stack([torch.tensor(x['pixel_values'][0]) for x in batch]),
        'labels': torch.tensor([int(x['label']) for x in batch])
    }

接下来,我们可以定义我们的训练指标。我们将使用准确率指标来评估模型的性能。

metric = load_metric("accuracy", trust_remote_code=True)
def compute_metrics(p):
    return metric.compute(predictions=np.argmax(p.predictions, axis=1), references=p.label_ids)

然后我们加载我们的模型并配置我们将用于训练的标签。

model = AutoModelForImageClassification.from_pretrained(
    checkpoint,
    num_labels=len(labels),
    id2label={int(i): int(c) for i, c in enumerate(labels)},
    label2id={int(c): int(i) for i, c in enumerate(labels)}
)
model.config

最后,我们定义训练参数并开始训练过程。

training_args = TrainingArguments(
  output_dir="./image-classifier",
  per_device_train_batch_size=16,
  eval_strategy="steps",
  num_train_epochs=1,
  fp16=False, # True if you have a GPU with mixed precision support
  save_steps=100,
  eval_steps=100,
  logging_steps=10,
  learning_rate=2e-4,
  save_total_limit=2,
  remove_unused_columns=True,
  push_to_hub=False,
  load_best_model_at_end=True,
)

trainer = Trainer(
    model=model,
    args=training_args,
    data_collator=collate_fn,
    compute_metrics=compute_metrics,
    train_dataset=prepared_ds["train"],
    eval_dataset=prepared_ds["test"],
    tokenizer=processor,
)

train_results = trainer.train()
trainer.save_model()
trainer.log_metrics("train", train_results.metrics)
trainer.save_metrics("train", train_results.metrics)
trainer.save_state()
{'train_runtime': 12.5374, 'train_samples_per_second': 6.381, 'train_steps_per_second': 0.399, 'train_loss': 2.0533515930175783, 'epoch': 1.0}
***** train metrics *****
  epoch                    =        1.0
  total_flos               =  5774017GF
  train_loss               =     2.0534
  train_runtime            = 0:00:12.53
  train_samples_per_second =      6.381
  train_steps_per_second   =      0.399

由于训练数据质量更高,我们可以期望获得更好的模型。因此,我们可以使用新模型的建议更新原始数据集的其余部分。

pipe = pipeline("image-classification", model=model, image_processor=processor)

def run_inference(batch):
    predictions = pipe(batch["image"])
    batch["image_label"] = [prediction[0]["label"] for prediction in predictions]
    batch["score"] = [prediction[0]["score"] for prediction in predictions]
    return batch

hf_dataset = hf_dataset.map(run_inference, batched=True)
data = dataset.records.to_list(flatten=True)
updated_data = [
    {
        "image_label": str(sample["image_label"]),
        "id": sample["id"],
        "score": sample["score"],
    }
    for sample in hf_dataset
]
dataset.records.log(records=updated_data, mapping={"score": "image_label.suggestion.score"})

结论

在本教程中,我们展示了一个图像分类任务的端到端示例。这可以作为基础,但可以迭代执行并无缝集成到您的工作流程中,以确保高质量的数据管理和改进的结果。

我们首先配置数据集,并从 zero-shot 模型添加记录和建议。在标注过程之后,我们使用标注的数据训练了一个新模型,并使用新建议更新了剩余的记录。