Skip to content

Utilities

Helper routines for aggregation.

add_skills_to_data(data, skills, on_missing_skill, default_skill)

Parameters:

Name Type Description Default
skills Series

workers' skills. A pandas.Series index by workers and holding corresponding worker's skill

required
on_missing_skill str

How to handle assignments done by workers with unknown skill. Possible values: * "error" — raise an exception if there is at least one assignment done by user with unknown skill; * "ignore" — drop assignments with unknown skill values during prediction. Raise an exception if there is no assignments with known skill for any task; * value — default value will be used if skill is missing.

required
Source code in crowdkit/aggregation/utils.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
def add_skills_to_data(
    data: pd.DataFrame,
    skills: "pd.Series[Any]",
    on_missing_skill: str,
    default_skill: Optional[float],
) -> pd.DataFrame:
    """
    Args:
        skills (Series): workers' skills.
            A pandas.Series index by workers and holding corresponding worker's skill
        on_missing_skill (str): How to handle assignments done by workers with unknown skill.
            Possible values:
                    * "error" — raise an exception if there is at least one assignment done by user with unknown skill;
                    * "ignore" — drop assignments with unknown skill values during prediction. Raise an exception if there is no
                    assignments with known skill for any task;
                    * value — default value will be used if skill is missing.
    """
    data = data.join(skills.rename("skill"), on="worker")

    if on_missing_skill != "value" and default_skill is not None:
        raise ValueError('default_skill is used but on_missing_skill is not "value"')

    if on_missing_skill == "error":
        missing_skills_count = data["skill"].isna().sum()
        if missing_skills_count > 0:
            raise ValueError(
                f"Skill value is missing in {missing_skills_count} assignments. Specify skills for every"
                f"used worker or use different 'on_unknown_skill' value."
            )
    elif on_missing_skill == "ignore":
        data.set_index("task", inplace=True)
        index_before_drop = data.index
        data.dropna(inplace=True)
        dropped_tasks_count = len(index_before_drop.difference(data.index))
        if dropped_tasks_count > 0:
            raise ValueError(
                f"{dropped_tasks_count} tasks has no workers with known skills. Provide at least one worker with known"
                f"skill for every task or use different 'on_unknown_skill' value."
            )
        data.reset_index(inplace=True)
    elif on_missing_skill == "value":
        if default_skill is None:
            raise ValueError(
                'Default skill value must be specified when using on_missing_skill="value"'
            )
        data.loc[data["skill"].isna(), "skill"] = default_skill
    else:
        raise ValueError(
            f'Unknown option {on_missing_skill!r} of "on_missing_skill" argument.'
        )
    return data

clone_aggregator(aggregator)

Construct a new unfitted aggregator with the same parameters. Args: aggregator (BaseClassificationAggregator): aggregator instance to be cloned

Returns:

Name Type Description
BaseClassificationAggregator BaseClassificationAggregator

cloned aggregator's instance. Its params are same to input, except for the results of previous fit (private attributes).

Source code in crowdkit/aggregation/utils.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def clone_aggregator(
    aggregator: "base.BaseClassificationAggregator",
) -> "base.BaseClassificationAggregator":
    """Construct a new unfitted aggregator with the same parameters.
    Args:
        aggregator (BaseClassificationAggregator): aggregator instance to be cloned

    Returns:
        BaseClassificationAggregator: cloned aggregator's instance. Its params are same to input,
            except for the results of previous fit (private attributes).
    """
    assert isinstance(
        aggregator, base.BaseClassificationAggregator
    ), "Can't clone object that is not inherit BaseClassificationAggregator"
    aggregator_class = aggregator.__class__
    new_object_params = dict()
    for attr_name in aggregator.__dict__:
        # if attribute is not private
        if not (attr_name.startswith("_") or attr_name.endswith("_")):
            new_object_params[attr_name] = getattr(aggregator, attr_name)
    new_object = aggregator_class(**new_object_params)
    return new_object

get_accuracy(data, true_labels, by=None)

Parameters:

Name Type Description Default
data DataFrame

Workers' labeling results. A pandas.DataFrame containing task, worker and label columns.

required
true_labels Series

Tasks' ground truth labels. A pandas.Series indexed by task such that labels.loc[task] is the tasks's ground truth label.

required

Returns:

Name Type Description
Series Series[Any]

workers' skills. A pandas.Series index by workers and holding corresponding worker's skill

