跳到内容

rg.Dataset.records

使用示例

在大多数情况下,您不需要直接创建 DatasetRecords 对象。相反,您可以通过 Dataset 对象访问它

dataset.records

对于熟悉旧方法的用户

  1. Dataset.records 对象用于与数据集中的记录进行交互。它以批次形式从服务器交互式地获取记录,而无需使用记录的本地副本。
  2. Dataset.recordslog 方法用于在数据集中添加和更新记录。如果记录包含已知的 id 字段,则记录将被更新。如果记录不包含已知的 id 字段,则记录将被添加。

向数据集添加记录

要向数据集添加记录,请使用 log 方法。记录可以作为字典或 Record 对象添加。单个记录也可以作为字典或 Record 添加。

您还可以通过直接初始化 Record 对象来向数据集添加记录。

records = [
    rg.Record(
        fields={
            "question": "Do you need oxygen to breathe?",
            "answer": "Yes"
        },
    ),
    rg.Record(
        fields={
            "question": "What is the boiling point of water?",
            "answer": "100 degrees Celsius"
        },
    ),
] # (1)

dataset.records.log(records)
  1. 这是一个定义的示例。在实际场景中,您将迭代数据结构并为每次迭代创建 Record 对象。
data = [
    {
        "question": "Do you need oxygen to breathe?",
        "answer": "Yes",
    },
    {
        "question": "What is the boiling point of water?",
        "answer": "100 degrees Celsius",
    },
] # (1)

dataset.records.log(data)
  1. 数据结构的键必须与 Argilla 数据集中的字段或问题匹配。在本例中,有名为 questionanswer 的字段。
data = [
    {
        "query": "Do you need oxygen to breathe?",
        "response": "Yes",
    },
    {
        "query": "What is the boiling point of water?",
        "response": "100 degrees Celsius",
    },
] # (1)
dataset.records.log(
    records=data,
    mapping={"query": "question", "response": "answer"} # (2)
)
  1. 数据结构的键必须与 Argilla 数据集中的字段或问题匹配。在本例中,有名为 questionanswer 的字段。
  2. 数据结构具有键 queryresponse,而 Argilla 数据集具有 questionanswer。您可以使用 mapping 参数将数据结构中的键映射到 Argilla 数据集中的字段。

您还可以使用 Hugging Face 数据集向数据集添加记录。当您想使用来自 Hugging Face Hub 的数据集并将其添加到您的 Argilla 数据集时,这非常有用。

您可以添加数据集,其中列名对应于 Argilla 数据集中的字段、问题、元数据或向量的名称。

如果数据集的模式与您的 Argilla 数据集名称不对应,您可以使用 mapping 来指示数据集中的哪些列对应于 Argilla 数据集字段。

from datasets import load_dataset

hf_dataset = load_dataset("imdb", split="train[:100]") # (1)

dataset.records.log(records=hf_dataset)
  1. 在本例中,Hugging Face 数据集与 Argilla 数据集模式匹配。如果不是这种情况,您可以使用 datasets 库的 .map 来准备数据,然后再将其添加到 Argilla 数据集。

在这里,我们使用 mapping 参数来指定 Hugging Face 数据集和 Argilla 数据集之间的关系。

dataset.records.log(records=hf_dataset, mapping={"txt": "text", "y": "label"}) # (1)
  1. 在本例中,Hugging Face 数据集中的 txt 键对应于 Argilla 数据集中的 text 字段,而 Hugging Face 数据集中的 y 键对应于 Argilla 数据集中的 label 字段。

更新数据集中的记录

记录也可以使用包含 id 的记录通过 log 方法进行更新,以标识要更新的记录。如上所述,记录可以作为字典或 Record 对象添加。

您可以通过直接初始化 Record 对象并提供 id 字段来更新数据集中的记录。

records = [
    rg.Record(
        metadata={"department": "toys"},
        id="2" # (1)
    ),
]

dataset.records.log(records)
  1. id 字段是标识要更新的记录所必需的。id 字段对于数据集中的每个记录都必须是唯一的。如果未提供 id 字段,则该记录将作为新记录添加。

您还可以通过在数据结构中提供 id 字段来更新数据集中的记录。

