Coverage for ovos_core/intent_services/stop_service.py: 92%

152 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-17 13:44 +0000

1import os 

2import re 

3from os.path import dirname 

4from threading import Event 

5from typing import Optional, Dict, List, Union 

6 

7from langcodes import closest_match 

8from ovos_bus_client.client import MessageBusClient 

9from ovos_bus_client.message import Message 

10from ovos_bus_client.session import SessionManager, UtteranceState 

11 

12from ovos_config.config import Configuration 

13from ovos_plugin_manager.templates.pipeline import ConfidenceMatcherPipeline, IntentHandlerMatch 

14from ovos_utils import flatten_list 

15from ovos_utils.fakebus import FakeBus 

16from ovos_utils.bracket_expansion import expand_template 

17from ovos_utils.lang import standardize_lang_tag 

18from ovos_utils.log import LOG 

19from ovos_utils.parse import match_one 

20 

21 

22class StopService(ConfidenceMatcherPipeline): 

23 """Intent Service thats handles stopping skills.""" 

24 

25 def __init__(self, bus: Optional[Union[MessageBusClient, FakeBus]] = None, 

26 config: Optional[Dict] = None): 

27 config = config or Configuration().get("skills", {}).get("stop") or {} 

28 super().__init__(config=config, bus=bus) 

29 self._voc_cache = {} 

30 self.load_resource_files() 

31 self.bus.on("stop:global", self.handle_global_stop) 

32 self.bus.on("stop:skill", self.handle_skill_stop) 

33 

34 def handle_global_stop(self, message: Message): 

35 self.bus.emit(message.forward("mycroft.stop")) 

36 # TODO - this needs a confirmation dialog if nothing was stopped 

37 self.bus.emit(message.forward("ovos.utterance.handled")) 

38 

39 def handle_skill_stop(self, message: Message): 

40 skill_id = message.data["skill_id"] 

41 self.bus.emit(message.reply(f"{skill_id}.stop")) 

42 

43 def load_resource_files(self): 

44 base = f"{dirname(__file__)}/locale" 

45 for lang in os.listdir(base): 

46 lang2 = standardize_lang_tag(lang) 

47 self._voc_cache[lang2] = {} 

48 for f in os.listdir(f"{base}/{lang}"): 

49 with open(f"{base}/{lang}/{f}", encoding="utf-8") as fi: 

50 lines = [expand_template(l) for l in fi.read().split("\n") 

51 if l.strip() and not l.startswith("#")] 

52 n = f.split(".", 1)[0] 

53 self._voc_cache[lang2][n] = flatten_list(lines) 

54 

55 @staticmethod 

56 def get_active_skills(message: Optional[Message] = None) -> List[str]: 

57 """Active skill ids ordered by converse priority 

58 this represents the order in which stop will be called 

59 

60 Returns: 

61 active_skills (list): ordered list of skill_ids 

62 """ 

63 session = SessionManager.get(message) 

64 return [skill[0] for skill in session.active_skills] 

65 

66 def _collect_stop_skills(self, message: Message) -> List[str]: 

67 """ 

68 Collect skills that can be stopped based on a ping-pong mechanism. 

69 

70 This method determines which active skills can handle a stop request by sending 

71 a stop ping to each active skill and waiting for their acknowledgment. 

72 

73 Individual skills respond to this request via the `can_stop` method 

74 

75 Parameters: 

76 message (Message): The original message triggering the stop request. 

77 

78 Returns: 

79 List[str]: A list of skill IDs that can be stopped. If no skills explicitly 

80 indicate they can stop, returns all active skills. 

81 

82 Notes: 

83 - Excludes skills that are blacklisted in the current session 

84 - Uses a non-blocking event mechanism to collect skill responses 

85 - Waits up to 0.5 seconds for skills to respond 

86 - Falls back to all active skills if no explicit stop confirmation is received 

87 """ 

88 sess = SessionManager.get(message) 

89 

90 want_stop = [] 

91 skill_ids = [] 

92 

93 active_skills = [s for s in self.get_active_skills(message) 

94 if s not in sess.blacklisted_skills] 

95 

96 if not active_skills: 

97 return want_stop 

98 

99 event = Event() 

100 

101 def handle_ack(msg): 

