Skip to content

Textual

Routines for textual data.

ROVER

Bases: BaseTextsAggregator

Recognizer Output Voting Error Reduction (ROVER).

This method uses dynamic programming to align sequences. Next, aligned sequences are used to construct the Word Transition Network (WTN): ROVER WTN scheme Finally, the aggregated sequence is the result of majority voting on each edge of the WTN.

J. G. Fiscus, "A post-processing system to yield reduced word error rates: Recognizer Output Voting Error Reduction (ROVER)," 1997 IEEE Workshop on Automatic Speech Recognition and Understanding Proceedings, 1997, pp. 347-354. https://doi.org/10.1109/ASRU.1997.659110

Examples:

>>> from crowdkit.datasets import load_dataset
>>> from crowdkit.aggregation import ROVER
>>> df, gt = load_dataset('crowdspeech-test-clean')
>>> df['text'] = df['text'].str.lower()
>>> tokenizer = lambda s: s.split(' ')
>>> detokenizer = lambda tokens: ' '.join(tokens)
>>> result = ROVER(tokenizer, detokenizer).fit_predict(df)

Attributes:

Name Type Description
texts_ Series

Tasks' texts. A pandas.Series indexed by task such that result.loc[task, text] is the task's text.

Source code in crowdkit/aggregation/texts/rover.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
@attr.s
class ROVER(BaseTextsAggregator):
    """Recognizer Output Voting Error Reduction (ROVER).

    This method uses dynamic programming to align sequences. Next, aligned sequences are used
    to construct the Word Transition Network (WTN):
    ![ROVER WTN scheme](https://tlk.s3.yandex.net/crowd-kit/docs/rover.png)
    Finally, the aggregated sequence is the result of majority voting on each edge of the WTN.

    J. G. Fiscus,
    "A post-processing system to yield reduced word error rates: Recognizer Output Voting Error Reduction (ROVER),"
    *1997 IEEE Workshop on Automatic Speech Recognition and Understanding Proceedings*, 1997, pp. 347-354.
    <https://doi.org/10.1109/ASRU.1997.659110>

    Examples:
        >>> from crowdkit.datasets import load_dataset
        >>> from crowdkit.aggregation import ROVER
        >>> df, gt = load_dataset('crowdspeech-test-clean')
        >>> df['text'] = df['text'].str.lower()
        >>> tokenizer = lambda s: s.split(' ')
        >>> detokenizer = lambda tokens: ' '.join(tokens)
        >>> result = ROVER(tokenizer, detokenizer).fit_predict(df)

    Attributes:
        texts_ (Series): Tasks' texts.
            A pandas.Series indexed by `task` such that `result.loc[task, text]` is the task's text.
    """

    tokenizer: Callable[[str], List[str]] = attr.ib()
    """A callable that takes a string and returns a list of tokens."""

    detokenizer: Callable[[List[str]], str] = attr.ib()
    """A callable that takes a list of tokens and returns a string."""

    silent: bool = attr.ib(default=True)
    """If false, show a progress bar."""

    # Available after fit
    # texts_

    def fit(self, data: pd.DataFrame) -> "ROVER":
        """Fits the model. The aggregated results are saved to the `texts_` attribute.

        Args:
            data (DataFrame): Workers' text outputs.
                A pandas.DataFrame containing `task`, `worker` and `text` columns.

        Returns:
            ROVER: self.
        """

        def grouped_tasks() -> Iterator[Tuple[Hashable, pd.DataFrame]]:
            grouped = data.groupby("task")

            if self.silent:
                yield from grouped
            else:
                yield from tqdm(grouped)

        result = {}

        for task, df in grouped_tasks():
            hypotheses = [self.tokenizer(text) for i, text in enumerate(df["text"])]

            edges = self._build_word_transition_network(hypotheses)
            rover_result = self._get_result(edges)

            text = self.detokenizer([value for value in rover_result if value != ""])

            result[task] = text

        texts = pd.Series(result, name="text")
        texts.index.name = "task"
        self.texts_ = texts

        return self

    def fit_predict(self, data: pd.DataFrame) -> "pd.Series[Any]":
        """Fit the model and return the aggregated texts.

        Args:
            data (DataFrame): Workers' text outputs.
                A pandas.DataFrame containing `task`, `worker` and `text` columns.

        Returns:
            Series: Tasks' texts.
                A pandas.Series indexed by `task` such that `result.loc[task, text]`
                is the task's text.
        """

        self.fit(data)
        return self.texts_

    def _build_word_transition_network(
        self, hypotheses: List[List[str]]
    ) -> List[Dict[str, AlignmentEdge]]:
        edges = [
            {edge.value: edge} for edge in self._get_edges_for_words(hypotheses[0])
        ]

        for sources_count, hyp in enumerate(hypotheses[1:], start=1):
            edges = self._align(edges, self._get_edges_for_words(hyp), sources_count)

        return edges

    @staticmethod
    def _get_edges_for_words(words: List[str]) -> List[AlignmentEdge]:
        return [AlignmentEdge(word, 1) for word in words]

    @staticmethod
    def _align(
        ref_edges_sets: List[Dict[str, AlignmentEdge]],
        hyp_edges: List[AlignmentEdge],
        sources_count: int,
    ) -> List[Dict[str, AlignmentEdge]]:
        """Sequence alignment algorithm implementation.

        Aligns a sequence of sets of tokens (edges) with a sequence of tokens using dynamic programming algorithm. Look
        for section 2.1 in <https://doi.org/10.1109/ASRU.1997.659110> for implementation details. Penalty for
        insert/deletion or mismatch is 1.

        Args:
           ref_edges_sets: Sequence of sets formed from previously aligned sequences.
           hyp_edges: Tokens from hypothesis (currently aligned) sequence.
           sources_count: Number of previously aligned sequences.
        """

        distance = np.zeros((len(hyp_edges) + 1, len(ref_edges_sets) + 1))
        distance[:, 0] = np.arange(len(hyp_edges) + 1)
        distance[0, :] = np.arange(len(ref_edges_sets) + 1)

        memoization: List[
            List[
                Optional[
                    Tuple[AlignmentAction, Dict[str, AlignmentEdge], AlignmentEdge]
                ]
            ]
        ] = [[None] * (len(ref_edges_sets) + 1) for _ in range(len(hyp_edges) + 1)]

        for i, hyp_edge in enumerate(hyp_edges, start=1):
            memoization[i][0] = (
                AlignmentAction.INSERTION,
                {"": AlignmentEdge("", sources_count)},
                hyp_edge,
            )
        for i, ref_edges in enumerate(ref_edges_sets, start=1):
            memoization[0][i] = (
                AlignmentAction.DELETION,
                ref_edges,
                AlignmentEdge("", 1),
            )

        # find alignment minimal cost using dynamic programming algorithm
        for i, hyp_edge in enumerate(hyp_edges, start=1):
            hyp_word = hyp_edge and hyp_edge.value
            for j, ref_edges in enumerate(ref_edges_sets, start=1):
                ref_words_set = ref_edges.keys()
                is_hyp_word_in_ref = hyp_word in ref_words_set

                options = []

                if is_hyp_word_in_ref:
                    options.append(
                        (
                            distance[i - 1, j - 1],
                            (
                                AlignmentAction.CORRECT,
                                ref_edges,
                                hyp_edge,
                            ),
                        )
                    )
                else:
                    options.append(
                        (
                            distance[i - 1, j - 1] + 1,
                            (
                                AlignmentAction.SUBSTITUTION,
                                ref_edges,
                                hyp_edge,
                            ),
                        )
                    )
                options.append(
                    (
                        distance[i, j - 1] + ("" not in ref_edges),
                        (
                            AlignmentAction.DELETION,
                            ref_edges,
                            AlignmentEdge("", 1),
                        ),
                    )
                )
                options.append(
                    (
                        distance[i - 1, j] + 1,
                        (
                            AlignmentAction.INSERTION,
                            {"": AlignmentEdge("", sources_count)},
                            hyp_edge,
                        ),
                    )
                )

                distance[i, j], memoization[i][j] = min(options, key=lambda t: t[0])

        alignment = []
        i = len(hyp_edges)
        j = len(ref_edges_sets)

        # reconstruct answer from dp array
        while i != 0 or j != 0:
            action, ref_edges, hyp_edge = cast(
                Tuple[AlignmentAction, Dict[str, AlignmentEdge], AlignmentEdge],
                memoization[i][j],
            )
            joined_edges = deepcopy(ref_edges)
            hyp_edge_word = hyp_edge.value
            if hyp_edge_word not in joined_edges:
                joined_edges[hyp_edge_word] = hyp_edge
            else:
                # if word is already in set increment sources count for future score calculation
                joined_edges[hyp_edge_word].sources_count += 1  # type: ignore
            alignment.append(joined_edges)
            if (
                action == AlignmentAction.CORRECT
                or action == AlignmentAction.SUBSTITUTION
            ):
                i -= 1
                j -= 1
            elif action == AlignmentAction.INSERTION:
                i -= 1
            # action == AlignmentAction.DELETION
            else:
                j -= 1

        return alignment[::-1]

    @staticmethod
    def _get_result(edges: List[Dict[str, AlignmentEdge]]) -> List[str]:
        result = []
        for edges_set in edges:
            _, _, value = max(
                (x.sources_count, len(x.value), x.value) for x in edges_set.values()
            )
            result.append(value)
        return result