data = [
    {
        "metadata": {"department": "toys"},
        "id": "2" # (1)
    },
]

dataset.records.log(data)
  1. id 字段是标识要更新的记录所必需的。id 字段对于数据集中的每个记录都必须是唯一的。如果未提供 id 字段,则该记录将作为新记录添加。

您还可以通过在数据结构中提供 id 字段并使用映射将数据结构中的键映射到数据集中的字段来更新数据集中的记录。

data = [
    {
        "metadata": {"department": "toys"},
        "my_id": "2" # (1)
    },
]

dataset.records.log(
    records=data,
    mapping={"my_id": "id"} # (2)
)
  1. id 字段是标识要更新的记录所必需的。id 字段对于数据集中的每个记录都必须是唯一的。如果未提供 id 字段,则该记录将作为新记录添加。
  2. 假设您的数据结构具有键 my_id 而不是 id。您可以使用 mapping 参数将数据结构中的键映射到数据集中的字段。

您还可以使用 Hugging Face 数据集更新 Argilla 数据集中的记录。要更新记录,Hugging Face 数据集必须包含 id 字段以标识要更新的记录,或者您可以使用映射将 Hugging Face 数据集中的键映射到 Argilla 数据集中的字段。

from datasets import load_dataset

hf_dataset = load_dataset("imdb", split="train[:100]") # (1)

dataset.records.log(records=hf_dataset, mapping={"uuid": "id"}) # (2)
  1. 在本例中,Hugging Face 数据集与 Argilla 数据集模式匹配。
  2. Hugging Face 数据集中的 uuid 键对应于 Argilla 数据集中的 id 字段。

添加和更新带有图像的记录

Argilla 数据集可以包含图像字段。您可以通过将图像作为远程 URL、本地图像文件路径或 PIL 对象传递给记录对象,从而向数据集添加图像。字段名称必须在数据集的 Settings 对象中定义为 rg.ImageField 才能被接受。图像将存储在 Argilla 数据库中,并使用数据 URI 模式返回。

作为 PIL 对象

要检索作为重新缩放的 PIL 对象的图像,您可以在导出记录时使用 to_datasets 方法,如本 操作指南 所示。

data = [
    {
        "image": "https://example.com/image1.jpg",
    },
    {
        "image": "https://example.com/image2.jpg",
    },
]

dataset.records.log(data)
import os
from PIL import Image

image_dir = "path/to/images"

data = [
    {
        "image": os.path.join(image_dir, "image1.jpg"), # (1)
    },
    {
        "image": Image.open(os.path.join(image_dir, "image2.jpg")), # (2)
    },
]

dataset.records.log(data)
  1. 图像是本地文件路径。
  2. 图像是 PIL 对象。

Hugging Face 数据集可以直接传递给 log 方法。图像字段必须在数据集的特征中定义为 Image

hf_dataset = load_dataset("ylecun/mnist", split="train[:100]")
dataset.records.log(records=hf_dataset)

如果图像字段未在数据集的特征中定义为 Image,您可以先将数据集转换为正确的模式,然后再将其添加到 Argilla 数据集。这仅在图像字段未在数据集的特征中定义为 Image,并且不是 Argilla 支持的图像类型之一(URL、本地路径或 PIL 对象)时才需要。

hf_dataset = load_dataset("<my_custom_dataset>") # (1)
hf_dataset = hf_dataset.cast(
    features=Features({"image": Image(), "label": Value("string")}),
)
dataset.records.log(records=hf_dataset)
  1. 在本例中,Hugging Face 数据集与 Argilla 数据集模式匹配,但图像字段未在数据集的特征中定义为 Image

迭代数据集中的记录

Dataset.records 可用于迭代服务器上数据集中的记录。记录将从服务器分批获取:

for record in dataset.records:
    print(record)

# Fetch records with suggestions and responses
for record in dataset.records(with_suggestions=True, with_responses=True):
    print(record.suggestions)
    print(record.responses)

# Filter records by a query and fetch records with vectors
for record in dataset.records(query="capital", with_vectors=True):
    print(record.vectors)