102 """ 

103 Handle acknowledgment from skills during the stop process. 

104 

105 This method is a nested function used in skill stopping negotiation. It validates and tracks skill responses to a stop request. 

106 

107 Parameters: 

108 msg (Message): Message containing skill acknowledgment details. 

109 

110 Side Effects: 

111 - Modifies the `want_stop` list with skills that can handle stopping 

112 - Updates the `skill_ids` list to track which skills have responded 

113 - Sets the threading event when all active skills have responded 

114 

115 Notes: 

116 - Checks if a skill can handle stopping based on multiple conditions 

117 - Ensures all active skills provide a response before proceeding 

118 """ 

119 nonlocal event, skill_ids 

120 skill_id = msg.data["skill_id"] 

121 

122 # validate the stop pong 

123 if all((skill_id not in want_stop, 

124 msg.data.get("can_handle", True), 

125 skill_id in active_skills)): 

126 want_stop.append(skill_id) 

127 

128 if skill_id not in skill_ids: # track which answer we got 

129 skill_ids.append(skill_id) 

130 

131 if all(s in skill_ids for s in active_skills): 

132 # all skills answered the ping! 

133 event.set() 

134 

135 self.bus.on("skill.stop.pong", handle_ack) 

136 

137 # ask skills if they can stop 

138 for skill_id in active_skills: 

139 self.bus.emit(message.forward(f"{skill_id}.stop.ping", 

140 {"skill_id": skill_id})) 

141 

142 # wait for all skills to acknowledge they can stop 

143 event.wait(timeout=0.5) 

144 

145 self.bus.remove("skill.stop.pong", handle_ack) 

146 return want_stop or active_skills 

147 

148 def handle_stop_confirmation(self, message: Message): 

149 skill_id = (message.data.get("skill_id") or 

150 message.context.get("skill_id") or 

151 message.msg_type.split(".stop.response")[0]) 

152 if 'error' in message.data: 

153 error_msg = message.data['error'] 

154 LOG.error(f"{skill_id}: {error_msg}") 

155 elif message.data.get('result', False): 

156 sess = SessionManager.get(message) 

157 utt_state = sess.utterance_states.get(skill_id, UtteranceState.INTENT) 

158 if utt_state == UtteranceState.RESPONSE: 

159 LOG.debug("Forcing get_response timeout") 

160 # force-kill any ongoing get_response - see @killable_event decorator (ovos-workshop) 

161 self.bus.emit(message.reply("mycroft.skills.abort_question", {"skill_id": skill_id})) 

162 if sess.is_active(skill_id): 

163 LOG.debug("Forcing converse timeout") 

164 # force-kill any ongoing converse - see @killable_event decorator (ovos-workshop) 

165 self.bus.emit(message.reply("ovos.skills.converse.force_timeout", {"skill_id": skill_id})) 

166 

167 # TODO - track if speech is coming from this skill! not currently tracked (ovos-audio) 

168 if sess.is_speaking: 

169 # force-kill any ongoing TTS 

170 self.bus.emit(message.forward("mycroft.audio.speech.stop", {"skill_id": skill_id})) 

171 

172 def match_high(self, utterances: List[str], lang: str, message: Message) -> Optional[IntentHandlerMatch]: 

173 """ 

174 Handles high-confidence stop requests by matching exact stop vocabulary and managing skill stopping. 

175 

176 Attempts to stop skills when an exact "stop" or "global_stop" command is detected. Performs the following actions: 

177 - Identifies the closest language match for vocabulary 

178 - Checks for global stop command when no active skills exist 

179 - Emits a global stop message if applicable 

180 - Attempts to stop individual skills if a stop command is detected 

181 - Disables response mode for stopped skills 

182 

183 Parameters: 

184 utterances (List[str]): List of user utterances to match against stop vocabulary 

185 lang (str): Four-letter ISO language code for language-specific matching 

186 message (Message): Message context for generating appropriate responses 

187 

188 Returns: 

189 Optional[PipelineMatch]: Match result indicating whether stop was handled, with optional skill and session information 

190 - Returns None if no stop action could be performed 

191 - Returns PipelineMatch with handled=True for successful global or skill-specific stop 

192 

193 Raises: 

194 No explicit exceptions raised, but may log debug/info messages during processing 

195 """ 