detokenizer: Callable[[List[str]], str] = attr.ib() class-attribute instance-attribute

A callable that takes a list of tokens and returns a string.

silent: bool = attr.ib(default=True) class-attribute instance-attribute

If false, show a progress bar.

tokenizer: Callable[[str], List[str]] = attr.ib() class-attribute instance-attribute

A callable that takes a string and returns a list of tokens.

fit(data)

Fits the model. The aggregated results are saved to the texts_ attribute.

Parameters:

Name Type Description Default
data DataFrame

Workers' text outputs. A pandas.DataFrame containing task, worker and text columns.

required

Returns:

Name Type Description
ROVER ROVER

self.

Source code in crowdkit/aggregation/texts/rover.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def fit(self, data: pd.DataFrame) -> "ROVER":
    """Fits the model. The aggregated results are saved to the `texts_` attribute.

    Args:
        data (DataFrame): Workers' text outputs.
            A pandas.DataFrame containing `task`, `worker` and `text` columns.

    Returns:
        ROVER: self.
    """

    def grouped_tasks() -> Iterator[Tuple[Hashable, pd.DataFrame]]:
        grouped = data.groupby("task")

        if self.silent:
            yield from grouped
        else:
            yield from tqdm(grouped)

    result = {}

    for task, df in grouped_tasks():
        hypotheses = [self.tokenizer(text) for i, text in enumerate(df["text"])]

        edges = self._build_word_transition_network(hypotheses)
        rover_result = self._get_result(edges)

        text = self.detokenizer([value for value in rover_result if value != ""])

        result[task] = text

    texts = pd.Series(result, name="text")
    texts.index.name = "task"
    self.texts_ = texts

    return self