Source code in crowdkit/aggregation/utils.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
def get_accuracy(
    data: pd.DataFrame, true_labels: "pd.Series[Any]", by: Optional[str] = None
) -> "pd.Series[Any]":
    """
    Args:
        data (DataFrame): Workers' labeling results.
            A pandas.DataFrame containing `task`, `worker` and `label` columns.
        true_labels (Series): Tasks' ground truth labels.
            A pandas.Series indexed by `task` such that `labels.loc[task]`
            is the tasks's ground truth label.

    Returns:
        Series: workers' skills.
            A pandas.Series index by workers and holding corresponding worker's skill
    """
    if "weight" in data.columns:
        data = data[["task", "worker", "label", "weight"]]
    else:
        data = data[["task", "worker", "label"]]

    if data.empty:
        data["true_label"] = []
    else:
        data = data.join(pd.Series(true_labels, name="true_label"), on="task")

    data = data[data.true_label.notna()]

    if "weight" not in data.columns:
        data["weight"] = 1
    data.eval("score = weight * (label == true_label)", inplace=True)

    data = data.sort_values("score").drop_duplicates(
        ["task", "worker", "label"], keep="last"
    )

    if by is not None:
        group = data.groupby(by)
        return group.score.sum() / group.weight.sum()
    else:
        return data.score.sum() / data.weight.sum()  # type: ignore

get_most_probable_labels(proba)

Returns most probable labels

Parameters:

Name Type Description Default
proba DataFrame

Tasks' label probability distributions. A pandas.DataFrame indexed by task such that result.loc[task, label] is the probability of task's true label to be equal to label. Each probability is between 0 and 1, all task's probabilities should sum up to 1

required
Source code in crowdkit/aggregation/utils.py
86
87
88
89
90
91
92
93
94
95
96
97
98
def get_most_probable_labels(proba: pd.DataFrame) -> "pd.Series[Any]":
    """Returns most probable labels

    Args:
        proba (DataFrame): Tasks' label probability distributions.
            A pandas.DataFrame indexed by `task` such that `result.loc[task, label]`
            is the probability of `task`'s true label to be equal to `label`. Each
            probability is between 0 and 1, all task's probabilities should sum up to 1
    """
    # patch for pandas<=1.1.5
    if not proba.size:
        return pd.Series([], dtype="O")
    return proba.idxmax(axis="columns")

manage_data(data, weights=None, skills=None)

Parameters:

Name Type Description Default
data DataFrame

Workers' labeling results. A pandas.DataFrame containing task, worker and label columns.

required
skills Series

workers' skills. A pandas.Series index by workers and holding corresponding worker's skill

None
Source code in crowdkit/aggregation/utils.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def manage_data(
    data: pd.DataFrame,
    weights: Optional["pd.Series[Any]"] = None,
    skills: Optional["pd.Series[Any]"] = None,
) -> pd.DataFrame:
    """
    Args:
        data (DataFrame): Workers' labeling results.
            A pandas.DataFrame containing `task`, `worker` and `label` columns.
        skills (Series): workers' skills.
            A pandas.Series index by workers and holding corresponding worker's skill
    """
    data = data[["task", "worker", "label"]]

    if weights is None:
        data["weight"] = 1
    else:
        data = data.join(weights.rename("weight"), on="task")

    if skills is None:
        data["skill"] = 1
    else:
        data = data.join(skills.rename("skill"), on="task")

    return data

named_series_attrib(name)

Attrs attribute with converter and setter which preserves specified attribute name

Source code in crowdkit/aggregation/utils.py
187
188
189
190
191
192
193
194
def named_series_attrib(name: str) -> "pd.Series[Any]":
    """Attrs attribute with converter and setter which preserves specified attribute name"""

    def converter(series: "pd.Series[Any]") -> "pd.Series[Any]":
        series.name = name
        return series

    return attr.ib(init=False, converter=converter, on_setattr=attr.setters.convert)

normalize_rows(scores)

Scales values so that every raw sums to 1

Parameters:

Name Type Description Default
scores DataFrame

Tasks' label scores. A pandas.DataFrame indexed by task such that result.loc[task, label] is the score of label for task.

required

Returns:

Name Type Description
DataFrame DataFrame

Tasks' label probability distributions. A pandas.DataFrame indexed by task such that result.loc[task, label] is the probability of task's true label to be equal to label. Each probability is between 0 and 1, all task's probabilities should sum up to 1

Source code in crowdkit/aggregation/utils.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def normalize_rows(scores: pd.DataFrame) -> pd.DataFrame:
    """Scales values so that every raw sums to 1

    Args:
        scores (DataFrame): Tasks' label scores.
            A pandas.DataFrame indexed by `task` such that `result.loc[task, label]`
            is the score of `label` for `task`.

    Returns:
        DataFrame: Tasks' label probability distributions.
            A pandas.DataFrame indexed by `task` such that `result.loc[task, label]`
            is the probability of `task`'s true label to be equal to `label`. Each
            probability is between 0 and 1, all task's probabilities should sum up to 1
    """
    return scores.div(scores.sum(axis=1), axis=0)

Routines for post-processing.