196 lang = self._get_closest_lang(lang) 

197 if lang is None: # no vocs registered for this lang 

198 return None 

199 

200 sess = SessionManager.get(message) 

201 

202 # we call flatten in case someone is sending the old style list of tuples 

203 utterance = flatten_list(utterances)[0] 

204 

205 is_stop = self.voc_match(utterance, 'stop', exact=True, lang=lang) 

206 is_global_stop = self.voc_match(utterance, 'global_stop', exact=True, lang=lang) or \ 

207 (is_stop and not len(self.get_active_skills(message))) 

208 

209 conf = 1.0 

210 

211 if is_global_stop: 

212 LOG.info(f"Emitting global stop, {len(self.get_active_skills(message))} active skills") 

213 # emit a global stop, full stop anything OVOS is doing 

214 return IntentHandlerMatch( 

215 match_type="stop:global", 

216 match_data={"conf": conf}, 

217 updated_session=sess, 

218 utterance=utterance, 

219 skill_id="stop.openvoiceos" 

220 ) 

221 

222 if is_stop: 

223 # check if any skill can stop 

224 for skill_id in self._collect_stop_skills(message): 

225 LOG.debug(f"Telling skill to stop: {skill_id}") 

226 sess.disable_response_mode(skill_id) 

227 self.bus.once(f"{skill_id}.stop.response", self.handle_stop_confirmation) 

228 return IntentHandlerMatch( 

229 match_type="stop:skill", 

230 match_data={"conf": conf, "skill_id": skill_id}, 

231 updated_session=sess, 

232 utterance=utterance, 

233 skill_id="stop.openvoiceos" 

234 ) 

235 

236 return None 

237 

238 def match_medium(self, utterances: List[str], lang: str, message: Message) -> Optional[IntentHandlerMatch]: 

239 """ 

240 Handle stop intent with additional context beyond simple stop commands. 

241 

242 This method processes utterances that contain "stop" or global stop vocabulary but may include 

243 additional words not explicitly defined in intent files. It performs a medium-confidence 

244 intent matching for stop requests. 

245 

246 Parameters: 

247 utterances (List[str]): List of input utterances to analyze 

248 lang (str): Four-letter ISO language code for localization 

249 message (Message): Message context for generating appropriate responses 

250 

251 Returns: 

252 Optional[PipelineMatch]: A pipeline match if the stop intent is successfully processed, 

253 otherwise None if no stop intent is detected 

254 

255 Notes: 

256 - Attempts to match stop vocabulary with fuzzy matching 

257 - Falls back to low-confidence matching if medium-confidence match is inconclusive 

258 - Handles global stop scenarios when no active skills are present 

259 """ 

260 lang = self._get_closest_lang(lang) 

261 if lang is None: # no vocs registered for this lang 

262 return None 

263 

264 # we call flatten in case someone is sending the old style list of tuples 

265 utterance = flatten_list(utterances)[0] 

266 

267 is_stop = self.voc_match(utterance, 'stop', exact=False, lang=lang) 

268 if not is_stop: 

269 is_global_stop = self.voc_match(utterance, 'global_stop', exact=False, lang=lang) or \ 

270 (is_stop and not len(self.get_active_skills(message))) 

271 if not is_global_stop: 

272 return None 

273 

274 return self.match_low(utterances, lang, message) 

275 

276 def match_low(self, utterances: List[str], lang: str, message: Message) -> Optional[IntentHandlerMatch]: 

277 """ 

278 Perform a low-confidence fuzzy match for stop intent before fallback processing. 

279 

280 This method attempts to match stop-related vocabulary with low confidence and handle stopping of active skills. 

281 

282 Parameters: 

283 utterances (List[str]): List of input utterances to match against stop vocabulary 

284 lang (str): Four-letter ISO language code for vocabulary matching 

285 message (Message): Message context used for generating replies and managing session 

286 

287 Returns: 

288 Optional[PipelineMatch]: A pipeline match object if a stop action is handled, otherwise None 

289 

290 Notes: 

291 - Increases confidence if active skills are present 

292 - Attempts to stop individual skills before emitting a global stop signal 

293 - Handles language-specific vocabulary matching 

294 - Configurable minimum confidence threshold for stop intent 

295 """ 