fit_predict(data)

Fit the model and return the aggregated texts.

Parameters:

Name Type Description Default
data DataFrame

Workers' text outputs. A pandas.DataFrame containing task, worker and text columns.

required

Returns:

Name Type Description
Series Series[Any]

Tasks' texts. A pandas.Series indexed by task such that result.loc[task, text] is the task's text.

Source code in crowdkit/aggregation/texts/rover.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def fit_predict(self, data: pd.DataFrame) -> "pd.Series[Any]":
    """Fit the model and return the aggregated texts.

    Args:
        data (DataFrame): Workers' text outputs.
            A pandas.DataFrame containing `task`, `worker` and `text` columns.

    Returns:
        Series: Tasks' texts.
            A pandas.Series indexed by `task` such that `result.loc[task, text]`
            is the task's text.
    """

    self.fit(data)
    return self.texts_

TextHRRASA

Bases: BaseTextsAggregator

HRRASA on text embeddings.

Given a sentence encoder, encodes texts provided by workers and runs the HRRASA algorithm for embedding aggregation.

Parameters:

Name Type Description Default
encoder Callable[[str], ArrayLike]

A callable that takes a text and returns a NumPy array containing the corresponding embedding.

required
n_iter int

A number of HRRASA iterations.

100
lambda_emb float

A weight of reliability calculated on embeddigs.

0.5
lambda_out float

A weight of reliability calculated on outputs.

0.5
alpha float

Confidence level of chi-squared distribution quantiles in beta parameter formula.

0.05
calculate_ranks bool

