Skip to content

Classification

Routines for classification.

DawidSkene

Bases: BaseClassificationAggregator

The Dawid-Skene aggregation model is a probabilistic model that parametrizes the expertise level of workers with confusion matrices.

Let \(e^w\) be a worker confusion (error) matrix of size \(K \times K\) in case of the \(K\) class classification, \(p\) be a vector of prior class probabilities, \(z_j\) be a true task label, and \(y^w_j\) be a worker response to the task \(j\). The relationship between these parameters is represented by the following latent label model.

Dawid-Skene latent label model

Here the prior true label probability is \(\operatorname{Pr}(z_j = c) = p[c]\), and the probability distribution of the worker responses with the true label \(c\) is represented by the corresponding column of the error matrix: \(\operatorname{Pr}(y_j^w = k | z_j = c) = e^w[k, c]\).

Parameters \(p\), \(e^w\), and latent variables \(z\) are optimized with the Expectation-Maximization algorithm: 1. E-step. Estimates the true task label probabilities using the specified workers' responses, the prior label probabilities, and the workers' error probability matrix. 2. M-step. Estimates the workers' error probability matrix using the specified workers' responses and the true task label probabilities.

A. Philip Dawid and Allan M. Skene. Maximum Likelihood Estimation of Observer Error-Rates Using the EM Algorithm. Journal of the Royal Statistical Society. Series C (Applied Statistics), Vol. 28, 1 (1979), 20–28.

https://doi.org/10.2307/2346806

Examples:

>>> from crowdkit.aggregation import DawidSkene
>>> from crowdkit.datasets import load_dataset
>>> df, gt = load_dataset('relevance-2')
>>> ds = DawidSkene(100)
>>> result = ds.fit_predict(df)

We can use the golden labels to correct the probability distributions of task labels by the true labels during the iterative process:

Examples:

>>> from crowdkit.aggregation import DawidSkene
>>> from crowdkit.datasets import load_dataset
>>> df, gt = load_dataset('relevance-2')
>>> true_labels = gt[:1000]  # use the first 100 true labels
>>> ds = DawidSkene(100)
>>> result = ds.fit_predict(df, true_labels)

We can also provide the workers' initial error matrices, which come from historical performance data. There two strategies to initialize the workers' error matrices: assign and addition. Here we create simple error matrices with two workers:

              0  1
worker label
w851   0      9  1
       1      1  9
w6991  0      9  1
       1      1  9
Note
  1. Make sure the error matrix is indexed by worker and label with columns for every label_id appeared in data. You can use the pandas.MultiIndex to create such an index, see the example below.

  2. When using addition strategy, the error matrix should contain the history count(not probability) that worker produces observed_label, given that the task true label is true_label.

When we use the addition strategy, partial error matrices are acceptable, which will be added to the workers' priori error matrices estimated from the given data.

Examples:

>>> import pandas as pd
>>> from crowdkit.aggregation import DawidSkene
>>> from crowdkit.datasets import load_dataset
>>> df, gt = load_dataset('relevance-2')
>>> error_matrix_index = pd.MultiIndex.from_arrays([['w851', 'w851', 'w6991', 'w6991'], [0, 1, 0, 1]], names=['worker', 'label'])
>>> initial_error = pd.DataFrame(
...     data=[[9, 1], [1, 9], [9, 1], [1, 9]],
...     index=error_matrix_index,
...     columns=[0, 1],
... )
>>> ds = DawidSkene(100, initial_error_strategy='addition')
>>> result = ds.fit_predict(df, initial_error=initial_error)

We can also use the assign strategy to initialize the workers' error matrices. But in this case, the initial_error must contain all the workers' error matrices in the data.

Source code in crowdkit/aggregation/classification/dawid_skene.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
@attr.s
class DawidSkene(BaseClassificationAggregator):
    r"""The **Dawid-Skene** aggregation model is a probabilistic model that parametrizes the expertise level of workers with confusion matrices.

    Let $e^w$ be a worker confusion (error) matrix of size $K \times K$ in case of the $K$ class classification,
    $p$ be a vector of prior class probabilities, $z_j$ be a true task label, and $y^w_j$ be a worker
    response to the task $j$. The relationship between these parameters is represented by the following latent
    label model.

    ![Dawid-Skene latent label model](https://tlk.s3.yandex.net/crowd-kit/docs/ds_llm.png)

    Here the prior true label probability is $\operatorname{Pr}(z_j = c) = p[c]$, and the probability distribution
    of the worker responses with the true label $c$ is represented by the corresponding column of the error matrix:
    $\operatorname{Pr}(y_j^w = k | z_j = c) = e^w[k, c]$.

    Parameters $p$, $e^w$, and latent variables $z$ are optimized with the Expectation-Maximization algorithm:
    1. **E-step**. Estimates the true task label probabilities using the specified workers' responses,
        the prior label probabilities, and the workers' error probability matrix.
    2. **M-step**. Estimates the workers' error probability matrix using the specified workers' responses and the true task label probabilities.

    A. Philip Dawid and Allan M. Skene. Maximum Likelihood Estimation of Observer Error-Rates Using the EM Algorithm.
    *Journal of the Royal Statistical Society. Series C (Applied Statistics), Vol. 28*, 1 (1979), 20–28.

    <https://doi.org/10.2307/2346806>

    Examples:
        >>> from crowdkit.aggregation import DawidSkene
        >>> from crowdkit.datasets import load_dataset
        >>> df, gt = load_dataset('relevance-2')
        >>> ds = DawidSkene(100)
        >>> result = ds.fit_predict(df)

    We can use the golden labels to correct the probability distributions of task labels
    by the true labels during the iterative process:

    Examples:
        >>> from crowdkit.aggregation import DawidSkene
        >>> from crowdkit.datasets import load_dataset
        >>> df, gt = load_dataset('relevance-2')
        >>> true_labels = gt[:1000]  # use the first 100 true labels
        >>> ds = DawidSkene(100)
        >>> result = ds.fit_predict(df, true_labels)

    We can also provide the workers' initial error matrices, which come from historical performance data.
    There two strategies to initialize the workers' error matrices: `assign` and `addition`.
    Here we create simple error matrices with two workers:

    ```
                  0  1
    worker label
    w851   0      9  1
           1      1  9
    w6991  0      9  1
           1      1  9
    ```

    Note:
        1. Make sure the error matrix is indexed by `worker` and `label`
        with columns for every `label_id` appeared in `data`.
        You can use the `pandas.MultiIndex` to create such an index, see the example below.

        2. When using `addition` strategy, the error matrix should contain the history **count**(not probability) that `worker` produces `observed_label`,
        given that the task true label is `true_label`.

    When we use the `addition` strategy, partial error matrices are acceptable,
    which will be added to the workers' priori error matrices estimated from the given data.

    Examples:
        >>> import pandas as pd
        >>> from crowdkit.aggregation import DawidSkene
        >>> from crowdkit.datasets import load_dataset
        >>> df, gt = load_dataset('relevance-2')
        >>> error_matrix_index = pd.MultiIndex.from_arrays([['w851', 'w851', 'w6991', 'w6991'], [0, 1, 0, 1]], names=['worker', 'label'])
        >>> initial_error = pd.DataFrame(
        ...     data=[[9, 1], [1, 9], [9, 1], [1, 9]],
        ...     index=error_matrix_index,
        ...     columns=[0, 1],
        ... )
        >>> ds = DawidSkene(100, initial_error_strategy='addition')
        >>> result = ds.fit_predict(df, initial_error=initial_error)

    We can also use the `assign` strategy to initialize the workers' error matrices.
    But in this case, the `initial_error` **must** contain all the workers' error matrices in the data.
    """

    n_iter: int = attr.ib(default=100)
    """The maximum number of EM iterations."""

    tol: float = attr.ib(default=1e-5)
    """The tolerance stopping criterion for iterative methods with a variable number of steps.
    The algorithm converges when the loss change is less than the `tol` parameter."""

    initial_error_strategy: Optional[Literal["assign", "addition"]] = attr.ib(
        default=None
    )
    """The strategy for initializing the workers' error matrices.
    The `assign` strategy assigns the initial error matrix to the workers' error matrices;
    the `addition` strategy adds the initial error matrix with the workers' priori error matrices estimated
    from the given data. If `None`, the initial error matrix is not used.

    Note:
        - `addition` strategy
            - The initial error matrix can be partial, not all workers' error matrices need to be provided.
            - The initial error matrix values should be the history **count** that
              `worker` produces `observed_label`, given that the task true label is `true_label`.
              For example(count values error matrix):

                                  0  1
                    worker label
                    w851   0      9  1
                           1      1  9
                    w6991  0      9  1
                           1      1  9

        - `assign` strategy
            - The initial error matrix must contain all the workers' error matrices in the data.
            - The initial error matrix values could be the probability or count that
              `worker` produces `observed_label`, given that the task true label is `true_label`.
              When given probability error matrix, the values should sum up to 1 for each worker at each `true_label` column.
              For example(probability values error matrix):

                                  0    1
                    worker label
                    w851   0      0.9  0.1
                           1      0.1  0.9
                    w6991  0      0.9  0.1
                           1      0.1  0.9
                    ...
    """

    probas_: Optional[pd.DataFrame] = attr.ib(init=False)
    """The probability distributions of task labels.
    The `pandas.Series` data is indexed by `task` so that `labels.loc[task]` is the most likely true label of tasks."""

    priors_: Optional["pd.Series[Any]"] = named_series_attrib(name="prior")
    """The prior label distribution.
    The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is the probability that
    the `task` true label is equal to `label`. Each probability is in the range from 0 to 1, all task probabilities
    must sum up to 1."""

    errors_: Optional[pd.DataFrame] = attr.ib(init=False)
    """The workers' error matrices. The `pandas.DataFrame` data is indexed by `worker` and `label` with a column
    for every `label_id` found in `data` so that `result.loc[worker, observed_label, true_label]` is the probability
    that `worker` produces `observed_label`, given that the task true label is `true_label`."""

    loss_history_: List[float] = attr.ib(init=False)
    """ A list of loss values during training."""

    @staticmethod
    def _m_step(
        data: pd.DataFrame,
        probas: pd.DataFrame,
        initial_error: Optional[pd.DataFrame] = None,
        initial_error_strategy: Optional[Literal["assign", "addition"]] = None,
    ) -> pd.DataFrame:
        """Performs M-step of the Dawid-Skene algorithm.

        Estimates the workers' error probability matrix using the specified workers' responses and the true task label probabilities.
        """
        joined = data.join(probas, on="task")
        joined.drop(columns=["task"], inplace=True)
        errors = joined.groupby(["worker", "label"], sort=False).sum()
        # Apply the initial error matrix
        errors = initial_error_apply(errors, initial_error, initial_error_strategy)
        # Normalize the error matrix
        errors.clip(lower=_EPS, inplace=True)
        errors /= errors.groupby("worker", sort=False).sum()

        return errors

    @staticmethod
    def _e_step(
        data: pd.DataFrame, priors: "pd.Series[Any]", errors: pd.DataFrame
    ) -> pd.DataFrame:
        """
        Performs E-step of the Dawid-Skene algorithm.

        Estimates the true task label probabilities using the specified workers' responses,
        the prior label probabilities, and the workers' error probability matrix.
        """
        # We have to multiply lots of probabilities and such products are known to converge
        # to zero exponentially fast. To avoid floating-point precision problems we work with
        # logs of original values
        joined = data.join(np.log2(errors), on=["worker", "label"])  # type: ignore
        joined.drop(columns=["worker", "label"], inplace=True)

        priors.clip(lower=_EPS, inplace=True)
        log_likelihoods = np.log2(priors) + joined.groupby("task", sort=False).sum()
        log_likelihoods.rename_axis("label", axis=1, inplace=True)

        # Exponentiating log_likelihoods 'as is' may still get us beyond our precision.
        # So we shift every row of log_likelihoods by a constant (which is equivalent to
        # multiplying likelihoods rows by a constant) so that max log_likelihood in each
        # row is equal to 0. This trick ensures proper scaling after exponentiating and
        # does not affect the result of E-step
        scaled_likelihoods = np.exp2(
            log_likelihoods.sub(log_likelihoods.max(axis=1), axis=0)
        )
        scaled_likelihoods = scaled_likelihoods.div(
            scaled_likelihoods.sum(axis=1), axis=0
        )
        # Convert columns types to label type
        scaled_likelihoods.columns = pd.Index(
            scaled_likelihoods.columns, name="label", dtype=data.label.dtype
        )
        return cast(pd.DataFrame, scaled_likelihoods)

    def _evidence_lower_bound(
        self,
        data: pd.DataFrame,
        probas: pd.DataFrame,
        priors: "pd.Series[Any]",
        errors: pd.DataFrame,
    ) -> float:
        # calculate joint probability log-likelihood expectation over probas
        joined = data.join(np.log(errors), on=["worker", "label"])  # type: ignore

        # escape boolean index/column names to prevent confusion between indexing by boolean array and iterable of names
        joined = joined.rename(columns={True: "True", False: "False"}, copy=False)
        priors = priors.rename(index={True: "True", False: "False"}, copy=False)
        priors.clip(lower=_EPS, inplace=True)

        joined.loc[:, priors.index] = joined.loc[:, priors.index].add(np.log(priors))  # type: ignore

        joined.set_index(["task", "worker"], inplace=True)
        joint_expectation = (
            (probas.rename(columns={True: "True", False: "False"}) * joined).sum().sum()
        )
        probas.clip(lower=_EPS, inplace=True)
        entropy = -(np.log(probas) * probas).sum().sum()
        return float(joint_expectation + entropy)

    def fit(
        self,
        data: pd.DataFrame,
        true_labels: Optional["pd.Series[Any]"] = None,
        initial_error: Optional[pd.DataFrame] = None,
    ) -> "DawidSkene":
        """Fits the model to the training data with the EM algorithm.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.
            true_labels (Series): The ground truth labels of tasks.
                The `pandas.Series` data is indexed by `task`  so that `labels.loc[task]` is the task ground truth label.
                When provided, the model will correct the probability distributions of task labels by the true labels
                during the iterative process.
            initial_error (DataFrame): The workers' initial error matrices, comes from historical performance data.
                The `pandas.DataFrame` data is indexed by `worker` and `label` with a column
                for every `label_id` found in `data` so that `result.loc[worker, observed_label, true_label]` is the
                history **count** that `worker` produces `observed_label`, given that the task true label is `true_label`.
                When the `initial_error_strategy` is `assign`, the values in the error matrix can be the probability too.
                Check the examples in the class docstring for more details.

        Returns:
            DawidSkene: self.
        """

        data = data[["task", "worker", "label"]]

        # Early exit
        if not data.size:
            self.probas_ = pd.DataFrame()
            self.priors_ = pd.Series(dtype=float)
            self.errors_ = pd.DataFrame()
            self.labels_ = pd.Series(dtype=float)
            return self

        # Initialization
        probas = MajorityVote().fit_predict_proba(data)
        # correct the probas by true_labels
        if true_labels is not None:
            probas = self._correct_probas_with_golden(probas, true_labels)
        priors = probas.mean()
        errors = self._m_step(data, probas, initial_error, self.initial_error_strategy)
        loss = -np.inf
        self.loss_history_ = []

        # Updating proba and errors n_iter times
        for _ in range(self.n_iter):
            probas = self._e_step(data, priors, errors)
            # correct the probas by true_labels
            if true_labels is not None:
                probas = self._correct_probas_with_golden(probas, true_labels)
            priors = probas.mean()
            errors = self._m_step(data, probas)
            new_loss = self._evidence_lower_bound(data, probas, priors, errors) / len(
                data
            )
            self.loss_history_.append(new_loss)

            if new_loss - loss < self.tol:
                break
            loss = new_loss

        probas.columns = pd.Index(
            probas.columns, name="label", dtype=probas.columns.dtype
        )
        # Saving results
        self.probas_ = probas
        self.priors_ = priors
        self.errors_ = errors
        self.labels_ = get_most_probable_labels(probas)

        return self

    def fit_predict_proba(
        self,
        data: pd.DataFrame,
        true_labels: Optional["pd.Series[Any]"] = None,
        initial_error: Optional[pd.DataFrame] = None,
    ) -> pd.DataFrame:
        """Fits the model to the training data and returns probability distributions of labels for each task.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.
            true_labels (Series): The ground truth labels of tasks.
                The `pandas.Series` data is indexed by `task`  so that `labels.loc[task]` is the task ground truth label.
                When provided, the model will correct the probability distributions of task labels by the true labels
                during the iterative process.
            initial_error (DataFrame): The workers' initial error matrices, comes from historical performance data.
                The `pandas.DataFrame` data is indexed by `worker` and `label` with a column
                for every `label_id` found in `data` so that `result.loc[worker, observed_label, true_label]` is the
                history **count** that `worker` produces `observed_label`, given that the task true label is `true_label`.
                When the `initial_error_strategy` is `assign`, the values in the error matrix can be the probability too.
                Check the examples in the class docstring for more details.

        Returns:
            DataFrame: Probability distributions of task labels.
                The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is the probability that the `task` true label is equal to `label`.
                Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.
        """

        self.fit(data, true_labels, initial_error)
        assert self.probas_ is not None, "no probas_"
        return self.probas_

    def fit_predict(
        self,
        data: pd.DataFrame,
        true_labels: Optional["pd.Series[Any]"] = None,
        initial_error: Optional[pd.DataFrame] = None,
    ) -> "pd.Series[Any]":
        """Fits the model to the training data and returns the aggregated results.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.
            true_labels (Series): The ground truth labels of tasks.
                The `pandas.Series` data is indexed by `task`  so that `labels.loc[task]` is the task ground truth label.
                When provided, the model will correct the probability distributions of task labels by the true labels
                during the iterative process.
            initial_error (DataFrame): The workers' initial error matrices, comes from historical performance data.
                The `pandas.DataFrame` data is indexed by `worker` and `label` with a column
                for every `label_id` found in `data` so that `result.loc[worker, observed_label, true_label]` is the
                history **count** that `worker` produces `observed_label`, given that the task true label is `true_label`.
                When the `initial_error_strategy` is `assign`, the values in the error matrix can be the probability too.
                Check the examples in the class docstring for more details.

        Returns:
            Series: Task labels. The `pandas.Series` data is indexed by `task` so that `labels.loc[task]` is the most likely true label of tasks.
        """

        self.fit(data, true_labels, initial_error)
        assert self.labels_ is not None, "no labels_"
        return self.labels_

    @staticmethod
    def _correct_probas_with_golden(
        probas: pd.DataFrame, true_labels: "pd.Series[Any]"
    ) -> pd.DataFrame:
        """
        Correct the probas by `true_labels`
        """
        corrected_probas = probas

        # Iterate over the unique labels present in true_labels
        for label in true_labels.unique():
            # Find the indices in both probas and true_labels where the true label is the current label
            indices = true_labels[true_labels == label].index.intersection(probas.index)
            # Set the corresponding probabilities to 1 for the correct label and 0 for others
            corrected_probas.loc[indices] = (
                0  # Set all columns to 0 for the given indices
            )
            corrected_probas.loc[indices, label] = (
                1  # Set the correct label column to 1
            )

        return corrected_probas