296 lang = self._get_closest_lang(lang) 

297 if lang is None: # no vocs registered for this lang 

298 return None 

299 sess = SessionManager.get(message) 

300 # we call flatten in case someone is sending the old style list of tuples 

301 utterance = flatten_list(utterances)[0] 

302 

303 conf = match_one(utterance, self._voc_cache[lang]['stop'])[1] 

304 if len(self.get_active_skills(message)) > 0: 

305 conf += 0.1 

306 conf = round(min(conf, 1.0), 3) 

307 

308 if conf < self.config.get("min_conf", 0.5): 

309 return None 

310 

311 # check if any skill can stop 

312 for skill_id in self._collect_stop_skills(message): 

313 LOG.debug(f"Telling skill to stop: {skill_id}") 

314 sess.disable_response_mode(skill_id) 

315 self.bus.once(f"{skill_id}.stop.response", self.handle_stop_confirmation) 

316 return IntentHandlerMatch( 

317 match_type="stop:skill", 

318 match_data={"conf": conf, "skill_id": skill_id}, 

319 updated_session=sess, 

320 utterance=utterance, 

321 skill_id="stop.openvoiceos" 

322 ) 

323 

324 # emit a global stop, full stop anything OVOS is doing 

325 LOG.debug(f"Emitting global stop signal, {len(self.get_active_skills(message))} active skills") 

326 return IntentHandlerMatch( 

327 match_type="stop:global", 

328 match_data={"conf": conf}, 

329 updated_session=sess, 

330 utterance=utterance, 

331 skill_id="stop.openvoiceos" 

332 ) 

333 

334 def _get_closest_lang(self, lang: str) -> Optional[str]: 

335 if self._voc_cache: 

336 lang = standardize_lang_tag(lang) 

337 closest, score = closest_match(lang, list(self._voc_cache.keys())) 

338 # https://langcodes-hickford.readthedocs.io/en/sphinx/index.html#distance-values 

339 # 0 -> These codes represent the same language, possibly after filling in values and normalizing. 

340 # 1- 3 -> These codes indicate a minor regional difference. 

341 # 4 - 10 -> These codes indicate a significant but unproblematic regional difference. 

342 if score < 10: 

343 return closest 

344 return None 

345 

346 def voc_match(self, utt: str, voc_filename: str, lang: str, 

347 exact: bool = False): 

348 """ 

349 TODO - should use ovos_workshop method instead of reimplementing here 

350 look into subclassing from OVOSAbstractApp 

351 

352 Determine if the given utterance contains the vocabulary provided. 

353 

354 By default the method checks if the utterance contains the given vocab 

355 thereby allowing the user to say things like "yes, please" and still 

356 match against "Yes.voc" containing only "yes". An exact match can be 

357 requested. 

358 

359 The method first checks in the current Skill's .voc files and secondly 

360 in the "res/text" folder of mycroft-core. The result is cached to 

361 avoid hitting the disk each time the method is called. 

362 

363 Args: 

364 utt (str): Utterance to be tested 

365 voc_filename (str): Name of vocabulary file (e.g. 'yes' for 

366 'res/text/en-us/yes.voc') 

367 lang (str): Language code, defaults to self.lang 

368 exact (bool): Whether the vocab must exactly match the utterance 

369 

370 Returns: 

371 bool: True if the utterance has the given vocabulary it 

372 """ 

373 lang = self._get_closest_lang(lang) 

374 if lang is None: # no vocs registered for this lang 

375 return False 

376 

377 _vocs = self._voc_cache[lang].get(voc_filename) or [] 

378 

379 if utt and _vocs: 

380 if exact: 

381 # Check for exact match 

382 return any(i.strip().lower() == utt.lower() 

383 for i in _vocs) 

384 else: 

385 # Check for matches against complete words 

386 return any([re.match(r'.*\b' + i + r'\b.*', utt, re.IGNORECASE) 

387 for i in _vocs]) 

388 return False 

389 

390 def shutdown(self): 

391 self.bus.remove("stop:global", self.handle_global_stop) 

392 self.bus.remove("stop:skill", self.handle_skill_stop)