If true, calculate additional attribute ranks_.

False

Examples:

We suggest to use sentence encoders provided by Sentence Transformers.

>>> from crowdkit.datasets import load_dataset
>>> from crowdkit.aggregation import TextHRRASA
>>> from sentence_transformers import SentenceTransformer
>>> encoder = SentenceTransformer('all-mpnet-base-v2')
>>> hrrasa = TextHRRASA(encoder=encoder.encode)
>>> df, gt = load_dataset('crowdspeech-test-clean')
>>> df['text'] = df['text'].str.lower()
>>> result = hrrasa.fit_predict(df)
Source code in crowdkit/aggregation/texts/text_hrrasa.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
class TextHRRASA(BaseTextsAggregator):
    """
    HRRASA on text embeddings.

    Given a sentence encoder, encodes texts provided by workers and runs the HRRASA algorithm for embedding
    aggregation.

    Args:
        encoder: A callable that takes a text and returns a NumPy array containing the corresponding embedding.
        n_iter: A number of HRRASA iterations.
        lambda_emb: A weight of reliability calculated on embeddigs.
        lambda_out: A weight of reliability calculated on outputs.
        alpha: Confidence level of chi-squared distribution quantiles in beta parameter formula.
        calculate_ranks: If true, calculate additional attribute `ranks_`.

    Examples:
        We suggest to use sentence encoders provided by [Sentence Transformers](https://www.sbert.net).
        >>> from crowdkit.datasets import load_dataset
        >>> from crowdkit.aggregation import TextHRRASA
        >>> from sentence_transformers import SentenceTransformer
        >>> encoder = SentenceTransformer('all-mpnet-base-v2')
        >>> hrrasa = TextHRRASA(encoder=encoder.encode)
        >>> df, gt = load_dataset('crowdspeech-test-clean')
        >>> df['text'] = df['text'].str.lower()
        >>> result = hrrasa.fit_predict(df)
    """

    # texts_

    @property
    def loss_history_(self) -> List[float]:
        return self._hrrasa.loss_history_

    def __init__(
        self,
        encoder: Callable[[str], npt.ArrayLike],
        n_iter: int = 100,
        tol: float = 1e-5,
        lambda_emb: float = 0.5,
        lambda_out: float = 0.5,
        alpha: float = 0.05,
        calculate_ranks: bool = False,
        output_similarity: Callable[[str, List[List[str]]], float] = glue_similarity,
    ) -> None:
        super().__init__()
        self.encoder = encoder
        self._hrrasa = HRRASA(
            n_iter,
            tol,
            lambda_emb,
            lambda_out,
            alpha,
            calculate_ranks,
            output_similarity,
        )

    def __getattr__(self, name: str) -> Any:
        return getattr(self._hrrasa, name)

    def fit_predict_scores(
        self, data: pd.DataFrame, true_objects: "pd.Series[Any]"
    ) -> pd.DataFrame:
        """Fit the model and return scores.

        Args:
            data (DataFrame): Workers' responses.
                A pandas.DataFrame containing `task`, `worker` and `text` columns.
            true_objects (Series): Tasks' ground truth texts.
                A pandas.Series indexed by `task` such that `labels.loc[task]`
                is the tasks's ground truth text.

        Returns:
            DataFrame: Tasks' label scores.
                A pandas.DataFrame indexed by `task` such that `result.loc[task, label]`
                is the score of `label` for `task`.
        """

        return self._hrrasa.fit_predict_scores(
            self._encode_data(data), self._encode_true_objects(true_objects)
        )

    def fit_predict(  # type: ignore
        self, data: pd.DataFrame, true_objects: "pd.Series[Any]"
    ) -> "pd.Series[Any]":
        """Fit the model and return aggregated texts.

        Args:
            data (DataFrame): Workers' responses.
                A pandas.DataFrame containing `task`, `worker` and `text` columns.
            true_objects (Series): Tasks' ground truth texts.
                A pandas.Series indexed by `task` such that `labels.loc[task]`
                is the tasks's ground truth text.

        Returns:
            Series: Tasks' texts.
                A pandas.Series indexed by `task` such that `result.loc[task, text]`
                is the task's text.
        """

        hrrasa_results = self._hrrasa.fit_predict(
            self._encode_data(data), self._encode_true_objects(true_objects)
        )
        self.texts_ = (
            hrrasa_results.reset_index()[["task", "output"]]  # type: ignore
            .rename(columns={"output": "text"})
            .set_index("task")
        )
        return self.texts_

    def _encode_data(self, data: pd.DataFrame) -> pd.DataFrame:
        data = data[["task", "worker", "text"]].rename(columns={"text": "output"})
        data["embedding"] = data.output.apply(self.encoder)  # type: ignore
        return data

    def _encode_true_objects(self, true_objects: "pd.Series[Any]") -> "pd.Series[Any]":
        return true_objects and true_objects.apply(self.encoder)  # type: ignore

