Coverage for ovos_core/intent_services/stop_service.py: 92%
152 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-17 13:44 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-17 13:44 +0000
1import os
2import re
3from os.path import dirname
4from threading import Event
5from typing import Optional, Dict, List, Union
7from langcodes import closest_match
8from ovos_bus_client.client import MessageBusClient
9from ovos_bus_client.message import Message
10from ovos_bus_client.session import SessionManager, UtteranceState
12from ovos_config.config import Configuration
13from ovos_plugin_manager.templates.pipeline import ConfidenceMatcherPipeline, IntentHandlerMatch
14from ovos_utils import flatten_list
15from ovos_utils.fakebus import FakeBus
16from ovos_utils.bracket_expansion import expand_template
17from ovos_utils.lang import standardize_lang_tag
18from ovos_utils.log import LOG
19from ovos_utils.parse import match_one
22class StopService(ConfidenceMatcherPipeline):
23 """Intent Service thats handles stopping skills."""
25 def __init__(self, bus: Optional[Union[MessageBusClient, FakeBus]] = None,
26 config: Optional[Dict] = None):
27 config = config or Configuration().get("skills", {}).get("stop") or {}
28 super().__init__(config=config, bus=bus)
29 self._voc_cache = {}
30 self.load_resource_files()
31 self.bus.on("stop:global", self.handle_global_stop)
32 self.bus.on("stop:skill", self.handle_skill_stop)
34 def handle_global_stop(self, message: Message):
35 self.bus.emit(message.forward("mycroft.stop"))
36 # TODO - this needs a confirmation dialog if nothing was stopped
37 self.bus.emit(message.forward("ovos.utterance.handled"))
39 def handle_skill_stop(self, message: Message):
40 skill_id = message.data["skill_id"]
41 self.bus.emit(message.reply(f"{skill_id}.stop"))
43 def load_resource_files(self):
44 base = f"{dirname(__file__)}/locale"
45 for lang in os.listdir(base):
46 lang2 = standardize_lang_tag(lang)
47 self._voc_cache[lang2] = {}
48 for f in os.listdir(f"{base}/{lang}"):
49 with open(f"{base}/{lang}/{f}", encoding="utf-8") as fi:
50 lines = [expand_template(l) for l in fi.read().split("\n")
51 if l.strip() and not l.startswith("#")]
52 n = f.split(".", 1)[0]
53 self._voc_cache[lang2][n] = flatten_list(lines)
55 @staticmethod
56 def get_active_skills(message: Optional[Message] = None) -> List[str]:
57 """Active skill ids ordered by converse priority
58 this represents the order in which stop will be called
60 Returns:
61 active_skills (list): ordered list of skill_ids
62 """
63 session = SessionManager.get(message)
64 return [skill[0] for skill in session.active_skills]
66 def _collect_stop_skills(self, message: Message) -> List[str]:
67 """
68 Collect skills that can be stopped based on a ping-pong mechanism.
70 This method determines which active skills can handle a stop request by sending
71 a stop ping to each active skill and waiting for their acknowledgment.
73 Individual skills respond to this request via the `can_stop` method
75 Parameters:
76 message (Message): The original message triggering the stop request.
78 Returns:
79 List[str]: A list of skill IDs that can be stopped. If no skills explicitly
80 indicate they can stop, returns all active skills.
82 Notes:
83 - Excludes skills that are blacklisted in the current session
84 - Uses a non-blocking event mechanism to collect skill responses
85 - Waits up to 0.5 seconds for skills to respond
86 - Falls back to all active skills if no explicit stop confirmation is received
87 """
88 sess = SessionManager.get(message)
90 want_stop = []
91 skill_ids = []
93 active_skills = [s for s in self.get_active_skills(message)
94 if s not in sess.blacklisted_skills]
96 if not active_skills:
97 return want_stop
99 event = Event()
101 def handle_ack(msg):
102 """
103 Handle acknowledgment from skills during the stop process.
105 This method is a nested function used in skill stopping negotiation. It validates and tracks skill responses to a stop request.
107 Parameters:
108 msg (Message): Message containing skill acknowledgment details.
110 Side Effects:
111 - Modifies the `want_stop` list with skills that can handle stopping
112 - Updates the `skill_ids` list to track which skills have responded
113 - Sets the threading event when all active skills have responded
115 Notes:
116 - Checks if a skill can handle stopping based on multiple conditions
117 - Ensures all active skills provide a response before proceeding
118 """
119 nonlocal event, skill_ids
120 skill_id = msg.data["skill_id"]
122 # validate the stop pong
123 if all((skill_id not in want_stop,
124 msg.data.get("can_handle", True),
125 skill_id in active_skills)):
126 want_stop.append(skill_id)
128 if skill_id not in skill_ids: # track which answer we got
129 skill_ids.append(skill_id)
131 if all(s in skill_ids for s in active_skills):
132 # all skills answered the ping!
133 event.set()
135 self.bus.on("skill.stop.pong", handle_ack)
137 # ask skills if they can stop
138 for skill_id in active_skills:
139 self.bus.emit(message.forward(f"{skill_id}.stop.ping",
140 {"skill_id": skill_id}))
142 # wait for all skills to acknowledge they can stop
143 event.wait(timeout=0.5)
145 self.bus.remove("skill.stop.pong", handle_ack)
146 return want_stop or active_skills
148 def handle_stop_confirmation(self, message: Message):
149 skill_id = (message.data.get("skill_id") or
150 message.context.get("skill_id") or
151 message.msg_type.split(".stop.response")[0])
152 if 'error' in message.data:
153 error_msg = message.data['error']
154 LOG.error(f"{skill_id}: {error_msg}")
155 elif message.data.get('result', False):
156 sess = SessionManager.get(message)
157 utt_state = sess.utterance_states.get(skill_id, UtteranceState.INTENT)
158 if utt_state == UtteranceState.RESPONSE:
159 LOG.debug("Forcing get_response timeout")
160 # force-kill any ongoing get_response - see @killable_event decorator (ovos-workshop)
161 self.bus.emit(message.reply("mycroft.skills.abort_question", {"skill_id": skill_id}))
162 if sess.is_active(skill_id):
163 LOG.debug("Forcing converse timeout")
164 # force-kill any ongoing converse - see @killable_event decorator (ovos-workshop)
165 self.bus.emit(message.reply("ovos.skills.converse.force_timeout", {"skill_id": skill_id}))
167 # TODO - track if speech is coming from this skill! not currently tracked (ovos-audio)
168 if sess.is_speaking:
169 # force-kill any ongoing TTS
170 self.bus.emit(message.forward("mycroft.audio.speech.stop", {"skill_id": skill_id}))
172 def match_high(self, utterances: List[str], lang: str, message: Message) -> Optional[IntentHandlerMatch]:
173 """
174 Handles high-confidence stop requests by matching exact stop vocabulary and managing skill stopping.
176 Attempts to stop skills when an exact "stop" or "global_stop" command is detected. Performs the following actions:
177 - Identifies the closest language match for vocabulary
178 - Checks for global stop command when no active skills exist
179 - Emits a global stop message if applicable
180 - Attempts to stop individual skills if a stop command is detected
181 - Disables response mode for stopped skills
183 Parameters:
184 utterances (List[str]): List of user utterances to match against stop vocabulary
185 lang (str): Four-letter ISO language code for language-specific matching
186 message (Message): Message context for generating appropriate responses
188 Returns:
189 Optional[PipelineMatch]: Match result indicating whether stop was handled, with optional skill and session information
190 - Returns None if no stop action could be performed
191 - Returns PipelineMatch with handled=True for successful global or skill-specific stop
193 Raises:
194 No explicit exceptions raised, but may log debug/info messages during processing
195 """
196 lang = self._get_closest_lang(lang)
197 if lang is None: # no vocs registered for this lang
198 return None
200 sess = SessionManager.get(message)
202 # we call flatten in case someone is sending the old style list of tuples
203 utterance = flatten_list(utterances)[0]
205 is_stop = self.voc_match(utterance, 'stop', exact=True, lang=lang)
206 is_global_stop = self.voc_match(utterance, 'global_stop', exact=True, lang=lang) or \
207 (is_stop and not len(self.get_active_skills(message)))
209 conf = 1.0
211 if is_global_stop:
212 LOG.info(f"Emitting global stop, {len(self.get_active_skills(message))} active skills")
213 # emit a global stop, full stop anything OVOS is doing
214 return IntentHandlerMatch(
215 match_type="stop:global",
216 match_data={"conf": conf},
217 updated_session=sess,
218 utterance=utterance,
219 skill_id="stop.openvoiceos"
220 )
222 if is_stop:
223 # check if any skill can stop
224 for skill_id in self._collect_stop_skills(message):
225 LOG.debug(f"Telling skill to stop: {skill_id}")
226 sess.disable_response_mode(skill_id)
227 self.bus.once(f"{skill_id}.stop.response", self.handle_stop_confirmation)
228 return IntentHandlerMatch(
229 match_type="stop:skill",
230 match_data={"conf": conf, "skill_id": skill_id},
231 updated_session=sess,
232 utterance=utterance,
233 skill_id="stop.openvoiceos"
234 )
236 return None
238 def match_medium(self, utterances: List[str], lang: str, message: Message) -> Optional[IntentHandlerMatch]:
239 """
240 Handle stop intent with additional context beyond simple stop commands.
242 This method processes utterances that contain "stop" or global stop vocabulary but may include
243 additional words not explicitly defined in intent files. It performs a medium-confidence
244 intent matching for stop requests.
246 Parameters:
247 utterances (List[str]): List of input utterances to analyze
248 lang (str): Four-letter ISO language code for localization
249 message (Message): Message context for generating appropriate responses
251 Returns:
252 Optional[PipelineMatch]: A pipeline match if the stop intent is successfully processed,
253 otherwise None if no stop intent is detected
255 Notes:
256 - Attempts to match stop vocabulary with fuzzy matching
257 - Falls back to low-confidence matching if medium-confidence match is inconclusive
258 - Handles global stop scenarios when no active skills are present
259 """
260 lang = self._get_closest_lang(lang)
261 if lang is None: # no vocs registered for this lang
262 return None
264 # we call flatten in case someone is sending the old style list of tuples
265 utterance = flatten_list(utterances)[0]
267 is_stop = self.voc_match(utterance, 'stop', exact=False, lang=lang)
268 if not is_stop:
269 is_global_stop = self.voc_match(utterance, 'global_stop', exact=False, lang=lang) or \
270 (is_stop and not len(self.get_active_skills(message)))
271 if not is_global_stop:
272 return None
274 return self.match_low(utterances, lang, message)
276 def match_low(self, utterances: List[str], lang: str, message: Message) -> Optional[IntentHandlerMatch]:
277 """
278 Perform a low-confidence fuzzy match for stop intent before fallback processing.
280 This method attempts to match stop-related vocabulary with low confidence and handle stopping of active skills.
282 Parameters:
283 utterances (List[str]): List of input utterances to match against stop vocabulary
284 lang (str): Four-letter ISO language code for vocabulary matching
285 message (Message): Message context used for generating replies and managing session
287 Returns:
288 Optional[PipelineMatch]: A pipeline match object if a stop action is handled, otherwise None
290 Notes:
291 - Increases confidence if active skills are present
292 - Attempts to stop individual skills before emitting a global stop signal
293 - Handles language-specific vocabulary matching
294 - Configurable minimum confidence threshold for stop intent
295 """
296 lang = self._get_closest_lang(lang)
297 if lang is None: # no vocs registered for this lang
298 return None
299 sess = SessionManager.get(message)
300 # we call flatten in case someone is sending the old style list of tuples
301 utterance = flatten_list(utterances)[0]
303 conf = match_one(utterance, self._voc_cache[lang]['stop'])[1]
304 if len(self.get_active_skills(message)) > 0:
305 conf += 0.1
306 conf = round(min(conf, 1.0), 3)
308 if conf < self.config.get("min_conf", 0.5):
309 return None
311 # check if any skill can stop
312 for skill_id in self._collect_stop_skills(message):
313 LOG.debug(f"Telling skill to stop: {skill_id}")
314 sess.disable_response_mode(skill_id)
315 self.bus.once(f"{skill_id}.stop.response", self.handle_stop_confirmation)
316 return IntentHandlerMatch(
317 match_type="stop:skill",
318 match_data={"conf": conf, "skill_id": skill_id},
319 updated_session=sess,
320 utterance=utterance,
321 skill_id="stop.openvoiceos"
322 )
324 # emit a global stop, full stop anything OVOS is doing
325 LOG.debug(f"Emitting global stop signal, {len(self.get_active_skills(message))} active skills")
326 return IntentHandlerMatch(
327 match_type="stop:global",
328 match_data={"conf": conf},
329 updated_session=sess,
330 utterance=utterance,
331 skill_id="stop.openvoiceos"
332 )
334 def _get_closest_lang(self, lang: str) -> Optional[str]:
335 if self._voc_cache:
336 lang = standardize_lang_tag(lang)
337 closest, score = closest_match(lang, list(self._voc_cache.keys()))
338 # https://langcodes-hickford.readthedocs.io/en/sphinx/index.html#distance-values
339 # 0 -> These codes represent the same language, possibly after filling in values and normalizing.
340 # 1- 3 -> These codes indicate a minor regional difference.
341 # 4 - 10 -> These codes indicate a significant but unproblematic regional difference.
342 if score < 10:
343 return closest
344 return None
346 def voc_match(self, utt: str, voc_filename: str, lang: str,
347 exact: bool = False):
348 """
349 TODO - should use ovos_workshop method instead of reimplementing here
350 look into subclassing from OVOSAbstractApp
352 Determine if the given utterance contains the vocabulary provided.
354 By default the method checks if the utterance contains the given vocab
355 thereby allowing the user to say things like "yes, please" and still
356 match against "Yes.voc" containing only "yes". An exact match can be
357 requested.
359 The method first checks in the current Skill's .voc files and secondly
360 in the "res/text" folder of mycroft-core. The result is cached to
361 avoid hitting the disk each time the method is called.
363 Args:
364 utt (str): Utterance to be tested
365 voc_filename (str): Name of vocabulary file (e.g. 'yes' for
366 'res/text/en-us/yes.voc')
367 lang (str): Language code, defaults to self.lang
368 exact (bool): Whether the vocab must exactly match the utterance
370 Returns:
371 bool: True if the utterance has the given vocabulary it
372 """
373 lang = self._get_closest_lang(lang)
374 if lang is None: # no vocs registered for this lang
375 return False
377 _vocs = self._voc_cache[lang].get(voc_filename) or []
379 if utt and _vocs:
380 if exact:
381 # Check for exact match
382 return any(i.strip().lower() == utt.lower()
383 for i in _vocs)
384 else:
385 # Check for matches against complete words
386 return any([re.match(r'.*\b' + i + r'\b.*', utt, re.IGNORECASE)
387 for i in _vocs])
388 return False
390 def shutdown(self):
391 self.bus.remove("stop:global", self.handle_global_stop)
392 self.bus.remove("stop:skill", self.handle_skill_stop)