查看 rg.Record 类参考,了解有关记录上可用的属性和方法的更多信息,并查看 rg.Query 类参考,了解有关查询语法的更多信息。


DatasetRecords

基类:Iterable[Record], LoggingMixin

此类用于处理来自数据集的记录,并通过 Dataset.records 访问。此类负责提供一个接口来与数据集中的记录进行交互,包括添加、更新、获取、查询、删除和导出记录。

属性

名称 类型 描述
client Argilla

Argilla 客户端对象。

dataset Dataset

数据集对象。

源代码在 src/argilla/records/_dataset_records.py
class DatasetRecords(Iterable[Record], LoggingMixin):
    """This class is used to work with records from a dataset and is accessed via `Dataset.records`.
    The responsibility of this class is to provide an interface to interact with records in a dataset,
    by adding, updating, fetching, querying, deleting, and exporting records.

    Attributes:
        client (Argilla): The Argilla client object.
        dataset (Dataset): The dataset object.
    """

    _api: RecordsAPI

    DEFAULT_BATCH_SIZE = 256
    DEFAULT_DELETE_BATCH_SIZE = 64

    def __init__(
        self, client: "Argilla", dataset: "Dataset", mapping: Optional[Dict[str, Union[str, Sequence[str]]]] = None
    ):
        """Initializes a DatasetRecords object with a client and a dataset.
        Args:
            client: An Argilla client object.
            dataset: A Dataset object.
        """
        self.__client = client
        self.__dataset = dataset
        self._mapping = mapping or {}
        self._api = self.__client.api.records

    def __iter__(self):
        return DatasetRecordsIterator(self.__dataset, self.__client, with_suggestions=True, with_responses=True)

    def __call__(
        self,
        query: Optional[Union[str, Query]] = None,
        batch_size: Optional[int] = DEFAULT_BATCH_SIZE,
        start_offset: int = 0,
        with_suggestions: bool = True,
        with_responses: bool = True,
        with_vectors: Optional[Union[List, bool, str]] = None,
        limit: Optional[int] = None,
    ) -> DatasetRecordsIterator:
        """Returns an iterator over the records in the dataset on the server.

        Parameters:
            query: A string or a Query object to filter the records.
            batch_size: The number of records to fetch in each batch. The default is 256.
            start_offset: The offset from which to start fetching records. The default is 0.
            with_suggestions: Whether to include suggestions in the records. The default is True.
            with_responses: Whether to include responses in the records. The default is True.
            with_vectors: A list of vector names to include in the records. The default is None.
                If a list is provided, only the specified vectors will be included.
                If True is provided, all vectors will be included.
            limit: The maximum number of records to fetch. The default is None.

        Returns:
            An iterator over the records in the dataset on the server.

        """
        if query and isinstance(query, str):
            query = Query(query=query)

        if with_vectors:
            self._validate_vector_names(vector_names=with_vectors)

        return DatasetRecordsIterator(
            dataset=self.__dataset,
            client=self.__client,
            query=query,
            batch_size=batch_size,
            start_offset=start_offset,
            with_suggestions=with_suggestions,
            with_responses=with_responses,
            with_vectors=with_vectors,
            limit=limit,
        )

    def __repr__(self) -> str:
        return f"{self.__class__.__name__}({self.__dataset})"

    ############################
    # Public methods
    ############################

    def log(
        self,
        records: Union[List[dict], List[Record], HFDataset],
        mapping: Optional[Dict[str, Union[str, Sequence[str]]]] = None,
        user_id: Optional[UUID] = None,
        batch_size: int = DEFAULT_BATCH_SIZE,
        on_error: RecordErrorHandling = RecordErrorHandling.RAISE,
    ) -> "DatasetRecords":
        """Add or update records in a dataset on the server using the provided records.
        If the record includes a known `id` field, the record will be updated.
        If the record does not include a known `id` field, the record will be added as a new record.
        See `rg.Record` for more information on the record definition.

        Parameters:
            records: A list of `Record` objects, a Hugging Face Dataset, or a list of dictionaries representing the records.
                     If records are defined as a dictionaries or a dataset, the keys/ column names should correspond to the
                     fields in the Argilla dataset's fields and questions. `id` should be provided to identify the records when updating.
            mapping: A dictionary that maps the keys/ column names in the records to the fields or questions in the Argilla dataset.
                     To assign an incoming key or column to multiple fields or questions, provide a list or tuple of field or question names.
            user_id: The user id to be associated with the records' response. If not provided, the current user id is used.
            batch_size: The number of records to send in each batch. The default is 256.

        Returns:
            A list of Record objects representing the updated records.
        """
        record_models = self._ingest_records(
            records=records, mapping=mapping, user_id=user_id or self.__client.me.id, on_error=on_error
        )
        batch_size = self._normalize_batch_size(
            batch_size=batch_size,
            records_length=len(record_models),
            max_value=self._api.MAX_RECORDS_PER_UPSERT_BULK,
        )

        created_or_updated = []
        records_updated = 0

        for batch in tqdm(
            iterable=range(0, len(records), batch_size),
            desc="Sending records...",
            total=len(records) // batch_size,
            unit="batch",
        ):
            self._log_message(message=f"Sending records from {batch} to {batch + batch_size}.")
            batch_records = record_models[batch : batch + batch_size]
            models, updated = self._api.bulk_upsert(dataset_id=self.__dataset.id, records=batch_records)
            created_or_updated.extend([Record.from_model(model=model, dataset=self.__dataset) for model in models])
            records_updated += updated

        records_created = len(created_or_updated) - records_updated
        self._log_message(
            message=f"Updated {records_updated} records and added {records_created} records to dataset {self.__dataset.name}",
            level="info",
        )

        return self

    def delete(
        self,
        records: List[Record],
        batch_size: int = DEFAULT_DELETE_BATCH_SIZE,
    ) -> List[Record]:
        """Delete records in a dataset on the server using the provided records
            and matching based on the id.

        Parameters:
            records: A list of `Record` objects representing the records to be deleted.
            batch_size: The number of records to send in each batch. The default is 64.

        Returns:
            A list of Record objects representing the deleted records.

        """
        mapping = None
        user_id = self.__client.me.id
        record_models = self._ingest_records(records=records, mapping=mapping, user_id=user_id)
        batch_size = self._normalize_batch_size(
            batch_size=batch_size,
            records_length=len(record_models),
            max_value=self._api.MAX_RECORDS_PER_DELETE_BULK,
        )

        records_deleted = 0
        for batch in tqdm(
            iterable=range(0, len(records), batch_size),
            desc="Sending records...",
            total=len(records) // batch_size,
            unit="batch",
        ):
            self._log_message(message=f"Sending records from {batch} to {batch + batch_size}.")
            batch_records = record_models[batch : batch + batch_size]
            self._api.delete_many(dataset_id=self.__dataset.id, records=batch_records)
            records_deleted += len(batch_records)

        self._log_message(
            message=f"Deleted {len(record_models)} records from dataset {self.__dataset.name}",
            level="info",
        )

        return records

    def to_dict(self, flatten: bool = False, orient: str = "names") -> Dict[str, Any]:
        """
        Return the records as a dictionary. This is a convenient shortcut for dataset.records(...).to_dict().

        Parameters:
            flatten (bool): The structure of the exported dictionary.
                - True: The record fields, metadata, suggestions and responses will be flattened.
                - False: The record fields, metadata, suggestions and responses will be nested.
            orient (str): The orientation of the exported dictionary.
                - "names": The keys of the dictionary will be the names of the fields, metadata, suggestions and responses.
                - "index": The keys of the dictionary will be the id of the records.
        Returns:
            A dictionary of records.

        """
        return self().to_dict(flatten=flatten, orient=orient)

    def to_list(self, flatten: bool = False) -> List[Dict[str, Any]]:
        """
        Return the records as a list of dictionaries. This is a convenient shortcut for dataset.records(...).to_list().

        Parameters:
            flatten (bool): The structure of the exported dictionaries in the list.
                - True: The record keys are flattened and a dot notation is used to record attributes and their attributes . For example, `label.suggestion` and `label.response`. Records responses are spread across multiple columns for values and users.
                - False: The record fields, metadata, suggestions and responses will be nested dictionary with keys for record attributes.
        Returns:
            A list of dictionaries of records.
        """
        data = self().to_list(flatten=flatten)
        return data

    def to_json(self, path: Union[Path, str]) -> Path:
        """
        Export the records to a file on disk.

        Parameters:
            path (str): The path to the file to save the records.

        Returns:
            The path to the file where the records were saved.

        """
        return self().to_json(path=path)

    def from_json(self, path: Union[Path, str]) -> List[Record]:
        """Creates a DatasetRecords object from a disk path to a JSON file.
            The JSON file should be defined by `DatasetRecords.to_json`.

        Args:
            path (str): The path to the file containing the records.

        Returns:
            DatasetRecords: The DatasetRecords object created from the disk path.

        """
        records = JsonIO._records_from_json(path=path)
        return self.log(records=records)

    def to_datasets(self) -> HFDataset:
        """
        Export the records to a HFDataset.

        Returns:
            The dataset containing the records.

        """

        return self().to_datasets()

    ############################
    # Private methods
    ############################

    def _ingest_records(
        self,
        records: Union[List[Dict[str, Any]], List[Record], HFDataset],
        mapping: Optional[Dict[str, Union[str, Sequence[str]]]] = None,
        user_id: Optional[UUID] = None,
        on_error: RecordErrorHandling = RecordErrorHandling.RAISE,
    ) -> List[RecordModel]:
        """Ingests records from a list of dictionaries, a Hugging Face Dataset, or a list of Record objects."""

        mapping = mapping or self._mapping
        if len(records) == 0:
            raise ValueError("No records provided to ingest.")

        record_mapper = IngestedRecordMapper(mapping=mapping, dataset=self.__dataset, user_id=user_id)

        if HFDatasetsIO._is_hf_dataset(dataset=records):
            records = HFDatasetsIO._record_dicts_from_datasets(hf_dataset=records, mapper=record_mapper)

        ingested_records = []
        for record in records:
            try:
                if isinstance(record, dict):
                    record = record_mapper(data=record)
                elif isinstance(record, Record):
                    record.dataset = self.__dataset
                else:
                    raise ValueError(
                        "Records should be a a list Record instances, "
                        "a Hugging Face Dataset, or a list of dictionaries representing the records."
                        f"Found a record of type {type(record)}: {record}."
                    )
            except Exception as e:
                if on_error == RecordErrorHandling.IGNORE:
                    self._log_message(
                        message=f"Failed to ingest record from dict {record}: {e}",
                        level="info",
                    )
                    continue
                elif on_error == RecordErrorHandling.WARN:
                    warnings.warn(f"Failed to ingest record from dict {record}: {e}")
                    continue
                raise RecordsIngestionError(f"Failed to ingest record from dict {record}") from e
            ingested_records.append(record.api_model())
        return ingested_records

    def _normalize_batch_size(self, batch_size: int, records_length, max_value: int):
        norm_batch_size = min(batch_size, records_length, max_value)

        if batch_size != norm_batch_size:
            self._log_message(
                message=f"The provided batch size {batch_size} was normalized. Using value {norm_batch_size}.",
                level="warning",
            )

        return norm_batch_size

    def _validate_vector_names(self, vector_names: Union[List[str], str]) -> None:
        if not isinstance(vector_names, list):
            vector_names = [vector_names]
        for vector_name in vector_names:
            if isinstance(vector_name, bool):
                continue
            if vector_name not in self.__dataset.schema:
                raise ValueError(f"Vector field {vector_name} not found in dataset schema.")