fit_predict(data, true_objects)

Fit the model and return aggregated texts.

Parameters:

Name Type Description Default
data DataFrame

Workers' responses. A pandas.DataFrame containing task, worker and text columns.

required
true_objects Series

Tasks' ground truth texts. A pandas.Series indexed by task such that labels.loc[task] is the tasks's ground truth text.

required

Returns:

Name Type Description
Series Series[Any]

Tasks' texts. A pandas.Series indexed by task such that result.loc[task, text] is the task's text.

Source code in crowdkit/aggregation/texts/text_hrrasa.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def fit_predict(  # type: ignore
    self, data: pd.DataFrame, true_objects: "pd.Series[Any]"
) -> "pd.Series[Any]":
    """Fit the model and return aggregated texts.

    Args:
        data (DataFrame): Workers' responses.
            A pandas.DataFrame containing `task`, `worker` and `text` columns.
        true_objects (Series): Tasks' ground truth texts.
            A pandas.Series indexed by `task` such that `labels.loc[task]`
            is the tasks's ground truth text.

    Returns:
        Series: Tasks' texts.
            A pandas.Series indexed by `task` such that `result.loc[task, text]`
            is the task's text.
    """

    hrrasa_results = self._hrrasa.fit_predict(
        self._encode_data(data), self._encode_true_objects(true_objects)
    )
    self.texts_ = (
        hrrasa_results.reset_index()[["task", "output"]]  # type: ignore
        .rename(columns={"output": "text"})
        .set_index("task")
    )
    return self.texts_

fit_predict_scores(data, true_objects)

Fit the model and return scores.

Parameters:

Name Type Description Default
data DataFrame

Workers' responses. A pandas.DataFrame containing task, worker and text columns.

required
true_objects Series

Tasks' ground truth texts. A pandas.Series indexed by task such that labels.loc[task] is the tasks's ground truth text.

required

Returns:

Name Type Description
DataFrame DataFrame

Tasks' label scores. A pandas.DataFrame indexed by task such that result.loc[task, label] is the score of label for task.

Source code in crowdkit/aggregation/texts/text_hrrasa.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def fit_predict_scores(
    self, data: pd.DataFrame, true_objects: "pd.Series[Any]"
) -> pd.DataFrame:
    """Fit the model and return scores.

    Args:
        data (DataFrame): Workers' responses.
            A pandas.DataFrame containing `task`, `worker` and `text` columns.
        true_objects (Series): Tasks' ground truth texts.
            A pandas.Series indexed by `task` such that `labels.loc[task]`
            is the tasks's ground truth text.

    Returns:
        DataFrame: Tasks' label scores.
            A pandas.DataFrame indexed by `task` such that `result.loc[task, label]`
            is the score of `label` for `task`.
    """

    return self._hrrasa.fit_predict_scores(
        self._encode_data(data), self._encode_true_objects(true_objects)
    )

TextRASA

Bases: BaseTextsAggregator

RASA on text embeddings.

Given a sentence encoder, encodes texts provided by workers and runs the RASA algorithm for embedding aggregation.

Parameters:

Name Type Description Default
encoder Callable[[str], NDArray[Any]]

A callable that takes a text and returns a NumPy array containing the corresponding embedding.

required
n_iter int

A number of RASA iterations.

100
alpha float

Confidence level of chi-squared distribution quantiles in beta parameter formula.

0.05

Examples:

We suggest to use sentence encoders provided by Sentence Transformers.