errors_ = attr.ib(init=False) class-attribute instance-attribute

The workers' error matrices. The pandas.DataFrame data is indexed by worker and label with a column for every label_id found in data so that result.loc[worker, observed_label, true_label] is the probability that worker produces observed_label, given that the task true label is true_label.

initial_error_strategy = attr.ib(default=None) class-attribute instance-attribute

The strategy for initializing the workers' error matrices. The assign strategy assigns the initial error matrix to the workers' error matrices; the addition strategy adds the initial error matrix with the workers' priori error matrices estimated from the given data. If None, the initial error matrix is not used.

Note
  • addition strategy

    • The initial error matrix can be partial, not all workers' error matrices need to be provided.
    • The initial error matrix values should be the history count that worker produces observed_label, given that the task true label is true_label. For example(count values error matrix):
                    0  1
      worker label
      w851   0      9  1
             1      1  9
      w6991  0      9  1
             1      1  9
      
  • assign strategy

    • The initial error matrix must contain all the workers' error matrices in the data.
    • The initial error matrix values could be the probability or count that worker produces observed_label, given that the task true label is true_label. When given probability error matrix, the values should sum up to 1 for each worker at each true_label column. For example(probability values error matrix):
                    0    1
      worker label
      w851   0      0.9  0.1
             1      0.1  0.9
      w6991  0      0.9  0.1
             1      0.1  0.9
      ...
      

loss_history_ = attr.ib(init=False) class-attribute instance-attribute

A list of loss values during training.

n_iter = attr.ib(default=100) class-attribute instance-attribute

The maximum number of EM iterations.

priors_ = named_series_attrib(name='prior') class-attribute instance-attribute

The prior label distribution. The pandas.DataFrame data is indexed by task so that result.loc[task, label] is the probability that the task true label is equal to label. Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.

probas_ = attr.ib(init=False) class-attribute instance-attribute

The probability distributions of task labels. The pandas.Series data is indexed by task so that labels.loc[task] is the most likely true label of tasks.

tol = attr.ib(default=1e-05) class-attribute instance-attribute

The tolerance stopping criterion for iterative methods with a variable number of steps. The algorithm converges when the loss change is less than the tol parameter.

fit(data, true_labels=None, initial_error=None)

Fits the model to the training data with the EM algorithm.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required
true_labels Series

The ground truth labels of tasks. The pandas.Series data is indexed by task so that labels.loc[task] is the task ground truth label. When provided, the model will correct the probability distributions of task labels by the true labels during the iterative process.

None
initial_error DataFrame

The workers' initial error matrices, comes from historical performance data. The pandas.DataFrame data is indexed by worker and label with a column for every label_id found in data so that result.loc[worker, observed_label, true_label] is the history count that worker produces observed_label, given that the task true label is true_label. When the initial_error_strategy is assign, the values in the error matrix can be the probability too. Check the examples in the class docstring for more details.

None

Returns:

Name Type Description
DawidSkene DawidSkene

self.

Source code in crowdkit/aggregation/classification/dawid_skene.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
def fit(
    self,
    data: pd.DataFrame,
    true_labels: Optional["pd.Series[Any]"] = None,
    initial_error: Optional[pd.DataFrame] = None,
) -> "DawidSkene":
    """Fits the model to the training data with the EM algorithm.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.
        true_labels (Series): The ground truth labels of tasks.
            The `pandas.Series` data is indexed by `task`  so that `labels.loc[task]` is the task ground truth label.
            When provided, the model will correct the probability distributions of task labels by the true labels
            during the iterative process.
        initial_error (DataFrame): The workers' initial error matrices, comes from historical performance data.
            The `pandas.DataFrame` data is indexed by `worker` and `label` with a column
            for every `label_id` found in `data` so that `result.loc[worker, observed_label, true_label]` is the
            history **count** that `worker` produces `observed_label`, given that the task true label is `true_label`.
            When the `initial_error_strategy` is `assign`, the values in the error matrix can be the probability too.
            Check the examples in the class docstring for more details.

    Returns:
        DawidSkene: self.
    """

    data = data[["task", "worker", "label"]]

    # Early exit
    if not data.size:
        self.probas_ = pd.DataFrame()
        self.priors_ = pd.Series(dtype=float)
        self.errors_ = pd.DataFrame()
        self.labels_ = pd.Series(dtype=float)
        return self

    # Initialization
    probas = MajorityVote().fit_predict_proba(data)
    # correct the probas by true_labels
    if true_labels is not None:
        probas = self._correct_probas_with_golden(probas, true_labels)
    priors = probas.mean()
    errors = self._m_step(data, probas, initial_error, self.initial_error_strategy)
    loss = -np.inf
    self.loss_history_ = []

    # Updating proba and errors n_iter times
    for _ in range(self.n_iter):
        probas = self._e_step(data, priors, errors)
        # correct the probas by true_labels
        if true_labels is not None:
            probas = self._correct_probas_with_golden(probas, true_labels)
        priors = probas.mean()
        errors = self._m_step(data, probas)
        new_loss = self._evidence_lower_bound(data, probas, priors, errors) / len(
            data
        )
        self.loss_history_.append(new_loss)

        if new_loss - loss < self.tol:
            break
        loss = new_loss

    probas.columns = pd.Index(
        probas.columns, name="label", dtype=probas.columns.dtype
    )
    # Saving results
    self.probas_ = probas
    self.priors_ = priors
    self.errors_ = errors
    self.labels_ = get_most_probable_labels(probas)

    return self

fit_predict(data, true_labels=None, initial_error=None)

Fits the model to the training data and returns the aggregated results.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required
true_labels Series

The ground truth labels of tasks. The pandas.Series data is indexed by task so that labels.loc[task] is the task ground truth label. When provided, the model will correct the probability distributions of task labels by the true labels during the iterative process.

None
initial_error DataFrame

The workers' initial error matrices, comes from historical performance data. The pandas.DataFrame data is indexed by worker and label with a column for every label_id found in data so that result.loc[worker, observed_label, true_label] is the history count that worker produces observed_label, given that the task true label is true_label. When the initial_error_strategy is assign, the values in the error matrix can be the probability too. Check the examples in the class docstring for more details.

None

Returns:

Name Type Description
Series Series[Any]

Task labels. The pandas.Series data is indexed by task so that labels.loc[task] is the most likely true label of tasks.

Source code in crowdkit/aggregation/classification/dawid_skene.py
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
def fit_predict(
    self,
    data: pd.DataFrame,
    true_labels: Optional["pd.Series[Any]"] = None,
    initial_error: Optional[pd.DataFrame] = None,
) -> "pd.Series[Any]":
    """Fits the model to the training data and returns the aggregated results.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.
        true_labels (Series): The ground truth labels of tasks.
            The `pandas.Series` data is indexed by `task`  so that `labels.loc[task]` is the task ground truth label.
            When provided, the model will correct the probability distributions of task labels by the true labels
            during the iterative process.
        initial_error (DataFrame): The workers' initial error matrices, comes from historical performance data.
            The `pandas.DataFrame` data is indexed by `worker` and `label` with a column
            for every `label_id` found in `data` so that `result.loc[worker, observed_label, true_label]` is the
            history **count** that `worker` produces `observed_label`, given that the task true label is `true_label`.
            When the `initial_error_strategy` is `assign`, the values in the error matrix can be the probability too.
            Check the examples in the class docstring for more details.

    Returns:
        Series: Task labels. The `pandas.Series` data is indexed by `task` so that `labels.loc[task]` is the most likely true label of tasks.
    """

    self.fit(data, true_labels, initial_error)
    assert self.labels_ is not None, "no labels_"
    return self.labels_

fit_predict_proba(data, true_labels=None, initial_error=None)

Fits the model to the training data and returns probability distributions of labels for each task.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required
true_labels Series

The ground truth labels of tasks. The pandas.Series data is indexed by task so that labels.loc[task] is the task ground truth label. When provided, the model will correct the probability distributions of task labels by the true labels during the iterative process.

None
initial_error DataFrame

The workers' initial error matrices, comes from historical performance data. The pandas.DataFrame data is indexed by worker and label with a column for every label_id found in data so that result.loc[worker, observed_label, true_label] is the history count that worker produces observed_label, given that the task true label is true_label. When the initial_error_strategy is assign, the values in the error matrix can be the probability too. Check the examples in the class docstring for more details.

None

Returns:

Name Type Description
DataFrame DataFrame

Probability distributions of task labels. The pandas.DataFrame data is indexed by task so that result.loc[task, label] is the probability that the task true label is equal to label. Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.

Source code in crowdkit/aggregation/classification/dawid_skene.py
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
def fit_predict_proba(
    self,
    data: pd.DataFrame,
    true_labels: Optional["pd.Series[Any]"] = None,
    initial_error: Optional[pd.DataFrame] = None,
) -> pd.DataFrame:
    """Fits the model to the training data and returns probability distributions of labels for each task.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.
        true_labels (Series): The ground truth labels of tasks.
            The `pandas.Series` data is indexed by `task`  so that `labels.loc[task]` is the task ground truth label.
            When provided, the model will correct the probability distributions of task labels by the true labels
            during the iterative process.
        initial_error (DataFrame): The workers' initial error matrices, comes from historical performance data.
            The `pandas.DataFrame` data is indexed by `worker` and `label` with a column
            for every `label_id` found in `data` so that `result.loc[worker, observed_label, true_label]` is the
            history **count** that `worker` produces `observed_label`, given that the task true label is `true_label`.
            When the `initial_error_strategy` is `assign`, the values in the error matrix can be the probability too.
            Check the examples in the class docstring for more details.

    Returns:
        DataFrame: Probability distributions of task labels.
            The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is the probability that the `task` true label is equal to `label`.
            Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.
    """

    self.fit(data, true_labels, initial_error)
    assert self.probas_ is not None, "no probas_"
    return self.probas_

GLAD

Bases: BaseClassificationAggregator

The GLAD (Generative model of Labels, Abilities, and Difficulties) model is a probabilistic model that parametrizes the abilities of workers and the difficulty of tasks.

Let's consider a case of \(K\) class classification. Let \(p\) be a vector of prior class probabilities, \(\alpha_i \in (-\infty, +\infty)\) be a worker ability parameter, \(\beta_j \in (0, +\infty)\) be an inverse task difficulty, \(z_j\) be a latent variable representing the true task label, and \(y^i_j\) be a worker response that we observe. The relationships between these variables and parameters according to GLAD are represented by the following latent label model:

GLAD latent label model

The prior probability of \(z_j\) being equal to \(c\) is \(\operatorname{Pr}(z_j = c) = p[c]\), and the probability distribution of the worker responses with the true label \(c\) follows the single coin Dawid-Skene model where the true label probability is a sigmoid function of the product of the worker ability and the inverse task difficulty:

\(\operatorname{Pr}(y^i_j = k | z_j = c) = \begin{cases} a(i, j), & k = c \\ \frac{1 - a(i,j)}{K-1}, & k \neq c \end{cases}\),

where \(a(i,j) = \frac{1}{1 + \exp(-\alpha_i\beta_j)}\).

Parameters \(p\), \(\alpha\), \(\beta\), and latent variables \(z\) are optimized with the Expectation-Minimization algorithm: 1. E-step. Estimates the true task label probabilities using the alpha parameters of workers' abilities, the prior label probabilities, and the beta parameters of task difficulty. 2. M-step. Optimizes the alpha and beta parameters using the conjugate gradient method.

J. Whitehill, P. Ruvolo, T. Wu, J. Bergsma, and J. Movellan. Whose Vote Should Count More: Optimal Integration of Labels from Labelers of Unknown Expertise. Proceedings of the 22nd International Conference on Neural Information Processing Systems, 2009

https://proceedings.neurips.cc/paper/2009/file/f899139df5e1059396431415e770c6dd-Paper.pdf

Examples:

>>> from crowdkit.aggregation import GLAD
>>> from crowdkit.datasets import load_dataset
>>> df, gt = load_dataset('relevance-2')
>>> glad = GLAD()
>>> result = glad.fit_predict(df)
Source code in crowdkit/aggregation/classification/glad.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
@attr.s
class GLAD(BaseClassificationAggregator):
    r"""The **GLAD** (Generative model of Labels, Abilities, and Difficulties) model is a probabilistic model that parametrizes the abilities of workers and the difficulty of tasks.

    Let's consider a case of $K$ class classification. Let $p$ be a vector of prior class probabilities,
    $\alpha_i \in (-\infty, +\infty)$ be a worker ability parameter, $\beta_j \in (0, +\infty)$ be
    an inverse task difficulty, $z_j$ be a latent variable representing the true task label, and $y^i_j$
    be a worker response that we observe. The relationships between these variables and parameters according
    to GLAD are represented by the following latent label model:

    ![GLAD latent label model](https://tlk.s3.yandex.net/crowd-kit/docs/glad_llm.png)

    The prior probability of $z_j$ being equal to $c$ is $\operatorname{Pr}(z_j = c) = p[c]$,
    and the probability distribution of the worker responses with the true label $c$ follows the
    single coin Dawid-Skene model where the true label probability is a sigmoid function of the product of the
    worker ability and the inverse task difficulty:

    $\operatorname{Pr}(y^i_j = k | z_j = c) = \begin{cases}
        a(i, j), & k = c \\
        \frac{1 - a(i,j)}{K-1}, & k \neq c
    \end{cases}$,

    where $a(i,j) = \frac{1}{1 + \exp(-\alpha_i\beta_j)}$.

    Parameters $p$, $\alpha$, $\beta$, and latent variables $z$ are optimized with the Expectation-Minimization algorithm:
    1. **E-step**. Estimates the true task label probabilities using the alpha parameters of workers' abilities,
        the prior label probabilities, and the beta parameters of task difficulty.
    2. **M-step**. Optimizes the alpha and beta parameters using the conjugate gradient method.

    J. Whitehill, P. Ruvolo, T. Wu, J. Bergsma, and J. Movellan.
    Whose Vote Should Count More: Optimal Integration of Labels from Labelers of Unknown Expertise.
    *Proceedings of the 22nd International Conference on Neural Information Processing Systems*, 2009

    <https://proceedings.neurips.cc/paper/2009/file/f899139df5e1059396431415e770c6dd-Paper.pdf>

    Examples:
        >>> from crowdkit.aggregation import GLAD
        >>> from crowdkit.datasets import load_dataset
        >>> df, gt = load_dataset('relevance-2')
        >>> glad = GLAD()
        >>> result = glad.fit_predict(df)
    """

    n_iter: int = attr.ib(default=100)
    """The maximum number of EM iterations."""

    tol: float = attr.ib(default=1e-5)
    """The tolerance stopping criterion for iterative methods with a variable number of steps.
    The algorithm converges when the loss change is less than the `tol` parameter."""

    silent: bool = attr.ib(default=True)
    """Specifies if the progress bar will be shown (false) or not (true)."""

    labels_priors: Optional["pd.Series[Any]"] = attr.ib(default=None)
    """The prior label probabilities."""

    alphas_priors_mean: Optional["pd.Series[Any]"] = attr.ib(default=None)
    """The prior mean value of the alpha parameters."""

    betas_priors_mean: Optional["pd.Series[Any]"] = attr.ib(default=None)
    """The prior mean value of the beta parameters."""

    m_step_max_iter: int = attr.ib(default=25)
    """The maximum number of iterations of the conjugate gradient method in the M-step."""

    m_step_tol: float = attr.ib(default=1e-2)
    """The tolerance stopping criterion of the conjugate gradient method in the M-step."""

    probas_: Optional[pd.DataFrame] = attr.ib(init=False)
    """The probability distributions of task labels.
    The data frame is indexed by `task` so that `result.loc[task, label]` is the probability that the `task`
    true label is equal to `label`. Each probability is in the range from 0 to 1, all task probabilities
    must sum up to 1."""

    alphas_: "pd.Series[Any]" = named_series_attrib(name="alpha")
    """The alpha parameters of workers' abilities.
    The `pandas.Series` data is indexed by `worker` that contains the estimated alpha parameters."""

    betas_: "pd.Series[Any]" = named_series_attrib(name="beta")
    """The beta parameters of task difficulty.
    The `pandas.Series` data is indexed by `task` that contains the estimated beta parameters."""

    loss_history_: List[float] = attr.ib(init=False)
    """A list of loss values during training."""

    def _join_all(
        self,
        data: pd.DataFrame,
        alphas: "pd.Series[Any]",
        betas: "pd.Series[Any]",
        priors: "pd.Series[Any]",
    ) -> pd.DataFrame:
        """Makes a data frame with format `(task, worker, label, variable) -> (alpha, beta, posterior, delta)`"""
        labels = list(priors.index)
        data = data.set_index("task")
        data[labels] = 0
        data.reset_index(inplace=True)
        data = data.melt(
            id_vars=["task", "worker", "label"],
            value_vars=labels,
            value_name="posterior",
        )
        data = data.set_index("variable")
        data.reset_index(inplace=True)
        data.set_index("task", inplace=True)
        data["beta"] = betas
        data = data.reset_index().set_index("worker")
        data["alpha"] = alphas
        data.reset_index(inplace=True)
        data["delta"] = data["label"] == data["variable"]
        return data

    def _e_step(self, data: pd.DataFrame) -> pd.DataFrame:
        """
        Performs E-step of GLAD algorithm.

        Estimates the true task label probabilities using the alpha parameters of workers' abilities,
        the prior label probabilities, and the beta parameters of task difficulty.
        """
        alpha_beta = data["alpha"] * np.exp(data["beta"])
        log_sigma = -self._softplus(-alpha_beta)
        log_one_minus_sigma = -self._softplus(alpha_beta)
        data["posterior"] = data["delta"] * log_sigma + (1 - data["delta"]) * (
            log_one_minus_sigma - np.log(len(self.prior_labels_) - 1)
        )
        # sum up by workers
        probas = data.groupby(["task", "variable"]).sum(numeric_only=True)["posterior"]
        # add priors to every label
        probas = probas.add(np.log(cast("pd.Series[Any]", self.priors_)), level=1)
        # exponentiate and normalize
        probas = probas.groupby(["task"]).transform(self._softmax)
        # put posterior in data['posterior']
        probas.name = "posterior"
        data = pd.merge(
            data.drop("posterior", axis=1), probas, on=["task", "variable"], copy=False  # type: ignore[call-arg,unused-ignore]
        )

        self.probas_ = probas.unstack()
        return data

    def _gradient_Q(
        self, data: pd.DataFrame
    ) -> Tuple["pd.Series[Any]", "pd.Series[Any]"]:
        """Computes gradient of loss function"""

        sigma = scipy.special.expit(data["alpha"] * np.exp(data["beta"]))
        # multiply by exponent of beta because of beta -> exp(beta) reparameterization
        data["dQb"] = (
            data["posterior"]
            * (data["delta"] - sigma)
            * data["alpha"]
            * np.exp(data["beta"])
        )
        dQbeta = data.groupby("task").sum(numeric_only=True)["dQb"]

        # gradient of priors on betas
        assert self.betas_ is not None, "betas_ is None"
        assert self.betas_priors_mean_ is not None, "betas_priors_mean_ is None"
        dQbeta -= self.betas_ - self.betas_priors_mean_

        data["dQa"] = data["posterior"] * (data["delta"] - sigma) * np.exp(data["beta"])
        dQalpha = data.groupby("worker").sum(numeric_only=True)["dQa"]
        # gradient of priors on alphas
        assert self.alphas_ is not None, "alphas_ is None"
        assert self.alphas_priors_mean_ is not None, "alphas_priors_mean_ is None"
        dQalpha -= self.alphas_ - self.alphas_priors_mean_
        return dQalpha, dQbeta

    def _compute_Q(self, data: pd.DataFrame) -> float:
        """Computes loss function"""

        alpha_beta = data["alpha"] * np.exp(data["beta"])
        log_sigma = -self._softplus(-alpha_beta)
        log_one_minus_sigma = -self._softplus(alpha_beta)
        data["task_expectation"] = data["posterior"] * (
            data["delta"] * log_sigma
            + (1 - data["delta"])
            * (log_one_minus_sigma - np.log(len(self.prior_labels_) - 1))
        )
        Q = data["task_expectation"].sum()

        # priors on alphas and betas
        assert self.alphas_ is not None, "alphas_ is None"
        assert self.alphas_priors_mean_ is not None, "alphas_priors_mean_ is None"
        assert self.betas_ is not None, "betas_ is None"
        assert self.betas_priors_mean_ is not None, "betas_priors_mean_ is None"
        Q += np.log(scipy.stats.norm.pdf(self.alphas_ - self.alphas_priors_mean_)).sum()
        Q += np.log(scipy.stats.norm.pdf(self.betas_ - self.betas_priors_mean_)).sum()
        if np.isnan(Q):
            return -np.inf
        return float(Q)

    def _optimize_f(self, x: npt.NDArray[Any]) -> float:
        """Computes loss by parameters represented by numpy array"""
        alpha, beta = self._get_alphas_betas_by_point(x)
        self._update_alphas_betas(alpha, beta)
        return -self._compute_Q(self._current_data)

    def _optimize_df(self, x: npt.NDArray[Any]) -> npt.NDArray[Any]:
        """Computes loss gradient by parameters represented by numpy array"""
        alpha, beta = self._get_alphas_betas_by_point(x)
        self._update_alphas_betas(alpha, beta)
        dQalpha, dQbeta = self._gradient_Q(self._current_data)

        minus_grad = np.zeros_like(x)
        minus_grad[: len(self.workers_)] = -dQalpha[self.workers_].values  # type: ignore
        minus_grad[len(self.workers_) :] = -dQbeta[self.tasks_].values  # type: ignore
        return minus_grad

    def _update_alphas_betas(
        self, alphas: "pd.Series[Any]", betas: "pd.Series[Any]"
    ) -> None:
        self.alphas_ = alphas
        self.betas_ = betas
        self._current_data.set_index("worker", inplace=True)
        self._current_data["alpha"] = alphas
        self._current_data.reset_index(inplace=True)
        self._current_data.set_index("task", inplace=True)
        self._current_data["beta"] = betas
        self._current_data.reset_index(inplace=True)

    def _get_alphas_betas_by_point(
        self, x: npt.NDArray[Any]
    ) -> Tuple["pd.Series[Any]", "pd.Series[Any]"]:
        alphas = pd.Series(x[: len(self.workers_)], index=self.workers_, name="alpha")  # type: ignore
        alphas.index.name = "worker"
        betas = pd.Series(x[len(self.workers_) :], index=self.tasks_, name="beta")  # type: ignore
        betas.index.name = "task"
        return alphas, betas

    def _m_step(self, data: pd.DataFrame) -> pd.DataFrame:
        """Optimizes the alpha and beta parameters using the conjugate gradient method."""
        x_0 = np.concatenate([self.alphas_.values, self.betas_.values])
        self._current_data = data
        res = minimize(
            self._optimize_f,
            x_0,
            method="CG",
            jac=self._optimize_df,
            tol=self.m_step_tol,
            options={"disp": False, "maxiter": self.m_step_max_iter},
        )
        self.alphas_, self.betas_ = self._get_alphas_betas_by_point(res.x)
        self._update_alphas_betas(self.alphas_, self.betas_)
        return self._current_data

    def _init(self, data: pd.DataFrame) -> None:
        self.alphas_ = pd.Series(1.0, index=pd.unique(data.worker))  # type: ignore
        self.betas_ = pd.Series(1.0, index=pd.unique(data.task))  # type: ignore
        self.tasks_ = pd.unique(data["task"])
        self.workers_ = pd.unique(data["worker"])
        self.priors_ = self.labels_priors
        if self.priors_ is None:
            self.prior_labels_ = pd.unique(data["label"])
            self.priors_ = pd.Series(
                1.0 / len(self.prior_labels_), index=self.prior_labels_  # type: ignore
            )
        else:
            self.prior_labels_ = self.priors_.index  # type: ignore
        self.alphas_priors_mean_ = self.alphas_priors_mean
        if self.alphas_priors_mean_ is None:
            self.alphas_priors_mean_ = pd.Series(1.0, index=self.alphas_.index)
        self.betas_priors_mean_ = self.betas_priors_mean
        if self.betas_priors_mean_ is None:
            self.betas_priors_mean_ = pd.Series(1.0, index=self.betas_.index)

    @staticmethod
    def _softplus(x: "pd.Series[Any]", limit: int = 30) -> npt.NDArray[Any]:
        """log(1 + exp(x)) stable version

        For x > 30 or x < -30 error is less than 1e-13
        """
        positive_mask = x > limit
        negative_mask = x < -limit
        mask = positive_mask | negative_mask
        return cast(
            npt.NDArray[Any],
            np.log1p(np.exp(x * (1 - mask))) * (1 - mask) + x * positive_mask,
        )

    # backport for scipy < 1.12.0
    @staticmethod
    def _softmax(x: npt.NDArray[Any]) -> npt.NDArray[Any]:
        return cast(npt.NDArray[Any], np.exp(x - logsumexp(x, keepdims=True)))

    def fit(self, data: pd.DataFrame) -> "GLAD":
        """Fits the model to the training data with the EM algorithm.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        Returns:
            GLAD: self.
        """

        # Initialization
        data = data.filter(["task", "worker", "label"])
        self._init(data)

        assert self.alphas_ is not None, "no alphas_"
        assert self.betas_ is not None, "no betas_"
        assert self.priors_ is not None, "no priors_"

        data = self._join_all(data, self.alphas_, self.betas_, self.priors_)
        data = self._e_step(data)
        Q = self._compute_Q(data)

        self.loss_history_ = []

        def iteration_progress() -> Tuple[Iterator[int], Optional["tqdm[int]"]]:
            if self.silent:
                return iter(range(self.n_iter)), None
            else:
                trange_ = trange(self.n_iter, desc="Iterations")
                return iter(trange_), trange_

        iterator, pbar = iteration_progress()

        for _ in iterator:
            last_Q = Q
            if not self.silent:
                assert isinstance(pbar, tqdm)
                pbar.set_description(f"Q = {round(Q, 4)}")

            # E-step
            data = self._e_step(data)

            # M-step
            data = self._m_step(data)

            Q = self._compute_Q(data) / len(data)

            self.loss_history_.append(Q)
            if Q - last_Q < self.tol:
                break

        self.labels_ = cast(pd.DataFrame, self.probas_).idxmax(axis=1)
        return self

    def fit_predict_proba(self, data: pd.DataFrame) -> pd.DataFrame:
        """Fits the model to the training data and returns probability distributions of labels for each task.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        Returns:
            DataFrame: Probability distributions of task labels.
                The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is the probability that the `task` true label is equal to `label`.
                Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.
        """

        self.fit(data)
        assert self.probas_ is not None, "no probas_"
        return self.probas_

    def fit_predict(self, data: pd.DataFrame) -> "pd.Series[Any]":
        """Fits the model to the training data and returns the aggregated results.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        Returns:
            Series: Task labels. The `pandas.Series` data is indexed by `task`
                so that `labels.loc[task]` is the most likely true label of tasks.
        """

        self.fit(data)
        assert self.labels_ is not None, "no labels_"
        return self.labels_

alphas_ = named_series_attrib(name='alpha') class-attribute instance-attribute

The alpha parameters of workers' abilities. The pandas.Series data is indexed by worker that contains the estimated alpha parameters.

alphas_priors_mean = attr.ib(default=None) class-attribute instance-attribute

The prior mean value of the alpha parameters.

betas_ = named_series_attrib(name='beta') class-attribute instance-attribute

The beta parameters of task difficulty. The pandas.Series data is indexed by task that contains the estimated beta parameters.

betas_priors_mean = attr.ib(default=None) class-attribute instance-attribute

The prior mean value of the beta parameters.

labels_priors = attr.ib(default=None) class-attribute instance-attribute

The prior label probabilities.

loss_history_ = attr.ib(init=False) class-attribute instance-attribute

A list of loss values during training.

m_step_max_iter = attr.ib(default=25) class-attribute instance-attribute

The maximum number of iterations of the conjugate gradient method in the M-step.

m_step_tol = attr.ib(default=0.01) class-attribute instance-attribute

The tolerance stopping criterion of the conjugate gradient method in the M-step.

n_iter = attr.ib(default=100) class-attribute instance-attribute

The maximum number of EM iterations.

probas_ = attr.ib(init=False) class-attribute instance-attribute

The probability distributions of task labels. The data frame is indexed by task so that result.loc[task, label] is the probability that the task true label is equal to label. Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.

silent = attr.ib(default=True) class-attribute instance-attribute

Specifies if the progress bar will be shown (false) or not (true).

tol = attr.ib(default=1e-05) class-attribute instance-attribute

The tolerance stopping criterion for iterative methods with a variable number of steps. The algorithm converges when the loss change is less than the tol parameter.

fit(data)

Fits the model to the training data with the EM algorithm.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required

Returns:

Name Type Description
GLAD GLAD

self.

Source code in crowdkit/aggregation/classification/glad.py
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
def fit(self, data: pd.DataFrame) -> "GLAD":
    """Fits the model to the training data with the EM algorithm.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

    Returns:
        GLAD: self.
    """

    # Initialization
    data = data.filter(["task", "worker", "label"])
    self._init(data)

    assert self.alphas_ is not None, "no alphas_"
    assert self.betas_ is not None, "no betas_"
    assert self.priors_ is not None, "no priors_"

    data = self._join_all(data, self.alphas_, self.betas_, self.priors_)
    data = self._e_step(data)
    Q = self._compute_Q(data)

    self.loss_history_ = []

    def iteration_progress() -> Tuple[Iterator[int], Optional["tqdm[int]"]]:
        if self.silent:
            return iter(range(self.n_iter)), None
        else:
            trange_ = trange(self.n_iter, desc="Iterations")
            return iter(trange_), trange_

    iterator, pbar = iteration_progress()

    for _ in iterator:
        last_Q = Q
        if not self.silent:
            assert isinstance(pbar, tqdm)
            pbar.set_description(f"Q = {round(Q, 4)}")

        # E-step
        data = self._e_step(data)

        # M-step
        data = self._m_step(data)

        Q = self._compute_Q(data) / len(data)

        self.loss_history_.append(Q)
        if Q - last_Q < self.tol:
            break

    self.labels_ = cast(pd.DataFrame, self.probas_).idxmax(axis=1)
    return self

fit_predict(data)

Fits the model to the training data and returns the aggregated results.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required

Returns:

Name Type Description
Series Series[Any]

Task labels. The pandas.Series data is indexed by task so that labels.loc[task] is the most likely true label of tasks.

Source code in crowdkit/aggregation/classification/glad.py
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
def fit_predict(self, data: pd.DataFrame) -> "pd.Series[Any]":
    """Fits the model to the training data and returns the aggregated results.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

    Returns:
        Series: Task labels. The `pandas.Series` data is indexed by `task`
            so that `labels.loc[task]` is the most likely true label of tasks.
    """

    self.fit(data)
    assert self.labels_ is not None, "no labels_"
    return self.labels_

fit_predict_proba(data)

Fits the model to the training data and returns probability distributions of labels for each task.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required

Returns:

Name Type Description
DataFrame DataFrame

Probability distributions of task labels. The pandas.DataFrame data is indexed by task so that result.loc[task, label] is the probability that the task true label is equal to label. Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.

Source code in crowdkit/aggregation/classification/glad.py
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
def fit_predict_proba(self, data: pd.DataFrame) -> pd.DataFrame:
    """Fits the model to the training data and returns probability distributions of labels for each task.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

    Returns:
        DataFrame: Probability distributions of task labels.
            The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is the probability that the `task` true label is equal to `label`.
            Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.
    """

    self.fit(data)
    assert self.probas_ is not None, "no probas_"
    return self.probas_

GoldMajorityVote

Bases: BaseClassificationAggregator

The Gold Majority Vote model is used when a golden dataset (ground truth) exists for some tasks. It calculates the probability of a correct label for each worker based on the golden set. After that, the sum of the probabilities of each label is calculated for each task. The correct label is the one with the greatest sum of the probabilities.

For example, you have 10 000 tasks completed by 3 000 different workers. And you have 100 tasks where you already know the ground truth labels. First, you can call fit to calculate the percentage of correct labels for each worker. And then call predict to calculate labels for your 10 000 tasks.

The following rules must be observed: 1. All workers must complete at least one task from the golden dataset. 2. All workers from the dataset that is submitted to predict must be included in the response dataset that is submitted to fit.

Examples:

>>> import pandas as pd
>>> from crowdkit.aggregation import GoldMajorityVote
>>> df = pd.DataFrame(
>>>     [
>>>         ['t1', 'p1', 0],
>>>         ['t1', 'p2', 0],
>>>         ['t1', 'p3', 1],
>>>         ['t2', 'p1', 1],
>>>         ['t2', 'p2', 0],
>>>         ['t2', 'p3', 1],
>>>     ],
>>>     columns=['task', 'worker', 'label']
>>> )
>>> true_labels = pd.Series({'t1': 0})
>>> gold_mv = GoldMajorityVote()
>>> result = gold_mv.fit_predict(df, true_labels)

Attributes:

Name Type Description
labels_ Optional[Series]

The task labels. The pandas.Series data is indexed by task so that labels.loc[task] is the most likely true label of tasks.

skills_ Optional[Series]

The workers' skills. The pandas.Series data is indexed by worker and has the corresponding worker skill.

probas_ Optional[DataFrame]

The probability distributions of task labels. The pandas.DataFrame data is indexed by task so that result.loc[task, label] is the probability that the task true label is equal to label. Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.

Source code in crowdkit/aggregation/classification/gold_majority_vote.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
@attr.s
class GoldMajorityVote(BaseClassificationAggregator):
    r"""The **Gold Majority Vote** model is used when a golden dataset (ground truth) exists for some tasks.
    It calculates the probability of a correct label for each worker based on the golden set.
    After that, the sum of the probabilities of each label is calculated for each task.
    The correct label is the one with the greatest sum of the probabilities.

    For example, you have 10 000 tasks completed by 3 000 different workers. And you have 100 tasks where you already
    know the ground truth labels. First, you can call `fit` to calculate the percentage of correct labels for each worker.
    And then call `predict` to calculate labels for your 10 000 tasks.

    The following rules must be observed:
    1. All workers must complete at least one task from the golden dataset.
    2. All workers from the dataset that is submitted to `predict` must be included in the response dataset that is submitted to `fit`.

    Examples:
        >>> import pandas as pd
        >>> from crowdkit.aggregation import GoldMajorityVote
        >>> df = pd.DataFrame(
        >>>     [
        >>>         ['t1', 'p1', 0],
        >>>         ['t1', 'p2', 0],
        >>>         ['t1', 'p3', 1],
        >>>         ['t2', 'p1', 1],
        >>>         ['t2', 'p2', 0],
        >>>         ['t2', 'p3', 1],
        >>>     ],
        >>>     columns=['task', 'worker', 'label']
        >>> )
        >>> true_labels = pd.Series({'t1': 0})
        >>> gold_mv = GoldMajorityVote()
        >>> result = gold_mv.fit_predict(df, true_labels)

    Attributes:
        labels_ (typing.Optional[pandas.core.series.Series]): The task labels. The `pandas.Series` data is indexed by `task`
            so that `labels.loc[task]` is the most likely true label of tasks.

        skills_ (typing.Optional[pandas.core.series.Series]): The workers' skills. The `pandas.Series` data is indexed by `worker`
            and has the corresponding worker skill.

        probas_ (typing.Optional[pandas.core.frame.DataFrame]): The probability distributions of task labels.
            The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]`
            is the probability that the `task` true label is equal to `label`. Each
            probability is in the range from 0 to 1, all task probabilities must sum up to 1.
    """

    # Available after fit
    skills_: Optional["pd.Series[Any]"] = named_series_attrib(name="skill")

    # Available after predict or predict_proba
    # labels_
    probas_: Optional[pd.DataFrame] = attr.ib(init=False)

    def _apply(self, data: pd.DataFrame) -> "GoldMajorityVote":
        check_is_fitted(self, attributes="skills_")
        mv = MajorityVote().fit(data, self.skills_)
        self.labels_ = mv.labels_
        self.probas_ = mv.probas_
        return self

    def fit(self, data: pd.DataFrame, true_labels: "pd.Series[Any]") -> "GoldMajorityVote":  # type: ignore
        """Fits the model to the training data.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

            true_labels (Series): The ground truth labels of tasks. The `pandas.Series` data is indexed by `task`
                so that `labels.loc[task]` is the task ground truth label.

        Returns:
            GoldMajorityVote: self.
        """

        data = data[["task", "worker", "label"]]
        self.skills_ = get_accuracy(data, true_labels=true_labels, by="worker")
        return self

    def predict(self, data: pd.DataFrame) -> "pd.Series[Any]":
        """Predicts the true labels of tasks when the model is fitted.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        Returns:
            Series: The task labels. The `pandas.Series` data is indexed by `task`
                so that `labels.loc[task]` is the most likely true label of tasks.
        """

        self._apply(data)
        assert self.labels_ is not None, "no labels_"
        return self.labels_

    def predict_proba(self, data: pd.DataFrame) -> pd.DataFrame:
        """Returns probability distributions of labels for each task when the model is fitted.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        Returns:
            DataFrame: Probability distributions of task labels.
                The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is the probability that the `task` true label is equal to `label`.
                Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.
        """

        self._apply(data)
        assert self.probas_ is not None, "no probas_"
        return self.probas_

    def fit_predict(self, data: pd.DataFrame, true_labels: "pd.Series[Any]") -> "pd.Series[Any]":  # type: ignore
        """Fits the model to the training data and returns the aggregated results.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

            true_labels (Series): The ground truth labels of tasks. The `pandas.Series` data is indexed by `task`
                so that `labels.loc[task]` is the task ground truth label.

        Returns:
            Series: The task labels. The `pandas.Series` data is indexed by `task`
                so that `labels.loc[task]` is the most likely true label of tasks.
        """

        return self.fit(data, true_labels).predict(data)

    def fit_predict_proba(
        self, data: pd.DataFrame, true_labels: "pd.Series[Any]"
    ) -> pd.DataFrame:
        """Fits the model to the training data and returns probability distributions of labels for each task.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

            true_labels (Series): The ground truth labels of tasks. The `pandas.Series` data is indexed by `task`
                so that `labels.loc[task]` is the task ground truth label.

        Returns:
            DataFrame: Probability distributions of task labels.
                The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is the probability that the `task` true label is equal to `label`.
                Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.
        """

        return self.fit(data, true_labels).predict_proba(data)