__init__(client, dataset, mapping=None)

使用客户端和数据集初始化 DatasetRecords 对象。参数:client:Argilla 客户端对象。dataset:Dataset 对象。

源代码在 src/argilla/records/_dataset_records.py
def __init__(
    self, client: "Argilla", dataset: "Dataset", mapping: Optional[Dict[str, Union[str, Sequence[str]]]] = None
):
    """Initializes a DatasetRecords object with a client and a dataset.
    Args:
        client: An Argilla client object.
        dataset: A Dataset object.
    """
    self.__client = client
    self.__dataset = dataset
    self._mapping = mapping or {}
    self._api = self.__client.api.records

__call__(query=None, batch_size=DEFAULT_BATCH_SIZE, start_offset=0, with_suggestions=True, with_responses=True, with_vectors=None, limit=None)

返回服务器上数据集中记录的迭代器。

参数

名称 类型 描述 默认值
query 可选[Union[str, Query]]

用于过滤记录的字符串或 Query 对象。

batch_size 可选[int]

每个批次中要获取的记录数。默认值为 256。

DEFAULT_BATCH_SIZE
start_offset int

开始获取记录的偏移量。默认值为 0。

0
with_suggestions bool

是否在记录中包含建议。默认值为 True。

True
with_responses bool

是否在记录中包含响应。默认值为 True。