>>> from crowdkit.datasets import load_dataset
>>> from crowdkit.aggregation import TextRASA
>>> from sentence_transformers import SentenceTransformer
>>> encoder = SentenceTransformer('all-mpnet-base-v2')
>>> hrrasa = TextRASA(encoder=encoder.encode)
>>> df, gt = load_dataset('crowdspeech-test-clean')
>>> df['text'] = df['text'].str.lower()
>>> result = hrrasa.fit_predict(df)
Source code in crowdkit/aggregation/texts/text_rasa.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
class TextRASA(BaseTextsAggregator):
    """RASA on text embeddings.

    Given a sentence encoder, encodes texts provided by workers and runs the RASA algorithm for embedding
    aggregation.

    Args:
        encoder: A callable that takes a text and returns a NumPy array containing the corresponding embedding.
        n_iter: A number of RASA iterations.
        alpha: Confidence level of chi-squared distribution quantiles in beta parameter formula.

    Examples:
        We suggest to use sentence encoders provided by [Sentence Transformers](https://www.sbert.net).
        >>> from crowdkit.datasets import load_dataset
        >>> from crowdkit.aggregation import TextRASA
        >>> from sentence_transformers import SentenceTransformer
        >>> encoder = SentenceTransformer('all-mpnet-base-v2')
        >>> hrrasa = TextRASA(encoder=encoder.encode)
        >>> df, gt = load_dataset('crowdspeech-test-clean')
        >>> df['text'] = df['text'].str.lower()
        >>> result = hrrasa.fit_predict(df)
    """

    # texts_

    @property
    def loss_history_(self) -> List[float]:
        return self._rasa.loss_history_

    def __init__(
        self,
        encoder: Callable[[str], npt.NDArray[Any]],
        n_iter: int = 100,
        tol: float = 1e-5,
        alpha: float = 0.05,
    ):
        super().__init__()
        self.encoder = encoder
        self._rasa = RASA(n_iter, tol, alpha)

    def __getattr__(self, name: str) -> Any:
        return getattr(self._rasa, name)

    def fit(  # type: ignore
        self, data: pd.DataFrame, true_objects: "pd.Series[Any]"
    ) -> "TextRASA":
        """Fit the model.
        Args:
            data (DataFrame): Workers' outputs.
                A pandas.DataFrame containing `task`, `worker` and `output` columns.
            true_objects (Series): Tasks' ground truth labels.
                A pandas.Series indexed by `task` such that `labels.loc[task]`
                is the tasks's ground truth label.

        Returns:
            TextRASA: self.
        """

        self._rasa.fit(self._encode_data(data), self._encode_true_objects(true_objects))
        return self

    def fit_predict_scores(
        self, data: pd.DataFrame, true_objects: "pd.Series[Any]"
    ) -> pd.DataFrame:
        """Fit the model and return scores.

        Args:
            data (DataFrame): Workers' responses.
                A pandas.DataFrame containing `task`, `worker` and `output` columns.
            true_objects (Series): Tasks' ground truth texts.
                A pandas.Series indexed by `task` such that `labels.loc[task]`
                is the tasks's ground truth text.

        Returns:
            DataFrame: Tasks' label scores.
                A pandas.DataFrame indexed by `task` such that `result.loc[task, label]`
                is the score of `label` for `task`.
        """

        return self._rasa.fit_predict_scores(
            self._encode_data(data), self._encode_true_objects(true_objects)
        )

    def fit_predict(  # type: ignore
        self, data: pd.DataFrame, true_objects: "pd.Series[Any]"
    ) -> "pd.Series[Any]":
        """Fit the model and return aggregated texts.

        Args:
            data (DataFrame): Workers' responses.
                A pandas.DataFrame containing `task`, `worker` and `output` columns.
            true_objects (Series): Tasks' ground truth texts.
                A pandas.Series indexed by `task` such that `labels.loc[task]`
                is the tasks's ground truth text.

        Returns:
            Series: Tasks' texts.
                A pandas.Series indexed by `task` such that `result.loc[task, text]`
                is the task's text.
        """

        rasa_results = self._rasa.fit_predict(
            self._encode_data(data), self._encode_true_objects(true_objects)
        )
        self.texts_ = (
            rasa_results.reset_index()[["task", "output"]]  # type: ignore
            .rename(columns={"output": "text"})
            .set_index("task")
        )
        return self.texts_

    def _encode_data(self, data: pd.DataFrame) -> pd.DataFrame:
        data = data[["task", "worker", "text"]].rename(columns={"text": "output"})
        data["embedding"] = data.output.apply(self.encoder)  # type: ignore
        return data

    def _encode_true_objects(self, true_objects: "pd.Series[Any]") -> "pd.Series[Any]":
        return true_objects and true_objects.apply(self.encoder)  # type: ignore

