Skip to content

Classification

Routines for classification.

DawidSkene

Bases: BaseClassificationAggregator

The Dawid-Skene aggregation model is a probabilistic model that parametrizes the expertise level of workers with confusion matrices.

Let \(e^w\) be a worker confusion (error) matrix of size \(K \times K\) in case of the \(K\) class classification, \(p\) be a vector of prior class probabilities, \(z_j\) be a true task label, and \(y^w_j\) be a worker response to the task \(j\). The relationship between these parameters is represented by the following latent label model.

Dawid-Skene latent label model

Here the prior true label probability is $$ \operatorname{Pr}(z_j = c) = p[c], $$ and the probability distribution of the worker responses with the true label \(c\) is represented by the corresponding column of the error matrix: $$ \operatorname{Pr}(y_j^w = k | z_j = c) = e^w[k, c]. $$

Parameters \(p\), \(e^w\), and latent variables \(z\) are optimized with the Expectation-Maximization algorithm: 1. E-step. Estimates the true task label probabilities using the specified workers' responses, the prior label probabilities, and the workers' error probability matrix. 2. M-step. Estimates the workers' error probability matrix using the specified workers' responses and the true task label probabilities.

A. Philip Dawid and Allan M. Skene. Maximum Likelihood Estimation of Observer Error-Rates Using the EM Algorithm. Journal of the Royal Statistical Society. Series C (Applied Statistics), Vol. 28, 1 (1979), 20–28.

https://doi.org/10.2307/2346806

Examples:

>>> from crowdkit.aggregation import DawidSkene
>>> from crowdkit.datasets import load_dataset
>>> df, gt = load_dataset('relevance-2')
>>> ds = DawidSkene(100)
>>> result = ds.fit_predict(df)
Source code in crowdkit/aggregation/classification/dawid_skene.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
@attr.s
class DawidSkene(BaseClassificationAggregator):
    r"""The **Dawid-Skene** aggregation model is a probabilistic model that parametrizes the expertise level of workers with confusion matrices.

    Let $e^w$ be a worker confusion (error) matrix of size $K \times K$ in case of the $K$ class classification,
    $p$ be a vector of prior class probabilities, $z_j$ be a true task label, and $y^w_j$ be a worker
    response to the task $j$. The relationship between these parameters is represented by the following latent
    label model.

    ![Dawid-Skene latent label model](https://tlk.s3.yandex.net/crowd-kit/docs/ds_llm.png)

    Here the prior true label probability is
    $$
    \operatorname{Pr}(z_j = c) = p[c],
    $$
    and the probability distribution of the worker responses with the true label $c$ is represented by the
    corresponding column of the error matrix:
    $$
    \operatorname{Pr}(y_j^w = k | z_j = c) = e^w[k, c].
    $$

    Parameters $p$, $e^w$, and latent variables $z$ are optimized with the Expectation-Maximization algorithm:
    1. **E-step**. Estimates the true task label probabilities using the specified workers' responses,
        the prior label probabilities, and the workers' error probability matrix.
    2. **M-step**. Estimates the workers' error probability matrix using the specified workers' responses and the true task label probabilities.

    A. Philip Dawid and Allan M. Skene. Maximum Likelihood Estimation of Observer Error-Rates Using the EM Algorithm.
    *Journal of the Royal Statistical Society. Series C (Applied Statistics), Vol. 28*, 1 (1979), 20–28.

    <https://doi.org/10.2307/2346806>

    Examples:
        >>> from crowdkit.aggregation import DawidSkene
        >>> from crowdkit.datasets import load_dataset
        >>> df, gt = load_dataset('relevance-2')
        >>> ds = DawidSkene(100)
        >>> result = ds.fit_predict(df)
    """

    n_iter: int = attr.ib(default=100)
    """The maximum number of EM iterations."""

    tol: float = attr.ib(default=1e-5)
    """The tolerance stopping criterion for iterative methods with a variable number of steps.
    The algorithm converges when the loss change is less than the `tol` parameter."""

    probas_: Optional[pd.DataFrame] = attr.ib(init=False)
    """The probability distributions of task labels.
    The `pandas.Series` data is indexed by `task` so that `labels.loc[task]` is the most likely true label of tasks."""

    priors_: Optional["pd.Series[Any]"] = named_series_attrib(name="prior")
    """The prior label distribution.
    The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is the probability that
    the `task` true label is equal to `label`. Each probability is in the range from 0 to 1, all task probabilities
    must sum up to 1."""

    errors_: Optional[pd.DataFrame] = attr.ib(init=False)
    """The workers' error matrices. The `pandas.DataFrame` data is indexed by `worker` and `label` with a column
    for every `label_id` found in `data` so that `result.loc[worker, observed_label, true_label]` is the probability
    that `worker` produces `observed_label`, given that the task true label is `true_label`."""

    loss_history_: List[float] = attr.ib(init=False)
    """ A list of loss values during training."""

    @staticmethod
    def _m_step(data: pd.DataFrame, probas: pd.DataFrame) -> pd.DataFrame:
        """Performs M-step of the Dawid-Skene algorithm.

        Estimates the workers' error probability matrix using the specified workers' responses and the true task label probabilities.
        """
        joined = data.join(probas, on="task")
        joined.drop(columns=["task"], inplace=True)

        errors = joined.groupby(["worker", "label"], sort=False).sum()
        errors.clip(lower=_EPS, inplace=True)
        errors /= errors.groupby("worker", sort=False).sum()

        return errors

    @staticmethod
    def _e_step(
        data: pd.DataFrame, priors: "pd.Series[Any]", errors: pd.DataFrame
    ) -> pd.DataFrame:
        """
        Performs E-step of the Dawid-Skene algorithm.

        Estimates the true task label probabilities using the specified workers' responses,
        the prior label probabilities, and the workers' error probability matrix.
        """

        # We have to multiply lots of probabilities and such products are known to converge
        # to zero exponentially fast. To avoid floating-point precision problems we work with
        # logs of original values
        joined = data.join(np.log2(errors), on=["worker", "label"])  # type: ignore
        joined.drop(columns=["worker", "label"], inplace=True)
        log_likelihoods = np.log2(priors) + joined.groupby("task", sort=False).sum()
        log_likelihoods.rename_axis("label", axis=1, inplace=True)

        # Exponentiating log_likelihoods 'as is' may still get us beyond our precision.
        # So we shift every row of log_likelihoods by a constant (which is equivalent to
        # multiplying likelihoods rows by a constant) so that max log_likelihood in each
        # row is equal to 0. This trick ensures proper scaling after exponentiating and
        # does not affect the result of E-step
        scaled_likelihoods = np.exp2(
            log_likelihoods.sub(log_likelihoods.max(axis=1), axis=0)
        )
        scaled_likelihoods = scaled_likelihoods.div(
            scaled_likelihoods.sum(axis=1), axis=0
        )
        # Convert columns types to label type
        scaled_likelihoods.columns = pd.Index(
            scaled_likelihoods.columns, name="label", dtype=data.label.dtype
        )
        return cast(pd.DataFrame, scaled_likelihoods)

    def _evidence_lower_bound(
        self,
        data: pd.DataFrame,
        probas: pd.DataFrame,
        priors: "pd.Series[Any]",
        errors: pd.DataFrame,
    ) -> float:
        # calculate joint probability log-likelihood expectation over probas
        joined = data.join(np.log(errors), on=["worker", "label"])  # type: ignore

        # escape boolean index/column names to prevent confusion between indexing by boolean array and iterable of names
        joined = joined.rename(columns={True: "True", False: "False"}, copy=False)
        priors = priors.rename(index={True: "True", False: "False"}, copy=False)

        joined.loc[:, priors.index] = joined.loc[:, priors.index].add(np.log(priors))  # type: ignore

        joined.set_index(["task", "worker"], inplace=True)
        joint_expectation = (
            (probas.rename(columns={True: "True", False: "False"}) * joined).sum().sum()
        )

        entropy = -(np.log(probas) * probas).sum().sum()
        return float(joint_expectation + entropy)

    def fit(self, data: pd.DataFrame) -> "DawidSkene":
        """Fits the model to the training data with the EM algorithm.
        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.
        Returns:
            DawidSkene: self.
        """

        data = data[["task", "worker", "label"]]

        # Early exit
        if not data.size:
            self.probas_ = pd.DataFrame()
            self.priors_ = pd.Series(dtype=float)
            self.errors_ = pd.DataFrame()
            self.labels_ = pd.Series(dtype=float)
            return self

        # Initialization
        probas = MajorityVote().fit_predict_proba(data)
        priors = probas.mean()
        errors = self._m_step(data, probas)
        loss = -np.inf
        self.loss_history_ = []

        # Updating proba and errors n_iter times
        for _ in range(self.n_iter):
            probas = self._e_step(data, priors, errors)
            priors = probas.mean()
            errors = self._m_step(data, probas)
            new_loss = self._evidence_lower_bound(data, probas, priors, errors) / len(
                data
            )
            self.loss_history_.append(new_loss)

            if new_loss - loss < self.tol:
                break
            loss = new_loss

        probas.columns = pd.Index(
            probas.columns, name="label", dtype=probas.columns.dtype
        )
        # Saving results
        self.probas_ = probas
        self.priors_ = priors
        self.errors_ = errors
        self.labels_ = get_most_probable_labels(probas)

        return self

    def fit_predict_proba(self, data: pd.DataFrame) -> pd.DataFrame:
        """Fits the model to the training data and returns probability distributions of labels for each task.
        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.
        Returns:
            DataFrame: Probability distributions of task labels.
                The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is the probability that the `task` true label is equal to `label`.
                Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.
        """

        self.fit(data)
        assert self.probas_ is not None, "no probas_"
        return self.probas_

    def fit_predict(self, data: pd.DataFrame) -> "pd.Series[Any]":
        """Fits the model to the training data and returns the aggregated results.
        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.
        Returns:
            Series: Task labels. The `pandas.Series` data is indexed by `task` so that `labels.loc[task]` is the most likely true label of tasks.
        """

        self.fit(data)
        assert self.labels_ is not None, "no labels_"
        return self.labels_

errors_: Optional[pd.DataFrame] = attr.ib(init=False) class-attribute instance-attribute

The workers' error matrices. The pandas.DataFrame data is indexed by worker and label with a column for every label_id found in data so that result.loc[worker, observed_label, true_label] is the probability that worker produces observed_label, given that the task true label is true_label.

loss_history_: List[float] = attr.ib(init=False) class-attribute instance-attribute

A list of loss values during training.

n_iter: int = attr.ib(default=100) class-attribute instance-attribute

The maximum number of EM iterations.

priors_: Optional[pd.Series[Any]] = named_series_attrib(name='prior') class-attribute instance-attribute

The prior label distribution. The pandas.DataFrame data is indexed by task so that result.loc[task, label] is the probability that the task true label is equal to label. Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.

probas_: Optional[pd.DataFrame] = attr.ib(init=False) class-attribute instance-attribute

The probability distributions of task labels. The pandas.Series data is indexed by task so that labels.loc[task] is the most likely true label of tasks.

tol: float = attr.ib(default=1e-05) class-attribute instance-attribute

The tolerance stopping criterion for iterative methods with a variable number of steps. The algorithm converges when the loss change is less than the tol parameter.

fit(data)

Fits the model to the training data with the EM algorithm. Args: data (DataFrame): The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns. Returns: DawidSkene: self.

Source code in crowdkit/aggregation/classification/dawid_skene.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def fit(self, data: pd.DataFrame) -> "DawidSkene":
    """Fits the model to the training data with the EM algorithm.
    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.
    Returns:
        DawidSkene: self.
    """

    data = data[["task", "worker", "label"]]

    # Early exit
    if not data.size:
        self.probas_ = pd.DataFrame()
        self.priors_ = pd.Series(dtype=float)
        self.errors_ = pd.DataFrame()
        self.labels_ = pd.Series(dtype=float)
        return self

    # Initialization
    probas = MajorityVote().fit_predict_proba(data)
    priors = probas.mean()
    errors = self._m_step(data, probas)
    loss = -np.inf
    self.loss_history_ = []

    # Updating proba and errors n_iter times
    for _ in range(self.n_iter):
        probas = self._e_step(data, priors, errors)
        priors = probas.mean()
        errors = self._m_step(data, probas)
        new_loss = self._evidence_lower_bound(data, probas, priors, errors) / len(
            data
        )
        self.loss_history_.append(new_loss)

        if new_loss - loss < self.tol:
            break
        loss = new_loss

    probas.columns = pd.Index(
        probas.columns, name="label", dtype=probas.columns.dtype
    )
    # Saving results
    self.probas_ = probas
    self.priors_ = priors
    self.errors_ = errors
    self.labels_ = get_most_probable_labels(probas)

    return self

fit_predict(data)

Fits the model to the training data and returns the aggregated results. Args: data (DataFrame): The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns. Returns: Series: Task labels. The pandas.Series data is indexed by task so that labels.loc[task] is the most likely true label of tasks.

Source code in crowdkit/aggregation/classification/dawid_skene.py
221
222
223
224
225
226
227
228
229
230
231
232
def fit_predict(self, data: pd.DataFrame) -> "pd.Series[Any]":
    """Fits the model to the training data and returns the aggregated results.
    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.
    Returns:
        Series: Task labels. The `pandas.Series` data is indexed by `task` so that `labels.loc[task]` is the most likely true label of tasks.
    """

    self.fit(data)
    assert self.labels_ is not None, "no labels_"
    return self.labels_

fit_predict_proba(data)

Fits the model to the training data and returns probability distributions of labels for each task. Args: data (DataFrame): The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns. Returns: DataFrame: Probability distributions of task labels. The pandas.DataFrame data is indexed by task so that result.loc[task, label] is the probability that the task true label is equal to label. Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.

Source code in crowdkit/aggregation/classification/dawid_skene.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
def fit_predict_proba(self, data: pd.DataFrame) -> pd.DataFrame:
    """Fits the model to the training data and returns probability distributions of labels for each task.
    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.
    Returns:
        DataFrame: Probability distributions of task labels.
            The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is the probability that the `task` true label is equal to `label`.
            Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.
    """

    self.fit(data)
    assert self.probas_ is not None, "no probas_"
    return self.probas_

GLAD

Bases: BaseClassificationAggregator

The GLAD (Generative model of Labels, Abilities, and Difficulties) model is a probabilistic model that parametrizes the abilities of workers and the difficulty of tasks.

Let's consider a case of \(K\) class classification. Let \(p\) be a vector of prior class probabilities, \(\alpha_i \in (-\infty, +\infty)\) be a worker ability parameter, \(\beta_j \in (0, +\infty)\) be an inverse task difficulty, \(z_j\) be a latent variable representing the true task label, and \(y^i_j\) be a worker response that we observe. The relationships between these variables and parameters according to GLAD are represented by the following latent label model:

GLAD latent label model

The prior probability of \(z_j\) being equal to \(c\) is $$ \operatorname{Pr}(z_j = c) = p[c], $$ and the probability distribution of the worker responses with the true label \(c\) follows the single coin Dawid-Skene model where the true label probability is a sigmoid function of the product of the worker ability and the inverse task difficulty: $$ \operatorname{Pr}(y^i_j = k | z_j = c) = \begin{cases}a(i, j), & k = c \ \frac{1 - a(i,j)}{K-1}, & k \neq c\end{cases}, $$ where $$ a(i,j) = \frac{1}{1 + \exp(-\alpha_i\beta_j)}. $$

Parameters \(p\), \(\alpha\), \(\beta\), and latent variables \(z\) are optimized with the Expectation-Minimization algorithm: 1. E-step. Estimates the true task label probabilities using the alpha parameters of workers' abilities, the prior label probabilities, and the beta parameters of task difficulty. 2. M-step. Optimizes the alpha and beta parameters using the conjugate gradient method.

J. Whitehill, P. Ruvolo, T. Wu, J. Bergsma, and J. Movellan. Whose Vote Should Count More: Optimal Integration of Labels from Labelers of Unknown Expertise. Proceedings of the 22nd International Conference on Neural Information Processing Systems, 2009

https://proceedings.neurips.cc/paper/2009/file/f899139df5e1059396431415e770c6dd-Paper.pdf

Examples:

>>> from crowdkit.aggregation import GLAD
>>> from crowdkit.datasets import load_dataset
>>> df, gt = load_dataset('relevance-2')
>>> glad = GLAD()
>>> result = glad.fit_predict(df)
Source code in crowdkit/aggregation/classification/glad.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
@attr.s
class GLAD(BaseClassificationAggregator):
    r"""The **GLAD** (Generative model of Labels, Abilities, and Difficulties) model is a probabilistic model that parametrizes the abilities of workers and the difficulty of tasks.

    Let's consider a case of $K$ class classification. Let $p$ be a vector of prior class probabilities,
    $\alpha_i \in (-\infty, +\infty)$ be a worker ability parameter, $\beta_j \in (0, +\infty)$ be
    an inverse task difficulty, $z_j$ be a latent variable representing the true task label, and $y^i_j$
    be a worker response that we observe. The relationships between these variables and parameters according
    to GLAD are represented by the following latent label model:

    ![GLAD latent label model](https://tlk.s3.yandex.net/crowd-kit/docs/glad_llm.png)

    The prior probability of $z_j$ being equal to $c$ is
    $$
    \operatorname{Pr}(z_j = c) = p[c],
    $$
    and the probability distribution of the worker responses with the true label $c$ follows the
    single coin Dawid-Skene model where the true label probability is a sigmoid function of the product of the
    worker ability and the inverse task difficulty:
    $$
    \operatorname{Pr}(y^i_j = k | z_j = c) = \begin{cases}a(i, j), & k = c \\ \frac{1 - a(i,j)}{K-1}, & k \neq c\end{cases},
    $$
    where
    $$
    a(i,j) = \frac{1}{1 + \exp(-\alpha_i\beta_j)}.
    $$

    Parameters $p$, $\alpha$, $\beta$, and latent variables $z$ are optimized with the Expectation-Minimization algorithm:
    1. **E-step**. Estimates the true task label probabilities using the alpha parameters of workers' abilities,
        the prior label probabilities, and the beta parameters of task difficulty.
    2. **M-step**. Optimizes the alpha and beta parameters using the conjugate gradient method.

    J. Whitehill, P. Ruvolo, T. Wu, J. Bergsma, and J. Movellan.
    Whose Vote Should Count More: Optimal Integration of Labels from Labelers of Unknown Expertise.
    *Proceedings of the 22nd International Conference on Neural Information Processing Systems*, 2009

    <https://proceedings.neurips.cc/paper/2009/file/f899139df5e1059396431415e770c6dd-Paper.pdf>

    Examples:
        >>> from crowdkit.aggregation import GLAD
        >>> from crowdkit.datasets import load_dataset
        >>> df, gt = load_dataset('relevance-2')
        >>> glad = GLAD()
        >>> result = glad.fit_predict(df)
    """

    n_iter: int = attr.ib(default=100)
    """The maximum number of EM iterations."""

    tol: float = attr.ib(default=1e-5)
    """The tolerance stopping criterion for iterative methods with a variable number of steps.
    The algorithm converges when the loss change is less than the `tol` parameter."""

    silent: bool = attr.ib(default=True)
    """Specifies if the progress bar will be shown (false) or not (true)."""

    labels_priors: Optional["pd.Series[Any]"] = attr.ib(default=None)
    """The prior label probabilities."""

    alphas_priors_mean: Optional["pd.Series[Any]"] = attr.ib(default=None)
    """The prior mean value of the alpha parameters."""

    betas_priors_mean: Optional["pd.Series[Any]"] = attr.ib(default=None)
    """The prior mean value of the beta parameters."""

    m_step_max_iter: int = attr.ib(default=25)
    """The maximum number of iterations of the conjugate gradient method in the M-step."""

    m_step_tol: float = attr.ib(default=1e-2)
    """The tolerance stopping criterion of the conjugate gradient method in the M-step."""

    probas_: Optional[pd.DataFrame] = attr.ib(init=False)
    """The probability distributions of task labels.
    The data frame is indexed by `task` so that `result.loc[task, label]` is the probability that the `task`
    true label is equal to `label`. Each probability is in the range from 0 to 1, all task probabilities
    must sum up to 1."""

    alphas_: "pd.Series[Any]" = named_series_attrib(name="alpha")
    """The alpha parameters of workers' abilities.
    The `pandas.Series` data is indexed by `worker` that contains the estimated alpha parameters."""

    betas_: "pd.Series[Any]" = named_series_attrib(name="beta")
    """The beta parameters of task difficulty.
    The `pandas.Series` data is indexed by `task` that contains the estimated beta parameters."""

    loss_history_: List[float] = attr.ib(init=False)
    """A list of loss values during training."""

    def _join_all(
        self,
        data: pd.DataFrame,
        alphas: "pd.Series[Any]",
        betas: "pd.Series[Any]",
        priors: "pd.Series[Any]",
    ) -> pd.DataFrame:
        """Makes a data frame with format `(task, worker, label, variable) -> (alpha, beta, posterior, delta)`"""
        labels = list(priors.index)
        data = data.set_index("task")
        data[labels] = 0
        data.reset_index(inplace=True)
        data = data.melt(
            id_vars=["task", "worker", "label"],
            value_vars=labels,
            value_name="posterior",
        )
        data = data.set_index("variable")
        data.reset_index(inplace=True)
        data.set_index("task", inplace=True)
        data["beta"] = betas
        data = data.reset_index().set_index("worker")
        data["alpha"] = alphas
        data.reset_index(inplace=True)
        data["delta"] = data["label"] == data["variable"]
        return data

    def _e_step(self, data: pd.DataFrame) -> pd.DataFrame:
        """
        Performs E-step of GLAD algorithm.

        Estimates the true task label probabilities using the alpha parameters of workers' abilities,
        the prior label probabilities, and the beta parameters of task difficulty.
        """
        alpha_beta = data["alpha"] * np.exp(data["beta"])
        log_sigma = -self._softplus(-alpha_beta)
        log_one_minus_sigma = -self._softplus(alpha_beta)
        data["posterior"] = data["delta"] * log_sigma + (1 - data["delta"]) * (
            log_one_minus_sigma - np.log(len(self.prior_labels_) - 1)
        )
        # sum up by workers
        probas = data.groupby(["task", "variable"]).sum(numeric_only=True)["posterior"]
        # add priors to every label
        probas = probas.add(np.log(cast("pd.Series[Any]", self.priors_)), level=1)
        # exponentiate and normalize
        probas = probas.groupby(["task"]).transform(self._softmax)
        # put posterior in data['posterior']
        probas.name = "posterior"
        data = pd.merge(
            data.drop("posterior", axis=1), probas, on=["task", "variable"], copy=False
        )

        self.probas_ = probas.unstack()
        return data

    def _gradient_Q(
        self, data: pd.DataFrame
    ) -> Tuple["pd.Series[Any]", "pd.Series[Any]"]:
        """Computes gradient of loss function"""

        sigma = scipy.special.expit(data["alpha"] * np.exp(data["beta"]))
        # multiply by exponent of beta because of beta -> exp(beta) reparameterization
        data["dQb"] = (
            data["posterior"]
            * (data["delta"] - sigma)
            * data["alpha"]
            * np.exp(data["beta"])
        )
        dQbeta = data.groupby("task").sum(numeric_only=True)["dQb"]

        # gradient of priors on betas
        assert self.betas_ is not None, "betas_ is None"
        assert self.betas_priors_mean_ is not None, "betas_priors_mean_ is None"
        dQbeta -= self.betas_ - self.betas_priors_mean_

        data["dQa"] = data["posterior"] * (data["delta"] - sigma) * np.exp(data["beta"])
        dQalpha = data.groupby("worker").sum(numeric_only=True)["dQa"]
        # gradient of priors on alphas
        assert self.alphas_ is not None, "alphas_ is None"
        assert self.alphas_priors_mean_ is not None, "alphas_priors_mean_ is None"
        dQalpha -= self.alphas_ - self.alphas_priors_mean_
        return dQalpha, dQbeta

    def _compute_Q(self, data: pd.DataFrame) -> float:
        """Computes loss function"""

        alpha_beta = data["alpha"] * np.exp(data["beta"])
        log_sigma = -self._softplus(-alpha_beta)
        log_one_minus_sigma = -self._softplus(alpha_beta)
        data["task_expectation"] = data["posterior"] * (
            data["delta"] * log_sigma
            + (1 - data["delta"])
            * (log_one_minus_sigma - np.log(len(self.prior_labels_) - 1))
        )
        Q = data["task_expectation"].sum()

        # priors on alphas and betas
        assert self.alphas_ is not None, "alphas_ is None"
        assert self.alphas_priors_mean_ is not None, "alphas_priors_mean_ is None"
        assert self.betas_ is not None, "betas_ is None"
        assert self.betas_priors_mean_ is not None, "betas_priors_mean_ is None"
        Q += np.log(scipy.stats.norm.pdf(self.alphas_ - self.alphas_priors_mean_)).sum()
        Q += np.log(scipy.stats.norm.pdf(self.betas_ - self.betas_priors_mean_)).sum()
        if np.isnan(Q):
            return -np.inf
        return float(Q)

    def _optimize_f(self, x: npt.NDArray[Any]) -> float:
        """Computes loss by parameters represented by numpy array"""
        alpha, beta = self._get_alphas_betas_by_point(x)
        self._update_alphas_betas(alpha, beta)
        return -self._compute_Q(self._current_data)

    def _optimize_df(self, x: npt.NDArray[Any]) -> npt.NDArray[Any]:
        """Computes loss gradient by parameters represented by numpy array"""
        alpha, beta = self._get_alphas_betas_by_point(x)
        self._update_alphas_betas(alpha, beta)
        dQalpha, dQbeta = self._gradient_Q(self._current_data)

        minus_grad = np.zeros_like(x)
        minus_grad[: len(self.workers_)] = -dQalpha[self.workers_].values  # type: ignore
        minus_grad[len(self.workers_) :] = -dQbeta[self.tasks_].values  # type: ignore
        return minus_grad

    def _update_alphas_betas(
        self, alphas: "pd.Series[Any]", betas: "pd.Series[Any]"
    ) -> None:
        self.alphas_ = alphas
        self.betas_ = betas
        self._current_data.set_index("worker", inplace=True)
        self._current_data["alpha"] = alphas
        self._current_data.reset_index(inplace=True)
        self._current_data.set_index("task", inplace=True)
        self._current_data["beta"] = betas
        self._current_data.reset_index(inplace=True)

    def _get_alphas_betas_by_point(
        self, x: npt.NDArray[Any]
    ) -> Tuple["pd.Series[Any]", "pd.Series[Any]"]:
        alphas = pd.Series(x[: len(self.workers_)], index=self.workers_, name="alpha")  # type: ignore
        alphas.index.name = "worker"
        betas = pd.Series(x[len(self.workers_) :], index=self.tasks_, name="beta")  # type: ignore
        betas.index.name = "task"
        return alphas, betas

    def _m_step(self, data: pd.DataFrame) -> pd.DataFrame:
        """Optimizes the alpha and beta parameters using the conjugate gradient method."""
        x_0 = np.concatenate([self.alphas_.values, self.betas_.values])
        self._current_data = data
        res = minimize(
            self._optimize_f,
            x_0,
            method="CG",
            jac=self._optimize_df,
            tol=self.m_step_tol,
            options={"disp": False, "maxiter": self.m_step_max_iter},
        )
        self.alphas_, self.betas_ = self._get_alphas_betas_by_point(res.x)
        self._update_alphas_betas(self.alphas_, self.betas_)
        return self._current_data

    def _init(self, data: pd.DataFrame) -> None:
        self.alphas_ = pd.Series(1.0, index=pd.unique(data.worker))  # type: ignore
        self.betas_ = pd.Series(1.0, index=pd.unique(data.task))  # type: ignore
        self.tasks_ = pd.unique(data["task"])
        self.workers_ = pd.unique(data["worker"])
        self.priors_ = self.labels_priors
        if self.priors_ is None:
            self.prior_labels_ = pd.unique(data["label"])
            self.priors_ = pd.Series(
                1.0 / len(self.prior_labels_), index=self.prior_labels_  # type: ignore
            )
        else:
            self.prior_labels_ = self.priors_.index  # type: ignore
        self.alphas_priors_mean_ = self.alphas_priors_mean
        if self.alphas_priors_mean_ is None:
            self.alphas_priors_mean_ = pd.Series(1.0, index=self.alphas_.index)
        self.betas_priors_mean_ = self.betas_priors_mean
        if self.betas_priors_mean_ is None:
            self.betas_priors_mean_ = pd.Series(1.0, index=self.betas_.index)

    @staticmethod
    def _softplus(x: "pd.Series[Any]", limit: int = 30) -> npt.NDArray[Any]:
        """log(1 + exp(x)) stable version

        For x > 30 or x < -30 error is less than 1e-13
        """
        positive_mask = x > limit
        negative_mask = x < -limit
        mask = positive_mask | negative_mask
        return cast(
            npt.NDArray[Any],
            np.log1p(np.exp(x * (1 - mask))) * (1 - mask) + x * positive_mask,
        )

    # backport for scipy < 1.12.0
    @staticmethod
    def _softmax(x: npt.NDArray[Any]) -> npt.NDArray[Any]:
        return cast(npt.NDArray[Any], np.exp(x - logsumexp(x, keepdims=True)))

    def fit(self, data: pd.DataFrame) -> "GLAD":
        """Fits the model to the training data with the EM algorithm.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        Returns:
            GLAD: self.
        """

        # Initialization
        data = data.filter(["task", "worker", "label"])
        self._init(data)

        assert self.alphas_ is not None, "no alphas_"
        assert self.betas_ is not None, "no betas_"
        assert self.priors_ is not None, "no priors_"

        data = self._join_all(data, self.alphas_, self.betas_, self.priors_)
        data = self._e_step(data)
        Q = self._compute_Q(data)

        self.loss_history_ = []

        def iteration_progress() -> Tuple[Iterator[int], Optional["tqdm[int]"]]:
            if self.silent:
                return iter(range(self.n_iter)), None
            else:
                trange_ = trange(self.n_iter, desc="Iterations")
                return iter(trange_), trange_

        iterator, pbar = iteration_progress()

        for _ in iterator:
            last_Q = Q
            if not self.silent:
                assert isinstance(pbar, tqdm)
                pbar.set_description(f"Q = {round(Q, 4)}")

            # E-step
            data = self._e_step(data)

            # M-step
            data = self._m_step(data)

            Q = self._compute_Q(data) / len(data)

            self.loss_history_.append(Q)
            if Q - last_Q < self.tol:
                break

        self.labels_ = cast(pd.DataFrame, self.probas_).idxmax(axis=1)
        return self

    def fit_predict_proba(self, data: pd.DataFrame) -> pd.DataFrame:
        """Fits the model to the training data and returns probability distributions of labels for each task.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        Returns:
            DataFrame: Probability distributions of task labels.
                The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is the probability that the `task` true label is equal to `label`.
                Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.
        """

        self.fit(data)
        assert self.probas_ is not None, "no probas_"
        return self.probas_

    def fit_predict(self, data: pd.DataFrame) -> "pd.Series[Any]":
        """Fits the model to the training data and returns the aggregated results.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        Returns:
            Series: Task labels. The `pandas.Series` data is indexed by `task`
                so that `labels.loc[task]` is the most likely true label of tasks.
        """

        self.fit(data)
        assert self.labels_ is not None, "no labels_"
        return self.labels_

alphas_: pd.Series[Any] = named_series_attrib(name='alpha') class-attribute instance-attribute

The alpha parameters of workers' abilities. The pandas.Series data is indexed by worker that contains the estimated alpha parameters.

alphas_priors_mean: Optional[pd.Series[Any]] = attr.ib(default=None) class-attribute instance-attribute

The prior mean value of the alpha parameters.

betas_: pd.Series[Any] = named_series_attrib(name='beta') class-attribute instance-attribute

The beta parameters of task difficulty. The pandas.Series data is indexed by task that contains the estimated beta parameters.

betas_priors_mean: Optional[pd.Series[Any]] = attr.ib(default=None) class-attribute instance-attribute

The prior mean value of the beta parameters.

labels_priors: Optional[pd.Series[Any]] = attr.ib(default=None) class-attribute instance-attribute

The prior label probabilities.

loss_history_: List[float] = attr.ib(init=False) class-attribute instance-attribute

A list of loss values during training.

m_step_max_iter: int = attr.ib(default=25) class-attribute instance-attribute

The maximum number of iterations of the conjugate gradient method in the M-step.

m_step_tol: float = attr.ib(default=0.01) class-attribute instance-attribute

The tolerance stopping criterion of the conjugate gradient method in the M-step.

n_iter: int = attr.ib(default=100) class-attribute instance-attribute

The maximum number of EM iterations.

probas_: Optional[pd.DataFrame] = attr.ib(init=False) class-attribute instance-attribute

The probability distributions of task labels. The data frame is indexed by task so that result.loc[task, label] is the probability that the task true label is equal to label. Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.

silent: bool = attr.ib(default=True) class-attribute instance-attribute

Specifies if the progress bar will be shown (false) or not (true).

tol: float = attr.ib(default=1e-05) class-attribute instance-attribute

The tolerance stopping criterion for iterative methods with a variable number of steps. The algorithm converges when the loss change is less than the tol parameter.