True
with_vectors 可选[Union[List, bool, str]]

要在记录中包含的向量名称列表。默认值为 None。如果提供了列表,则仅包含指定的向量。如果提供了 True,则将包含所有向量。

limit 可选[int]

要获取的最大记录数。默认值为 None。

返回值

类型 描述
DatasetRecordsIterator

服务器上数据集中记录的迭代器。

源代码在 src/argilla/records/_dataset_records.py
def __call__(
    self,
    query: Optional[Union[str, Query]] = None,
    batch_size: Optional[int] = DEFAULT_BATCH_SIZE,
    start_offset: int = 0,
    with_suggestions: bool = True,
    with_responses: bool = True,
    with_vectors: Optional[Union[List, bool, str]] = None,
    limit: Optional[int] = None,
) -> DatasetRecordsIterator:
    """Returns an iterator over the records in the dataset on the server.

    Parameters:
        query: A string or a Query object to filter the records.
        batch_size: The number of records to fetch in each batch. The default is 256.
        start_offset: The offset from which to start fetching records. The default is 0.
        with_suggestions: Whether to include suggestions in the records. The default is True.
        with_responses: Whether to include responses in the records. The default is True.
        with_vectors: A list of vector names to include in the records. The default is None.
            If a list is provided, only the specified vectors will be included.
            If True is provided, all vectors will be included.
        limit: The maximum number of records to fetch. The default is None.

    Returns:
        An iterator over the records in the dataset on the server.

    """
    if query and isinstance(query, str):
        query = Query(query=query)

    if with_vectors:
        self._validate_vector_names(vector_names=with_vectors)

    return DatasetRecordsIterator(
        dataset=self.__dataset,
        client=self.__client,
        query=query,
        batch_size=batch_size,
        start_offset=start_offset,
        with_suggestions=with_suggestions,
        with_responses=with_responses,
        with_vectors=with_vectors,
        limit=limit,
    )