fit(data, true_objects)

Fit the model. Args: data (DataFrame): Workers' outputs. A pandas.DataFrame containing task, worker and output columns. true_objects (Series): Tasks' ground truth labels. A pandas.Series indexed by task such that labels.loc[task] is the tasks's ground truth label.

Returns:

Name Type Description
TextRASA TextRASA

self.

Source code in crowdkit/aggregation/texts/text_rasa.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def fit(  # type: ignore
    self, data: pd.DataFrame, true_objects: "pd.Series[Any]"
) -> "TextRASA":
    """Fit the model.
    Args:
        data (DataFrame): Workers' outputs.
            A pandas.DataFrame containing `task`, `worker` and `output` columns.
        true_objects (Series): Tasks' ground truth labels.
            A pandas.Series indexed by `task` such that `labels.loc[task]`
            is the tasks's ground truth label.

    Returns:
        TextRASA: self.
    """

    self._rasa.fit(self._encode_data(data), self._encode_true_objects(true_objects))
    return self

fit_predict(data, true_objects)

Fit the model and return aggregated texts.

Parameters:

Name Type Description Default
data DataFrame

Workers' responses. A pandas.DataFrame containing task, worker and output columns.

required
true_objects Series

Tasks' ground truth texts. A pandas.Series indexed by task such that labels.loc[task] is the tasks's ground truth text.

required

Returns:

Name Type Description
Series Series[Any]

Tasks' texts. A pandas.Series indexed by task such that result.loc[task, text] is the task's text.

Source code in crowdkit/aggregation/texts/text_rasa.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
def fit_predict(  # type: ignore
    self, data: pd.DataFrame, true_objects: "pd.Series[Any]"
) -> "pd.Series[Any]":
    """Fit the model and return aggregated texts.

    Args:
        data (DataFrame): Workers' responses.
            A pandas.DataFrame containing `task`, `worker` and `output` columns.
        true_objects (Series): Tasks' ground truth texts.
            A pandas.Series indexed by `task` such that `labels.loc[task]`
            is the tasks's ground truth text.

    Returns:
        Series: Tasks' texts.
            A pandas.Series indexed by `task` such that `result.loc[task, text]`
            is the task's text.
    """

    rasa_results = self._rasa.fit_predict(
        self._encode_data(data), self._encode_true_objects(true_objects)
    )
    self.texts_ = (
        rasa_results.reset_index()[["task", "output"]]  # type: ignore
        .rename(columns={"output": "text"})
        .set_index("task")
    )
    return self.texts_

fit_predict_scores(data, true_objects)

Fit the model and return scores.

Parameters:

Name Type Description Default
data DataFrame

Workers' responses. A pandas.DataFrame containing task, worker and output columns.

required
true_objects Series

Tasks' ground truth texts. A pandas.Series indexed by task such that labels.loc[task] is the tasks's ground truth text.

required

Returns:

Name Type Description
DataFrame DataFrame

Tasks' label scores. A pandas.DataFrame indexed by task such that result.loc[task, label] is the score of label for task.

Source code in crowdkit/aggregation/texts/text_rasa.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def fit_predict_scores(
    self, data: pd.DataFrame, true_objects: "pd.Series[Any]"
) -> pd.DataFrame:
    """Fit the model and return scores.

    Args:
        data (DataFrame): Workers' responses.
            A pandas.DataFrame containing `task`, `worker` and `output` columns.
        true_objects (Series): Tasks' ground truth texts.
            A pandas.Series indexed by `task` such that `labels.loc[task]`
            is the tasks's ground truth text.

    Returns:
        DataFrame: Tasks' label scores.
            A pandas.DataFrame indexed by `task` such that `result.loc[task, label]`
            is the score of `label` for `task`.
    """

    return self._rasa.fit_predict_scores(
        self._encode_data(data), self._encode_true_objects(true_objects)
    )