fit(data, true_labels)

Fits the model to the training data.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required
true_labels Series

The ground truth labels of tasks. The pandas.Series data is indexed by task so that labels.loc[task] is the task ground truth label.

required

Returns:

Name Type Description
GoldMajorityVote GoldMajorityVote

self.

Source code in crowdkit/aggregation/classification/gold_majority_vote.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def fit(self, data: pd.DataFrame, true_labels: "pd.Series[Any]") -> "GoldMajorityVote":  # type: ignore
    """Fits the model to the training data.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        true_labels (Series): The ground truth labels of tasks. The `pandas.Series` data is indexed by `task`
            so that `labels.loc[task]` is the task ground truth label.

    Returns:
        GoldMajorityVote: self.
    """

    data = data[["task", "worker", "label"]]
    self.skills_ = get_accuracy(data, true_labels=true_labels, by="worker")
    return self

fit_predict(data, true_labels)

Fits the model to the training data and returns the aggregated results.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required
true_labels Series

The ground truth labels of tasks. The pandas.Series data is indexed by task so that labels.loc[task] is the task ground truth label.

required

Returns:

Name Type Description
Series Series[Any]

The task labels. The pandas.Series data is indexed by task so that labels.loc[task] is the most likely true label of tasks.

Source code in crowdkit/aggregation/classification/gold_majority_vote.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def fit_predict(self, data: pd.DataFrame, true_labels: "pd.Series[Any]") -> "pd.Series[Any]":  # type: ignore
    """Fits the model to the training data and returns the aggregated results.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        true_labels (Series): The ground truth labels of tasks. The `pandas.Series` data is indexed by `task`
            so that `labels.loc[task]` is the task ground truth label.

    Returns:
        Series: The task labels. The `pandas.Series` data is indexed by `task`
            so that `labels.loc[task]` is the most likely true label of tasks.
    """

    return self.fit(data, true_labels).predict(data)

fit_predict_proba(data, true_labels)

Fits the model to the training data and returns probability distributions of labels for each task.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required
true_labels Series

The ground truth labels of tasks. The pandas.Series data is indexed by task so that labels.loc[task] is the task ground truth label.

required

Returns:

Name Type Description
DataFrame DataFrame

Probability distributions of task labels. The pandas.DataFrame data is indexed by task so that result.loc[task, label] is the probability that the task true label is equal to label. Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.

Source code in crowdkit/aggregation/classification/gold_majority_vote.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def fit_predict_proba(
    self, data: pd.DataFrame, true_labels: "pd.Series[Any]"
) -> pd.DataFrame:
    """Fits the model to the training data and returns probability distributions of labels for each task.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        true_labels (Series): The ground truth labels of tasks. The `pandas.Series` data is indexed by `task`
            so that `labels.loc[task]` is the task ground truth label.

    Returns:
        DataFrame: Probability distributions of task labels.
            The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is the probability that the `task` true label is equal to `label`.
            Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.
    """

    return self.fit(data, true_labels).predict_proba(data)

predict(data)

Predicts the true labels of tasks when the model is fitted.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required

Returns:

Name Type Description
Series Series[Any]

The task labels. The pandas.Series data is indexed by task so that labels.loc[task] is the most likely true label of tasks.

Source code in crowdkit/aggregation/classification/gold_majority_vote.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def predict(self, data: pd.DataFrame) -> "pd.Series[Any]":
    """Predicts the true labels of tasks when the model is fitted.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

    Returns:
        Series: The task labels. The `pandas.Series` data is indexed by `task`
            so that `labels.loc[task]` is the most likely true label of tasks.
    """

    self._apply(data)
    assert self.labels_ is not None, "no labels_"
    return self.labels_

predict_proba(data)

Returns probability distributions of labels for each task when the model is fitted.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required

Returns:

Name Type Description
DataFrame DataFrame

Probability distributions of task labels. The pandas.DataFrame data is indexed by task so that result.loc[task, label] is the probability that the task true label is equal to label. Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.

Source code in crowdkit/aggregation/classification/gold_majority_vote.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def predict_proba(self, data: pd.DataFrame) -> pd.DataFrame:
    """Returns probability distributions of labels for each task when the model is fitted.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

    Returns:
        DataFrame: Probability distributions of task labels.
            The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is the probability that the `task` true label is equal to `label`.
            Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.
    """

    self._apply(data)
    assert self.probas_ is not None, "no probas_"
    return self.probas_

KOS

Bases: BaseClassificationAggregator

The KOS (Karger, Oh, and Shah 2011) aggregation model is an iterative algorithm that calculates the log-likelihood of the task being positive while modeling the worker reliability.

Let \(A_{ij}\) be a matrix of the responses of a worker \(j\) on a task \(i\). If the worker \(j\) does not respond to the task \(i\), then \(A_{ij} = 0\). Otherwise, \(|A_{ij}| = 1\). The algorithm operates on real-valued task messages \(x_{i \rightarrow j}\) and worker messages \(y_{j \rightarrow i}\). A task message \(x_{i \rightarrow j}\) represents the log-likelihood of task \(i\) being a positive task, and a worker message \(y_{j \rightarrow i}\) represents how reliable worker \(j\) is.

At \(k\)-th iteration, the values are updated as follows: \(x_{i \rightarrow j}^{(k)} = \sum_{j^{'} \in \partial i \backslash j} A_{ij^{'}} y_{j^{'} \rightarrow i}^{(k-1)}\) and \(y_{j \rightarrow i}^{(k)} = \sum_{i^{'} \in \partial j \backslash i} A_{i^{'}j} x_{i^{'} \rightarrow j}^{(k-1)}\).

David R. Karger, Sewoong Oh, and Devavrat Shah. Budget-Optimal Task Allocation for Reliable Crowdsourcing Systems. Operations Research 62.1 (2014), 1-38.

https://arxiv.org/abs/1110.3564

Examples:

>>> from crowdkit.aggregation import KOS
>>> from crowdkit.datasets import load_dataset
>>> df, gt = load_dataset('relevance-2')
>>> ds = KOS(10)
>>> result = ds.fit_predict(df)
Source code in crowdkit/aggregation/classification/kos.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
@attr.s
class KOS(BaseClassificationAggregator):
    r"""The **KOS** (Karger, Oh, and Shah 2011) aggregation model is an iterative algorithm that calculates the log-likelihood of the task being positive while modeling
    the worker reliability.

    Let $A_{ij}$ be a matrix of the responses of a worker $j$ on a task $i$.
    If the worker $j$ does not respond to the task $i$, then $A_{ij} = 0$. Otherwise, $|A_{ij}| = 1$.
    The algorithm operates on real-valued task messages $x_{i \rightarrow j}$  and
    worker messages $y_{j \rightarrow i}$. A task message $x_{i \rightarrow j}$ represents
    the log-likelihood of task $i$ being a positive task, and a worker message $y_{j \rightarrow i}$ represents
    how reliable worker $j$ is.

    At $k$-th iteration, the values are updated as follows:
    $x_{i \rightarrow j}^{(k)} = \sum_{j^{'} \in \partial i \backslash j} A_{ij^{'}} y_{j^{'} \rightarrow i}^{(k-1)}$
    and
    $y_{j \rightarrow i}^{(k)} = \sum_{i^{'} \in \partial j \backslash i} A_{i^{'}j} x_{i^{'} \rightarrow j}^{(k-1)}$.

    David R. Karger, Sewoong Oh, and Devavrat Shah. Budget-Optimal Task Allocation for Reliable Crowdsourcing Systems.
    *Operations Research 62.1 (2014)*, 1-38.

    <https://arxiv.org/abs/1110.3564>

    Examples:
        >>> from crowdkit.aggregation import KOS
        >>> from crowdkit.datasets import load_dataset
        >>> df, gt = load_dataset('relevance-2')
        >>> ds = KOS(10)
        >>> result = ds.fit_predict(df)
    """

    n_iter: int = attr.ib(default=100)
    """The maximum number of iterations."""

    random_state: int = attr.ib(default=0)
    """The state of the random number generator."""

    def fit(self, data: pd.DataFrame) -> "KOS":
        """Fits the model to the training data.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        Returns:
            KOS: self.
        """

        np.random.seed(self.random_state)

        # Early exit
        if not data.size:
            self.labels_ = pd.Series([], dtype="O")
            return self

        # Initialization
        kos_data = data.copy()
        labels = kos_data.label.unique()
        if len(labels) != 2:
            raise ValueError(
                "KOS aggregation method is for binary classification only."
            )
        mapping = {labels[0]: 1, labels[1]: -1}
        kos_data.label = kos_data.label.apply(lambda x: mapping[x])
        kos_data["reliabilities"] = np.random.normal(loc=1, scale=1, size=len(kos_data))

        # Updating reliabilities
        for _ in range(self.n_iter):
            # Update inferred labels for (task, worker)
            kos_data["multiplied"] = kos_data.label * kos_data.reliabilities
            kos_data["summed"] = list(
                kos_data.groupby("task")["multiplied"].sum()[kos_data.task]
            )
            # Early exit to prevent NaN
            if (np.abs(kos_data["summed"]) > _MAX).any():
                break
            kos_data["inferred"] = (kos_data["summed"] - kos_data["multiplied"]).astype(
                float
            )

            # Update reliabilities for (task, worker)
            kos_data["multiplied"] = kos_data.label * kos_data.inferred
            kos_data["summed"] = list(
                kos_data.groupby("worker")["multiplied"].sum()[kos_data.worker]
            )
            # Early exit to prevent NaN
            if (np.abs(kos_data["summed"]) > _MAX).any():
                break
            kos_data["reliabilities"] = (kos_data.summed - kos_data.multiplied).astype(
                "float"
            )

        kos_data["inferred"] = kos_data.label * kos_data.reliabilities
        inferred_labels = np.sign(kos_data.groupby("task")["inferred"].sum())
        back_mapping = {v: k for k, v in mapping.items()}
        self.labels_ = cast(
            "pd.Series[Any]", inferred_labels.apply(lambda x: back_mapping[x])
        )
        return self

    def fit_predict(self, data: pd.DataFrame) -> "pd.Series[Any]":
        """Fits the model to the training data and returns the aggregated results.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        Returns:
            Series: The task labels. The `pandas.Series` data is indexed by `task`
                so that `labels.loc[task]` is the most likely true label of tasks.
        """

        self.fit(data)
        assert self.labels_ is not None, "no labels_"
        return self.labels_

n_iter = attr.ib(default=100) class-attribute instance-attribute

The maximum number of iterations.

random_state = attr.ib(default=0) class-attribute instance-attribute

The state of the random number generator.

fit(data)

Fits the model to the training data.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required

Returns:

Name Type Description
KOS KOS

self.

Source code in crowdkit/aggregation/classification/kos.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def fit(self, data: pd.DataFrame) -> "KOS":
    """Fits the model to the training data.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

    Returns:
        KOS: self.
    """

    np.random.seed(self.random_state)

    # Early exit
    if not data.size:
        self.labels_ = pd.Series([], dtype="O")
        return self

    # Initialization
    kos_data = data.copy()
    labels = kos_data.label.unique()
    if len(labels) != 2:
        raise ValueError(
            "KOS aggregation method is for binary classification only."
        )
    mapping = {labels[0]: 1, labels[1]: -1}
    kos_data.label = kos_data.label.apply(lambda x: mapping[x])
    kos_data["reliabilities"] = np.random.normal(loc=1, scale=1, size=len(kos_data))

    # Updating reliabilities
    for _ in range(self.n_iter):
        # Update inferred labels for (task, worker)
        kos_data["multiplied"] = kos_data.label * kos_data.reliabilities
        kos_data["summed"] = list(
            kos_data.groupby("task")["multiplied"].sum()[kos_data.task]
        )
        # Early exit to prevent NaN
        if (np.abs(kos_data["summed"]) > _MAX).any():
            break
        kos_data["inferred"] = (kos_data["summed"] - kos_data["multiplied"]).astype(
            float
        )

        # Update reliabilities for (task, worker)
        kos_data["multiplied"] = kos_data.label * kos_data.inferred
        kos_data["summed"] = list(
            kos_data.groupby("worker")["multiplied"].sum()[kos_data.worker]
        )
        # Early exit to prevent NaN
        if (np.abs(kos_data["summed"]) > _MAX).any():
            break
        kos_data["reliabilities"] = (kos_data.summed - kos_data.multiplied).astype(
            "float"
        )

    kos_data["inferred"] = kos_data.label * kos_data.reliabilities
    inferred_labels = np.sign(kos_data.groupby("task")["inferred"].sum())
    back_mapping = {v: k for k, v in mapping.items()}
    self.labels_ = cast(
        "pd.Series[Any]", inferred_labels.apply(lambda x: back_mapping[x])
    )
    return self

fit_predict(data)

Fits the model to the training data and returns the aggregated results.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required

Returns:

Name Type Description
Series Series[Any]

The task labels. The pandas.Series data is indexed by task so that labels.loc[task] is the most likely true label of tasks.

Source code in crowdkit/aggregation/classification/kos.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def fit_predict(self, data: pd.DataFrame) -> "pd.Series[Any]":
    """Fits the model to the training data and returns the aggregated results.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

    Returns:
        Series: The task labels. The `pandas.Series` data is indexed by `task`
            so that `labels.loc[task]` is the most likely true label of tasks.
    """

    self.fit(data)
    assert self.labels_ is not None, "no labels_"
    return self.labels_

MACE

Bases: BaseClassificationAggregator

The Multi-Annotator Competence Estimation (MACE) model is a probabilistic model that associates each worker with a label probability distribution. A worker can be spamming on each task. If the worker is not spamming, they label a task correctly. If the worker is spamming, they answer according to their probability distribution.

We assume that the correct label \(T_i\) comes from a discrete uniform distribution. When a worker annotates a task, they are spamming with probability \(\operatorname{Bernoulli}(1 - \theta_j)\). \(S_{ij}\) specifies whether or not worker \(j\) is spamming on instance \(i\). Thus, if the worker is not spamming on the task, i.e. \(S_{ij} = 0\), their response is the true label, i.e. \(A_{ij} = T_i\). Otherwise, their response \(A_{ij}\) is drawn from a multinomial distribution with parameter vector \(\xi_j\).

MACE latent label model

The model can be enhanced by adding the Beta prior on \(\theta_j\) and the Diriclet prior on \(\xi_j\).

The marginal data likelihood is maximized with the Expectation-Maximization algorithm: 1. E-step. Performs n_restarts random restarts, and keeps the model with the best marginal data likelihood. 2. M-step. Smooths parameters by adding a fixed value smoothing to the fractional counts before normalizing. 3. Variational M-step. Employs Variational-Bayes (VB) training with symmetric Beta priors on \(\theta_j\) and symmetric Dirichlet priors on the strategy parameters \(\xi_j\).

D. Hovy, T. Berg-Kirkpatrick, A. Vaswani and E. Hovy. Learning Whom to Trust with MACE. In Proceedings of NAACL-HLT, Atlanta, GA, USA (2013), 1120–1130.

https://aclanthology.org/N13-1132.pdf

Examples:

>>> from crowdkit.aggregation import MACE
>>> from crowdkit.datasets import load_dataset
>>> df, gt = load_dataset('relevance-2')
>>> mace = MACE()
>>> result = mace.fit_predict(df)
Source code in crowdkit/aggregation/classification/mace.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
@attr.s
class MACE(BaseClassificationAggregator):
    r"""The **Multi-Annotator Competence Estimation** (MACE) model is a probabilistic model that associates each worker with a label probability distribution.
    A worker can be spamming on each task. If the worker is not spamming, they label a task correctly. If the worker is spamming, they answer according
    to their probability distribution.

    We assume that the correct label $T_i$ comes from a discrete uniform distribution. When a worker
    annotates a task, they are spamming with probability
    $\operatorname{Bernoulli}(1 - \theta_j)$. $S_{ij}$ specifies whether or not worker $j$ is spamming on instance $i$.
    Thus, if the worker is not spamming on the task, i.e. $S_{ij} = 0$, their response is the true label, i.e. $A_{ij} = T_i$.
    Otherwise, their response $A_{ij}$ is drawn from a multinomial distribution with parameter vector $\xi_j$.

    ![MACE latent label model](https://tlk.s3.yandex.net/crowd-kit/docs/mace_llm.png =500x630)

    The model can be enhanced by adding the Beta prior on $\theta_j$ and the Diriclet
    prior on $\xi_j$.

    The marginal data likelihood is maximized with the Expectation-Maximization algorithm:
    1. **E-step**. Performs `n_restarts` random restarts, and keeps the model with the best marginal data likelihood.
    2. **M-step**. Smooths parameters by adding a fixed value `smoothing` to the fractional counts before normalizing.
    3. **Variational M-step**. Employs Variational-Bayes (VB) training with symmetric Beta priors on $\theta_j$ and symmetric Dirichlet priors on the strategy parameters $\xi_j$.

    D. Hovy, T. Berg-Kirkpatrick, A. Vaswani and E. Hovy. Learning Whom to Trust with MACE.
    In *Proceedings of NAACL-HLT*, Atlanta, GA, USA (2013), 1120–1130.

    <https://aclanthology.org/N13-1132.pdf>

    Examples:
        >>> from crowdkit.aggregation import MACE
        >>> from crowdkit.datasets import load_dataset
        >>> df, gt = load_dataset('relevance-2')
        >>> mace = MACE()
        >>> result = mace.fit_predict(df)
    """

    n_restarts: int = attr.ib(default=10)
    """The number of optimization runs of the algorithms.
    The final parameters are those that gave the best log likelihood.
    If one run takes too long, this parameter can be set to 1."""

    n_iter: int = attr.ib(default=50)
    """The maximum number of EM iterations for each optimization run."""

    method: str = attr.ib(default="vb")
    """The method which is used for the M-step. Either 'vb' or 'em'.
    'vb' means optimization with Variational Bayes using priors.
    'em' means standard Expectation-Maximization algorithm."""

    smoothing: float = attr.ib(default=0.1)
    """The smoothing parameter for the normalization."""

    default_noise: float = attr.ib(default=0.5)
    """The default noise parameter for the initialization."""

    alpha: float = attr.ib(default=0.5)
    r"""The prior parameter for the Beta distribution on $\theta_j$."""

    beta: float = attr.ib(default=0.5)
    r"""The prior parameter for the Beta distribution on $\theta_j$."""

    random_state: int = attr.ib(default=0)
    """The state of the random number generator."""

    verbose: int = attr.ib(default=0)
    """The state of progress bar: 0 — no progress bar, 1 — only for restarts, 2 — for both restarts and optimization."""

    spamming_: NDArray[np.float64] = attr.ib(init=False)
    """The posterior distribution of workers' spamming states."""

    thetas_: NDArray[np.float64] = attr.ib(init=False)
    """The posterior distribution of workers' spamming labels."""

    theta_priors_: Optional[NDArray[np.float64]] = attr.ib(init=False)
    r"""The prior parameters for the Beta distribution on $\theta_j$."""

    strategy_priors_: Optional[NDArray[np.float64]] = attr.ib(init=False)
    r"""The prior parameters for the Diriclet distribution on $\xi_j$."""

    smoothing_: float = attr.ib(init=False)
    """The smoothing parameter."""

    probas_: Optional[pd.DataFrame] = attr.ib(init=False)
    """The probability distributions of task labels.
    The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is the probability that
    the `task` true label is equal to `label`. Each probability is in the range from 0 to 1,
    all task probabilities must sum up to 1."""

    def fit(self, data: pd.DataFrame) -> "MACE":
        """Fits the model to the training data.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        Returns:
            MACE: The fitted MACE model.
        """

        workers, worker_names = pd.factorize(data["worker"])
        labels, label_names = pd.factorize(data["label"])
        tasks, task_names = pd.factorize(data["task"])

        n_workers = len(worker_names)
        n_labels = len(label_names)

        self.smoothing_ = 0.01 / n_labels

        annotation = data.copy(deep=True)

        best_log_marginal_likelihood = -np.inf

        def restarts_progress() -> Iterator[int]:
            if self.verbose > 0:
                yield from trange(self.n_restarts, desc="Restarts")
            else:
                yield from range(self.n_restarts)

        for _ in restarts_progress():
            self._initialize(n_workers, n_labels)
            (
                log_marginal_likelihood,
                gold_label_marginals,
                strategy_expected_counts,
                knowing_expected_counts,
            ) = self._e_step(
                annotation,
                task_names,
                worker_names,
                label_names,
                tasks,
                workers,
                labels,
            )

            def iteration_progress() -> Tuple[Iterator[int], Optional["tqdm[int]"]]:
                if self.verbose > 1:
                    trange_ = trange(self.n_iter, desc="Iterations")
                    return iter(trange_), trange_
                else:
                    return iter(range(self.n_iter)), None

            iterator, pbar = iteration_progress()

            for _ in iterator:
                if self.method == "vb":
                    self._variational_m_step(
                        knowing_expected_counts, strategy_expected_counts
                    )
                else:
                    self._m_step(knowing_expected_counts, strategy_expected_counts)
                (
                    log_marginal_likelihood,
                    gold_label_marginals,
                    strategy_expected_counts,
                    knowing_expected_counts,
                ) = self._e_step(
                    annotation,
                    task_names,
                    worker_names,
                    label_names,
                    tasks,
                    workers,
                    labels,
                )
                if self.verbose > 1:
                    assert isinstance(pbar, tqdm)
                    pbar.set_postfix(
                        {"log_marginal_likelihood": round(log_marginal_likelihood, 5)}
                    )
            if log_marginal_likelihood > best_log_marginal_likelihood:
                best_log_marginal_likelihood = log_marginal_likelihood
                best_thetas = self.thetas_.copy()
                best_spamming = self.spamming_.copy()

        self.thetas_ = best_thetas
        self.spamming_ = best_spamming
        _, gold_label_marginals, _, _ = self._e_step(
            annotation, task_names, worker_names, label_names, tasks, workers, labels
        )

        self.probas_ = decode_distribution(gold_label_marginals)
        self.labels_ = self.probas_.idxmax(axis="columns")
        self.labels_.index.name = "task"

        return self

    def fit_predict(self, data: pd.DataFrame) -> "pd.Series[Any]":
        """
        Fits the model to the training data and returns the aggregated results.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        Returns:
            Series: Task labels. The `pandas.Series` data is indexed by `task`
                so that `labels.loc[task]` is the most likely true label of tasks.
        """
        self.fit(data)
        assert self.labels_ is not None, "no labels_"
        return self.labels_

    def fit_predict_proba(self, data: pd.DataFrame) -> pd.DataFrame:
        """
        Fits the model to the training data and returns probability distributions of labels for each task.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        Returns:
            DataFrame: Probability distributions of task labels.
                The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is the probability that the `task` true label is equal to `label`.
                Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.
        """
        self.fit(data)
        assert self.probas_ is not None, "no probas_"
        return self.probas_

    def _initialize(self, n_workers: int, n_labels: int) -> None:
        """Initializes the MACE parameters.

        Args:
            n_workers (int): The number of workers.
            n_labels (int): The number of labels.

        Returns:
            None
        """

        self.spamming_ = sps.uniform(1, 1 + self.default_noise).rvs(
            size=(n_workers, 2),
            random_state=self.random_state,
        )
        self.thetas_ = sps.uniform(1, 1 + self.default_noise).rvs(
            size=(n_workers, n_labels), random_state=self.random_state
        )

        self.spamming_ = self.spamming_ / self.spamming_.sum(axis=1, keepdims=True)
        self.thetas_ = self.thetas_ / self.thetas_.sum(axis=1, keepdims=True)

        if self.method == "vb":
            self.theta_priors_ = np.empty((n_workers, 2))
            self.theta_priors_[:, 0] = self.alpha
            self.theta_priors_[:, 1] = self.beta

            self.strategy_priors_ = np.multiply(
                10.0, np.ones((n_workers, n_labels)), dtype=np.float64
            )

    def _e_step(
        self,
        annotation: pd.DataFrame,
        task_names: Union[List[Any], "pd.Index[Any]"],
        worker_names: Union[List[Any], "pd.Index[Any]"],
        label_names: Union[List[Any], "pd.Index[Any]"],
        tasks: NDArray[np.int64],
        workers: NDArray[np.int64],
        labels: NDArray[np.int64],
    ) -> Tuple[float, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
        """Performs E-step of the MACE algorithm.

        Args:
            annotation (DataFrame): The workers' labeling results. The `pandas.DataFrame` data contains `task`, `worker`, and `label` columns.
            task_names (List[Any]): The task names.
            worker_names (List[Any]): The workers' names.
            label_names (List[Any]): The label names.
            tasks (np.ndarray): The task IDs in the annotation.
            workers (np.ndarray): The workers' IDs in the annotation.
            labels (np.ndarray): The label IDs in the annotation.

        Returns:
            Tuple[float, pd.DataFrame, pd.DataFrame, pd.DataFrame]: The log marginal likelihood, gold label marginals,
                strategy expected counts, and knowing expected counts.
        """
        gold_label_marginals = pd.DataFrame(
            np.zeros((len(task_names), len(label_names))),
            index=task_names,
            columns=label_names,
        )

        knowing_expected_counts = pd.DataFrame(
            np.zeros((len(worker_names), 2)),
            index=worker_names,
            columns=["knowing_expected_count_0", "knowing_expected_count_1"],
        )

        for label_idx, label in enumerate(label_names):
            annotation["gold_marginal"] = self.spamming_[workers, 0] * self.thetas_[
                workers, labels
            ] + self.spamming_[workers, 1] * (label_idx == labels)
            gold_label_marginals[label] = annotation.groupby("task").prod(
                numeric_only=True
            )["gold_marginal"] / len(label_names)

        instance_marginals = gold_label_marginals.sum(axis=1)
        log_marginal_likelihood = np.log(instance_marginals + 1e-8).sum()

        annotation["strategy_marginal"] = 0.0
        for label in range(len(label_names)):
            annotation["strategy_marginal"] += gold_label_marginals.values[
                tasks, label
            ] / (
                self.spamming_[workers, 0] * self.thetas_[workers, labels]
                + self.spamming_[workers, 1] * (labels == label)
            )

        annotation["strategy_marginal"] = (
            annotation["strategy_marginal"]
            * self.spamming_[workers, 0]
            * self.thetas_[workers, labels]
        )

        annotation.set_index("task", inplace=True)
        annotation["instance_marginal"] = instance_marginals
        annotation.reset_index(inplace=True)

        annotation["strategy_marginal"] = (
            annotation["strategy_marginal"] / annotation["instance_marginal"]
        )

        strategy_expected_counts = (
            annotation.groupby(["worker", "label"])
            .sum(numeric_only=True)["strategy_marginal"]
            .unstack()
            .fillna(0.0)
        )

        knowing_expected_counts["knowing_expected_count_0"] = annotation.groupby(
            "worker"
        ).sum(numeric_only=True)["strategy_marginal"]

        annotation["knowing_expected_counts"] = (
            gold_label_marginals.values[tasks, labels].ravel()
            * self.spamming_[workers, 1]
            / (
                self.spamming_[workers, 0] * self.thetas_[workers, labels]
                + self.spamming_[workers, 1]
            )
        ) / instance_marginals.values[tasks]
        knowing_expected_counts["knowing_expected_count_1"] = annotation.groupby(
            "worker"
        ).sum(numeric_only=True)["knowing_expected_counts"]

        return (
            log_marginal_likelihood,
            gold_label_marginals,
            strategy_expected_counts,
            knowing_expected_counts,
        )

    def _m_step(
        self,
        knowing_expected_counts: pd.DataFrame,
        strategy_expected_counts: pd.DataFrame,
    ) -> None:
        """
        Performs M-step of the MACE algorithm.

        Args:
            knowing_expected_counts (DataFrame): The knowing expected counts.
            strategy_expected_counts (DataFrame): The strategy expected counts.

        Returns:
            None
        """
        self.spamming_ = normalize(knowing_expected_counts.values, self.smoothing_)
        self.thetas_ = normalize(strategy_expected_counts.values, self.smoothing_)

    def _variational_m_step(
        self,
        knowing_expected_counts: pd.DataFrame,
        strategy_expected_counts: pd.DataFrame,
    ) -> None:
        """
        Performs variational M-step of the MACE algorithm.

        Args:
            knowing_expected_counts (DataFrame): The knowing expected counts.
            strategy_expected_counts (DataFrame): The strategy expected counts.

        Returns:
            None
        """
        assert self.theta_priors_ is not None
        self.spamming_ = variational_normalize(
            knowing_expected_counts.values, self.theta_priors_
        )
        assert self.strategy_priors_ is not None
        self.thetas_ = variational_normalize(
            strategy_expected_counts.values, self.strategy_priors_
        )

alpha = attr.ib(default=0.5) class-attribute instance-attribute

The prior parameter for the Beta distribution on \(\theta_j\).

beta = attr.ib(default=0.5) class-attribute instance-attribute

The prior parameter for the Beta distribution on \(\theta_j\).

default_noise = attr.ib(default=0.5) class-attribute instance-attribute

The default noise parameter for the initialization.

method = attr.ib(default='vb') class-attribute instance-attribute

The method which is used for the M-step. Either 'vb' or 'em'. 'vb' means optimization with Variational Bayes using priors. 'em' means standard Expectation-Maximization algorithm.

n_iter = attr.ib(default=50) class-attribute instance-attribute

The maximum number of EM iterations for each optimization run.

n_restarts = attr.ib(default=10) class-attribute instance-attribute

The number of optimization runs of the algorithms. The final parameters are those that gave the best log likelihood. If one run takes too long, this parameter can be set to 1.

probas_ = attr.ib(init=False) class-attribute instance-attribute

The probability distributions of task labels. The pandas.DataFrame data is indexed by task so that result.loc[task, label] is the probability that the task true label is equal to label. Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.

random_state = attr.ib(default=0) class-attribute instance-attribute

The state of the random number generator.

smoothing = attr.ib(default=0.1) class-attribute instance-attribute

The smoothing parameter for the normalization.

smoothing_ = attr.ib(init=False) class-attribute instance-attribute

The smoothing parameter.

spamming_ = attr.ib(init=False) class-attribute instance-attribute

The posterior distribution of workers' spamming states.

strategy_priors_ = attr.ib(init=False) class-attribute instance-attribute

The prior parameters for the Diriclet distribution on \(\xi_j\).

theta_priors_ = attr.ib(init=False) class-attribute instance-attribute

The prior parameters for the Beta distribution on \(\theta_j\).

thetas_ = attr.ib(init=False) class-attribute instance-attribute

The posterior distribution of workers' spamming labels.

verbose = attr.ib(default=0) class-attribute instance-attribute

The state of progress bar: 0 — no progress bar, 1 — only for restarts, 2 — for both restarts and optimization.

fit(data)

Fits the model to the training data.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required

Returns:

Name Type Description
MACE MACE

The fitted MACE model.

Source code in crowdkit/aggregation/classification/mace.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
def fit(self, data: pd.DataFrame) -> "MACE":
    """Fits the model to the training data.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

    Returns:
        MACE: The fitted MACE model.
    """

    workers, worker_names = pd.factorize(data["worker"])
    labels, label_names = pd.factorize(data["label"])
    tasks, task_names = pd.factorize(data["task"])

    n_workers = len(worker_names)
    n_labels = len(label_names)

    self.smoothing_ = 0.01 / n_labels

    annotation = data.copy(deep=True)

    best_log_marginal_likelihood = -np.inf

    def restarts_progress() -> Iterator[int]:
        if self.verbose > 0:
            yield from trange(self.n_restarts, desc="Restarts")
        else:
            yield from range(self.n_restarts)

    for _ in restarts_progress():
        self._initialize(n_workers, n_labels)
        (
            log_marginal_likelihood,
            gold_label_marginals,
            strategy_expected_counts,
            knowing_expected_counts,
        ) = self._e_step(
            annotation,
            task_names,
            worker_names,
            label_names,
            tasks,
            workers,
            labels,
        )

        def iteration_progress() -> Tuple[Iterator[int], Optional["tqdm[int]"]]:
            if self.verbose > 1:
                trange_ = trange(self.n_iter, desc="Iterations")
                return iter(trange_), trange_
            else:
                return iter(range(self.n_iter)), None

        iterator, pbar = iteration_progress()

        for _ in iterator:
            if self.method == "vb":
                self._variational_m_step(
                    knowing_expected_counts, strategy_expected_counts
                )
            else:
                self._m_step(knowing_expected_counts, strategy_expected_counts)
            (
                log_marginal_likelihood,
                gold_label_marginals,
                strategy_expected_counts,
                knowing_expected_counts,
            ) = self._e_step(
                annotation,
                task_names,
                worker_names,
                label_names,
                tasks,
                workers,
                labels,
            )
            if self.verbose > 1:
                assert isinstance(pbar, tqdm)
                pbar.set_postfix(
                    {"log_marginal_likelihood": round(log_marginal_likelihood, 5)}
                )
        if log_marginal_likelihood > best_log_marginal_likelihood:
            best_log_marginal_likelihood = log_marginal_likelihood
            best_thetas = self.thetas_.copy()
            best_spamming = self.spamming_.copy()

    self.thetas_ = best_thetas
    self.spamming_ = best_spamming
    _, gold_label_marginals, _, _ = self._e_step(
        annotation, task_names, worker_names, label_names, tasks, workers, labels
    )

    self.probas_ = decode_distribution(gold_label_marginals)
    self.labels_ = self.probas_.idxmax(axis="columns")
    self.labels_.index.name = "task"

    return self

fit_predict(data)

Fits the model to the training data and returns the aggregated results.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required

Returns:

Name Type Description
Series Series[Any]

Task labels. The pandas.Series data is indexed by task so that labels.loc[task] is the most likely true label of tasks.

Source code in crowdkit/aggregation/classification/mace.py
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
def fit_predict(self, data: pd.DataFrame) -> "pd.Series[Any]":
    """
    Fits the model to the training data and returns the aggregated results.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

    Returns:
        Series: Task labels. The `pandas.Series` data is indexed by `task`
            so that `labels.loc[task]` is the most likely true label of tasks.
    """
    self.fit(data)
    assert self.labels_ is not None, "no labels_"
    return self.labels_

fit_predict_proba(data)

Fits the model to the training data and returns probability distributions of labels for each task.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required

Returns:

Name Type Description
DataFrame DataFrame

Probability distributions of task labels. The pandas.DataFrame data is indexed by task so that result.loc[task, label] is the probability that the task true label is equal to label. Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.

Source code in crowdkit/aggregation/classification/mace.py
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
def fit_predict_proba(self, data: pd.DataFrame) -> pd.DataFrame:
    """
    Fits the model to the training data and returns probability distributions of labels for each task.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

    Returns:
        DataFrame: Probability distributions of task labels.
            The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is the probability that the `task` true label is equal to `label`.
            Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.
    """
    self.fit(data)
    assert self.probas_ is not None, "no probas_"
    return self.probas_

MMSR

Bases: BaseClassificationAggregator

The Matrix Mean-Subsequence-Reduced Algorithm (M-MSR) model assumes that workers have different expertise levels and are represented as a vector of "skills" \(s\) which entries \(s_i\) show the probability that the worker \(i\) will answer the given task correctly. Having that, we can estimate the probability of each worker via solving a rank-one matrix completion problem as follows:

\(\mathbb{E}\left[\frac{M}{M-1}\widetilde{C}-\frac{1}{M-1}\boldsymbol{1}\boldsymbol{1}^T\right] = \boldsymbol{s}\boldsymbol{s}^T\),

where \(M\) is the total number of classes, \(\widetilde{C}\) is a covariance matrix between workers, and \(\boldsymbol{1}\boldsymbol{1}^T\) is the all-ones matrix which has the same size as \(\widetilde{C}\).

Thus, the problem of estimating the skill level vector \(s\) becomes equivalent to the rank-one matrix completion problem. The M-MSR algorithm is an iterative algorithm for the robust rank-one matrix completion, so its result is an estimator of the vector \(s\). And the aggregation is weighted majority voting with weights equal to \(\log \frac{(M-1)s_i}{1-s_i}\).

Q. Ma and Alex Olshevsky. Adversarial Crowdsourcing Through Robust Rank-One Matrix Completion. 34th Conference on Neural Information Processing Systems (NeurIPS 2020)

https://arxiv.org/abs/2010.12181

Examples:

>>> from crowdkit.aggregation import MMSR
>>> from crowdkit.datasets import load_dataset
>>> df, gt = load_dataset('relevance-2')
>>> mmsr = MMSR()
>>> result = mmsr.fit_predict(df)
Source code in crowdkit/aggregation/classification/m_msr.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
@attr.s
class MMSR(BaseClassificationAggregator):
    r"""The **Matrix Mean-Subsequence-Reduced Algorithm** (M-MSR) model assumes that workers have different
    expertise levels and are represented as a vector of "skills" $s$ which entries $s_i$ show the probability
    that the worker $i$ will answer the given task correctly. Having that, we can estimate the probability of
    each worker via solving a rank-one matrix completion problem as follows:

    $\mathbb{E}\left[\frac{M}{M-1}\widetilde{C}-\frac{1}{M-1}\boldsymbol{1}\boldsymbol{1}^T\right] =
    \boldsymbol{s}\boldsymbol{s}^T$,

    where $M$ is the total number of classes, $\widetilde{C}$ is a covariance matrix between
    workers, and $\boldsymbol{1}\boldsymbol{1}^T$ is the all-ones matrix which has the same
    size as $\widetilde{C}$.

    Thus, the problem of estimating the skill level vector $s$ becomes equivalent to the
    rank-one matrix completion problem. The M-MSR algorithm is an iterative algorithm for the *robust*
    rank-one matrix completion, so its result is an estimator of the vector $s$.
    And the aggregation is weighted majority voting with weights equal to
    $\log \frac{(M-1)s_i}{1-s_i}$.

    Q. Ma and Alex Olshevsky. Adversarial Crowdsourcing Through Robust Rank-One Matrix Completion.
    *34th Conference on Neural Information Processing Systems (NeurIPS 2020)*

    <https://arxiv.org/abs/2010.12181>

    Examples:
        >>> from crowdkit.aggregation import MMSR
        >>> from crowdkit.datasets import load_dataset
        >>> df, gt = load_dataset('relevance-2')
        >>> mmsr = MMSR()
        >>> result = mmsr.fit_predict(df)
    """

    n_iter: int = attr.ib(default=10000)
    """The maximum number of iterations."""

    tol: float = attr.ib(default=1e-10)
    """The tolerance stopping criterion for iterative methods with a variable number of steps. The algorithm converges when the loss change is less than the `tol` parameter."""

    random_state: Optional[int] = attr.ib(default=0)
    """The seed number for the random initialization."""

    _observation_matrix: npt.NDArray[Any] = attr.ib(factory=lambda: np.array([]))
    """The matrix representing which workers give responses to which tasks."""

    _covariation_matrix: npt.NDArray[Any] = attr.ib(factory=lambda: np.array([]))
    """The matrix representing the covariance between workers."""

    _n_common_tasks: npt.NDArray[Any] = attr.ib(factory=lambda: np.array([]))
    """The matrix representing workers with tasks in common."""

    _n_workers: int = attr.ib(default=0)
    """The number of workers."""

    _n_tasks: int = attr.ib(default=0)
    """The number of tasks that are assigned to workers."""

    _n_labels: int = attr.ib(default=0)
    """The number of possible labels for a series of classification tasks."""

    _labels_mapping: Dict[Any, int] = attr.ib(factory=dict)
    """The mapping of labels and integer values."""

    _workers_mapping: Dict[Any, int] = attr.ib(factory=dict)
    """The mapping of workers and integer values."""

    _tasks_mapping: Dict[Any, int] = attr.ib(factory=dict)
    """The mapping of tasks and integer values."""

    # Available after fit
    skills_: Optional["pd.Series[Any]"] = named_series_attrib(name="skill")
    """The task labels.
    The `pandas.Series` data is indexed by `task` so that `labels.loc[task]` is the most likely true label of tasks."""

    scores_: Optional[pd.DataFrame] = attr.ib(init=False)
    """The task label scores.
    The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is a score of `label` for `task`."""

    loss_history_: List[float] = attr.ib(init=False)
    """A list of loss values during training."""

    def _apply(self, data: pd.DataFrame) -> "MMSR":
        mv = MajorityVote().fit(data, skills=self.skills_)
        self.labels_ = mv.labels_
        self.scores_ = mv.probas_
        return self

    def fit(self, data: pd.DataFrame) -> "MMSR":
        """Fits the model to the training data.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        Returns:
            MMSR: self.
        """

        data = data[["task", "worker", "label"]]
        self._construnct_covariation_matrix(data)
        self._m_msr()
        return self

    def predict(self, data: pd.DataFrame) -> "pd.Series[Any]":
        """Predicts the true labels of tasks when the model is fitted.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        Returns:
            Series: The task labels. The `pandas.Series` data is indexed by `task`
                so that `labels.loc[task]` is the most likely true label of tasks.
        """

        self._apply(data)
        assert self.labels_ is not None, "no labels_"
        return self.labels_

    def predict_score(self, data: pd.DataFrame) -> pd.DataFrame:
        """Returns the total sum of weights for each label when the model is fitted.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        Returns:
            DataFrame: The task label scores. The `pandas.DataFrame` data is indexed by `task`
                so that `result.loc[task, label]` is a score of `label` for `task`.
        """

        self._apply(data)
        assert self.scores_ is not None, "no scores_"
        return self.scores_

    def fit_predict(self, data: pd.DataFrame) -> "pd.Series[Any]":
        """Fits the model to the training data and returns the aggregated results.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        Returns:
            Series: The task labels. The `pandas.Series` data is indexed by `task`
                so that `labels.loc[task]` is the most likely true label of tasks.
        """

        return self.fit(data).predict(data)

    def fit_predict_score(self, data: pd.DataFrame) -> pd.DataFrame:
        """Fits the model to the training data and returns the total sum of weights for each label.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        Returns:
            DataFrame: The task label scores. The `pandas.DataFrame` data is indexed by `task`
                so that `result.loc[task, label]` is a score of `label` for `task`.
        """

        return self.fit(data).predict_score(data)

    def _m_msr(self) -> None:
        F_param = int(np.floor(self._sparsity / 2)) - 1
        n, m = self._covariation_matrix.shape
        u = sps.uniform.rvs(size=(n, 1), random_state=self.random_state)
        v = sps.uniform.rvs(size=(m, 1), random_state=self.random_state)
        observed_entries = np.abs(np.sign(self._n_common_tasks)) == 1
        X = np.abs(self._covariation_matrix)
        self.loss_history_ = []
        for _ in range(self.n_iter):
            v_prev = np.copy(v)
            u_prev = np.copy(u)
            for j in range(n):
                target_v = X[:, j].reshape(-1, 1)
                target_v = target_v[observed_entries[:, j]] / u[observed_entries[:, j]]

                y = self._remove_largest_and_smallest_F_value(
                    target_v, F_param, v[j][0], self._n_tasks
                )
                if len(y) == 0:
                    v[j] = v[j]
                else:
                    v[j][0] = y.mean()

            for i in range(m):
                target_u = X[i, :].reshape(-1, 1)
                target_u = target_u[observed_entries[i, :]] / v[observed_entries[i, :]]
                y = self._remove_largest_and_smallest_F_value(
                    target_u, F_param, u[i][0], self._n_tasks
                )
                if len(y) == 0:
                    u[i] = u[i]
                else:
                    u[i][0] = y.mean()

            loss = np.linalg.norm(u @ v.T - u_prev @ v_prev.T, ord="fro")
            self.loss_history_.append(float(loss))
            if loss < self.tol:
                break

        k = np.sqrt(np.linalg.norm(u) / np.linalg.norm(v))
        x_track_1 = u / k
        x_track_2 = self._sign_determination_valid(self._covariation_matrix, x_track_1)
        x_track_3 = np.minimum(x_track_2, 1 - 1.0 / np.sqrt(self._n_tasks))
        x_MSR = np.maximum(
            x_track_3, -1 / (self._n_labels - 1) + 1.0 / np.sqrt(self._n_tasks)
        )

        workers_probas = (
            x_MSR * (self._n_labels - 1) / (self._n_labels) + 1 / self._n_labels
        )
        workers_probas = workers_probas.ravel()
        skills = np.log(workers_probas * (self._n_labels - 1) / (1 - workers_probas))

        self.skills_ = self._get_skills_from_array(skills)

    def _get_skills_from_array(self, array: npt.NDArray[Any]) -> "pd.Series[Any]":
        inverse_workers_mapping = {
            ind: worker for worker, ind in self._workers_mapping.items()
        }
        index = [inverse_workers_mapping[i] for i in range(len(array))]
        return pd.Series(array, index=pd.Index(index, name="worker"))

    @staticmethod
    def _sign_determination_valid(
        C: npt.NDArray[Any], s_abs: npt.NDArray[Any]
    ) -> npt.NDArray[Any]:
        S = np.sign(C)
        n = len(s_abs)

        valid_idx = np.where(np.sum(C, axis=1) != 0)[0]
        S_valid = S[valid_idx[:, None], valid_idx]
        k = S_valid.shape[0]
        upper_idx = np.triu(np.ones(shape=(k, k)))
        S_valid_upper = S_valid * upper_idx
        new_node_end_I, new_node_end_J = np.where(S_valid_upper == 1)
        S_valid[S_valid == 1] = 0
        I = np.eye(k)
        S_valid_new = I[new_node_end_I, :] + I[new_node_end_J, :]
        m = S_valid_new.shape[0]
        A = np.vstack(
            (
                np.hstack((np.abs(S_valid), S_valid_new.T)),
                np.hstack((S_valid_new, np.zeros(shape=(m, m)))),
            )
        )
        n_new = A.shape[0]
        W = (1.0 / np.sum(A, axis=1)).reshape(-1, 1) @ np.ones(shape=(1, n_new)) * A
        D, V = sla.eigs(W + np.eye(n_new), 1, which="SM")
        V = V.real
        sign_vector = np.sign(V)
        s_sign = np.zeros(shape=(n, 1))
        s_sign[valid_idx] = (
            np.sign(np.sum(sign_vector[:k])) * s_abs[valid_idx] * sign_vector[:k]
        )
        return s_sign

    @staticmethod
    def _remove_largest_and_smallest_F_value(
        x: npt.NDArray[Any], F: int, a: float, n_tasks: int
    ) -> npt.NDArray[Any]:
        y = np.sort(x, axis=0)
        if np.sum(y < a) < F:
            y = y[y[:, 0] >= a]
        else:
            y = y[F:]

        m = y.shape[0]
        if np.sum(y > a) < F:
            y = y[y[:, 0] <= a]
        else:
            y = np.concatenate((y[: m - F], y[m:]), axis=0)
        if len(y) == 1 and y[0][0] == 0:
            y[0][0] = 1 / np.sqrt(n_tasks)
        return y

    def _construnct_covariation_matrix(self, answers: pd.DataFrame) -> None:
        labels = pd.unique(answers.label)
        self._n_labels = len(labels)
        self._labels_mapping = {labels[idx]: idx + 1 for idx in range(self._n_labels)}

        workers = pd.unique(answers.worker)
        self._n_workers = len(workers)
        self._workers_mapping = {workers[idx]: idx for idx in range(self._n_workers)}

        tasks = pd.unique(answers.task)
        self._n_tasks = len(tasks)
        self._tasks_mapping = {tasks[idx]: idx for idx in range(self._n_tasks)}

        self._observation_matrix = np.zeros(shape=(self._n_workers, self._n_tasks))
        for i, row in answers.iterrows():
            self._observation_matrix[self._workers_mapping[row["worker"]]][
                self._tasks_mapping[row["task"]]
            ] = self._labels_mapping[row["label"]]

        self._n_common_tasks = (
            np.sign(self._observation_matrix) @ np.sign(self._observation_matrix).T
        )
        self._n_common_tasks -= np.diag(np.diag(self._n_common_tasks))
        self._sparsity = np.min(np.sign(self._n_common_tasks).sum(axis=0))

        # Can we rewrite it in matrix operations?
        self._covariation_matrix = np.zeros(shape=(self._n_workers, self._n_workers))
        for i in range(self._n_workers):
            for j in range(self._n_workers):
                if self._n_common_tasks[i][j]:
                    valid_idx = np.sign(self._observation_matrix[i]) * np.sign(
                        self._observation_matrix[j]
                    )
                    self._covariation_matrix[i][j] = (
                        np.sum(
                            (self._observation_matrix[i] == self._observation_matrix[j])
                            * valid_idx
                        )
                        / self._n_common_tasks[i][j]
                    )

        self._covariation_matrix *= self._n_labels / (self._n_labels - 1)
        self._covariation_matrix -= np.ones(
            shape=(self._n_workers, self._n_workers)
        ) / (self._n_labels - 1)

loss_history_ = attr.ib(init=False) class-attribute instance-attribute

A list of loss values during training.

n_iter = attr.ib(default=10000) class-attribute instance-attribute

The maximum number of iterations.

random_state = attr.ib(default=0) class-attribute instance-attribute

The seed number for the random initialization.

scores_ = attr.ib(init=False) class-attribute instance-attribute

The task label scores. The pandas.DataFrame data is indexed by task so that result.loc[task, label] is a score of label for task.

skills_ = named_series_attrib(name='skill') class-attribute instance-attribute

The task labels. The pandas.Series data is indexed by task so that labels.loc[task] is the most likely true label of tasks.

tol = attr.ib(default=1e-10) class-attribute instance-attribute

The tolerance stopping criterion for iterative methods with a variable number of steps. The algorithm converges when the loss change is less than the tol parameter.

fit(data)

Fits the model to the training data.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required

Returns:

Name Type Description
MMSR MMSR

self.

Source code in crowdkit/aggregation/classification/m_msr.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def fit(self, data: pd.DataFrame) -> "MMSR":
    """Fits the model to the training data.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

    Returns:
        MMSR: self.
    """

    data = data[["task", "worker", "label"]]
    self._construnct_covariation_matrix(data)
    self._m_msr()
    return self

fit_predict(data)

Fits the model to the training data and returns the aggregated results.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required

Returns:

Name Type Description
Series Series[Any]

The task labels. The pandas.Series data is indexed by task so that labels.loc[task] is the most likely true label of tasks.

Source code in crowdkit/aggregation/classification/m_msr.py
152
153
154
155
156
157
158
159
160
161
162
163
164
def fit_predict(self, data: pd.DataFrame) -> "pd.Series[Any]":
    """Fits the model to the training data and returns the aggregated results.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

    Returns:
        Series: The task labels. The `pandas.Series` data is indexed by `task`
            so that `labels.loc[task]` is the most likely true label of tasks.
    """

    return self.fit(data).predict(data)

fit_predict_score(data)

Fits the model to the training data and returns the total sum of weights for each label.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required

Returns:

Name Type Description
DataFrame DataFrame

The task label scores. The pandas.DataFrame data is indexed by task so that result.loc[task, label] is a score of label for task.

Source code in crowdkit/aggregation/classification/m_msr.py
166
167
168
169
170
171
172
173
174
175
176
177
178
def fit_predict_score(self, data: pd.DataFrame) -> pd.DataFrame:
    """Fits the model to the training data and returns the total sum of weights for each label.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

    Returns:
        DataFrame: The task label scores. The `pandas.DataFrame` data is indexed by `task`
            so that `result.loc[task, label]` is a score of `label` for `task`.
    """

    return self.fit(data).predict_score(data)

predict(data)

Predicts the true labels of tasks when the model is fitted.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required

Returns:

Name Type Description
Series Series[Any]

The task labels. The pandas.Series data is indexed by task so that labels.loc[task] is the most likely true label of tasks.

Source code in crowdkit/aggregation/classification/m_msr.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def predict(self, data: pd.DataFrame) -> "pd.Series[Any]":
    """Predicts the true labels of tasks when the model is fitted.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

    Returns:
        Series: The task labels. The `pandas.Series` data is indexed by `task`
            so that `labels.loc[task]` is the most likely true label of tasks.
    """

    self._apply(data)
    assert self.labels_ is not None, "no labels_"
    return self.labels_

predict_score(data)

Returns the total sum of weights for each label when the model is fitted.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required

Returns:

Name Type Description
DataFrame DataFrame

The task label scores. The pandas.DataFrame data is indexed by task so that result.loc[task, label] is a score of label for task.

Source code in crowdkit/aggregation/classification/m_msr.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def predict_score(self, data: pd.DataFrame) -> pd.DataFrame:
    """Returns the total sum of weights for each label when the model is fitted.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

    Returns:
        DataFrame: The task label scores. The `pandas.DataFrame` data is indexed by `task`
            so that `result.loc[task, label]` is a score of `label` for `task`.
    """

    self._apply(data)
    assert self.scores_ is not None, "no scores_"
    return self.scores_

MajorityVote

Bases: BaseClassificationAggregator

The Majority Vote aggregation algorithm is a straightforward approach for categorical aggregation: for each task, it outputs a label with the largest number of responses. Additionaly, the Majority Vote can be used when different weights are assigned to workers' votes. In this case, the resulting label will have the largest sum of weights.

Note

If two or more labels have the largest number of votes, the resulting label will be the same for all tasks that have the same set of labels with the same number of votes.

Examples:

Basic Majority Vote:

>>> from crowdkit.aggregation import MajorityVote
>>> from crowdkit.datasets import load_dataset
>>> df, gt = load_dataset('relevance-2')
>>> result = MajorityVote().fit_predict(df)

Weighted Majority Vote:

>>> import pandas as pd
>>> from crowdkit.aggregation import MajorityVote
>>> df = pd.DataFrame(
>>>     [
>>>         ['t1', 'p1', 0],
>>>         ['t1', 'p2', 0],
>>>         ['t1', 'p3', 1],
>>>         ['t2', 'p1', 1],
>>>         ['t2', 'p2', 0],
>>>         ['t2', 'p3', 1],
>>>     ],
>>>     columns=['task', 'worker', 'label']
>>> )
>>> skills = pd.Series({'p1': 0.5, 'p2': 0.7, 'p3': 0.4})
>>> result = MajorityVote().fit_predict(df, skills)
Source code in crowdkit/aggregation/classification/majority_vote.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
@attr.s
class MajorityVote(BaseClassificationAggregator):
    r"""The **Majority Vote** aggregation algorithm is a straightforward approach for categorical aggregation: for each task,
    it outputs a label with the largest number of responses. Additionaly, the Majority Vote
    can be used when different weights are assigned to workers' votes. In this case, the
    resulting label will have the largest sum of weights.


    Note:
        If two or more labels have the largest number of votes, the resulting
        label will be the same for all tasks that have the same set of labels with the same number of votes.

    Examples:
        Basic Majority Vote:
        >>> from crowdkit.aggregation import MajorityVote
        >>> from crowdkit.datasets import load_dataset
        >>> df, gt = load_dataset('relevance-2')
        >>> result = MajorityVote().fit_predict(df)

        Weighted Majority Vote:
        >>> import pandas as pd
        >>> from crowdkit.aggregation import MajorityVote
        >>> df = pd.DataFrame(
        >>>     [
        >>>         ['t1', 'p1', 0],
        >>>         ['t1', 'p2', 0],
        >>>         ['t1', 'p3', 1],
        >>>         ['t2', 'p1', 1],
        >>>         ['t2', 'p2', 0],
        >>>         ['t2', 'p3', 1],
        >>>     ],
        >>>     columns=['task', 'worker', 'label']
        >>> )
        >>> skills = pd.Series({'p1': 0.5, 'p2': 0.7, 'p3': 0.4})
        >>> result = MajorityVote().fit_predict(df, skills)
    """

    default_skill: Optional[float] = attr.ib(default=None)
    """Default worker weight value."""

    skills_: Optional["pd.Series[Any]"] = named_series_attrib(name="skill")
    """The workers' skills. The `pandas.Series` data is indexed by `worker` and has the corresponding worker skill."""

    probas_: Optional[pd.DataFrame] = attr.ib(init=False)
    """The probability distributions of task labels. The `pandas.DataFrame` data is indexed by `task`
    so that `result.loc[task, label]` is the probability that the `task` true label is equal to `label`.
    Each probability is in the range from 0 to 1, all task probabilities must sum up to 1."""

    on_missing_skill: str = attr.ib(default="error")
    """A value which specifies how to handle assignments performed by workers with an unknown skill.

    Possible values:
    * `error`: raises an exception if there is at least one assignment performed by a worker with an unknown skill;
    * `ignore`: drops assignments performed by workers with an unknown skill during prediction,
    raises an exception if there are no assignments with a known skill for any task;
    * `value`: the default value will be used if a skill is missing."""

    def fit(
        self, data: pd.DataFrame, skills: Optional["pd.Series[Any]"] = None
    ) -> "MajorityVote":
        """Fits the model to the training data.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

            skills (Series): The workers' skills. The `pandas.Series` data is indexed by `worker`
                and has the corresponding worker skill.

        Returns:
            MajorityVote: self.
        """

        data = data[["task", "worker", "label"]]

        if skills is None:
            scores = data[["task", "label"]].value_counts()
        else:
            data = add_skills_to_data(
                data, skills, self.on_missing_skill, self.default_skill
            )
            scores = data.groupby(["task", "label"])["skill"].sum()

        self.probas_ = normalize_rows(scores.unstack("label", fill_value=0))
        self.labels_ = get_most_probable_labels(self.probas_)
        self.skills_ = get_accuracy(data, self.labels_, by="worker")

        return self

    def fit_predict_proba(
        self, data: pd.DataFrame, skills: Optional["pd.Series[Any]"] = None
    ) -> pd.DataFrame:
        """Fits the model to the training data and returns probability distributions of labels for each task.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

            skills (Series): The workers' skills. The `pandas.Series` data is indexed by `worker`
                and has the corresponding worker skill.

        Returns:
            DataFrame: The probability distributions of task labels.
                The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is the probability that the `task` true label is equal to `label`.
                Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.
        """
        self.fit(data, skills)
        assert self.probas_ is not None, "no probas_"
        return self.probas_

    def fit_predict(
        self, data: pd.DataFrame, skills: Optional["pd.Series[Any]"] = None
    ) -> "pd.Series[Any]":
        """Fits the model to the training data and returns the aggregated results.

        Args:
           data (DataFrame): The training dataset of workers' labeling results
               which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

           skills (Series): The workers' skills. The `pandas.Series` data is indexed by `worker`
               and has the corresponding worker skill.

        Returns:
           Series: The task labels. The `pandas.Series` data is indexed by `task`
               so that `labels.loc[task]` is the most likely true label of tasks.
        """

        self.fit(data, skills)
        assert self.labels_ is not None, "no labels_"
        return self.labels_

default_skill = attr.ib(default=None) class-attribute instance-attribute

Default worker weight value.

on_missing_skill = attr.ib(default='error') class-attribute instance-attribute

A value which specifies how to handle assignments performed by workers with an unknown skill.

Possible values: * error: raises an exception if there is at least one assignment performed by a worker with an unknown skill; * ignore: drops assignments performed by workers with an unknown skill during prediction, raises an exception if there are no assignments with a known skill for any task; * value: the default value will be used if a skill is missing.

probas_ = attr.ib(init=False) class-attribute instance-attribute

The probability distributions of task labels. The pandas.DataFrame data is indexed by task so that result.loc[task, label] is the probability that the task true label is equal to label. Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.

skills_ = named_series_attrib(name='skill') class-attribute instance-attribute

The workers' skills. The pandas.Series data is indexed by worker and has the corresponding worker skill.

fit(data, skills=None)

Fits the model to the training data.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required
skills Series

The workers' skills. The pandas.Series data is indexed by worker and has the corresponding worker skill.

None

Returns:

Name Type Description
MajorityVote MajorityVote

self.

Source code in crowdkit/aggregation/classification/majority_vote.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def fit(
    self, data: pd.DataFrame, skills: Optional["pd.Series[Any]"] = None
) -> "MajorityVote":
    """Fits the model to the training data.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        skills (Series): The workers' skills. The `pandas.Series` data is indexed by `worker`
            and has the corresponding worker skill.

    Returns:
        MajorityVote: self.
    """

    data = data[["task", "worker", "label"]]

    if skills is None:
        scores = data[["task", "label"]].value_counts()
    else:
        data = add_skills_to_data(
            data, skills, self.on_missing_skill, self.default_skill
        )
        scores = data.groupby(["task", "label"])["skill"].sum()

    self.probas_ = normalize_rows(scores.unstack("label", fill_value=0))
    self.labels_ = get_most_probable_labels(self.probas_)
    self.skills_ = get_accuracy(data, self.labels_, by="worker")

    return self

fit_predict(data, skills=None)

Fits the model to the training data and returns the aggregated results.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required
skills Series

The workers' skills. The pandas.Series data is indexed by worker and has the corresponding worker skill.

None

Returns:

Name Type Description
Series Series[Any]

The task labels. The pandas.Series data is indexed by task so that labels.loc[task] is the most likely true label of tasks.

Source code in crowdkit/aggregation/classification/majority_vote.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def fit_predict(
    self, data: pd.DataFrame, skills: Optional["pd.Series[Any]"] = None
) -> "pd.Series[Any]":
    """Fits the model to the training data and returns the aggregated results.

    Args:
       data (DataFrame): The training dataset of workers' labeling results
           which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

       skills (Series): The workers' skills. The `pandas.Series` data is indexed by `worker`
           and has the corresponding worker skill.

    Returns:
       Series: The task labels. The `pandas.Series` data is indexed by `task`
           so that `labels.loc[task]` is the most likely true label of tasks.
    """

    self.fit(data, skills)
    assert self.labels_ is not None, "no labels_"
    return self.labels_

fit_predict_proba(data, skills=None)

Fits the model to the training data and returns probability distributions of labels for each task.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required
skills Series

The workers' skills. The pandas.Series data is indexed by worker and has the corresponding worker skill.

None

Returns:

Name Type Description
DataFrame DataFrame

The probability distributions of task labels. The pandas.DataFrame data is indexed by task so that result.loc[task, label] is the probability that the task true label is equal to label. Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.

Source code in crowdkit/aggregation/classification/majority_vote.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def fit_predict_proba(
    self, data: pd.DataFrame, skills: Optional["pd.Series[Any]"] = None
) -> pd.DataFrame:
    """Fits the model to the training data and returns probability distributions of labels for each task.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        skills (Series): The workers' skills. The `pandas.Series` data is indexed by `worker`
            and has the corresponding worker skill.

    Returns:
        DataFrame: The probability distributions of task labels.
            The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is the probability that the `task` true label is equal to `label`.
            Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.
    """
    self.fit(data, skills)
    assert self.probas_ is not None, "no probas_"
    return self.probas_

OneCoinDawidSkene

Bases: DawidSkene

The one-coin Dawid-Skene aggregation model works exactly the same as the original Dawid-Skene model based on the EM algorithm, except for calculating the workers' errors at the M-step of the algorithm.

For the one-coin model, a worker confusion (error) matrix is parameterized by a single parameter \(s_w\):

\(e^w_{j,z_j} = \begin{cases} s_{w} & y^w_j = z_j \\ \frac{1 - s_{w}}{K - 1} & y^w_j \neq z_j \end{cases}\),

where \(e^w\) is a worker confusion (error) matrix of size \(K \times K\) in case of the \(K\) class classification, \(z_j\) be a true task label, \(y^w_j\) is a worker response to the task \(j\), and \(s_w\) is a worker skill (accuracy).

In other words, the worker \(w\) uses a single coin flip to decide their assignment. No matter what the true label is, the worker has the \(s_w\) probability to assign the correct label, and has the \(1 − s_w\) probability to randomly assign an incorrect label. For the one-coin model, it suffices to estimate \(s_w\) for every worker \(w\) and estimate \(y^w_j\) for every task \(j\). Because of its simplicity, the one-coin model is easier to estimate and enjoys better convergence properties.

Parameters \(p\), \(e^w\), and latent variables \(z\) are optimized with the Expectation-Maximization algorithm: 1. E-step. Estimates the true task label probabilities using the specified workers' responses, the prior label probabilities, and the workers' error probability matrix. 2. M-step. Calculates a worker skill as their accuracy according to the label probability. Then estimates the workers' error probability matrix by assigning user skills to error matrix row by row.

Y. Zhang, X. Chen, D. Zhou, and M. I. Jordan. Spectral methods meet EM: A provably optimal algorithm for crowdsourcing. Journal of Machine Learning Research. Vol. 17, (2016), 1-44.

https://doi.org/10.48550/arXiv.1406.3824

Examples:

>>> from crowdkit.aggregation import OneCoinDawidSkene
>>> from crowdkit.datasets import load_dataset
>>> df, gt = load_dataset('relevance-2')
>>> hds = OneCoinDawidSkene(100)
>>> result = hds.fit_predict(df)
Source code in crowdkit/aggregation/classification/dawid_skene.py
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
@attr.s
class OneCoinDawidSkene(DawidSkene):
    r"""The **one-coin Dawid-Skene** aggregation model works exactly the same as the original Dawid-Skene model
    based on the EM algorithm, except for calculating the workers' errors
    at the M-step of the algorithm.

    For the one-coin model, a worker confusion (error) matrix is parameterized by a single parameter $s_w$:

    $e^w_{j,z_j}  = \begin{cases}
        s_{w} & y^w_j = z_j \\
        \frac{1 - s_{w}}{K - 1} & y^w_j \neq z_j
    \end{cases}$,

    where $e^w$ is a worker confusion (error) matrix of size $K \times K$ in case of the $K$ class classification,
    $z_j$ be a true task label, $y^w_j$ is a worker response to the task $j$, and $s_w$ is a worker skill (accuracy).

    In other words, the worker $w$ uses a single coin flip to decide their assignment. No matter what the true label is,
    the worker has the $s_w$ probability to assign the correct label, and
    has the $1 − s_w$ probability to randomly assign an incorrect label. For the one-coin model, it
    suffices to estimate $s_w$ for every worker $w$ and estimate $y^w_j$ for every task $j$. Because of its
    simplicity, the one-coin model is easier to estimate and enjoys better convergence properties.

    Parameters $p$, $e^w$, and latent variables $z$ are optimized with the Expectation-Maximization algorithm:
    1. **E-step**. Estimates the true task label probabilities using the specified workers' responses,
    the prior label probabilities, and the workers' error probability matrix.
    2. **M-step**. Calculates a worker skill as their accuracy according to the label probability.
    Then estimates the workers' error probability matrix by assigning user skills to error matrix row by row.

    Y. Zhang, X. Chen, D. Zhou, and M. I. Jordan. Spectral methods meet EM: A provably optimal algorithm for crowdsourcing.
    *Journal of Machine Learning Research. Vol. 17*, (2016), 1-44.

    <https://doi.org/10.48550/arXiv.1406.3824>

    Examples:
        >>> from crowdkit.aggregation import OneCoinDawidSkene
        >>> from crowdkit.datasets import load_dataset
        >>> df, gt = load_dataset('relevance-2')
        >>> hds = OneCoinDawidSkene(100)
        >>> result = hds.fit_predict(df)
    """

    @staticmethod
    def _assign_skills(row: "pd.Series[Any]", skills: pd.DataFrame) -> pd.DataFrame:
        """
        Assigns user skills to error matrix row by row.
        """
        num_categories = len(row)
        for column_name, _ in row.items():
            if column_name == row.name[1]:  # type: ignore
                row[column_name] = skills[row.name[0]]  # type: ignore
            else:
                row[column_name] = (1 - skills[row.name[0]]) / (num_categories - 1)  # type: ignore
        return row  # type: ignore

    @staticmethod
    def _process_skills_to_errors(
        data: pd.DataFrame, probas: pd.DataFrame, skills: "pd.Series[Any]"
    ) -> pd.DataFrame:
        errors = DawidSkene._m_step(data, probas)

        errors = errors.apply(OneCoinDawidSkene._assign_skills, args=(skills,), axis=1)  # type: ignore
        errors.clip(lower=_EPS, upper=1 - _EPS, inplace=True)

        return errors

    @staticmethod
    def _m_step(data: pd.DataFrame, probas: pd.DataFrame) -> "pd.Series[Any]":  # type: ignore
        """Performs M-step of Homogeneous Dawid-Skene algorithm.

        Calculates a worker skill as their accuracy according to the label probability.
        """
        skilled_data = data.copy()
        idx_cols, cols = pd.factorize(data["label"])
        idx_rows, rows = pd.factorize(data["task"])
        skilled_data["skill"] = (
            probas.reindex(rows, axis=0)
            .reindex(cols, axis=1)
            .to_numpy()[idx_rows, idx_cols]
        )
        skills = skilled_data.groupby(["worker"], sort=False)["skill"].mean()
        return skills

    def fit(self, data: pd.DataFrame) -> "OneCoinDawidSkene":  # type: ignore[override]
        """Fits the model to the training data with the EM algorithm.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.
        Returns:
            DawidSkene: self.
        """

        data = data[["task", "worker", "label"]]

        # Early exit
        if not data.size:
            self.probas_ = pd.DataFrame()
            self.priors_ = pd.Series(dtype=float)
            self.errors_ = pd.DataFrame()
            self.labels_ = pd.Series(dtype=float)
            return self

        # Initialization
        probas = MajorityVote().fit_predict_proba(data)
        priors = probas.mean()
        skills = self._m_step(data, probas)
        errors = self._process_skills_to_errors(data, probas, skills)
        loss = -np.inf
        self.loss_history_ = []

        # Updating proba and errors n_iter times
        for _ in range(self.n_iter):
            probas = self._e_step(data, priors, errors)
            priors = probas.mean()
            skills = self._m_step(data, probas)
            errors = self._process_skills_to_errors(data, probas, skills)
            new_loss = self._evidence_lower_bound(data, probas, priors, errors) / len(
                data
            )
            self.loss_history_.append(new_loss)

            if new_loss - loss < self.tol:
                break
            loss = new_loss

        # Saving results
        self.probas_ = probas
        self.priors_ = priors
        self.skills_ = skills
        self.errors_ = errors
        self.labels_ = get_most_probable_labels(probas)

        return self

    def fit_predict_proba(self, data: pd.DataFrame) -> pd.DataFrame:  # type: ignore[override]
        """Fits the model to the training data and returns probability distributions of labels for each task.
        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.
        Returns:
            DataFrame: Probability distributions of task labels.
                The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is the probability that the `task` true label is equal to `label`.
                Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.
        """

        self.fit(data)
        assert self.probas_ is not None, "no probas_"
        return self.probas_

    def fit_predict(self, data: pd.DataFrame) -> "pd.Series[Any]":  # type: ignore[override]
        """Fits the model to the training data and returns the aggregated results.
        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.
        Returns:
            Series: Task labels. The `pandas.Series` data is indexed by `task` so that `labels.loc[task]` is the most likely true label of tasks.
        """

        self.fit(data)
        assert self.labels_ is not None, "no labels_"
        return self.labels_

fit(data)

Fits the model to the training data with the EM algorithm.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required

Returns: DawidSkene: self.

Source code in crowdkit/aggregation/classification/dawid_skene.py
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
def fit(self, data: pd.DataFrame) -> "OneCoinDawidSkene":  # type: ignore[override]
    """Fits the model to the training data with the EM algorithm.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.
    Returns:
        DawidSkene: self.
    """

    data = data[["task", "worker", "label"]]

    # Early exit
    if not data.size:
        self.probas_ = pd.DataFrame()
        self.priors_ = pd.Series(dtype=float)
        self.errors_ = pd.DataFrame()
        self.labels_ = pd.Series(dtype=float)
        return self

    # Initialization
    probas = MajorityVote().fit_predict_proba(data)
    priors = probas.mean()
    skills = self._m_step(data, probas)
    errors = self._process_skills_to_errors(data, probas, skills)
    loss = -np.inf
    self.loss_history_ = []

    # Updating proba and errors n_iter times
    for _ in range(self.n_iter):
        probas = self._e_step(data, priors, errors)
        priors = probas.mean()
        skills = self._m_step(data, probas)
        errors = self._process_skills_to_errors(data, probas, skills)
        new_loss = self._evidence_lower_bound(data, probas, priors, errors) / len(
            data
        )
        self.loss_history_.append(new_loss)

        if new_loss - loss < self.tol:
            break
        loss = new_loss

    # Saving results
    self.probas_ = probas
    self.priors_ = priors
    self.skills_ = skills
    self.errors_ = errors
    self.labels_ = get_most_probable_labels(probas)

    return self

fit_predict(data)

Fits the model to the training data and returns the aggregated results. Args: data (DataFrame): The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns. Returns: Series: Task labels. The pandas.Series data is indexed by task so that labels.loc[task] is the most likely true label of tasks.

Source code in crowdkit/aggregation/classification/dawid_skene.py
597
598
599
600
601
602
603
604
605
606
607
608
def fit_predict(self, data: pd.DataFrame) -> "pd.Series[Any]":  # type: ignore[override]
    """Fits the model to the training data and returns the aggregated results.
    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.
    Returns:
        Series: Task labels. The `pandas.Series` data is indexed by `task` so that `labels.loc[task]` is the most likely true label of tasks.
    """

    self.fit(data)
    assert self.labels_ is not None, "no labels_"
    return self.labels_

fit_predict_proba(data)

Fits the model to the training data and returns probability distributions of labels for each task. Args: data (DataFrame): The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns. Returns: DataFrame: Probability distributions of task labels. The pandas.DataFrame data is indexed by task so that result.loc[task, label] is the probability that the task true label is equal to label. Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.

Source code in crowdkit/aggregation/classification/dawid_skene.py
582
583
584
585
586
587
588
589
590
591
592
593
594
595
def fit_predict_proba(self, data: pd.DataFrame) -> pd.DataFrame:  # type: ignore[override]
    """Fits the model to the training data and returns probability distributions of labels for each task.
    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.
    Returns:
        DataFrame: Probability distributions of task labels.
            The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is the probability that the `task` true label is equal to `label`.
            Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.
    """

    self.fit(data)
    assert self.probas_ is not None, "no probas_"
    return self.probas_

Wawa

Bases: BaseClassificationAggregator

The Worker Agreement with Aggregate (Wawa) algorithm consists of three steps: 1. calculates the majority vote label; 2. estimates workers' skills as a fraction of responses that are equal to the majority vote; 3. calculates the weigthed majority vote based on skills from the previous step.

Examples:

>>> from crowdkit.aggregation import Wawa
>>> from crowdkit.datasets import load_dataset
>>> df, gt = load_dataset('relevance-2')
>>> result = Wawa().fit_predict(df)
Source code in crowdkit/aggregation/classification/wawa.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
@attr.s
class Wawa(BaseClassificationAggregator):
    r"""The **Worker Agreement with Aggregate** (Wawa) algorithm consists of three steps:
    1. calculates the majority vote label;
    2. estimates workers' skills as a fraction of responses that are equal to the majority vote;
    3. calculates the weigthed majority vote based on skills from the previous step.

    Examples:
        >>> from crowdkit.aggregation import Wawa
        >>> from crowdkit.datasets import load_dataset
        >>> df, gt = load_dataset('relevance-2')
        >>> result = Wawa().fit_predict(df)
    """

    skills_: Optional["pd.Series[Any]"] = named_series_attrib(name="skill")
    """The workers' skills.
    The `pandas.Series` data is indexed by `worker` and has the corresponding worker skill."""

    probas_: Optional[pd.DataFrame] = attr.ib(init=False)
    """The probability distributions of task labels.
    The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is the probability that
    the `task` true label is equal to `label`. Each probability is in the range from 0 to 1,
    all task probabilities must sum up to 1."""

    def _apply(self, data: pd.DataFrame) -> "Wawa":
        check_is_fitted(self, attributes="skills_")
        mv = MajorityVote().fit(data, skills=self.skills_)
        self.probas_ = mv.probas_
        self.labels_ = mv.labels_
        return self

    def fit(self, data: pd.DataFrame) -> "Wawa":
        """Fits the model to the training data.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        Returns:
            Wawa: self.
        """

        # TODO: support weights?
        data = data[["task", "worker", "label"]]
        mv = MajorityVote().fit(data)
        assert mv.labels_ is not None, "no labels_"
        self.skills_ = get_accuracy(data, true_labels=mv.labels_, by="worker")
        return self

    def predict(self, data: pd.DataFrame) -> "pd.Series[Any]":
        """Predicts the true labels of tasks when the model is fitted.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        Returns:
            Series: The task labels. The `pandas.Series` data is indexed by `task`
                so that `labels.loc[task]` is the most likely true label of tasks.
        """

        self._apply(data)
        assert self.labels_ is not None, "no labels_"
        return self.labels_

    def predict_proba(self, data: pd.DataFrame) -> pd.DataFrame:
        """Returns probability distributions of labels for each task when the model is fitted.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        Returns:
            DataFrame: Probability distributions of task labels.
                The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is the probability that the `task` true label is equal to `label`.
                Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.
        """

        self._apply(data)
        assert self.probas_ is not None, "no probas_"
        return self.probas_

    def fit_predict(self, data: pd.DataFrame) -> "pd.Series[Any]":
        """Fits the model to the training data and returns the aggregated results.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        Returns:
            Series: The task labels. The `pandas.Series` data is indexed by `task`
                so that `labels.loc[task]` is the most likely true label of tasks.
        """

        return self.fit(data).predict(data)

    def fit_predict_proba(self, data: pd.DataFrame) -> pd.DataFrame:
        """Fits the model to the training data and returns probability distributions of labels for each task.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        Returns:
            DataFrame: Probability distributions of task labels.
                The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is the probability that the `task` true label is equal to `label`.
                Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.
        """

        return self.fit(data).predict_proba(data)

probas_ = attr.ib(init=False) class-attribute instance-attribute

The probability distributions of task labels. The pandas.DataFrame data is indexed by task so that result.loc[task, label] is the probability that the task true label is equal to label. Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.

skills_ = named_series_attrib(name='skill') class-attribute instance-attribute

The workers' skills. The pandas.Series data is indexed by worker and has the corresponding worker skill.

fit(data)

Fits the model to the training data.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required

Returns:

Name Type Description
Wawa Wawa

self.

Source code in crowdkit/aggregation/classification/wawa.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def fit(self, data: pd.DataFrame) -> "Wawa":
    """Fits the model to the training data.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

    Returns:
        Wawa: self.
    """

    # TODO: support weights?
    data = data[["task", "worker", "label"]]
    mv = MajorityVote().fit(data)
    assert mv.labels_ is not None, "no labels_"
    self.skills_ = get_accuracy(data, true_labels=mv.labels_, by="worker")
    return self

fit_predict(data)

Fits the model to the training data and returns the aggregated results.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required

Returns:

Name Type Description
Series Series[Any]

The task labels. The pandas.Series data is indexed by task so that labels.loc[task] is the most likely true label of tasks.

Source code in crowdkit/aggregation/classification/wawa.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def fit_predict(self, data: pd.DataFrame) -> "pd.Series[Any]":
    """Fits the model to the training data and returns the aggregated results.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

    Returns:
        Series: The task labels. The `pandas.Series` data is indexed by `task`
            so that `labels.loc[task]` is the most likely true label of tasks.
    """

    return self.fit(data).predict(data)

fit_predict_proba(data)

Fits the model to the training data and returns probability distributions of labels for each task.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required

Returns:

Name Type Description
DataFrame DataFrame

Probability distributions of task labels. The pandas.DataFrame data is indexed by task so that result.loc[task, label] is the probability that the task true label is equal to label. Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.

Source code in crowdkit/aggregation/classification/wawa.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def fit_predict_proba(self, data: pd.DataFrame) -> pd.DataFrame:
    """Fits the model to the training data and returns probability distributions of labels for each task.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

    Returns:
        DataFrame: Probability distributions of task labels.
            The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is the probability that the `task` true label is equal to `label`.
            Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.
    """

    return self.fit(data).predict_proba(data)

predict(data)

Predicts the true labels of tasks when the model is fitted.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required

Returns:

Name Type Description
Series Series[Any]

The task labels. The pandas.Series data is indexed by task so that labels.loc[task] is the most likely true label of tasks.

Source code in crowdkit/aggregation/classification/wawa.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def predict(self, data: pd.DataFrame) -> "pd.Series[Any]":
    """Predicts the true labels of tasks when the model is fitted.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

    Returns:
        Series: The task labels. The `pandas.Series` data is indexed by `task`
            so that `labels.loc[task]` is the most likely true label of tasks.
    """

    self._apply(data)
    assert self.labels_ is not None, "no labels_"
    return self.labels_

predict_proba(data)

Returns probability distributions of labels for each task when the model is fitted.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required

Returns:

Name Type Description
DataFrame DataFrame

Probability distributions of task labels. The pandas.DataFrame data is indexed by task so that result.loc[task, label] is the probability that the task true label is equal to label. Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.

Source code in crowdkit/aggregation/classification/wawa.py
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def predict_proba(self, data: pd.DataFrame) -> pd.DataFrame:
    """Returns probability distributions of labels for each task when the model is fitted.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

    Returns:
        DataFrame: Probability distributions of task labels.
            The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is the probability that the `task` true label is equal to `label`.
            Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.
    """

    self._apply(data)
    assert self.probas_ is not None, "no probas_"
    return self.probas_

ZeroBasedSkill

Bases: BaseClassificationAggregator

The Zero-Based Skill (ZBS) aggregation model performs weighted majority voting on tasks. After processing a pool of tasks, it re-estimates the workers' skills with a gradient descend step to optimize the mean squared error of the current skills and the fraction of responses that are equal to the aggregated labels.

This process is repeated until the labels change or exceed the number of iterations.

Note

It is necessary that all workers in the dataset that is sent to predict exist in responses to the dataset that was sent to fit.

Examples:

>>> from crowdkit.aggregation import ZeroBasedSkill
>>> from crowdkit.datasets import load_dataset
>>> df, gt = load_dataset('relevance-2')
>>> result = ZeroBasedSkill().fit_predict(df)
Source code in crowdkit/aggregation/classification/zero_based_skill.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
@attr.attrs(auto_attribs=True)
class ZeroBasedSkill(BaseClassificationAggregator):
    r"""The **Zero-Based Skill** (ZBS) aggregation model performs weighted majority voting on tasks. After processing a pool of tasks,
    it re-estimates the workers' skills with a gradient descend step to optimize
    the mean squared error of the current skills and the fraction of responses that
    are equal to the aggregated labels.

    This process is repeated until the labels change or exceed the number of iterations.

    Note:
        It is necessary that all workers in the dataset that is sent to `predict` exist in responses to
        the dataset that was sent to `fit`.

    Examples:
        >>> from crowdkit.aggregation import ZeroBasedSkill
        >>> from crowdkit.datasets import load_dataset
        >>> df, gt = load_dataset('relevance-2')
        >>> result = ZeroBasedSkill().fit_predict(df)
    """

    n_iter: int = 100
    """The maximum number of iterations."""

    lr_init: float = 1.0
    """The initial learning rate."""

    lr_steps_to_reduce: int = 20
    """The number of steps required to reduce the learning rate."""

    lr_reduce_factor: float = 0.5
    """The factor by which the learning rate will be multiplied every `lr_steps_to_reduce` step."""

    eps: float = 1e-5
    """The convergence threshold."""

    skills_: Optional["pd.Series[Any]"] = named_series_attrib(name="skill")
    """The workers' skills.
    The `pandas.Series` data is indexed by `worker` and has the corresponding worker skill."""

    probas_: Optional[pd.DataFrame] = attr.ib(init=False)
    """The probability distributions of task labels.
    The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is the probability that
    the `task` true label is equal to `label`. Each probability is in the range from 0 to 1,
    all task probabilities must sum up to 1."""

    def _init_skills(self, data: pd.DataFrame) -> "pd.Series[Any]":
        skill_value = 1 / data.label.unique().size + self.eps
        skill_index = pd.Index(data.worker.unique(), name="worker")
        return pd.Series(skill_value, index=skill_index)

    def _apply(self, data: pd.DataFrame) -> "ZeroBasedSkill":
        check_is_fitted(self, attributes="skills_")
        mv = MajorityVote().fit(data, self.skills_)
        self.labels_ = mv.labels_
        self.probas_ = mv.probas_
        return self

    def fit(self, data: pd.DataFrame) -> "ZeroBasedSkill":
        """Fits the model to the training data.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        Returns:
            ZeroBasedSkill: self.
        """

        # Initialization
        data = data[["task", "worker", "label"]]
        skills = self._init_skills(data)
        mv = MajorityVote()

        # Updating skills and re-fitting majority vote n_iter times
        learning_rate = self.lr_init
        for iteration in range(1, self.n_iter + 1):
            if iteration % self.lr_steps_to_reduce == 0:
                learning_rate *= self.lr_reduce_factor
            mv.fit(data, skills=skills)
            assert mv.labels_ is not None, "no labels_"
            skills = skills + learning_rate * (
                get_accuracy(data, mv.labels_, by="worker") - skills
            )

        # Saving results
        self.skills_ = skills

        return self

    def predict(self, data: pd.DataFrame) -> "pd.Series[Any]":
        """Predicts the true labels of tasks when the model is fitted.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        Returns:
            Series: The task labels. The `pandas.Series` data is indexed by `task`
                so that `labels.loc[task]` is the most likely true label of tasks.
        """

        self._apply(data)
        assert self.labels_ is not None, "no labels_"
        return self.labels_

    def predict_proba(self, data: pd.DataFrame) -> pd.DataFrame:
        """Returns probability distributions of labels for each task when the model is fitted.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        Returns:
            DataFrame: The probability distributions of task labels.
                The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is the probability that the `task` true label is equal to `label`.
                Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.
        """

        self._apply(data)
        assert self.probas_ is not None, "no probas_"
        return self.probas_

    def fit_predict(self, data: pd.DataFrame) -> "pd.Series[Any]":
        """Fits the model to the training data and returns the aggregated results.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.
        Returns:
            Series: The task labels. The `pandas.Series` data is indexed by `task`
                so that `labels.loc[task]` is the most likely true label of tasks.
        """

        return self.fit(data).predict(data)

    def fit_predict_proba(self, data: pd.DataFrame) -> pd.DataFrame:
        """Fits the model to the training data and returns the aggregated results.
        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.
        Returns:
            Series: The task labels. The `pandas.Series` data is indexed by `task`
                so that `labels.loc[task]` is the most likely true label of tasks.
        """

        return self.fit(data).predict_proba(data)

eps = 1e-05 class-attribute instance-attribute

The convergence threshold.

lr_init = 1.0 class-attribute instance-attribute

The initial learning rate.

lr_reduce_factor = 0.5 class-attribute instance-attribute

The factor by which the learning rate will be multiplied every lr_steps_to_reduce step.

lr_steps_to_reduce = 20 class-attribute instance-attribute

The number of steps required to reduce the learning rate.

n_iter = 100 class-attribute instance-attribute

The maximum number of iterations.

probas_ = attr.ib(init=False) class-attribute instance-attribute

The probability distributions of task labels. The pandas.DataFrame data is indexed by task so that result.loc[task, label] is the probability that the task true label is equal to label. Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.

skills_ = named_series_attrib(name='skill') class-attribute instance-attribute

The workers' skills. The pandas.Series data is indexed by worker and has the corresponding worker skill.

fit(data)

Fits the model to the training data.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required

Returns:

Name Type Description
ZeroBasedSkill ZeroBasedSkill

self.

Source code in crowdkit/aggregation/classification/zero_based_skill.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def fit(self, data: pd.DataFrame) -> "ZeroBasedSkill":
    """Fits the model to the training data.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

    Returns:
        ZeroBasedSkill: self.
    """

    # Initialization
    data = data[["task", "worker", "label"]]
    skills = self._init_skills(data)
    mv = MajorityVote()

    # Updating skills and re-fitting majority vote n_iter times
    learning_rate = self.lr_init
    for iteration in range(1, self.n_iter + 1):
        if iteration % self.lr_steps_to_reduce == 0:
            learning_rate *= self.lr_reduce_factor
        mv.fit(data, skills=skills)
        assert mv.labels_ is not None, "no labels_"
        skills = skills + learning_rate * (
            get_accuracy(data, mv.labels_, by="worker") - skills
        )

    # Saving results
    self.skills_ = skills

    return self

fit_predict(data)

Fits the model to the training data and returns the aggregated results.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required

Returns: Series: The task labels. The pandas.Series data is indexed by task so that labels.loc[task] is the most likely true label of tasks.

Source code in crowdkit/aggregation/classification/zero_based_skill.py
136
137
138
139
140
141
142
143
144
145
146
147
def fit_predict(self, data: pd.DataFrame) -> "pd.Series[Any]":
    """Fits the model to the training data and returns the aggregated results.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.
    Returns:
        Series: The task labels. The `pandas.Series` data is indexed by `task`
            so that `labels.loc[task]` is the most likely true label of tasks.
    """

    return self.fit(data).predict(data)

fit_predict_proba(data)

Fits the model to the training data and returns the aggregated results. Args: data (DataFrame): The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns. Returns: Series: The task labels. The pandas.Series data is indexed by task so that labels.loc[task] is the most likely true label of tasks.

Source code in crowdkit/aggregation/classification/zero_based_skill.py
149
150
151
152
153
154
155
156
157
158
159
def fit_predict_proba(self, data: pd.DataFrame) -> pd.DataFrame:
    """Fits the model to the training data and returns the aggregated results.
    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.
    Returns:
        Series: The task labels. The `pandas.Series` data is indexed by `task`
            so that `labels.loc[task]` is the most likely true label of tasks.
    """

    return self.fit(data).predict_proba(data)

predict(data)

Predicts the true labels of tasks when the model is fitted.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required

Returns:

Name Type Description
Series Series[Any]

The task labels. The pandas.Series data is indexed by task so that labels.loc[task] is the most likely true label of tasks.

Source code in crowdkit/aggregation/classification/zero_based_skill.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def predict(self, data: pd.DataFrame) -> "pd.Series[Any]":
    """Predicts the true labels of tasks when the model is fitted.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

    Returns:
        Series: The task labels. The `pandas.Series` data is indexed by `task`
            so that `labels.loc[task]` is the most likely true label of tasks.
    """

    self._apply(data)
    assert self.labels_ is not None, "no labels_"
    return self.labels_

predict_proba(data)

Returns probability distributions of labels for each task when the model is fitted.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required

Returns:

Name Type Description
DataFrame DataFrame

The probability distributions of task labels. The pandas.DataFrame data is indexed by task so that result.loc[task, label] is the probability that the task true label is equal to label. Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.

Source code in crowdkit/aggregation/classification/zero_based_skill.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def predict_proba(self, data: pd.DataFrame) -> pd.DataFrame:
    """Returns probability distributions of labels for each task when the model is fitted.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

    Returns:
        DataFrame: The probability distributions of task labels.
            The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is the probability that the `task` true label is equal to `label`.
            Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.
    """

    self._apply(data)
    assert self.probas_ is not None, "no probas_"
    return self.probas_