fit(data)

Fits the model to the training data with the EM algorithm.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required

Returns:

Name Type Description
GLAD GLAD

self.

Source code in crowdkit/aggregation/classification/glad.py
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
def fit(self, data: pd.DataFrame) -> "GLAD":
    """Fits the model to the training data with the EM algorithm.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

    Returns:
        GLAD: self.
    """

    # Initialization
    data = data.filter(["task", "worker", "label"])
    self._init(data)

    assert self.alphas_ is not None, "no alphas_"
    assert self.betas_ is not None, "no betas_"
    assert self.priors_ is not None, "no priors_"

    data = self._join_all(data, self.alphas_, self.betas_, self.priors_)
    data = self._e_step(data)
    Q = self._compute_Q(data)

    self.loss_history_ = []

    def iteration_progress() -> Tuple[Iterator[int], Optional["tqdm[int]"]]:
        if self.silent:
            return iter(range(self.n_iter)), None
        else:
            trange_ = trange(self.n_iter, desc="Iterations")
            return iter(trange_), trange_

    iterator, pbar = iteration_progress()

    for _ in iterator:
        last_Q = Q
        if not self.silent:
            assert isinstance(pbar, tqdm)
            pbar.set_description(f"Q = {round(Q, 4)}")

        # E-step
        data = self._e_step(data)

        # M-step
        data = self._m_step(data)

        Q = self._compute_Q(data) / len(data)

        self.loss_history_.append(Q)
        if Q - last_Q < self.tol:
            break

    self.labels_ = cast(pd.DataFrame, self.probas_).idxmax(axis=1)
    return self

fit_predict(data)

Fits the model to the training data and returns the aggregated results.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required

Returns:

Name Type Description
Series Series[Any]

Task labels. The pandas.Series data is indexed by task so that labels.loc[task] is the most likely true label of tasks.

Source code in crowdkit/aggregation/classification/glad.py
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
def fit_predict(self, data: pd.DataFrame) -> "pd.Series[Any]":
    """Fits the model to the training data and returns the aggregated results.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

    Returns:
        Series: Task labels. The `pandas.Series` data is indexed by `task`
            so that `labels.loc[task]` is the most likely true label of tasks.
    """

    self.fit(data)
    assert self.labels_ is not None, "no labels_"
    return self.labels_

fit_predict_proba(data)

Fits the model to the training data and returns probability distributions of labels for each task.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required

Returns:

Name Type Description
DataFrame DataFrame

Probability distributions of task labels. The pandas.DataFrame data is indexed by task so that result.loc[task, label] is the probability that the task true label is equal to label. Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.

Source code in crowdkit/aggregation/classification/glad.py
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
def fit_predict_proba(self, data: pd.DataFrame) -> pd.DataFrame:
    """Fits the model to the training data and returns probability distributions of labels for each task.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

    Returns:
        DataFrame: Probability distributions of task labels.
            The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is the probability that the `task` true label is equal to `label`.
            Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.
    """

    self.fit(data)
    assert self.probas_ is not None, "no probas_"
    return self.probas_

GoldMajorityVote

Bases: BaseClassificationAggregator

The Gold Majority Vote model is used when a golden dataset (ground truth) exists for some tasks. It calculates the probability of a correct label for each worker based on the golden set. After that, the sum of the probabilities of each label is calculated for each task. The correct label is the one with the greatest sum of the probabilities.

For example, you have 10 000 tasks completed by 3 000 different workers. And you have 100 tasks where you already know the ground truth labels. First, you can call fit to calculate the percentage of correct labels for each worker. And then call predict to calculate labels for your 10 000 tasks.

The following rules must be observed: 1. All workers must complete at least one task from the golden dataset. 2. All workers from the dataset that is submitted to predict must be included in the response dataset that is submitted to fit.

Examples:

>>> import pandas as pd
>>> from crowdkit.aggregation import GoldMajorityVote
>>> df = pd.DataFrame(
>>>     [
>>>         ['t1', 'p1', 0],
>>>         ['t1', 'p2', 0],
>>>         ['t1', 'p3', 1],
>>>         ['t2', 'p1', 1],
>>>         ['t2', 'p2', 0],
>>>         ['t2', 'p3', 1],
>>>     ],
>>>     columns=['task', 'worker', 'label']
>>> )
>>> true_labels = pd.Series({'t1': 0})
>>> gold_mv = GoldMajorityVote()
>>> result = gold_mv.fit_predict(df, true_labels)

Attributes:

Name Type Description
labels_ Optional[Series]

The task labels. The pandas.Series data is indexed by task so that labels.loc[task] is the most likely true label of tasks.

skills_ Optional[Series]

The workers' skills. The pandas.Series data is indexed by worker and has the corresponding worker skill.

probas_ Optional[DataFrame]

The probability distributions of task labels. The pandas.DataFrame data is indexed by task so that result.loc[task, label] is the probability that the task true label is equal to label. Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.

Source code in crowdkit/aggregation/classification/gold_majority_vote.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
@attr.s
class GoldMajorityVote(BaseClassificationAggregator):
    r"""The **Gold Majority Vote** model is used when a golden dataset (ground truth) exists for some tasks.
    It calculates the probability of a correct label for each worker based on the golden set.
    After that, the sum of the probabilities of each label is calculated for each task.
    The correct label is the one with the greatest sum of the probabilities.

    For example, you have 10 000 tasks completed by 3 000 different workers. And you have 100 tasks where you already
    know the ground truth labels. First, you can call `fit` to calculate the percentage of correct labels for each worker.
    And then call `predict` to calculate labels for your 10 000 tasks.

    The following rules must be observed:
    1. All workers must complete at least one task from the golden dataset.
    2. All workers from the dataset that is submitted to `predict` must be included in the response dataset that is submitted to `fit`.

    Examples:
        >>> import pandas as pd
        >>> from crowdkit.aggregation import GoldMajorityVote
        >>> df = pd.DataFrame(
        >>>     [
        >>>         ['t1', 'p1', 0],
        >>>         ['t1', 'p2', 0],
        >>>         ['t1', 'p3', 1],
        >>>         ['t2', 'p1', 1],
        >>>         ['t2', 'p2', 0],
        >>>         ['t2', 'p3', 1],
        >>>     ],
        >>>     columns=['task', 'worker', 'label']
        >>> )
        >>> true_labels = pd.Series({'t1': 0})
        >>> gold_mv = GoldMajorityVote()
        >>> result = gold_mv.fit_predict(df, true_labels)

    Attributes:
        labels_ (typing.Optional[pandas.core.series.Series]): The task labels. The `pandas.Series` data is indexed by `task`
            so that `labels.loc[task]` is the most likely true label of tasks.

        skills_ (typing.Optional[pandas.core.series.Series]): The workers' skills. The `pandas.Series` data is indexed by `worker`
            and has the corresponding worker skill.

        probas_ (typing.Optional[pandas.core.frame.DataFrame]): The probability distributions of task labels.
            The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]`
            is the probability that the `task` true label is equal to `label`. Each
            probability is in the range from 0 to 1, all task probabilities must sum up to 1.
    """

    # Available after fit
    skills_: Optional["pd.Series[Any]"] = named_series_attrib(name="skill")

    # Available after predict or predict_proba
    # labels_
    probas_: Optional[pd.DataFrame] = attr.ib(init=False)

    def _apply(self, data: pd.DataFrame) -> "GoldMajorityVote":
        check_is_fitted(self, attributes="skills_")
        mv = MajorityVote().fit(data, self.skills_)
        self.labels_ = mv.labels_
        self.probas_ = mv.probas_
        return self

    def fit(self, data: pd.DataFrame, true_labels: "pd.Series[Any]") -> "GoldMajorityVote":  # type: ignore
        """Fits the model to the training data.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

            true_labels (Series): The ground truth labels of tasks. The `pandas.Series` data is indexed by `task`
                so that `labels.loc[task]` is the task ground truth label.

        Returns:
            GoldMajorityVote: self.
        """

        data = data[["task", "worker", "label"]]
        self.skills_ = get_accuracy(data, true_labels=true_labels, by="worker")
        return self

    def predict(self, data: pd.DataFrame) -> "pd.Series[Any]":
        """Predicts the true labels of tasks when the model is fitted.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        Returns:
            Series: The task labels. The `pandas.Series` data is indexed by `task`
                so that `labels.loc[task]` is the most likely true label of tasks.
        """

        self._apply(data)
        assert self.labels_ is not None, "no labels_"
        return self.labels_

    def predict_proba(self, data: pd.DataFrame) -> pd.DataFrame:
        """Returns probability distributions of labels for each task when the model is fitted.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        Returns:
            DataFrame: Probability distributions of task labels.
                The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is the probability that the `task` true label is equal to `label`.
                Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.
        """

        self._apply(data)
        assert self.probas_ is not None, "no probas_"
        return self.probas_

    def fit_predict(self, data: pd.DataFrame, true_labels: "pd.Series[Any]") -> "pd.Series[Any]":  # type: ignore
        """Fits the model to the training data and returns the aggregated results.
        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

            true_labels (Series): The ground truth labels of tasks. The `pandas.Series` data is indexed by `task`
                so that `labels.loc[task]` is the task ground truth label.

        Returns:
            Series: The task labels. The `pandas.Series` data is indexed by `task`
                so that `labels.loc[task]` is the most likely true label of tasks.
        """

        return self.fit(data, true_labels).predict(data)

    def fit_predict_proba(
        self, data: pd.DataFrame, true_labels: "pd.Series[Any]"
    ) -> pd.DataFrame:
        """Fits the model to the training data and returns probability distributions of labels for each task.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

            true_labels (Series): The ground truth labels of tasks. The `pandas.Series` data is indexed by `task`
                so that `labels.loc[task]` is the task ground truth label.

        Returns:
            DataFrame: Probability distributions of task labels.
                The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is the probability that the `task` true label is equal to `label`.
                Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.
        """

        return self.fit(data, true_labels).predict_proba(data)

fit(data, true_labels)

Fits the model to the training data.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required
true_labels Series

The ground truth labels of tasks. The pandas.Series data is indexed by task so that labels.loc[task] is the task ground truth label.

required

Returns:

Name Type Description
GoldMajorityVote GoldMajorityVote

self.

Source code in crowdkit/aggregation/classification/gold_majority_vote.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def fit(self, data: pd.DataFrame, true_labels: "pd.Series[Any]") -> "GoldMajorityVote":  # type: ignore
    """Fits the model to the training data.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        true_labels (Series): The ground truth labels of tasks. The `pandas.Series` data is indexed by `task`
            so that `labels.loc[task]` is the task ground truth label.

    Returns:
        GoldMajorityVote: self.
    """

    data = data[["task", "worker", "label"]]
    self.skills_ = get_accuracy(data, true_labels=true_labels, by="worker")
    return self

fit_predict(data, true_labels)

Fits the model to the training data and returns the aggregated results. Args: data (DataFrame): The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

true_labels (Series): The ground truth labels of tasks. The `pandas.Series` data is indexed by `task`
    so that `labels.loc[task]` is the task ground truth label.

Returns:

Name Type Description
Series Series[Any]

The task labels. The pandas.Series data is indexed by task so that labels.loc[task] is the most likely true label of tasks.

Source code in crowdkit/aggregation/classification/gold_majority_vote.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
def fit_predict(self, data: pd.DataFrame, true_labels: "pd.Series[Any]") -> "pd.Series[Any]":  # type: ignore
    """Fits the model to the training data and returns the aggregated results.
    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        true_labels (Series): The ground truth labels of tasks. The `pandas.Series` data is indexed by `task`
            so that `labels.loc[task]` is the task ground truth label.

    Returns:
        Series: The task labels. The `pandas.Series` data is indexed by `task`
            so that `labels.loc[task]` is the most likely true label of tasks.
    """

    return self.fit(data, true_labels).predict(data)

fit_predict_proba(data, true_labels)

Fits the model to the training data and returns probability distributions of labels for each task.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required
true_labels Series

The ground truth labels of tasks. The pandas.Series data is indexed by task so that labels.loc[task] is the task ground truth label.

required

Returns:

Name Type Description
DataFrame DataFrame

Probability distributions of task labels. The pandas.DataFrame data is indexed by task so that result.loc[task, label] is the probability that the task true label is equal to label. Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.

Source code in crowdkit/aggregation/classification/gold_majority_vote.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
def fit_predict_proba(
    self, data: pd.DataFrame, true_labels: "pd.Series[Any]"
) -> pd.DataFrame:
    """Fits the model to the training data and returns probability distributions of labels for each task.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        true_labels (Series): The ground truth labels of tasks. The `pandas.Series` data is indexed by `task`
            so that `labels.loc[task]` is the task ground truth label.

    Returns:
        DataFrame: Probability distributions of task labels.
            The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is the probability that the `task` true label is equal to `label`.
            Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.
    """

    return self.fit(data, true_labels).predict_proba(data)

predict(data)

Predicts the true labels of tasks when the model is fitted.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required

Returns:

Name Type Description
Series Series[Any]

The task labels. The pandas.Series data is indexed by task so that labels.loc[task] is the most likely true label of tasks.

Source code in crowdkit/aggregation/classification/gold_majority_vote.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def predict(self, data: pd.DataFrame) -> "pd.Series[Any]":
    """Predicts the true labels of tasks when the model is fitted.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

    Returns:
        Series: The task labels. The `pandas.Series` data is indexed by `task`
            so that `labels.loc[task]` is the most likely true label of tasks.
    """

    self._apply(data)
    assert self.labels_ is not None, "no labels_"
    return self.labels_

predict_proba(data)

Returns probability distributions of labels for each task when the model is fitted.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required

Returns:

Name Type Description
DataFrame DataFrame

Probability distributions of task labels. The pandas.DataFrame data is indexed by task so that result.loc[task, label] is the probability that the task true label is equal to label. Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.

Source code in crowdkit/aggregation/classification/gold_majority_vote.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def predict_proba(self, data: pd.DataFrame) -> pd.DataFrame:
    """Returns probability distributions of labels for each task when the model is fitted.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

    Returns:
        DataFrame: Probability distributions of task labels.
            The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is the probability that the `task` true label is equal to `label`.
            Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.
    """

    self._apply(data)
    assert self.probas_ is not None, "no probas_"
    return self.probas_

KOS

Bases: BaseClassificationAggregator

The KOS (Karger, Oh, and Shah 2011) aggregation model is an iterative algorithm that calculates the log-likelihood of the task being positive while modeling the worker reliability.

Let \(A_{ij}\) be a matrix of the responses of a worker \(j\) on a task \(i\). If the worker \(j\) does not respond to the task \(i\), then \(A_{ij} = 0\). Otherwise, \(|A_{ij}| = 1\). The algorithm operates on real-valued task messages \(x_{i \rightarrow j}\) and worker messages \(y_{j \rightarrow i}\). A task message \(x_{i \rightarrow j}\) represents the log-likelihood of task \(i\) being a positive task, and a worker message \(y_{j \rightarrow i}\) represents how reliable worker \(j\) is.

At \(k\)-th iteration, the values are updated as follows: $$ x_{i \rightarrow j}^{(k)} = \sum_{j^{'} \in \partial i \backslash j} A_{ij^{'}} y_{j^{'} \rightarrow i}^{(k-1)} \ y_{j \rightarrow i}^{(k)} = \sum_{i^{'} \in \partial j \backslash i} A_{i^{'}j} x_{i^{'} \rightarrow j}^{(k-1)} $$

David R. Karger, Sewoong Oh, and Devavrat Shah. Budget-Optimal Task Allocation for Reliable Crowdsourcing Systems. Operations Research 62.1 (2014), 1-38.

https://arxiv.org/abs/1110.3564

Examples:

>>> from crowdkit.aggregation import KOS
>>> from crowdkit.datasets import load_dataset
>>> df, gt = load_dataset('relevance-2')
>>> ds = KOS(10)
>>> result = ds.fit_predict(df)
Source code in crowdkit/aggregation/classification/kos.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
@attr.s
class KOS(BaseClassificationAggregator):
    r"""The **KOS** (Karger, Oh, and Shah 2011) aggregation model is an iterative algorithm that calculates the log-likelihood of the task being positive while modeling
    the worker reliability.

    Let $A_{ij}$ be a matrix of the responses of a worker $j$ on a task $i$.
    If the worker $j$ does not respond to the task $i$, then $A_{ij} = 0$. Otherwise, $|A_{ij}| = 1$.
    The algorithm operates on real-valued task messages $x_{i \rightarrow j}$  and
    worker messages $y_{j \rightarrow i}$. A task message $x_{i \rightarrow j}$ represents
    the log-likelihood of task $i$ being a positive task, and a worker message $y_{j \rightarrow i}$ represents
    how reliable worker $j$ is.

    At $k$-th iteration, the values are updated as follows:
    $$
    x_{i \rightarrow j}^{(k)} = \sum_{j^{'} \in \partial i \backslash j} A_{ij^{'}} y_{j^{'} \rightarrow i}^{(k-1)} \\
    y_{j \rightarrow i}^{(k)} = \sum_{i^{'} \in \partial j \backslash i} A_{i^{'}j} x_{i^{'} \rightarrow j}^{(k-1)}
    $$

    David R. Karger, Sewoong Oh, and Devavrat Shah. Budget-Optimal Task Allocation for Reliable Crowdsourcing Systems.
    *Operations Research 62.1 (2014)*, 1-38.

    <https://arxiv.org/abs/1110.3564>

    Examples:
        >>> from crowdkit.aggregation import KOS
        >>> from crowdkit.datasets import load_dataset
        >>> df, gt = load_dataset('relevance-2')
        >>> ds = KOS(10)
        >>> result = ds.fit_predict(df)
    """

    n_iter: int = attr.ib(default=100)
    """The maximum number of iterations."""

    random_state: int = attr.ib(default=0)
    """The state of the random number generator."""

    def fit(self, data: pd.DataFrame) -> "KOS":
        """Fits the model to the training data.
        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        Returns:
            KOS: self.
        """

        np.random.seed(self.random_state)

        # Early exit
        if not data.size:
            self.labels_ = pd.Series([], dtype="O")
            return self

        # Initialization
        kos_data = data.copy()
        labels = kos_data.label.unique()
        if len(labels) != 2:
            raise ValueError(
                "KOS aggregation method is for binary classification only."
            )
        mapping = {labels[0]: 1, labels[1]: -1}
        kos_data.label = kos_data.label.apply(lambda x: mapping[x])
        kos_data["reliabilities"] = np.random.normal(loc=1, scale=1, size=len(kos_data))

        # Updating reliabilities
        for _ in range(self.n_iter):
            # Update inferred labels for (task, worker)
            kos_data["multiplied"] = kos_data.label * kos_data.reliabilities
            kos_data["summed"] = list(
                kos_data.groupby("task")["multiplied"].sum()[kos_data.task]
            )
            # Early exit to prevent NaN
            if (np.abs(kos_data["summed"]) > _MAX).any():
                break
            kos_data["inferred"] = (kos_data["summed"] - kos_data["multiplied"]).astype(
                float
            )

            # Update reliabilities for (task, worker)
            kos_data["multiplied"] = kos_data.label * kos_data.inferred
            kos_data["summed"] = list(
                kos_data.groupby("worker")["multiplied"].sum()[kos_data.worker]
            )
            # Early exit to prevent NaN
            if (np.abs(kos_data["summed"]) > _MAX).any():
                break
            kos_data["reliabilities"] = (kos_data.summed - kos_data.multiplied).astype(
                "float"
            )

        kos_data["inferred"] = kos_data.label * kos_data.reliabilities
        inferred_labels = np.sign(kos_data.groupby("task")["inferred"].sum())
        back_mapping = {v: k for k, v in mapping.items()}
        self.labels_ = cast(
            "pd.Series[Any]", inferred_labels.apply(lambda x: back_mapping[x])
        )
        return self

    def fit_predict(self, data: pd.DataFrame) -> "pd.Series[Any]":
        """Fits the model to the training data and returns the aggregated results.
        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        Returns:
            Series: The task labels. The `pandas.Series` data is indexed by `task`
                so that `labels.loc[task]` is the most likely true label of tasks.
        """

        self.fit(data)
        assert self.labels_ is not None, "no labels_"
        return self.labels_

n_iter: int = attr.ib(default=100) class-attribute instance-attribute

The maximum number of iterations.

random_state: int = attr.ib(default=0) class-attribute instance-attribute

The state of the random number generator.

fit(data)

Fits the model to the training data. Args: data (DataFrame): The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

Returns:

Name Type Description
KOS KOS

self.

Source code in crowdkit/aggregation/classification/kos.py
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def fit(self, data: pd.DataFrame) -> "KOS":
    """Fits the model to the training data.
    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

    Returns:
        KOS: self.
    """

    np.random.seed(self.random_state)

    # Early exit
    if not data.size:
        self.labels_ = pd.Series([], dtype="O")
        return self

    # Initialization
    kos_data = data.copy()
    labels = kos_data.label.unique()
    if len(labels) != 2:
        raise ValueError(
            "KOS aggregation method is for binary classification only."
        )
    mapping = {labels[0]: 1, labels[1]: -1}
    kos_data.label = kos_data.label.apply(lambda x: mapping[x])
    kos_data["reliabilities"] = np.random.normal(loc=1, scale=1, size=len(kos_data))

    # Updating reliabilities
    for _ in range(self.n_iter):
        # Update inferred labels for (task, worker)
        kos_data["multiplied"] = kos_data.label * kos_data.reliabilities
        kos_data["summed"] = list(
            kos_data.groupby("task")["multiplied"].sum()[kos_data.task]
        )
        # Early exit to prevent NaN
        if (np.abs(kos_data["summed"]) > _MAX).any():
            break
        kos_data["inferred"] = (kos_data["summed"] - kos_data["multiplied"]).astype(
            float
        )

        # Update reliabilities for (task, worker)
        kos_data["multiplied"] = kos_data.label * kos_data.inferred
        kos_data["summed"] = list(
            kos_data.groupby("worker")["multiplied"].sum()[kos_data.worker]
        )
        # Early exit to prevent NaN
        if (np.abs(kos_data["summed"]) > _MAX).any():
            break
        kos_data["reliabilities"] = (kos_data.summed - kos_data.multiplied).astype(
            "float"
        )

    kos_data["inferred"] = kos_data.label * kos_data.reliabilities
    inferred_labels = np.sign(kos_data.groupby("task")["inferred"].sum())
    back_mapping = {v: k for k, v in mapping.items()}
    self.labels_ = cast(
        "pd.Series[Any]", inferred_labels.apply(lambda x: back_mapping[x])
    )
    return self

fit_predict(data)

Fits the model to the training data and returns the aggregated results. Args: data (DataFrame): The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

Returns:

Name Type Description
Series Series[Any]

The task labels. The pandas.Series data is indexed by task so that labels.loc[task] is the most likely true label of tasks.

Source code in crowdkit/aggregation/classification/kos.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def fit_predict(self, data: pd.DataFrame) -> "pd.Series[Any]":
    """Fits the model to the training data and returns the aggregated results.
    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

    Returns:
        Series: The task labels. The `pandas.Series` data is indexed by `task`
            so that `labels.loc[task]` is the most likely true label of tasks.
    """

    self.fit(data)
    assert self.labels_ is not None, "no labels_"
    return self.labels_

MACE

Bases: BaseClassificationAggregator

The Multi-Annotator Competence Estimation (MACE) model is a probabilistic model that associates each worker with a label probability distribution. A worker can be spamming on each task. If the worker is not spamming, they label a task correctly. If the worker is spamming, they answer according to their probability distribution.

We assume that the correct label \(T_i\) comes from a discrete uniform distribution. When a worker annotates a task, they are spamming with probability \(\operatorname{Bernoulli}(1 - \theta_j)\). \(S_{ij}\) specifies whether or not worker \(j\) is spamming on instance \(i\). Thus, if the worker is not spamming on the task, i.e. \(S_{ij} = 0\), their response is the true label, i.e. \(A_{ij} = T_i\). Otherwise, their response \(A_{ij}\) is drawn from a multinomial distribution with parameter vector \(\xi_j\).

MACE latent label model

The model can be enhanced by adding the Beta prior on \(\theta_j\) and the Diriclet prior on \(\xi_j\).

The marginal data likelihood is maximized with the Expectation-Maximization algorithm: 1. E-step. Performs n_restarts random restarts, and keeps the model with the best marginal data likelihood. 2. M-step. Smooths parameters by adding a fixed value smoothing to the fractional counts before normalizing. 3. Variational M-step. Employs Variational-Bayes (VB) training with symmetric Beta priors on \(\theta_j\) and symmetric Dirichlet priors on the strategy parameters $\xi_j.

D. Hovy, T. Berg-Kirkpatrick, A. Vaswani and E. Hovy. Learning Whom to Trust with MACE. In Proceedings of NAACL-HLT, Atlanta, GA, USA (2013), 1120–1130.

https://aclanthology.org/N13-1132.pdf

Examples:

>>> from crowdkit.aggregation import MACE
>>> from crowdkit.datasets import load_dataset
>>> df, gt = load_dataset('relevance-2')
>>> mace = MACE()
>>> result = mace.fit_predict(df)
Source code in crowdkit/aggregation/classification/mace.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
@attr.s
class MACE(BaseClassificationAggregator):
    r"""The **Multi-Annotator Competence Estimation** (MACE) model is a probabilistic model that associates each worker with a label probability distribution.
    A worker can be spamming on each task. If the worker is not spamming, they label a task correctly. If the worker is spamming, they answer according
    to their probability distribution.

    We assume that the correct label $T_i$ comes from a discrete uniform distribution. When a worker
    annotates a task, they are spamming with probability
    $\operatorname{Bernoulli}(1 - \theta_j)$. $S_{ij}$ specifies whether or not worker $j$ is spamming on instance $i$.
    Thus, if the worker is not spamming on the task, i.e. $S_{ij} = 0$, their response is the true label, i.e. $A_{ij} = T_i$.
    Otherwise, their response $A_{ij}$ is drawn from a multinomial distribution with parameter vector $\xi_j$.

    ![MACE latent label model](https://tlk.s3.yandex.net/crowd-kit/docs/mace_llm.png =500x630)

    The model can be enhanced by adding the Beta prior on $\theta_j$ and the Diriclet
    prior on $\xi_j$.

    The marginal data likelihood is maximized with the Expectation-Maximization algorithm:
    1. **E-step**. Performs `n_restarts` random restarts, and keeps the model with the best marginal data likelihood.
    2. **M-step**. Smooths parameters by adding a fixed value `smoothing` to the fractional counts before normalizing.
    3. **Variational M-step**. Employs Variational-Bayes (VB) training with symmetric Beta priors on $\theta_j$ and symmetric Dirichlet priors on the strategy parameters $\xi_j.

    D. Hovy, T. Berg-Kirkpatrick, A. Vaswani and E. Hovy. Learning Whom to Trust with MACE.
    In *Proceedings of NAACL-HLT*, Atlanta, GA, USA (2013), 1120–1130.

    <https://aclanthology.org/N13-1132.pdf>

    Examples:
        >>> from crowdkit.aggregation import MACE
        >>> from crowdkit.datasets import load_dataset
        >>> df, gt = load_dataset('relevance-2')
        >>> mace = MACE()
        >>> result = mace.fit_predict(df)
    """

    n_restarts: int = attr.ib(default=10)
    """The number of optimization runs of the algorithms.
    The final parameters are those that gave the best log likelihood.
    If one run takes too long, this parameter can be set to 1."""

    n_iter: int = attr.ib(default=50)
    """The maximum number of EM iterations for each optimization run."""

    method: str = attr.ib(default="vb")
    """The method which is used for the M-step. Either 'vb' or 'em'.
    'vb' means optimization with Variational Bayes using priors.
    'em' means standard Expectation-Maximization algorithm."""

    smoothing: float = attr.ib(default=0.1)
    """The smoothing parameter for the normalization."""

    default_noise: float = attr.ib(default=0.5)
    """The default noise parameter for the initialization."""

    alpha: float = attr.ib(default=0.5)
    """The prior parameter for the Beta distribution on $\theta_j$."""

    beta: float = attr.ib(default=0.5)
    """The prior parameter for the Beta distribution on $\theta_j$."""

    random_state: int = attr.ib(default=0)
    """The state of the random number generator."""

    verbose: int = attr.ib(default=0)
    """Specifies if the progress will be printed or not:
    0 — no progress bar, 1 — only for restarts, 2 — for both restarts and optimization."""

    spamming_: NDArray[np.float64] = attr.ib(init=False)
    """The posterior distribution of workers' spamming states."""

    thetas_: NDArray[np.float64] = attr.ib(init=False)
    """The posterior distribution of workers' spamming labels."""

    theta_priors_: Optional[NDArray[np.float64]] = attr.ib(init=False)
    r"""The prior parameters for the Beta distribution on $\theta_j$."""

    strategy_priors_: Optional[NDArray[np.float64]] = attr.ib(init=False)
    r"""The prior parameters for the Diriclet distribution on $\xi_j$."""

    smoothing_: float = attr.ib(init=False)
    """The smoothing parameter."""

    probas_: Optional[pd.DataFrame] = attr.ib(init=False)
    """The probability distributions of task labels.
    The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is the probability that
    the `task` true label is equal to `label`. Each probability is in the range from 0 to 1,
    all task probabilities must sum up to 1."""

    def fit(self, data: pd.DataFrame) -> "MACE":
        """Fits the model to the training data.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        Returns:
            MACE: The fitted MACE model.
        """

        workers, worker_names = pd.factorize(data["worker"])
        labels, label_names = pd.factorize(data["label"])
        tasks, task_names = pd.factorize(data["task"])

        n_workers = len(worker_names)
        n_labels = len(label_names)

        self.smoothing_ = 0.01 / n_labels

        annotation = data.copy(deep=True)

        best_log_marginal_likelihood = -np.inf

        def restarts_progress() -> Iterator[int]:
            if self.verbose > 0:
                yield from trange(self.n_restarts, desc="Restarts")
            else:
                yield from range(self.n_restarts)

        for _ in restarts_progress():
            self._initialize(n_workers, n_labels)
            (
                log_marginal_likelihood,
                gold_label_marginals,
                strategy_expected_counts,
                knowing_expected_counts,
            ) = self._e_step(
                annotation,
                task_names,
                worker_names,
                label_names,
                tasks,
                workers,
                labels,
            )

            def iteration_progress() -> Tuple[Iterator[int], Optional["tqdm[int]"]]:
                if self.verbose > 1:
                    trange_ = trange(self.n_iter, desc="Iterations")
                    return iter(trange_), trange_
                else:
                    return iter(range(self.n_iter)), None

            iterator, pbar = iteration_progress()

            for _ in iterator:
                if self.method == "vb":
                    self._variational_m_step(
                        knowing_expected_counts, strategy_expected_counts
                    )
                else:
                    self._m_step(knowing_expected_counts, strategy_expected_counts)
                (
                    log_marginal_likelihood,
                    gold_label_marginals,
                    strategy_expected_counts,
                    knowing_expected_counts,
                ) = self._e_step(
                    annotation,
                    task_names,
                    worker_names,
                    label_names,
                    tasks,
                    workers,
                    labels,
                )
                if self.verbose > 1:
                    assert isinstance(pbar, tqdm)
                    pbar.set_postfix(
                        {"log_marginal_likelihood": round(log_marginal_likelihood, 5)}
                    )
            if log_marginal_likelihood > best_log_marginal_likelihood:
                best_log_marginal_likelihood = log_marginal_likelihood
                best_thetas = self.thetas_.copy()
                best_spamming = self.spamming_.copy()

        self.thetas_ = best_thetas
        self.spamming_ = best_spamming
        _, gold_label_marginals, _, _ = self._e_step(
            annotation, task_names, worker_names, label_names, tasks, workers, labels
        )

        self.probas_ = decode_distribution(gold_label_marginals)
        self.labels_ = self.probas_.idxmax(axis="columns")
        self.labels_.index.name = "task"

        return self

    def fit_predict(self, data: pd.DataFrame) -> "pd.Series[Any]":
        """
        Fits the model to the training data and returns the aggregated results.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        Returns:
            Series: Task labels. The `pandas.Series` data is indexed by `task`
                so that `labels.loc[task]` is the most likely true label of tasks.
        """
        self.fit(data)
        assert self.labels_ is not None, "no labels_"
        return self.labels_

    def fit_predict_proba(self, data: pd.DataFrame) -> pd.DataFrame:
        """
        Fits the model to the training data and returns probability distributions of labels for each task.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        Returns:
            DataFrame: Probability distributions of task labels.
                The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is the probability that the `task` true label is equal to `label`.
                Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.
        """
        self.fit(data)
        assert self.probas_ is not None, "no probas_"
        return self.probas_

    def _initialize(self, n_workers: int, n_labels: int) -> None:
        """Initializes the MACE parameters.

        Args:
            n_workers (int): The number of workers.
            n_labels (int): The number of labels.

        Returns:
            None
        """

        self.spamming_ = sps.uniform(1, 1 + self.default_noise).rvs(
            size=(n_workers, 2),
            random_state=self.random_state,
        )
        self.thetas_ = sps.uniform(1, 1 + self.default_noise).rvs(
            size=(n_workers, n_labels), random_state=self.random_state
        )

        self.spamming_ = self.spamming_ / self.spamming_.sum(axis=1, keepdims=True)
        self.thetas_ = self.thetas_ / self.thetas_.sum(axis=1, keepdims=True)

        if self.method == "vb":
            self.theta_priors_ = np.empty((n_workers, 2))
            self.theta_priors_[:, 0] = self.alpha
            self.theta_priors_[:, 1] = self.beta

            self.strategy_priors_ = np.ones((n_workers, n_labels)) * 10.0

    def _e_step(
        self,
        annotation: pd.DataFrame,
        task_names: Union[List[Any], "pd.Index[Any]"],
        worker_names: Union[List[Any], "pd.Index[Any]"],
        label_names: Union[List[Any], "pd.Index[Any]"],
        tasks: NDArray[np.int64],
        workers: NDArray[np.int64],
        labels: NDArray[np.int64],
    ) -> Tuple[float, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
        """Performs E-step of the MACE algorithm.

        Args:
            annotation (DataFrame): The workers' labeling results. The `pandas.DataFrame` data contains `task`, `worker`, and `label` columns.
            task_names (List[Any]): The task names.
            worker_names (List[Any]): The workers' names.
            label_names (List[Any]): The label names.
            tasks (np.ndarray): The task IDs in the annotation.
            workers (np.ndarray): The workers' IDs in the annotation.
            labels (np.ndarray): The label IDs in the annotation.

        Returns:
            Tuple[float, pd.DataFrame, pd.DataFrame, pd.DataFrame]: The log marginal likelihood, gold label marginals,
                strategy expected counts, and knowing expected counts.
        """
        gold_label_marginals = pd.DataFrame(
            np.zeros((len(task_names), len(label_names))),
            index=task_names,
            columns=label_names,
        )

        knowing_expected_counts = pd.DataFrame(
            np.zeros((len(worker_names), 2)),
            index=worker_names,
            columns=["knowing_expected_count_0", "knowing_expected_count_1"],
        )

        for label_idx, label in enumerate(label_names):
            annotation["gold_marginal"] = self.spamming_[workers, 0] * self.thetas_[
                workers, labels
            ] + self.spamming_[workers, 1] * (label_idx == labels)
            gold_label_marginals[label] = annotation.groupby("task").prod(
                numeric_only=True
            )["gold_marginal"] / len(label_names)

        instance_marginals = gold_label_marginals.sum(axis=1)
        log_marginal_likelihood = np.log(instance_marginals + 1e-8).sum()

        annotation["strategy_marginal"] = 0.0
        for label in range(len(label_names)):
            annotation["strategy_marginal"] += gold_label_marginals.values[
                tasks, label
            ] / (
                self.spamming_[workers, 0] * self.thetas_[workers, labels]
                + self.spamming_[workers, 1] * (labels == label)
            )

        annotation["strategy_marginal"] = (
            annotation["strategy_marginal"]
            * self.spamming_[workers, 0]
            * self.thetas_[workers, labels]
        )

        annotation.set_index("task", inplace=True)
        annotation["instance_marginal"] = instance_marginals
        annotation.reset_index(inplace=True)

        annotation["strategy_marginal"] = (
            annotation["strategy_marginal"] / annotation["instance_marginal"]
        )

        strategy_expected_counts = (
            annotation.groupby(["worker", "label"])
            .sum(numeric_only=True)["strategy_marginal"]
            .unstack()
            .fillna(0.0)
        )

        knowing_expected_counts["knowing_expected_count_0"] = annotation.groupby(
            "worker"
        ).sum(numeric_only=True)["strategy_marginal"]

        annotation["knowing_expected_counts"] = (
            gold_label_marginals.values[tasks, labels].ravel()
            * self.spamming_[workers, 1]
            / (
                self.spamming_[workers, 0] * self.thetas_[workers, labels]
                + self.spamming_[workers, 1]
            )
        ) / instance_marginals.values[tasks]
        knowing_expected_counts["knowing_expected_count_1"] = annotation.groupby(
            "worker"
        ).sum(numeric_only=True)["knowing_expected_counts"]

        return (
            log_marginal_likelihood,
            gold_label_marginals,
            strategy_expected_counts,
            knowing_expected_counts,
        )

    def _m_step(
        self,
        knowing_expected_counts: pd.DataFrame,
        strategy_expected_counts: pd.DataFrame,
    ) -> None:
        """
        Performs M-step of the MACE algorithm.

        Args:
            knowing_expected_counts (DataFrame): The knowing expected counts.
            strategy_expected_counts (DataFrame): The strategy expected counts.

        Returns:
            None
        """
        self.spamming_ = normalize(knowing_expected_counts.values, self.smoothing_)
        self.thetas_ = normalize(strategy_expected_counts.values, self.smoothing_)

    def _variational_m_step(
        self,
        knowing_expected_counts: pd.DataFrame,
        strategy_expected_counts: pd.DataFrame,
    ) -> None:
        """
        Performs variational M-step of the MACE algorithm.

        Args:
            knowing_expected_counts (DataFrame): The knowing expected counts.
            strategy_expected_counts (DataFrame): The strategy expected counts.

        Returns:
            None
        """
        assert self.theta_priors_ is not None
        self.spamming_ = variational_normalize(
            knowing_expected_counts.values, self.theta_priors_
        )
        assert self.strategy_priors_ is not None
        self.thetas_ = variational_normalize(
            strategy_expected_counts.values, self.strategy_priors_
        )

alpha: float = attr.ib(default=0.5) class-attribute instance-attribute

The prior parameter for the Beta distribution on $ heta_j$.

beta: float = attr.ib(default=0.5) class-attribute instance-attribute

The prior parameter for the Beta distribution on $ heta_j$.

default_noise: float = attr.ib(default=0.5) class-attribute instance-attribute

The default noise parameter for the initialization.

method: str = attr.ib(default='vb') class-attribute instance-attribute

The method which is used for the M-step. Either 'vb' or 'em'. 'vb' means optimization with Variational Bayes using priors. 'em' means standard Expectation-Maximization algorithm.

n_iter: int = attr.ib(default=50) class-attribute instance-attribute

The maximum number of EM iterations for each optimization run.

n_restarts: int = attr.ib(default=10) class-attribute instance-attribute

The number of optimization runs of the algorithms. The final parameters are those that gave the best log likelihood. If one run takes too long, this parameter can be set to 1.

probas_: Optional[pd.DataFrame] = attr.ib(init=False) class-attribute instance-attribute

The probability distributions of task labels. The pandas.DataFrame data is indexed by task so that result.loc[task, label] is the probability that the task true label is equal to label. Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.

random_state: int = attr.ib(default=0) class-attribute instance-attribute

The state of the random number generator.

smoothing: float = attr.ib(default=0.1) class-attribute instance-attribute

The smoothing parameter for the normalization.

smoothing_: float = attr.ib(init=False) class-attribute instance-attribute

The smoothing parameter.

spamming_: NDArray[np.float64] = attr.ib(init=False) class-attribute instance-attribute

The posterior distribution of workers' spamming states.

strategy_priors_: Optional[NDArray[np.float64]] = attr.ib(init=False) class-attribute instance-attribute

The prior parameters for the Diriclet distribution on \(\xi_j\).

theta_priors_: Optional[NDArray[np.float64]] = attr.ib(init=False) class-attribute instance-attribute

The prior parameters for the Beta distribution on \(\theta_j\).

thetas_: NDArray[np.float64] = attr.ib(init=False) class-attribute instance-attribute

The posterior distribution of workers' spamming labels.

verbose: int = attr.ib(default=0) class-attribute instance-attribute

Specifies if the progress will be printed or not: 0 — no progress bar, 1 — only for restarts, 2 — for both restarts and optimization.

fit(data)

Fits the model to the training data.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required

Returns:

Name Type Description
MACE MACE

The fitted MACE model.

Source code in crowdkit/aggregation/classification/mace.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
def fit(self, data: pd.DataFrame) -> "MACE":
    """Fits the model to the training data.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

    Returns:
        MACE: The fitted MACE model.
    """

    workers, worker_names = pd.factorize(data["worker"])
    labels, label_names = pd.factorize(data["label"])
    tasks, task_names = pd.factorize(data["task"])

    n_workers = len(worker_names)
    n_labels = len(label_names)

    self.smoothing_ = 0.01 / n_labels

    annotation = data.copy(deep=True)

    best_log_marginal_likelihood = -np.inf

    def restarts_progress() -> Iterator[int]:
        if self.verbose > 0:
            yield from trange(self.n_restarts, desc="Restarts")
        else:
            yield from range(self.n_restarts)

    for _ in restarts_progress():
        self._initialize(n_workers, n_labels)
        (
            log_marginal_likelihood,
            gold_label_marginals,
            strategy_expected_counts,
            knowing_expected_counts,
        ) = self._e_step(
            annotation,
            task_names,
            worker_names,
            label_names,
            tasks,
            workers,
            labels,
        )

        def iteration_progress() -> Tuple[Iterator[int], Optional["tqdm[int]"]]:
            if self.verbose > 1:
                trange_ = trange(self.n_iter, desc="Iterations")
                return iter(trange_), trange_
            else:
                return iter(range(self.n_iter)), None

        iterator, pbar = iteration_progress()

        for _ in iterator:
            if self.method == "vb":
                self._variational_m_step(
                    knowing_expected_counts, strategy_expected_counts
                )
            else:
                self._m_step(knowing_expected_counts, strategy_expected_counts)
            (
                log_marginal_likelihood,
                gold_label_marginals,
                strategy_expected_counts,
                knowing_expected_counts,
            ) = self._e_step(
                annotation,
                task_names,
                worker_names,
                label_names,
                tasks,
                workers,
                labels,
            )
            if self.verbose > 1:
                assert isinstance(pbar, tqdm)
                pbar.set_postfix(
                    {"log_marginal_likelihood": round(log_marginal_likelihood, 5)}
                )
        if log_marginal_likelihood > best_log_marginal_likelihood:
            best_log_marginal_likelihood = log_marginal_likelihood
            best_thetas = self.thetas_.copy()
            best_spamming = self.spamming_.copy()

    self.thetas_ = best_thetas
    self.spamming_ = best_spamming
    _, gold_label_marginals, _, _ = self._e_step(
        annotation, task_names, worker_names, label_names, tasks, workers, labels
    )

    self.probas_ = decode_distribution(gold_label_marginals)
    self.labels_ = self.probas_.idxmax(axis="columns")
    self.labels_.index.name = "task"

    return self

fit_predict(data)

Fits the model to the training data and returns the aggregated results.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required

Returns:

Name Type Description
Series Series[Any]

Task labels. The pandas.Series data is indexed by task so that labels.loc[task] is the most likely true label of tasks.

Source code in crowdkit/aggregation/classification/mace.py
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
def fit_predict(self, data: pd.DataFrame) -> "pd.Series[Any]":
    """
    Fits the model to the training data and returns the aggregated results.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

    Returns:
        Series: Task labels. The `pandas.Series` data is indexed by `task`
            so that `labels.loc[task]` is the most likely true label of tasks.
    """
    self.fit(data)
    assert self.labels_ is not None, "no labels_"
    return self.labels_

fit_predict_proba(data)

Fits the model to the training data and returns probability distributions of labels for each task.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required

Returns:

Name Type Description
DataFrame DataFrame

Probability distributions of task labels. The pandas.DataFrame data is indexed by task so that result.loc[task, label] is the probability that the task true label is equal to label. Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.

Source code in crowdkit/aggregation/classification/mace.py
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
def fit_predict_proba(self, data: pd.DataFrame) -> pd.DataFrame:
    """
    Fits the model to the training data and returns probability distributions of labels for each task.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

    Returns:
        DataFrame: Probability distributions of task labels.
            The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is the probability that the `task` true label is equal to `label`.
            Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.
    """
    self.fit(data)
    assert self.probas_ is not None, "no probas_"
    return self.probas_

MMSR

Bases: BaseClassificationAggregator

The Matrix Mean-Subsequence-Reduced Algorithm (M-MSR) model assumes that workers have different expertise levels and are represented as a vector of "skills" \(s\) which entries \(s_i\) show the probability that the worker \(i\) will answer the given task correctly. Having that, we can estimate the probability of each worker via solving a rank-one matrix completion problem as follows: $$ \mathbb{E}\left[\frac{M}{M-1}\widetilde{C}-\frac{1}{M-1}\boldsymbol{1}\boldsymbol{1}^T\right] = \boldsymbol{s}\boldsymbol{s}^T, $$ where \(M\) is the total number of classes, \(\widetilde{C}\) is a covariance matrix between workers, and \(\boldsymbol{1}\boldsymbol{1}^T\) is the all-ones matrix which has the same size as \(\widetilde{C}\).

Thus, the problem of estimating the skill level vector \(s\) becomes equivalent to the rank-one matrix completion problem. The M-MSR algorithm is an iterative algorithm for the robust rank-one matrix completion, so its result is an estimator of the vector \(s\). And the aggregation is weighted majority voting with weights equal to \(\log \frac{(M-1)s_i}{1-s_i}\).

Q. Ma and Alex Olshevsky. Adversarial Crowdsourcing Through Robust Rank-One Matrix Completion. 34th Conference on Neural Information Processing Systems (NeurIPS 2020)

https://arxiv.org/abs/2010.12181

Examples:

>>> from crowdkit.aggregation import MMSR
>>> from crowdkit.datasets import load_dataset
>>> df, gt = load_dataset('relevance-2')
>>> mmsr = MMSR()
>>> result = mmsr.fit_predict(df)
Source code in crowdkit/aggregation/classification/m_msr.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
@attr.s
class MMSR(BaseClassificationAggregator):
    r"""The **Matrix Mean-Subsequence-Reduced Algorithm** (M-MSR) model assumes that workers have different expertise levels and are represented
    as a vector of "skills" $s$ which entries $s_i$ show the probability
    that the worker $i$ will answer the given task correctly. Having that, we can estimate the probability of each worker via solving a rank-one matrix completion problem as follows:
    $$
    \mathbb{E}\left[\frac{M}{M-1}\widetilde{C}-\frac{1}{M-1}\boldsymbol{1}\boldsymbol{1}^T\right]
     = \boldsymbol{s}\boldsymbol{s}^T,
    $$
    where $M$ is the total number of classes, $\widetilde{C}$ is a covariance matrix between
    workers, and $\boldsymbol{1}\boldsymbol{1}^T$ is the all-ones matrix which has the same
    size as $\widetilde{C}$.

    Thus, the problem of estimating the skill level vector $s$ becomes equivalent to the
    rank-one matrix completion problem. The M-MSR algorithm is an iterative algorithm for the *robust*
    rank-one matrix completion, so its result is an estimator of the vector $s$.
    And the aggregation is weighted majority voting with weights equal to
    $\log \frac{(M-1)s_i}{1-s_i}$.

    Q. Ma and Alex Olshevsky. Adversarial Crowdsourcing Through Robust Rank-One Matrix Completion.
    *34th Conference on Neural Information Processing Systems (NeurIPS 2020)*

    <https://arxiv.org/abs/2010.12181>

    Examples:
        >>> from crowdkit.aggregation import MMSR
        >>> from crowdkit.datasets import load_dataset
        >>> df, gt = load_dataset('relevance-2')
        >>> mmsr = MMSR()
        >>> result = mmsr.fit_predict(df)
    """

    n_iter: int = attr.ib(default=10000)
    """The maximum number of iterations."""

    tol: float = attr.ib(default=1e-10)
    """The tolerance stopping criterion for iterative methods with a variable number of steps. The algorithm converges when the loss change is less than the `tol` parameter."""

    random_state: Optional[int] = attr.ib(default=0)
    """The seed number for the random initialization."""

    _observation_matrix: npt.NDArray[Any] = attr.ib(factory=lambda: np.array([]))
    """The matrix representing which workers give responses to which tasks."""

    _covariation_matrix: npt.NDArray[Any] = attr.ib(factory=lambda: np.array([]))
    """The matrix representing the covariance between workers."""

    _n_common_tasks: npt.NDArray[Any] = attr.ib(factory=lambda: np.array([]))
    """The matrix representing workers with tasks in common."""

    _n_workers: int = attr.ib(default=0)
    """The number of workers."""

    _n_tasks: int = attr.ib(default=0)
    """The number of tasks that are assigned to workers."""

    _n_labels: int = attr.ib(default=0)
    """The number of possible labels for a series of classification tasks."""

    _labels_mapping: Dict[Any, int] = attr.ib(factory=dict)
    """The mapping of labels and integer values."""

    _workers_mapping: Dict[Any, int] = attr.ib(factory=dict)
    """The mapping of workers and integer values."""

    _tasks_mapping: Dict[Any, int] = attr.ib(factory=dict)
    """The mapping of tasks and integer values."""

    # Available after fit
    skills_: Optional["pd.Series[Any]"] = named_series_attrib(name="skill")
    """The task labels.
    The `pandas.Series` data is indexed by `task` so that `labels.loc[task]` is the most likely true label of tasks."""

    scores_: Optional[pd.DataFrame] = attr.ib(init=False)
    """The task label scores.
    The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is a score of `label` for `task`."""

    loss_history_: List[float] = attr.ib(init=False)
    """A list of loss values during training."""

    def _apply(self, data: pd.DataFrame) -> "MMSR":
        mv = MajorityVote().fit(data, skills=self.skills_)
        self.labels_ = mv.labels_
        self.scores_ = mv.probas_
        return self

    def fit(self, data: pd.DataFrame) -> "MMSR":
        """Fits the model to the training data.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        Returns:
            MMSR: self.
        """

        data = data[["task", "worker", "label"]]
        self._construnct_covariation_matrix(data)
        self._m_msr()
        return self

    def predict(self, data: pd.DataFrame) -> "pd.Series[Any]":
        """Predicts the true labels of tasks when the model is fitted.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        Returns:
            Series: The task labels. The `pandas.Series` data is indexed by `task`
                so that `labels.loc[task]` is the most likely true label of tasks.
        """

        self._apply(data)
        assert self.labels_ is not None, "no labels_"
        return self.labels_

    def predict_score(self, data: pd.DataFrame) -> pd.DataFrame:
        """Returns the total sum of weights for each label when the model is fitted.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        Returns:
            DataFrame: The task label scores. The `pandas.DataFrame` data is indexed by `task`
                so that `result.loc[task, label]` is a score of `label` for `task`.
        """

        self._apply(data)
        assert self.scores_ is not None, "no scores_"
        return self.scores_

    def fit_predict(self, data: pd.DataFrame) -> "pd.Series[Any]":
        """Fits the model to the training data and returns the aggregated results.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        Returns:
            Series: The task labels. The `pandas.Series` data is indexed by `task`
                so that `labels.loc[task]` is the most likely true label of tasks.
        """

        return self.fit(data).predict(data)

    def fit_predict_score(self, data: pd.DataFrame) -> pd.DataFrame:
        """Fits the model to the training data and returns the total sum of weights for each label.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        Returns:
            DataFrame: The task label scores. The `pandas.DataFrame` data is indexed by `task`
                so that `result.loc[task, label]` is a score of `label` for `task`.
        """

        return self.fit(data).predict_score(data)

    def _m_msr(self) -> None:
        F_param = int(np.floor(self._sparsity / 2)) - 1
        n, m = self._covariation_matrix.shape
        u = sps.uniform.rvs(size=(n, 1), random_state=self.random_state)
        v = sps.uniform.rvs(size=(m, 1), random_state=self.random_state)
        observed_entries = np.abs(np.sign(self._n_common_tasks)) == 1
        X = np.abs(self._covariation_matrix)
        self.loss_history_ = []
        for _ in range(self.n_iter):
            v_prev = np.copy(v)
            u_prev = np.copy(u)
            for j in range(n):
                target_v = X[:, j].reshape(-1, 1)
                target_v = target_v[observed_entries[:, j]] / u[observed_entries[:, j]]

                y = self._remove_largest_and_smallest_F_value(
                    target_v, F_param, v[j][0], self._n_tasks
                )
                if len(y) == 0:
                    v[j] = v[j]
                else:
                    v[j][0] = y.mean()

            for i in range(m):
                target_u = X[i, :].reshape(-1, 1)
                target_u = target_u[observed_entries[i, :]] / v[observed_entries[i, :]]
                y = self._remove_largest_and_smallest_F_value(
                    target_u, F_param, u[i][0], self._n_tasks
                )
                if len(y) == 0:
                    u[i] = u[i]
                else:
                    u[i][0] = y.mean()

            loss = np.linalg.norm(u @ v.T - u_prev @ v_prev.T, ord="fro")
            self.loss_history_.append(float(loss))
            if loss < self.tol:
                break

        k = np.sqrt(np.linalg.norm(u) / np.linalg.norm(v))
        x_track_1 = u / k
        x_track_2 = self._sign_determination_valid(self._covariation_matrix, x_track_1)
        x_track_3 = np.minimum(x_track_2, 1 - 1.0 / np.sqrt(self._n_tasks))
        x_MSR = np.maximum(
            x_track_3, -1 / (self._n_labels - 1) + 1.0 / np.sqrt(self._n_tasks)
        )

        workers_probas = (
            x_MSR * (self._n_labels - 1) / (self._n_labels) + 1 / self._n_labels
        )
        workers_probas = workers_probas.ravel()
        skills = np.log(workers_probas * (self._n_labels - 1) / (1 - workers_probas))

        self.skills_ = self._get_skills_from_array(skills)

    def _get_skills_from_array(self, array: npt.NDArray[Any]) -> "pd.Series[Any]":
        inverse_workers_mapping = {
            ind: worker for worker, ind in self._workers_mapping.items()
        }
        index = [inverse_workers_mapping[i] for i in range(len(array))]
        return pd.Series(array, index=pd.Index(index, name="worker"))

    @staticmethod
    def _sign_determination_valid(
        C: npt.NDArray[Any], s_abs: npt.NDArray[Any]
    ) -> npt.NDArray[Any]:
        S = np.sign(C)
        n = len(s_abs)

        valid_idx = np.where(np.sum(C, axis=1) != 0)[0]
        S_valid = S[valid_idx[:, None], valid_idx]
        k = S_valid.shape[0]
        upper_idx = np.triu(np.ones(shape=(k, k)))
        S_valid_upper = S_valid * upper_idx
        new_node_end_I, new_node_end_J = np.where(S_valid_upper == 1)
        S_valid[S_valid == 1] = 0
        I = np.eye(k)
        S_valid_new = I[new_node_end_I, :] + I[new_node_end_J, :]
        m = S_valid_new.shape[0]
        A = np.vstack(
            (
                np.hstack((np.abs(S_valid), S_valid_new.T)),
                np.hstack((S_valid_new, np.zeros(shape=(m, m)))),
            )
        )
        n_new = A.shape[0]
        W = (1.0 / np.sum(A, axis=1)).reshape(-1, 1) @ np.ones(shape=(1, n_new)) * A
        D, V = sla.eigs(W + np.eye(n_new), 1, which="SM")
        V = V.real
        sign_vector = np.sign(V)
        s_sign = np.zeros(shape=(n, 1))
        s_sign[valid_idx] = (
            np.sign(np.sum(sign_vector[:k])) * s_abs[valid_idx] * sign_vector[:k]
        )
        return s_sign

    @staticmethod
    def _remove_largest_and_smallest_F_value(
        x: npt.NDArray[Any], F: int, a: float, n_tasks: int
    ) -> npt.NDArray[Any]:
        y = np.sort(x, axis=0)
        if np.sum(y < a) < F:
            y = y[y[:, 0] >= a]
        else:
            y = y[F:]

        m = y.shape[0]
        if np.sum(y > a) < F:
            y = y[y[:, 0] <= a]
        else:
            y = np.concatenate((y[: m - F], y[m:]), axis=0)
        if len(y) == 1 and y[0][0] == 0:
            y[0][0] = 1 / np.sqrt(n_tasks)
        return y

    def _construnct_covariation_matrix(self, answers: pd.DataFrame) -> None:
        labels = pd.unique(answers.label)
        self._n_labels = len(labels)
        self._labels_mapping = {labels[idx]: idx + 1 for idx in range(self._n_labels)}

        workers = pd.unique(answers.worker)
        self._n_workers = len(workers)
        self._workers_mapping = {workers[idx]: idx for idx in range(self._n_workers)}

        tasks = pd.unique(answers.task)
        self._n_tasks = len(tasks)
        self._tasks_mapping = {tasks[idx]: idx for idx in range(self._n_tasks)}

        self._observation_matrix = np.zeros(shape=(self._n_workers, self._n_tasks))
        for i, row in answers.iterrows():
            self._observation_matrix[self._workers_mapping[row["worker"]]][
                self._tasks_mapping[row["task"]]
            ] = self._labels_mapping[row["label"]]

        self._n_common_tasks = (
            np.sign(self._observation_matrix) @ np.sign(self._observation_matrix).T
        )
        self._n_common_tasks -= np.diag(np.diag(self._n_common_tasks))
        self._sparsity = np.min(np.sign(self._n_common_tasks).sum(axis=0))

        # Can we rewrite it in matrix operations?
        self._covariation_matrix = np.zeros(shape=(self._n_workers, self._n_workers))
        for i in range(self._n_workers):
            for j in range(self._n_workers):
                if self._n_common_tasks[i][j]:
                    valid_idx = np.sign(self._observation_matrix[i]) * np.sign(
                        self._observation_matrix[j]
                    )
                    self._covariation_matrix[i][j] = (
                        np.sum(
                            (self._observation_matrix[i] == self._observation_matrix[j])
                            * valid_idx
                        )
                        / self._n_common_tasks[i][j]
                    )

        self._covariation_matrix *= self._n_labels / (self._n_labels - 1)
        self._covariation_matrix -= np.ones(
            shape=(self._n_workers, self._n_workers)
        ) / (self._n_labels - 1)

loss_history_: List[float] = attr.ib(init=False) class-attribute instance-attribute

A list of loss values during training.

n_iter: int = attr.ib(default=10000) class-attribute instance-attribute

The maximum number of iterations.

random_state: Optional[int] = attr.ib(default=0) class-attribute instance-attribute

The seed number for the random initialization.

scores_: Optional[pd.DataFrame] = attr.ib(init=False) class-attribute instance-attribute

The task label scores. The pandas.DataFrame data is indexed by task so that result.loc[task, label] is a score of label for task.

skills_: Optional[pd.Series[Any]] = named_series_attrib(name='skill') class-attribute instance-attribute

The task labels. The pandas.Series data is indexed by task so that labels.loc[task] is the most likely true label of tasks.

tol: float = attr.ib(default=1e-10) class-attribute instance-attribute

The tolerance stopping criterion for iterative methods with a variable number of steps. The algorithm converges when the loss change is less than the tol parameter.

fit(data)

Fits the model to the training data.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required

Returns:

Name Type Description
MMSR MMSR

self.

Source code in crowdkit/aggregation/classification/m_msr.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def fit(self, data: pd.DataFrame) -> "MMSR":
    """Fits the model to the training data.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

    Returns:
        MMSR: self.
    """

    data = data[["task", "worker", "label"]]
    self._construnct_covariation_matrix(data)
    self._m_msr()
    return self

fit_predict(data)

Fits the model to the training data and returns the aggregated results.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required

Returns:

Name Type Description
Series Series[Any]

The task labels. The pandas.Series data is indexed by task so that labels.loc[task] is the most likely true label of tasks.

Source code in crowdkit/aggregation/classification/m_msr.py
151
152
153
154
155
156
157
158
159
160
161
162
163
def fit_predict(self, data: pd.DataFrame) -> "pd.Series[Any]":
    """Fits the model to the training data and returns the aggregated results.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

    Returns:
        Series: The task labels. The `pandas.Series` data is indexed by `task`
            so that `labels.loc[task]` is the most likely true label of tasks.
    """

    return self.fit(data).predict(data)

fit_predict_score(data)

Fits the model to the training data and returns the total sum of weights for each label.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required

Returns:

Name Type Description
DataFrame DataFrame

The task label scores. The pandas.DataFrame data is indexed by task so that result.loc[task, label] is a score of label for task.

Source code in crowdkit/aggregation/classification/m_msr.py
165
166
167
168
169
170
171
172
173
174
175
176
177
def fit_predict_score(self, data: pd.DataFrame) -> pd.DataFrame:
    """Fits the model to the training data and returns the total sum of weights for each label.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

    Returns:
        DataFrame: The task label scores. The `pandas.DataFrame` data is indexed by `task`
            so that `result.loc[task, label]` is a score of `label` for `task`.
    """

    return self.fit(data).predict_score(data)

predict(data)

Predicts the true labels of tasks when the model is fitted.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required

Returns:

Name Type Description
Series Series[Any]

The task labels. The pandas.Series data is indexed by task so that labels.loc[task] is the most likely true label of tasks.

Source code in crowdkit/aggregation/classification/m_msr.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def predict(self, data: pd.DataFrame) -> "pd.Series[Any]":
    """Predicts the true labels of tasks when the model is fitted.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

    Returns:
        Series: The task labels. The `pandas.Series` data is indexed by `task`
            so that `labels.loc[task]` is the most likely true label of tasks.
    """

    self._apply(data)
    assert self.labels_ is not None, "no labels_"
    return self.labels_

predict_score(data)

Returns the total sum of weights for each label when the model is fitted.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required

Returns:

Name Type Description
DataFrame DataFrame

The task label scores. The pandas.DataFrame data is indexed by task so that result.loc[task, label] is a score of label for task.

Source code in crowdkit/aggregation/classification/m_msr.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def predict_score(self, data: pd.DataFrame) -> pd.DataFrame:
    """Returns the total sum of weights for each label when the model is fitted.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

    Returns:
        DataFrame: The task label scores. The `pandas.DataFrame` data is indexed by `task`
            so that `result.loc[task, label]` is a score of `label` for `task`.
    """

    self._apply(data)
    assert self.scores_ is not None, "no scores_"
    return self.scores_

MajorityVote

Bases: BaseClassificationAggregator

The Majority Vote aggregation algorithm is a straightforward approach for categorical aggregation: for each task, it outputs a label with the largest number of responses. Additionaly, the Majority Vote can be used when different weights are assigned to workers' votes. In this case, the resulting label will have the largest sum of weights.

{% note info %}

If two or more labels have the largest number of votes, the resulting label will be the same for all tasks that have the same set of labels with the same number of votes.

{% endnote %}

Examples:

Basic Majority Vote:

>>> from crowdkit.aggregation import MajorityVote
>>> from crowdkit.datasets import load_dataset
>>> df, gt = load_dataset('relevance-2')
>>> result = MajorityVote().fit_predict(df)

Weighted Majority Vote:

>>> import pandas as pd
>>> from crowdkit.aggregation import MajorityVote
>>> df = pd.DataFrame(
>>>     [
>>>         ['t1', 'p1', 0],
>>>         ['t1', 'p2', 0],
>>>         ['t1', 'p3', 1],
>>>         ['t2', 'p1', 1],
>>>         ['t2', 'p2', 0],
>>>         ['t2', 'p3', 1],
>>>     ],
>>>     columns=['task', 'worker', 'label']
>>> )
>>> skills = pd.Series({'p1': 0.5, 'p2': 0.7, 'p3': 0.4})
>>> result = MajorityVote().fit_predict(df, skills)
Source code in crowdkit/aggregation/classification/majority_vote.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
@attr.s
class MajorityVote(BaseClassificationAggregator):
    r"""The **Majority Vote** aggregation algorithm is a straightforward approach for categorical aggregation: for each task,
    it outputs a label with the largest number of responses. Additionaly, the Majority Vote
    can be used when different weights are assigned to workers' votes. In this case, the
    resulting label will have the largest sum of weights.


    {% note info %}

     If two or more labels have the largest number of votes, the resulting
     label will be the same for all tasks that have the same set of labels with the same number of votes.

     {% endnote %}

    Examples:
        Basic Majority Vote:
        >>> from crowdkit.aggregation import MajorityVote
        >>> from crowdkit.datasets import load_dataset
        >>> df, gt = load_dataset('relevance-2')
        >>> result = MajorityVote().fit_predict(df)

        Weighted Majority Vote:
        >>> import pandas as pd
        >>> from crowdkit.aggregation import MajorityVote
        >>> df = pd.DataFrame(
        >>>     [
        >>>         ['t1', 'p1', 0],
        >>>         ['t1', 'p2', 0],
        >>>         ['t1', 'p3', 1],
        >>>         ['t2', 'p1', 1],
        >>>         ['t2', 'p2', 0],
        >>>         ['t2', 'p3', 1],
        >>>     ],
        >>>     columns=['task', 'worker', 'label']
        >>> )
        >>> skills = pd.Series({'p1': 0.5, 'p2': 0.7, 'p3': 0.4})
        >>> result = MajorityVote().fit_predict(df, skills)
    """

    default_skill: Optional[float] = attr.ib(default=None)
    """Default worker weight value."""

    skills_: Optional["pd.Series[Any]"] = named_series_attrib(name="skill")
    """The workers' skills. The `pandas.Series` data is indexed by `worker` and has the corresponding worker skill."""

    probas_: Optional[pd.DataFrame] = attr.ib(init=False)
    """The probability distributions of task labels. The `pandas.DataFrame` data is indexed by `task`
    so that `result.loc[task, label]` is the probability that the `task` true label is equal to `label`.
    Each probability is in the range from 0 to 1, all task probabilities must sum up to 1."""

    on_missing_skill: str = attr.ib(default="error")
    """A value which specifies how to handle assignments performed by workers with an unknown skill.

    Possible values:
    * `error`: raises an exception if there is at least one assignment performed by a worker with an unknown skill;
    * `ignore`: drops assignments performed by workers with an unknown skill during prediction,
    raises an exception if there are no assignments with a known skill for any task;
    * `value`: the default value will be used if a skill is missing."""

    def fit(
        self, data: pd.DataFrame, skills: Optional["pd.Series[Any]"] = None
    ) -> "MajorityVote":
        """Fits the model to the training data.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

            skills (Series): The workers' skills. The `pandas.Series` data is indexed by `worker`
                and has the corresponding worker skill.

        Returns:
            MajorityVote: self.
        """

        data = data[["task", "worker", "label"]]

        if skills is None:
            scores = data[["task", "label"]].value_counts()
        else:
            data = add_skills_to_data(
                data, skills, self.on_missing_skill, self.default_skill
            )
            scores = data.groupby(["task", "label"])["skill"].sum()

        self.probas_ = normalize_rows(scores.unstack("label", fill_value=0))
        self.labels_ = get_most_probable_labels(self.probas_)
        self.skills_ = get_accuracy(data, self.labels_, by="worker")

        return self

    def fit_predict_proba(
        self, data: pd.DataFrame, skills: Optional["pd.Series[Any]"] = None
    ) -> pd.DataFrame:
        """Fits the model to the training data and returns probability distributions of labels for each task.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

            skills (Series): The workers' skills. The `pandas.Series` data is indexed by `worker`
                and has the corresponding worker skill.

        Returns:
            DataFrame: The probability distributions of task labels.
                The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is the probability that the `task` true label is equal to `label`.
                Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.
        """
        self.fit(data, skills)
        assert self.probas_ is not None, "no probas_"
        return self.probas_

    def fit_predict(
        self, data: pd.DataFrame, skills: Optional["pd.Series[Any]"] = None
    ) -> "pd.Series[Any]":
        """Fits the model to the training data and returns the aggregated results.

        Args:
           data (DataFrame): The training dataset of workers' labeling results
               which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

           skills (Series): The workers' skills. The `pandas.Series` data is indexed by `worker`
               and has the corresponding worker skill.

        Returns:
           Series: The task labels. The `pandas.Series` data is indexed by `task`
               so that `labels.loc[task]` is the most likely true label of tasks.
        """

        self.fit(data, skills)
        assert self.labels_ is not None, "no labels_"
        return self.labels_

default_skill: Optional[float] = attr.ib(default=None) class-attribute instance-attribute

Default worker weight value.

on_missing_skill: str = attr.ib(default='error') class-attribute instance-attribute

A value which specifies how to handle assignments performed by workers with an unknown skill.

Possible values: * error: raises an exception if there is at least one assignment performed by a worker with an unknown skill; * ignore: drops assignments performed by workers with an unknown skill during prediction, raises an exception if there are no assignments with a known skill for any task; * value: the default value will be used if a skill is missing.

probas_: Optional[pd.DataFrame] = attr.ib(init=False) class-attribute instance-attribute

The probability distributions of task labels. The pandas.DataFrame data is indexed by task so that result.loc[task, label] is the probability that the task true label is equal to label. Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.

skills_: Optional[pd.Series[Any]] = named_series_attrib(name='skill') class-attribute instance-attribute

The workers' skills. The pandas.Series data is indexed by worker and has the corresponding worker skill.

fit(data, skills=None)

Fits the model to the training data.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required
skills Series

The workers' skills. The pandas.Series data is indexed by worker and has the corresponding worker skill.

None

Returns:

Name Type Description
MajorityVote MajorityVote

self.

Source code in crowdkit/aggregation/classification/majority_vote.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def fit(
    self, data: pd.DataFrame, skills: Optional["pd.Series[Any]"] = None
) -> "MajorityVote":
    """Fits the model to the training data.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        skills (Series): The workers' skills. The `pandas.Series` data is indexed by `worker`
            and has the corresponding worker skill.

    Returns:
        MajorityVote: self.
    """

    data = data[["task", "worker", "label"]]

    if skills is None:
        scores = data[["task", "label"]].value_counts()
    else:
        data = add_skills_to_data(
            data, skills, self.on_missing_skill, self.default_skill
        )
        scores = data.groupby(["task", "label"])["skill"].sum()

    self.probas_ = normalize_rows(scores.unstack("label", fill_value=0))
    self.labels_ = get_most_probable_labels(self.probas_)
    self.skills_ = get_accuracy(data, self.labels_, by="worker")

    return self

fit_predict(data, skills=None)

Fits the model to the training data and returns the aggregated results.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required
skills Series

The workers' skills. The pandas.Series data is indexed by worker and has the corresponding worker skill.

None

Returns:

Name Type Description
Series Series[Any]

The task labels. The pandas.Series data is indexed by task so that labels.loc[task] is the most likely true label of tasks.

Source code in crowdkit/aggregation/classification/majority_vote.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def fit_predict(
    self, data: pd.DataFrame, skills: Optional["pd.Series[Any]"] = None
) -> "pd.Series[Any]":
    """Fits the model to the training data and returns the aggregated results.

    Args:
       data (DataFrame): The training dataset of workers' labeling results
           which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

       skills (Series): The workers' skills. The `pandas.Series` data is indexed by `worker`
           and has the corresponding worker skill.

    Returns:
       Series: The task labels. The `pandas.Series` data is indexed by `task`
           so that `labels.loc[task]` is the most likely true label of tasks.
    """

    self.fit(data, skills)
    assert self.labels_ is not None, "no labels_"
    return self.labels_

fit_predict_proba(data, skills=None)

Fits the model to the training data and returns probability distributions of labels for each task.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required
skills Series

The workers' skills. The pandas.Series data is indexed by worker and has the corresponding worker skill.

None

Returns:

Name Type Description
DataFrame DataFrame

The probability distributions of task labels. The pandas.DataFrame data is indexed by task so that result.loc[task, label] is the probability that the task true label is equal to label. Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.

Source code in crowdkit/aggregation/classification/majority_vote.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def fit_predict_proba(
    self, data: pd.DataFrame, skills: Optional["pd.Series[Any]"] = None
) -> pd.DataFrame:
    """Fits the model to the training data and returns probability distributions of labels for each task.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        skills (Series): The workers' skills. The `pandas.Series` data is indexed by `worker`
            and has the corresponding worker skill.

    Returns:
        DataFrame: The probability distributions of task labels.
            The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is the probability that the `task` true label is equal to `label`.
            Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.
    """
    self.fit(data, skills)
    assert self.probas_ is not None, "no probas_"
    return self.probas_

OneCoinDawidSkene

Bases: DawidSkene

The one-coin Dawid-Skene aggregation model works exactly the same as the original Dawid-Skene model based on the EM algorithm, except for calculating the workers' errors at the M-step of the algorithm.

For the one-coin model, a worker confusion (error) matrix is parameterized by a single parameter \(s_w\): $$ e^w_{j,z_j} = \begin{cases} s_{w} & y^w_j = z_j \ \frac{1 - s_{w}}{K - 1} & y^w_j \neq z_j \end{cases} $$ where \(e^w\) is a worker confusion (error) matrix of size \(K \times K\) in case of the \(K\) class classification, \(z_j\) be a true task label, \(y^w_j\) is a worker response to the task \(j\), and \(s_w\) is a worker skill (accuracy).

In other words, the worker \(w\) uses a single coin flip to decide their assignment. No matter what the true label is, the worker has the \(s_w\) probability to assign the correct label, and has the \(1 − s_w\) probability to randomly assign an incorrect label. For the one-coin model, it suffices to estimate \(s_w\) for every worker \(w\) and estimate \(y^w_j\) for every task \(j\). Because of its simplicity, the one-coin model is easier to estimate and enjoys better convergence properties.

Parameters \(p\), \(e^w\), and latent variables \(z\) are optimized with the Expectation-Maximization algorithm: 1. E-step. Estimates the true task label probabilities using the specified workers' responses, the prior label probabilities, and the workers' error probability matrix. 2. M-step. Calculates a worker skill as their accuracy according to the label probability. Then estimates the workers' error probability matrix by assigning user skills to error matrix row by row.

Y. Zhang, X. Chen, D. Zhou, and M. I. Jordan. Spectral methods meet EM: A provably optimal algorithm for crowdsourcing. Journal of Machine Learning Research. Vol. 17, (2016), 1-44.

https://doi.org/10.48550/arXiv.1406.3824

Examples:

>>> from crowdkit.aggregation import OneCoinDawidSkene
>>> from crowdkit.datasets import load_dataset
>>> df, gt = load_dataset('relevance-2')
>>> hds = OneCoinDawidSkene(100)
>>> result = hds.fit_predict(df)
Source code in crowdkit/aggregation/classification/dawid_skene.py
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
@attr.s
class OneCoinDawidSkene(DawidSkene):
    r"""The **one-coin Dawid-Skene** aggregation model works exactly the same as the original Dawid-Skene model
    based on the EM algorithm, except for calculating the workers' errors
    at the M-step of the algorithm.

    For the one-coin model, a worker confusion (error) matrix is parameterized by a single parameter $s_w$:
    $$
    e^w_{j,z_j}  = \begin{cases}
        s_{w} & y^w_j = z_j \\
        \frac{1 - s_{w}}{K - 1} & y^w_j \neq z_j
    \end{cases}
    $$
    where $e^w$ is a worker confusion (error) matrix of size $K \times K$ in case of the $K$ class classification,
    $z_j$ be a true task label, $y^w_j$ is a worker
    response to the task $j$, and $s_w$ is a worker skill (accuracy).

    In other words, the worker $w$ uses a single coin flip to decide their assignment. No matter what the true label is,
    the worker has the $s_w$ probability to assign the correct label, and
    has the $1 − s_w$ probability to randomly assign an incorrect label. For the one-coin model, it
    suffices to estimate $s_w$ for every worker $w$ and estimate $y^w_j$ for every task $j$. Because of its
    simplicity, the one-coin model is easier to estimate and enjoys better convergence properties.

    Parameters $p$, $e^w$, and latent variables $z$ are optimized with the Expectation-Maximization algorithm:
    1. **E-step**. Estimates the true task label probabilities using the specified workers' responses,
    the prior label probabilities, and the workers' error probability matrix.
    2. **M-step**. Calculates a worker skill as their accuracy according to the label probability.
    Then estimates the workers' error probability matrix by assigning user skills to error matrix row by row.

    Y. Zhang, X. Chen, D. Zhou, and M. I. Jordan. Spectral methods meet EM: A provably optimal algorithm for crowdsourcing.
    *Journal of Machine Learning Research. Vol. 17*, (2016), 1-44.

    <https://doi.org/10.48550/arXiv.1406.3824>

    Examples:
        >>> from crowdkit.aggregation import OneCoinDawidSkene
        >>> from crowdkit.datasets import load_dataset
        >>> df, gt = load_dataset('relevance-2')
        >>> hds = OneCoinDawidSkene(100)
        >>> result = hds.fit_predict(df)
    """

    @staticmethod
    def _assign_skills(row: "pd.Series[Any]", skills: pd.DataFrame) -> pd.DataFrame:
        """
        Assigns user skills to error matrix row by row.
        """
        num_categories = len(row)
        for column_name, _ in row.items():
            if column_name == row.name[1]:  # type: ignore
                row[column_name] = skills[row.name[0]]  # type: ignore
            else:
                row[column_name] = (1 - skills[row.name[0]]) / (num_categories - 1)  # type: ignore
        return row  # type: ignore

    @staticmethod
    def _process_skills_to_errors(
        data: pd.DataFrame, probas: pd.DataFrame, skills: "pd.Series[Any]"
    ) -> pd.DataFrame:
        errors = DawidSkene._m_step(data, probas)

        errors = errors.apply(OneCoinDawidSkene._assign_skills, args=(skills,), axis=1)  # type: ignore
        errors.clip(lower=_EPS, upper=1 - _EPS, inplace=True)

        return errors

    @staticmethod
    def _m_step(data: pd.DataFrame, probas: pd.DataFrame) -> "pd.Series[Any]":  # type: ignore
        """Performs M-step of Homogeneous Dawid-Skene algorithm.

        Calculates a worker skill as their accuracy according to the label probability.
        """
        skilled_data = data.copy()
        idx_cols, cols = pd.factorize(data["label"])
        idx_rows, rows = pd.factorize(data["task"])
        skilled_data["skill"] = (
            probas.reindex(rows, axis=0)
            .reindex(cols, axis=1)
            .to_numpy()[idx_rows, idx_cols]
        )
        skills = skilled_data.groupby(["worker"], sort=False)["skill"].mean()
        return skills

    def fit(self, data: pd.DataFrame) -> "OneCoinDawidSkene":
        """Fits the model to the training data with the EM algorithm.
        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.
        Returns:
            DawidSkene: self.
        """

        data = data[["task", "worker", "label"]]

        # Early exit
        if not data.size:
            self.probas_ = pd.DataFrame()
            self.priors_ = pd.Series(dtype=float)
            self.errors_ = pd.DataFrame()
            self.labels_ = pd.Series(dtype=float)
            return self

        # Initialization
        probas = MajorityVote().fit_predict_proba(data)
        priors = probas.mean()
        skills = self._m_step(data, probas)
        errors = self._process_skills_to_errors(data, probas, skills)
        loss = -np.inf
        self.loss_history_ = []

        # Updating proba and errors n_iter times
        for _ in range(self.n_iter):
            probas = self._e_step(data, priors, errors)
            priors = probas.mean()
            skills = self._m_step(data, probas)
            errors = self._process_skills_to_errors(data, probas, skills)
            new_loss = self._evidence_lower_bound(data, probas, priors, errors) / len(
                data
            )
            self.loss_history_.append(new_loss)

            if new_loss - loss < self.tol:
                break
            loss = new_loss

        # Saving results
        self.probas_ = probas
        self.priors_ = priors
        self.skills_ = skills
        self.errors_ = errors
        self.labels_ = get_most_probable_labels(probas)

        return self

fit(data)

Fits the model to the training data with the EM algorithm. Args: data (DataFrame): The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns. Returns: DawidSkene: self.

Source code in crowdkit/aggregation/classification/dawid_skene.py
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
def fit(self, data: pd.DataFrame) -> "OneCoinDawidSkene":
    """Fits the model to the training data with the EM algorithm.
    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.
    Returns:
        DawidSkene: self.
    """

    data = data[["task", "worker", "label"]]

    # Early exit
    if not data.size:
        self.probas_ = pd.DataFrame()
        self.priors_ = pd.Series(dtype=float)
        self.errors_ = pd.DataFrame()
        self.labels_ = pd.Series(dtype=float)
        return self

    # Initialization
    probas = MajorityVote().fit_predict_proba(data)
    priors = probas.mean()
    skills = self._m_step(data, probas)
    errors = self._process_skills_to_errors(data, probas, skills)
    loss = -np.inf
    self.loss_history_ = []

    # Updating proba and errors n_iter times
    for _ in range(self.n_iter):
        probas = self._e_step(data, priors, errors)
        priors = probas.mean()
        skills = self._m_step(data, probas)
        errors = self._process_skills_to_errors(data, probas, skills)
        new_loss = self._evidence_lower_bound(data, probas, priors, errors) / len(
            data
        )
        self.loss_history_.append(new_loss)

        if new_loss - loss < self.tol:
            break
        loss = new_loss

    # Saving results
    self.probas_ = probas
    self.priors_ = priors
    self.skills_ = skills
    self.errors_ = errors
    self.labels_ = get_most_probable_labels(probas)

    return self

Wawa

Bases: BaseClassificationAggregator

The Worker Agreement with Aggregate (Wawa) algorithm consists of three steps: 1. calculates the majority vote label; 2. estimates workers' skills as a fraction of responses that are equal to the majority vote; 3. calculates the weigthed majority vote based on skills from the previous step.

Examples:

>>> from crowdkit.aggregation import Wawa
>>> from crowdkit.datasets import load_dataset
>>> df, gt = load_dataset('relevance-2')
>>> result = Wawa().fit_predict(df)
Source code in crowdkit/aggregation/classification/wawa.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
@attr.s
class Wawa(BaseClassificationAggregator):
    r"""The **Worker Agreement with Aggregate** (Wawa) algorithm consists of three steps:
    1. calculates the majority vote label;
    2. estimates workers' skills as a fraction of responses that are equal to the majority vote;
    3. calculates the weigthed majority vote based on skills from the previous step.

    Examples:
        >>> from crowdkit.aggregation import Wawa
        >>> from crowdkit.datasets import load_dataset
        >>> df, gt = load_dataset('relevance-2')
        >>> result = Wawa().fit_predict(df)
    """

    skills_: Optional["pd.Series[Any]"] = named_series_attrib(name="skill")
    """The workers' skills.
    The `pandas.Series` data is indexed by `worker` and has the corresponding worker skill."""

    probas_: Optional[pd.DataFrame] = attr.ib(init=False)
    """The probability distributions of task labels.
    The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is the probability that
    the `task` true label is equal to `label`. Each probability is in the range from 0 to 1,
    all task probabilities must sum up to 1."""

    def _apply(self, data: pd.DataFrame) -> "Wawa":
        check_is_fitted(self, attributes="skills_")
        mv = MajorityVote().fit(data, skills=self.skills_)
        self.probas_ = mv.probas_
        self.labels_ = mv.labels_
        return self

    def fit(self, data: pd.DataFrame) -> "Wawa":
        """Fits the model to the training data.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        Returns:
            Wawa: self.
        """

        # TODO: support weights?
        data = data[["task", "worker", "label"]]
        mv = MajorityVote().fit(data)
        assert mv.labels_ is not None, "no labels_"
        self.skills_ = get_accuracy(data, true_labels=mv.labels_, by="worker")
        return self

    def predict(self, data: pd.DataFrame) -> "pd.Series[Any]":
        """Predicts the true labels of tasks when the model is fitted.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        Returns:
            Series: The task labels. The `pandas.Series` data is indexed by `task`
                so that `labels.loc[task]` is the most likely true label of tasks.
        """

        self._apply(data)
        assert self.labels_ is not None, "no labels_"
        return self.labels_

    def predict_proba(self, data: pd.DataFrame) -> pd.DataFrame:
        """Returns probability distributions of labels for each task when the model is fitted.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        Returns:
            DataFrame: Probability distributions of task labels.
                The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is the probability that the `task` true label is equal to `label`.
                Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.
        """

        self._apply(data)
        assert self.probas_ is not None, "no probas_"
        return self.probas_

    def fit_predict(self, data: pd.DataFrame) -> "pd.Series[Any]":
        """Fits the model to the training data and returns the aggregated results.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        Returns:
            Series: The task labels. The `pandas.Series` data is indexed by `task`
                so that `labels.loc[task]` is the most likely true label of tasks.
        """

        return self.fit(data).predict(data)

    def fit_predict_proba(self, data: pd.DataFrame) -> pd.DataFrame:
        """Fits the model to the training data and returns probability distributions of labels for each task.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        Returns:
            DataFrame: Probability distributions of task labels.
                The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is the probability that the `task` true label is equal to `label`.
                Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.
        """

        return self.fit(data).predict_proba(data)

probas_: Optional[pd.DataFrame] = attr.ib(init=False) class-attribute instance-attribute

The probability distributions of task labels. The pandas.DataFrame data is indexed by task so that result.loc[task, label] is the probability that the task true label is equal to label. Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.

skills_: Optional[pd.Series[Any]] = named_series_attrib(name='skill') class-attribute instance-attribute

The workers' skills. The pandas.Series data is indexed by worker and has the corresponding worker skill.

fit(data)

Fits the model to the training data.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required

Returns:

Name Type Description
Wawa Wawa

self.

Source code in crowdkit/aggregation/classification/wawa.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def fit(self, data: pd.DataFrame) -> "Wawa":
    """Fits the model to the training data.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

    Returns:
        Wawa: self.
    """

    # TODO: support weights?
    data = data[["task", "worker", "label"]]
    mv = MajorityVote().fit(data)
    assert mv.labels_ is not None, "no labels_"
    self.skills_ = get_accuracy(data, true_labels=mv.labels_, by="worker")
    return self

fit_predict(data)

Fits the model to the training data and returns the aggregated results.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required

Returns:

Name Type Description
Series Series[Any]

The task labels. The pandas.Series data is indexed by task so that labels.loc[task] is the most likely true label of tasks.

Source code in crowdkit/aggregation/classification/wawa.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def fit_predict(self, data: pd.DataFrame) -> "pd.Series[Any]":
    """Fits the model to the training data and returns the aggregated results.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

    Returns:
        Series: The task labels. The `pandas.Series` data is indexed by `task`
            so that `labels.loc[task]` is the most likely true label of tasks.
    """

    return self.fit(data).predict(data)

fit_predict_proba(data)

Fits the model to the training data and returns probability distributions of labels for each task.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required

Returns:

Name Type Description
DataFrame DataFrame

Probability distributions of task labels. The pandas.DataFrame data is indexed by task so that result.loc[task, label] is the probability that the task true label is equal to label. Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.

Source code in crowdkit/aggregation/classification/wawa.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def fit_predict_proba(self, data: pd.DataFrame) -> pd.DataFrame:
    """Fits the model to the training data and returns probability distributions of labels for each task.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

    Returns:
        DataFrame: Probability distributions of task labels.
            The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is the probability that the `task` true label is equal to `label`.
            Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.
    """

    return self.fit(data).predict_proba(data)

predict(data)

Predicts the true labels of tasks when the model is fitted.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required

Returns:

Name Type Description
Series Series[Any]

The task labels. The pandas.Series data is indexed by task so that labels.loc[task] is the most likely true label of tasks.

Source code in crowdkit/aggregation/classification/wawa.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def predict(self, data: pd.DataFrame) -> "pd.Series[Any]":
    """Predicts the true labels of tasks when the model is fitted.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

    Returns:
        Series: The task labels. The `pandas.Series` data is indexed by `task`
            so that `labels.loc[task]` is the most likely true label of tasks.
    """

    self._apply(data)
    assert self.labels_ is not None, "no labels_"
    return self.labels_

predict_proba(data)

Returns probability distributions of labels for each task when the model is fitted.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required

Returns:

Name Type Description
DataFrame DataFrame

Probability distributions of task labels. The pandas.DataFrame data is indexed by task so that result.loc[task, label] is the probability that the task true label is equal to label. Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.

Source code in crowdkit/aggregation/classification/wawa.py
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def predict_proba(self, data: pd.DataFrame) -> pd.DataFrame:
    """Returns probability distributions of labels for each task when the model is fitted.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

    Returns:
        DataFrame: Probability distributions of task labels.
            The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is the probability that the `task` true label is equal to `label`.
            Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.
    """

    self._apply(data)
    assert self.probas_ is not None, "no probas_"
    return self.probas_

ZeroBasedSkill

Bases: BaseClassificationAggregator

The Zero-Based Skill (ZBS) aggregation model performs weighted majority voting on tasks. After processing a pool of tasks, it re-estimates the workers' skills with a gradient descend step to optimize the mean squared error of the current skills and the fraction of responses that are equal to the aggregated labels.

This process is repeated until the labels change or exceed the number of iterations.

{% note info %}

It is necessary that all workers in the dataset that is sent to predict exist in responses to the dataset that was sent to fit.

{% endnote %}

Examples:

>>> from crowdkit.aggregation import ZeroBasedSkill
>>> from crowdkit.datasets import load_dataset
>>> df, gt = load_dataset('relevance-2')
>>> result = ZeroBasedSkill().fit_predict(df)
Source code in crowdkit/aggregation/classification/zero_based_skill.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
@attr.attrs(auto_attribs=True)
class ZeroBasedSkill(BaseClassificationAggregator):
    r"""The **Zero-Based Skill** (ZBS) aggregation model performs weighted majority voting on tasks. After processing a pool of tasks,
    it re-estimates the workers' skills with a gradient descend step to optimize
    the mean squared error of the current skills and the fraction of responses that
    are equal to the aggregated labels.

    This process is repeated until the labels change or exceed the number of iterations.

    {% note info %}

    It is necessary that all workers in the dataset that is sent to `predict` exist in responses to
    the dataset that was sent to `fit`.

    {% endnote %}

    Examples:
        >>> from crowdkit.aggregation import ZeroBasedSkill
        >>> from crowdkit.datasets import load_dataset
        >>> df, gt = load_dataset('relevance-2')
        >>> result = ZeroBasedSkill().fit_predict(df)
    """

    n_iter: int = 100
    """The maximum number of iterations."""

    lr_init: float = 1.0
    """The initial learning rate."""

    lr_steps_to_reduce: int = 20
    """The number of steps required to reduce the learning rate."""

    lr_reduce_factor: float = 0.5
    """The factor by which the learning rate will be multiplied every `lr_steps_to_reduce` step."""

    eps: float = 1e-5
    """The convergence threshold."""

    skills_: Optional["pd.Series[Any]"] = named_series_attrib(name="skill")
    """The workers' skills.
    The `pandas.Series` data is indexed by `worker` and has the corresponding worker skill."""

    probas_: Optional[pd.DataFrame] = attr.ib(init=False)
    """The probability distributions of task labels.
    The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is the probability that
    the `task` true label is equal to `label`. Each probability is in the range from 0 to 1,
    all task probabilities must sum up to 1."""

    def _init_skills(self, data: pd.DataFrame) -> "pd.Series[Any]":
        skill_value = 1 / data.label.unique().size + self.eps
        skill_index = pd.Index(data.worker.unique(), name="worker")
        return pd.Series(skill_value, index=skill_index)

    def _apply(self, data: pd.DataFrame) -> "ZeroBasedSkill":
        check_is_fitted(self, attributes="skills_")
        mv = MajorityVote().fit(data, self.skills_)
        self.labels_ = mv.labels_
        self.probas_ = mv.probas_
        return self

    def fit(self, data: pd.DataFrame) -> "ZeroBasedSkill":
        """Fits the model to the training data.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        Returns:
            ZeroBasedSkill: self.
        """

        # Initialization
        data = data[["task", "worker", "label"]]
        skills = self._init_skills(data)
        mv = MajorityVote()

        # Updating skills and re-fitting majority vote n_iter times
        learning_rate = self.lr_init
        for iteration in range(1, self.n_iter + 1):
            if iteration % self.lr_steps_to_reduce == 0:
                learning_rate *= self.lr_reduce_factor
            mv.fit(data, skills=skills)
            assert mv.labels_ is not None, "no labels_"
            skills = skills + learning_rate * (
                get_accuracy(data, mv.labels_, by="worker") - skills
            )

        # Saving results
        self.skills_ = skills

        return self

    def predict(self, data: pd.DataFrame) -> "pd.Series[Any]":
        """Predicts the true labels of tasks when the model is fitted.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        Returns:
            Series: The task labels. The `pandas.Series` data is indexed by `task`
                so that `labels.loc[task]` is the most likely true label of tasks.
        """

        self._apply(data)
        assert self.labels_ is not None, "no labels_"
        return self.labels_

    def predict_proba(self, data: pd.DataFrame) -> pd.DataFrame:
        """Returns probability distributions of labels for each task when the model is fitted.

        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

        Returns:
            DataFrame: The probability distributions of task labels.
                The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is the probability that the `task` true label is equal to `label`.
                Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.
        """

        self._apply(data)
        assert self.probas_ is not None, "no probas_"
        return self.probas_

    def fit_predict(self, data: pd.DataFrame) -> "pd.Series[Any]":
        """Fits the model to the training data and returns the aggregated results.
        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.
        Returns:
            Series: The task labels. The `pandas.Series` data is indexed by `task`
                so that `labels.loc[task]` is the most likely true label of tasks.
        """

        return self.fit(data).predict(data)

    def fit_predict_proba(self, data: pd.DataFrame) -> pd.DataFrame:
        """Fits the model to the training data and returns the aggregated results.
        Args:
            data (DataFrame): The training dataset of workers' labeling results
                which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.
        Returns:
            Series: The task labels. The `pandas.Series` data is indexed by `task`
                so that `labels.loc[task]` is the most likely true label of tasks.
        """

        return self.fit(data).predict_proba(data)

eps: float = 1e-05 class-attribute instance-attribute

The convergence threshold.

lr_init: float = 1.0 class-attribute instance-attribute

The initial learning rate.

lr_reduce_factor: float = 0.5 class-attribute instance-attribute

The factor by which the learning rate will be multiplied every lr_steps_to_reduce step.

lr_steps_to_reduce: int = 20 class-attribute instance-attribute

The number of steps required to reduce the learning rate.

n_iter: int = 100 class-attribute instance-attribute

The maximum number of iterations.

probas_: Optional[pd.DataFrame] = attr.ib(init=False) class-attribute instance-attribute

The probability distributions of task labels. The pandas.DataFrame data is indexed by task so that result.loc[task, label] is the probability that the task true label is equal to label. Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.

skills_: Optional[pd.Series[Any]] = named_series_attrib(name='skill') class-attribute instance-attribute

The workers' skills. The pandas.Series data is indexed by worker and has the corresponding worker skill.

fit(data)

Fits the model to the training data.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required

Returns:

Name Type Description
ZeroBasedSkill ZeroBasedSkill

self.

Source code in crowdkit/aggregation/classification/zero_based_skill.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def fit(self, data: pd.DataFrame) -> "ZeroBasedSkill":
    """Fits the model to the training data.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

    Returns:
        ZeroBasedSkill: self.
    """

    # Initialization
    data = data[["task", "worker", "label"]]
    skills = self._init_skills(data)
    mv = MajorityVote()

    # Updating skills and re-fitting majority vote n_iter times
    learning_rate = self.lr_init
    for iteration in range(1, self.n_iter + 1):
        if iteration % self.lr_steps_to_reduce == 0:
            learning_rate *= self.lr_reduce_factor
        mv.fit(data, skills=skills)
        assert mv.labels_ is not None, "no labels_"
        skills = skills + learning_rate * (
            get_accuracy(data, mv.labels_, by="worker") - skills
        )

    # Saving results
    self.skills_ = skills

    return self

fit_predict(data)

Fits the model to the training data and returns the aggregated results. Args: data (DataFrame): The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns. Returns: Series: The task labels. The pandas.Series data is indexed by task so that labels.loc[task] is the most likely true label of tasks.

Source code in crowdkit/aggregation/classification/zero_based_skill.py
139
140
141
142
143
144
145
146
147
148
149
def fit_predict(self, data: pd.DataFrame) -> "pd.Series[Any]":
    """Fits the model to the training data and returns the aggregated results.
    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.
    Returns:
        Series: The task labels. The `pandas.Series` data is indexed by `task`
            so that `labels.loc[task]` is the most likely true label of tasks.
    """

    return self.fit(data).predict(data)

fit_predict_proba(data)

Fits the model to the training data and returns the aggregated results. Args: data (DataFrame): The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns. Returns: Series: The task labels. The pandas.Series data is indexed by task so that labels.loc[task] is the most likely true label of tasks.

Source code in crowdkit/aggregation/classification/zero_based_skill.py
151
152
153
154
155
156
157
158
159
160
161
def fit_predict_proba(self, data: pd.DataFrame) -> pd.DataFrame:
    """Fits the model to the training data and returns the aggregated results.
    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.
    Returns:
        Series: The task labels. The `pandas.Series` data is indexed by `task`
            so that `labels.loc[task]` is the most likely true label of tasks.
    """

    return self.fit(data).predict_proba(data)

predict(data)

Predicts the true labels of tasks when the model is fitted.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required

Returns:

Name Type Description
Series Series[Any]

The task labels. The pandas.Series data is indexed by task so that labels.loc[task] is the most likely true label of tasks.

Source code in crowdkit/aggregation/classification/zero_based_skill.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def predict(self, data: pd.DataFrame) -> "pd.Series[Any]":
    """Predicts the true labels of tasks when the model is fitted.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

    Returns:
        Series: The task labels. The `pandas.Series` data is indexed by `task`
            so that `labels.loc[task]` is the most likely true label of tasks.
    """

    self._apply(data)
    assert self.labels_ is not None, "no labels_"
    return self.labels_

predict_proba(data)

Returns probability distributions of labels for each task when the model is fitted.

Parameters:

Name Type Description Default
data DataFrame

The training dataset of workers' labeling results which is represented as the pandas.DataFrame data containing task, worker, and label columns.

required

Returns:

Name Type Description
DataFrame DataFrame

The probability distributions of task labels. The pandas.DataFrame data is indexed by task so that result.loc[task, label] is the probability that the task true label is equal to label. Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.

Source code in crowdkit/aggregation/classification/zero_based_skill.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def predict_proba(self, data: pd.DataFrame) -> pd.DataFrame:
    """Returns probability distributions of labels for each task when the model is fitted.

    Args:
        data (DataFrame): The training dataset of workers' labeling results
            which is represented as the `pandas.DataFrame` data containing `task`, `worker`, and `label` columns.

    Returns:
        DataFrame: The probability distributions of task labels.
            The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is the probability that the `task` true label is equal to `label`.
            Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.
    """

    self._apply(data)
    assert self.probas_ is not None, "no probas_"
    return self.probas_