log(records, mapping=None, user_id=None, batch_size=DEFAULT_BATCH_SIZE, on_error=RecordErrorHandling.RAISE)

使用提供的记录在服务器上的数据集中添加或更新记录。如果记录包含已知的 id 字段,则记录将被更新。如果记录不包含已知的 id 字段,则记录将作为新记录添加。有关记录定义的更多信息,请参阅 rg.Record

参数

名称 类型 描述 默认值
records Union[List[dict], List[Record], HFDataset]

Record 对象列表、Hugging Face 数据集或表示记录的字典列表。如果记录定义为字典或数据集,则键/列名应与 Argilla 数据集字段和问题中的字段和问题相对应。应提供 id 以在更新时标识记录。

必需
mapping 可选[Dict[str, Union[str, Sequence[str]]]]

一个字典,用于将记录中的键/列名映射到 Argilla 数据集中的字段或问题。要将传入的键或列分配给多个字段或问题,请提供字段或问题名称的列表或元组。

user_id 可选[UUID]

要与记录的响应关联的用户 ID。如果未提供,则使用当前用户 ID。

batch_size int

每个批次要发送的记录数。默认值为 256。

DEFAULT_BATCH_SIZE

返回值

类型 描述
DatasetRecords

表示更新记录的 Record 对象列表。

源代码在 src/argilla/records/_dataset_records.py
def log(
    self,
    records: Union[List[dict], List[Record], HFDataset],
    mapping: Optional[Dict[str, Union[str, Sequence[str]]]] = None,
    user_id: Optional[UUID] = None,
    batch_size: int = DEFAULT_BATCH_SIZE,
    on_error: RecordErrorHandling = RecordErrorHandling.RAISE,
) -> "DatasetRecords":
    """Add or update records in a dataset on the server using the provided records.
    If the record includes a known `id` field, the record will be updated.
    If the record does not include a known `id` field, the record will be added as a new record.
    See `rg.Record` for more information on the record definition.

    Parameters:
        records: A list of `Record` objects, a Hugging Face Dataset, or a list of dictionaries representing the records.
                 If records are defined as a dictionaries or a dataset, the keys/ column names should correspond to the
                 fields in the Argilla dataset's fields and questions. `id` should be provided to identify the records when updating.
        mapping: A dictionary that maps the keys/ column names in the records to the fields or questions in the Argilla dataset.
                 To assign an incoming key or column to multiple fields or questions, provide a list or tuple of field or question names.
        user_id: The user id to be associated with the records' response. If not provided, the current user id is used.
        batch_size: The number of records to send in each batch. The default is 256.

    Returns:
        A list of Record objects representing the updated records.
    """
    record_models = self._ingest_records(
        records=records, mapping=mapping, user_id=user_id or self.__client.me.id, on_error=on_error
    )
    batch_size = self._normalize_batch_size(
        batch_size=batch_size,
        records_length=len(record_models),
        max_value=self._api.MAX_RECORDS_PER_UPSERT_BULK,
    )

    created_or_updated = []
    records_updated = 0

    for batch in tqdm(
        iterable=range(0, len(records), batch_size),
        desc="Sending records...",
        total=len(records) // batch_size,
        unit="batch",
    ):
        self._log_message(message=f"Sending records from {batch} to {batch + batch_size}.")
        batch_records = record_models[batch : batch + batch_size]
        models, updated = self._api.bulk_upsert(dataset_id=self.__dataset.id, records=batch_records)
        created_or_updated.extend([Record.from_model(model=model, dataset=self.__dataset) for model in models])
        records_updated += updated

    records_created = len(created_or_updated) - records_updated
    self._log_message(
        message=f"Updated {records_updated} records and added {records_created} records to dataset {self.__dataset.name}",
        level="info",
    )

    return self

delete(records, batch_size=DEFAULT_DELETE_BATCH_SIZE)

使用提供的记录删除服务器上数据集中的记录,并基于 ID 进行匹配。

参数

名称 类型 描述 默认值
records List[Record]

表示要删除的记录的 Record 对象列表。

必需
batch_size int

每个批次要发送的记录数。默认值为 64。

DEFAULT_DELETE_BATCH_SIZE

返回值

类型 描述
List[Record]

表示已删除记录的 Record 对象列表。

源代码在 src/argilla/records/_dataset_records.py
def delete(
    self,
    records: List[Record],
    batch_size: int = DEFAULT_DELETE_BATCH_SIZE,
) -> List[Record]:
    """Delete records in a dataset on the server using the provided records
        and matching based on the id.

    Parameters:
        records: A list of `Record` objects representing the records to be deleted.
        batch_size: The number of records to send in each batch. The default is 64.

    Returns:
        A list of Record objects representing the deleted records.

    """
    mapping = None
    user_id = self.__client.me.id
    record_models = self._ingest_records(records=records, mapping=mapping, user_id=user_id)
    batch_size = self._normalize_batch_size(
        batch_size=batch_size,
        records_length=len(record_models),
        max_value=self._api.MAX_RECORDS_PER_DELETE_BULK,
    )

    records_deleted = 0
    for batch in tqdm(
        iterable=range(0, len(records), batch_size),
        desc="Sending records...",
        total=len(records) // batch_size,
        unit="batch",
    ):
        self._log_message(message=f"Sending records from {batch} to {batch + batch_size}.")
        batch_records = record_models[batch : batch + batch_size]
        self._api.delete_many(dataset_id=self.__dataset.id, records=batch_records)
        records_deleted += len(batch_records)

    self._log_message(
        message=f"Deleted {len(record_models)} records from dataset {self.__dataset.name}",
        level="info",
    )

    return records

to_dict(flatten=False, orient='names')

将记录作为字典返回。这是 dataset.records(...).to_dict() 的便捷快捷方式。

参数

名称 类型 描述 默认值
flatten bool

导出字典的结构。 - True:记录字段、元数据、建议和响应将被展平。 - False:记录字段、元数据、建议和响应将被嵌套。

False
orient str

导出字典的方向。 - "names":字典的键将是字段、元数据、建议和响应的名称。 - "index":字典的键将是记录的 ID。

'names'

返回值:记录的字典。

源代码在 src/argilla/records/_dataset_records.py
def to_dict(self, flatten: bool = False, orient: str = "names") -> Dict[str, Any]:
    """
    Return the records as a dictionary. This is a convenient shortcut for dataset.records(...).to_dict().

    Parameters:
        flatten (bool): The structure of the exported dictionary.
            - True: The record fields, metadata, suggestions and responses will be flattened.
            - False: The record fields, metadata, suggestions and responses will be nested.
        orient (str): The orientation of the exported dictionary.
            - "names": The keys of the dictionary will be the names of the fields, metadata, suggestions and responses.
            - "index": The keys of the dictionary will be the id of the records.
    Returns:
        A dictionary of records.

    """
    return self().to_dict(flatten=flatten, orient=orient)

to_list(flatten=False)

将记录作为字典列表返回。这是 dataset.records(...).to_list() 的便捷快捷方式。

参数

名称 类型 描述 默认值
flatten bool

列表中导出的字典的结构。 - True:记录键被展平,并使用点表示法记录属性及其属性。例如,label.suggestionlabel.response。记录响应在多个列中展开,用于值和用户。 - False:记录字段、元数据、建议和响应将是嵌套字典,其中包含记录属性的键。

False

返回值:记录的字典列表。

源代码在 src/argilla/records/_dataset_records.py
def to_list(self, flatten: bool = False) -> List[Dict[str, Any]]:
    """
    Return the records as a list of dictionaries. This is a convenient shortcut for dataset.records(...).to_list().

    Parameters:
        flatten (bool): The structure of the exported dictionaries in the list.
            - True: The record keys are flattened and a dot notation is used to record attributes and their attributes . For example, `label.suggestion` and `label.response`. Records responses are spread across multiple columns for values and users.
            - False: The record fields, metadata, suggestions and responses will be nested dictionary with keys for record attributes.
    Returns:
        A list of dictionaries of records.
    """
    data = self().to_list(flatten=flatten)
    return data

to_json(path)

将记录导出到磁盘上的文件。

参数

名称 类型 描述 默认值
path str

保存记录的文件的路径。

必需

返回值

类型 描述
Path

记录保存到的文件的路径。

源代码在 src/argilla/records/_dataset_records.py
def to_json(self, path: Union[Path, str]) -> Path:
    """
    Export the records to a file on disk.

    Parameters:
        path (str): The path to the file to save the records.

    Returns:
        The path to the file where the records were saved.

    """
    return self().to_json(path=path)

from_json(path)

从磁盘路径创建 DatasetRecords 对象到 JSON 文件。JSON 文件应由 DatasetRecords.to_json 定义。

参数

名称 类型 描述 默认值
path str

path

必需

返回值

名称 类型 描述
DatasetRecords List[Record]

从磁盘路径创建的 DatasetRecords 对象。

源代码在 src/argilla/records/_dataset_records.py
def from_json(self, path: Union[Path, str]) -> List[Record]:
    """Creates a DatasetRecords object from a disk path to a JSON file.
        The JSON file should be defined by `DatasetRecords.to_json`.

    Args:
        path (str): The path to the file containing the records.

    Returns:
        DatasetRecords: The DatasetRecords object created from the disk path.

    """
    records = JsonIO._records_from_json(path=path)
    return self.log(records=records)

to_datasets()

将记录导出到 HFDataset。

返回值

类型 描述
HFDataset

包含记录的数据集。

源代码在 src/argilla/records/_dataset_records.py
def to_datasets(self) -> HFDataset:
    """
    Export the records to a HFDataset.

    Returns:
        The dataset containing the records.

    """

    return self().to_datasets()