Coverage for ovos_core/intent_services/service.py: 70%
322 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-17 13:44 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-17 13:44 +0000
1# Copyright 2017 Mycroft AI Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
16import json
17import re
18import time
19from collections import defaultdict
20from typing import Tuple, Callable, List
22import requests
23from langcodes import closest_match
24from ovos_bus_client.message import Message
25from ovos_bus_client.session import SessionManager
26from ovos_bus_client.util import get_message_lang
27from ovos_config.config import Configuration
28from ovos_config.locale import get_valid_languages
29from ovos_utils.lang import standardize_lang_tag
30from ovos_utils.log import LOG
31from ovos_utils.metrics import Stopwatch
32from ovos_utils.process_utils import ProcessStatus, StatusCallbackMap
33from ovos_utils.thread_utils import create_daemon
35from ovos_core.transformers import MetadataTransformersService, UtteranceTransformersService, IntentTransformersService
36from ovos_plugin_manager.pipeline import OVOSPipelineFactory
37from ovos_plugin_manager.templates.pipeline import IntentHandlerMatch, ConfidenceMatcherPipeline
40def on_started():
41 LOG.info('IntentService is starting up.')
44def on_alive():
45 LOG.info('IntentService is alive.')
48def on_ready():
49 LOG.info('IntentService is ready.')
52def on_error(e='Unknown'):
53 LOG.info(f'IntentService failed to launch ({e})')
56def on_stopping():
57 LOG.info('IntentService is shutting down...')
60class IntentService:
61 """OVOS intent service. parses utterances using a variety of systems.
63 The intent service also provides the internal API for registering and
64 querying the intent service.
65 """
67 def __init__(self, bus, config=None, preload_pipelines=True,
68 alive_hook=on_alive, started_hook=on_started,
69 ready_hook=on_ready,
70 error_hook=on_error, stopping_hook=on_stopping):
71 """
72 Initializes the IntentService with all intent parsing pipelines, transformer services, and messagebus event handlers.
74 Args:
75 bus: The messagebus connection used for event-driven communication.
76 config: Optional configuration dictionary for intent services.
78 Sets up skill name mapping, loads all supported intent matching pipelines (including Adapt, Padatious, Padacioso, Fallback, Converse, CommonQA, Stop, OCP, Persona, and optionally LLM and Model2Vec pipelines), initializes utterance and metadata transformer services, connects the session manager, and registers all relevant messagebus event handlers for utterance processing, context management, intent queries, and skill deactivation tracking.
79 """
80 callbacks = StatusCallbackMap(on_started=started_hook,
81 on_alive=alive_hook,
82 on_ready=ready_hook,
83 on_error=error_hook,
84 on_stopping=stopping_hook)
85 self.bus = bus
86 self.status = ProcessStatus('intents', bus=self.bus, callback_map=callbacks)
87 self.status.set_started()
88 self.config = config or Configuration().get("intents", {})
90 # load and cache the plugins right away so they receive all bus messages
91 self.pipeline_plugins = {}
93 self.utterance_plugins = UtteranceTransformersService(bus)
94 self.metadata_plugins = MetadataTransformersService(bus)
95 self.intent_plugins = IntentTransformersService(bus)
97 # connection SessionManager to the bus,
98 # this will sync default session across all components
99 SessionManager.connect_to_bus(self.bus)
101 self.bus.on('recognizer_loop:utterance', self.handle_utterance)
103 # Context related handlers
104 self.bus.on('add_context', self.handle_add_context)
105 self.bus.on('remove_context', self.handle_remove_context)
106 self.bus.on('clear_context', self.handle_clear_context)
108 # Intents API
109 self.bus.on('intent.service.intent.get', self.handle_get_intent)
111 # internal, track skills that call self.deactivate to avoid reactivating them again
112 self._deactivations = defaultdict(list)
113 self.bus.on('intent.service.skills.deactivate', self._handle_deactivate)
114 self.bus.on('intent.service.pipelines.reload', self.handle_reload_pipelines)
116 self.status.set_alive()
117 if preload_pipelines:
118 self.bus.emit(Message('intent.service.pipelines.reload'))
120 def handle_reload_pipelines(self, message: Message):
121 pipeline_plugins = OVOSPipelineFactory.get_installed_pipeline_ids()
122 LOG.debug(f"Installed pipeline plugins: {pipeline_plugins}")
123 for p in pipeline_plugins:
124 try:
125 self.pipeline_plugins[p] = OVOSPipelineFactory.load_plugin(p, bus=self.bus)
126 LOG.debug(f"Loaded pipeline plugin: '{p}'")
127 except Exception as e:
128 LOG.error(f"Failed to load pipeline plugin '{p}': {e}")
129 self.status.set_ready()
131 def _handle_transformers(self, message):
132 """
133 Pipe utterance through transformer plugins to get more metadata.
134 Utterances may be modified by any parser and context overwritten
135 """
136 lang = get_message_lang(message) # per query lang or default Configuration lang
137 original = utterances = message.data.get('utterances', [])
138 message.context["lang"] = lang
139 utterances, message.context = self.utterance_plugins.transform(utterances, message.context)
140 if original != utterances:
141 message.data["utterances"] = utterances
142 LOG.debug(f"utterances transformed: {original} -> {utterances}")
143 message.context = self.metadata_plugins.transform(message.context)
144 return message
146 @staticmethod
147 def disambiguate_lang(message):
148 """ disambiguate language of the query via pre-defined context keys
149 1 - stt_lang -> tagged in stt stage (STT used this lang to transcribe speech)
150 2 - request_lang -> tagged in source message (wake word/request volunteered lang info)
151 3 - detected_lang -> tagged by transformers (text classification, free form chat)
152 4 - config lang (or from message.data)
153 """
154 default_lang = get_message_lang(message)
155 valid_langs = message.context.get("valid_langs") or get_valid_languages()
156 valid_langs = [standardize_lang_tag(l) for l in valid_langs]
157 lang_keys = ["stt_lang",
158 "request_lang",
159 "detected_lang"]
160 for k in lang_keys:
161 if k in message.context:
162 try:
163 v = standardize_lang_tag(message.context[k])
164 best_lang, _ = closest_match(v, valid_langs, max_distance=10)
165 except:
166 v = message.context[k]
167 best_lang = "und"
168 if best_lang == "und":
169 LOG.warning(f"ignoring {k}, {v} is not in enabled languages: {valid_langs}")
170 continue
171 LOG.info(f"replaced {default_lang} with {k}: {v}")
172 return v
174 return default_lang
176 def get_pipeline_matcher(self, matcher_id: str):
177 """
178 Retrieve a matcher function for a given pipeline matcher ID.
180 Args:
181 matcher_id: The configured matcher ID (e.g. `adapt_high`).
183 Returns:
184 A callable matcher function.
185 """
186 migration_map = {
187 "converse": "ovos-converse-pipeline-plugin",
188 "common_qa": "ovos-common-query-pipeline-plugin",
189 "fallback_high": "ovos-fallback-pipeline-plugin-high",
190 "fallback_medium": "ovos-fallback-pipeline-plugin-medium",
191 "fallback_low": "ovos-fallback-pipeline-plugin-low",
192 "stop_high": "ovos-stop-pipeline-plugin-high",
193 "stop_medium": "ovos-stop-pipeline-plugin-medium",
194 "stop_low": "ovos-stop-pipeline-plugin-low",
195 "adapt_high": "ovos-adapt-pipeline-plugin-high",
196 "adapt_medium": "ovos-adapt-pipeline-plugin-medium",
197 "adapt_low": "ovos-adapt-pipeline-plugin-low",
198 "padacioso_high": "ovos-padacioso-pipeline-plugin-high",
199 "padacioso_medium": "ovos-padacioso-pipeline-plugin-medium",
200 "padacioso_low": "ovos-padacioso-pipeline-plugin-low",
201 "padatious_high": "ovos-padatious-pipeline-plugin-high",
202 "padatious_medium": "ovos-padatious-pipeline-plugin-medium",
203 "padatious_low": "ovos-padatious-pipeline-plugin-low",
204 "ocp_high": "ovos-ocp-pipeline-plugin-high",
205 "ocp_medium": "ovos-ocp-pipeline-plugin-medium",
206 "ocp_low": "ovos-ocp-pipeline-plugin-low",
207 "ocp_legacy": "ovos-ocp-pipeline-plugin-legacy"
208 }
210 matcher_id = migration_map.get(matcher_id, matcher_id)
211 pipe_id = re.sub(r'-(high|medium|low)$', '', matcher_id)
212 plugin = self.pipeline_plugins.get(pipe_id)
213 if not plugin:
214 LOG.error(f"Unknown pipeline matcher: {matcher_id}")
215 return None
217 if isinstance(plugin, ConfidenceMatcherPipeline):
218 if matcher_id.endswith("-high"):
219 return plugin.match_high
220 if matcher_id.endswith("-medium"):
221 return plugin.match_medium
222 if matcher_id.endswith("-low"):
223 return plugin.match_low
224 return plugin.match
226 def get_pipeline(self, session=None) -> List[Tuple[str, Callable]]:
227 """return a list of matcher functions ordered by priority
228 utterances will be sent to each matcher in order until one can handle the utterance
229 the list can be configured in mycroft.conf under intents.pipeline,
230 in the future plugins will be supported for users to define their own pipeline"""
231 session = session or SessionManager.get()
232 matchers = [(p, self.get_pipeline_matcher(p)) for p in session.pipeline]
233 matchers = [m for m in matchers if m[1] is not None] # filter any that failed to load
234 final_pipeline = [k[0] for k in matchers]
235 if session.pipeline != final_pipeline:
236 LOG.warning(f"Requested some invalid pipeline components! "
237 f"filtered: {[k for k in session.pipeline if k not in final_pipeline]}")
238 LOG.debug(f"Session final pipeline: {final_pipeline}")
239 return matchers
241 @staticmethod
242 def _validate_session(message, lang):
243 # get session
244 lang = standardize_lang_tag(lang)
245 sess = SessionManager.get(message)
246 if sess.session_id == "default":
247 updated = False
248 # Default session, check if it needs to be (re)-created
249 if sess.expired():
250 sess = SessionManager.reset_default_session()
251 updated = True
252 if lang != sess.lang:
253 sess.lang = lang
254 updated = True
255 if updated:
256 SessionManager.update(sess)
257 SessionManager.sync(message)
258 else:
259 sess.lang = lang
260 SessionManager.update(sess)
261 sess.touch()
262 return sess
264 def _handle_deactivate(self, message):
265 """internal helper, track if a skill asked to be removed from active list during intent match
266 in this case we want to avoid reactivating it again
267 This only matters in PipelineMatchers, such as fallback and converse
268 in those cases the activation is only done AFTER the match, not before unlike intents
269 """
270 sess = SessionManager.get(message)
271 skill_id = message.data.get("skill_id")
272 self._deactivations[sess.session_id].append(skill_id)
274 def _emit_match_message(self, match: IntentHandlerMatch, message: Message, lang: str):
275 """
276 Emit a reply message for a matched intent, updating session and skill activation.
278 This method processes matched intents from either a pipeline matcher or an intent handler,
279 creating a reply message with matched intent details and managing skill activation.
281 Args:
282 match (IntentHandlerMatch): The matched intent object containing
283 utterance and matching information.
284 message (Message): The original messagebus message that triggered the intent match.
285 lang (str): The language of the pipeline plugin match
287 Details:
288 - Handles two types of matches: PipelineMatch and IntentHandlerMatch
289 - Creates a reply message with matched intent data
290 - Activates the corresponding skill if not previously deactivated
291 - Updates session information
292 - Emits the reply message on the messagebus
294 Side Effects:
295 - Modifies session state
296 - Emits a messagebus event
297 - Can trigger skill activation events
299 Returns:
300 None
301 """
302 try:
303 match = self.intent_plugins.transform(match)
304 except Exception as e:
305 LOG.error(f"Error in IntentTransformers: {e}")
307 reply = None
308 sess = match.updated_session or SessionManager.get(message)
309 sess.lang = lang # ensure it is updated
311 # Launch intent handler
312 if match.match_type:
313 # keep all original message.data and update with intent match
314 data = dict(message.data)
315 data.update(match.match_data)
316 reply = message.reply(match.match_type, data)
318 # upload intent metrics if enabled
319 create_daemon(self._upload_match_data, (match.utterance,
320 match.match_type,
321 lang,
322 match.match_data))
324 if reply is not None:
325 reply.data["utterance"] = match.utterance
326 reply.data["lang"] = lang
328 # update active skill list
329 if match.skill_id:
330 # ensure skill_id is present in message.context
331 reply.context["skill_id"] = match.skill_id
333 # NOTE: do not re-activate if the skill called self.deactivate
334 # we could also skip activation if skill is already active,
335 # but we still want to update the timestamp
336 was_deactivated = match.skill_id in self._deactivations[sess.session_id]
337 if not was_deactivated:
338 sess.activate_skill(match.skill_id)
339 # emit event for skills callback -> self.handle_activate
340 self.bus.emit(reply.forward(f"{match.skill_id}.activate"))
342 # update Session if modified by pipeline
343 reply.context["session"] = sess.serialize()
345 # finally emit reply message
346 self.bus.emit(reply)
348 else: # upload intent metrics if enabled
349 create_daemon(self._upload_match_data, (match.utterance,
350 "complete_intent_failure",
351 lang,
352 match.match_data))
354 @staticmethod
355 def _upload_match_data(utterance: str, intent: str, lang: str, match_data: dict):
356 """if enabled upload the intent match data to a server, allowing users and developers
357 to collect metrics/datasets to improve the pipeline plugins and skills.
359 There isn't a default server to upload things too, users needs to explicitly configure one
361 https://github.com/OpenVoiceOS/ovos-opendata-server
362 """
363 config = Configuration().get("open_data", {})
364 endpoints: List[str] = config.get("intent_urls", []) # eg. "http://localhost:8000/intents"
365 if not endpoints:
366 return # user didn't configure any endpoints to upload metrics to
367 if isinstance(endpoints, str):
368 endpoints = [endpoints]
369 headers = {"Content-Type": "application/x-www-form-urlencoded",
370 "User-Agent": config.get("user_agent", "ovos-metrics")}
371 data = {
372 "utterance": utterance,
373 "intent": intent,
374 "lang": lang,
375 "match_data": json.dumps(match_data, ensure_ascii=False)
376 }
377 for url in endpoints:
378 try:
379 # Add a timeout to prevent hanging
380 response = requests.post(url, data=data, headers=headers, timeout=3)
381 LOG.info(f"Uploaded intent metrics to '{url}' - Response: {response.status_code}")
382 except Exception as e:
383 LOG.warning(f"Failed to upload metrics: {e}")
385 def send_cancel_event(self, message):
386 """
387 Emit events and play a sound when an utterance is canceled.
389 Logs the cancellation with the specific cancel word, plays a predefined cancel sound,
390 and emits multiple events to signal the utterance cancellation.
392 Parameters:
393 message (Message): The original message that triggered the cancellation.
395 Events Emitted:
396 - 'mycroft.audio.play_sound': Plays a cancel sound from configuration
397 - 'ovos.utterance.cancelled': Signals that the utterance was canceled
398 - 'ovos.utterance.handled': Indicates the utterance processing is complete
400 Notes:
401 - Uses the default cancel sound path 'snd/cancel.mp3' if not specified in configuration
402 - Ensures events are sent as replies to the original message
403 """
404 LOG.info("utterance canceled, cancel_word:" + message.context.get("cancel_word"))
405 # play dedicated cancel sound
406 sound = Configuration().get('sounds', {}).get('cancel', "snd/cancel.mp3")
407 # NOTE: message.reply to ensure correct message destination
408 self.bus.emit(message.reply('mycroft.audio.play_sound', {"uri": sound}))
409 self.bus.emit(message.reply("ovos.utterance.cancelled"))
410 self.bus.emit(message.reply("ovos.utterance.handled"))
412 def handle_utterance(self, message: Message):
413 """Main entrypoint for handling user utterances
415 Monitor the messagebus for 'recognizer_loop:utterance', typically
416 generated by a spoken interaction but potentially also from a CLI
417 or other method of injecting a 'user utterance' into the system.
419 Utterances then work through this sequence to be handled:
420 1) UtteranceTransformers can modify the utterance and metadata in message.context
421 2) MetadataTransformers can modify the metadata in message.context
422 3) Language is extracted from message
423 4) Active skills attempt to handle using converse()
424 5) Padatious high match intents (conf > 0.95)
425 6) Adapt intent handlers
426 7) CommonQuery Skills
427 8) High Priority Fallbacks
428 9) Padatious near match intents (conf > 0.8)
429 10) General Fallbacks
430 11) Padatious loose match intents (conf > 0.5)
431 12) Catch all fallbacks including Unknown intent handler
433 If all these fail the complete_intent_failure message will be sent
434 and a generic error sound played.
436 Args:
437 message (Message): The messagebus data
438 """
439 # Get utterance utterance_plugins additional context
440 message = self._handle_transformers(message)
442 if message.context.get("canceled"):
443 self.send_cancel_event(message)
444 return
446 # tag language of this utterance
447 lang = self.disambiguate_lang(message)
449 utterances = message.data.get('utterances', [])
450 LOG.info(f"Parsing utterance: {utterances}")
452 stopwatch = Stopwatch()
454 # get session
455 sess = self._validate_session(message, lang)
456 message.context["session"] = sess.serialize()
458 # match
459 match = None
460 with stopwatch:
461 self._deactivations[sess.session_id] = []
462 # Loop through the matching functions until a match is found.
463 for pipeline, match_func in self.get_pipeline(session=sess):
464 langs = [lang]
465 if self.config.get("multilingual_matching"):
466 # if multilingual matching is enabled, attempt to match all user languages if main fails
467 langs += [l for l in get_valid_languages() if l != lang]
468 for intent_lang in langs:
469 match = match_func(utterances, intent_lang, message)
470 if match:
471 LOG.info(f"{pipeline} match ({intent_lang}): {match}")
472 if match.skill_id and match.skill_id in sess.blacklisted_skills:
473 LOG.debug(
474 f"ignoring match, skill_id '{match.skill_id}' blacklisted by Session '{sess.session_id}'")
475 continue
476 if isinstance(match, IntentHandlerMatch) and match.match_type in sess.blacklisted_intents:
477 LOG.debug(
478 f"ignoring match, intent '{match.match_type}' blacklisted by Session '{sess.session_id}'")
479 continue
480 try:
481 self._emit_match_message(match, message, intent_lang)
482 break
483 except:
484 LOG.exception(f"{match_func} returned an invalid match")
485 else:
486 LOG.debug(f"no match from {match_func}")
487 continue
488 break
489 else:
490 # Nothing was able to handle the intent
491 # Ask politely for forgiveness for failing in this vital task
492 message.data["lang"] = lang
493 self.send_complete_intent_failure(message)
495 LOG.debug(f"intent matching took: {stopwatch.time}")
497 # sync any changes made to the default session, eg by ConverseService
498 if sess.session_id == "default":
499 SessionManager.sync(message)
500 elif sess.session_id in self._deactivations:
501 self._deactivations.pop(sess.session_id)
502 return match, message.context, stopwatch
504 def send_complete_intent_failure(self, message):
505 """Send a message that no skill could handle the utterance.
507 Args:
508 message (Message): original message to forward from
509 """
510 sound = Configuration().get('sounds', {}).get('error', "snd/error.mp3")
511 # NOTE: message.reply to ensure correct message destination
512 self.bus.emit(message.reply('mycroft.audio.play_sound', {"uri": sound}))
513 self.bus.emit(message.reply('complete_intent_failure', message.data))
514 self.bus.emit(message.reply("ovos.utterance.handled"))
516 @staticmethod
517 def handle_add_context(message: Message):
518 """Add context
520 Args:
521 message: data contains the 'context' item to add
522 optionally can include 'word' to be injected as
523 an alias for the context item.
524 """
525 entity = {'confidence': 1.0}
526 context = message.data.get('context')
527 word = message.data.get('word') or ''
528 origin = message.data.get('origin') or ''
529 # if not a string type try creating a string from it
530 if not isinstance(word, str):
531 word = str(word)
532 entity['data'] = [(word, context)]
533 entity['match'] = word
534 entity['key'] = word
535 entity['origin'] = origin
536 sess = SessionManager.get(message)
537 sess.context.inject_context(entity)
539 @staticmethod
540 def handle_remove_context(message: Message):
541 """Remove specific context
543 Args:
544 message: data contains the 'context' item to remove
545 """
546 context = message.data.get('context')
547 if context:
548 sess = SessionManager.get(message)
549 sess.context.remove_context(context)
551 @staticmethod
552 def handle_clear_context(message: Message):
553 """Clears all keywords from context """
554 sess = SessionManager.get(message)
555 sess.context.clear_context()
557 def handle_get_intent(self, message):
558 """Get intent from either adapt or padatious.
560 Args:
561 message (Message): message containing utterance
562 """
563 utterance = message.data["utterance"]
564 lang = get_message_lang(message)
565 sess = SessionManager.get(message)
566 match = None
567 # Loop through the matching functions until a match is found.
568 for pipeline, match_func in self.get_pipeline(session=sess):
569 s = time.monotonic()
570 match = match_func([utterance], lang, message)
571 LOG.debug(f"matching '{pipeline}' took: {time.monotonic() - s} seconds")
572 if match:
573 if match.match_type:
574 intent_data = dict(match.match_data)
575 intent_data["intent_name"] = match.match_type
576 intent_data["intent_service"] = pipeline
577 intent_data["skill_id"] = match.skill_id
578 intent_data["handler"] = match_func.__name__
579 LOG.debug(f"final intent match: {intent_data}")
580 m = message.reply("intent.service.intent.reply",
581 {"intent": intent_data, "utterance": utterance})
582 self.bus.emit(m)
583 return
584 LOG.error(f"bad pipeline match! {match}")
585 # signal intent failure
586 self.bus.emit(message.reply("intent.service.intent.reply",
587 {"intent": None, "utterance": utterance}))
589 def shutdown(self):
590 self.utterance_plugins.shutdown()
591 self.metadata_plugins.shutdown()
592 for pipeline in self.pipeline_plugins.values():
593 if hasattr(pipeline, "stop"):
594 try:
595 pipeline.stop()
596 except Exception as e:
597 LOG.warning(f"Failed to stop pipeline {pipeline}: {e}")
598 continue
599 if hasattr(pipeline, "shutdown"):
600 try:
601 pipeline.shutdown()
602 except Exception as e:
603 LOG.warning(f"Failed to shutdown pipeline {pipeline}: {e}")
604 continue
606 self.bus.remove('recognizer_loop:utterance', self.handle_utterance)
607 self.bus.remove('add_context', self.handle_add_context)
608 self.bus.remove('remove_context', self.handle_remove_context)
609 self.bus.remove('clear_context', self.handle_clear_context)
610 self.bus.remove('intent.service.intent.get', self.handle_get_intent)
612 self.status.set_stopping()
615def launch_standalone():
616 from ovos_bus_client import MessageBusClient
617 from ovos_utils import wait_for_exit_signal
618 from ovos_config.locale import setup_locale
619 from ovos_utils.log import init_service_logger
621 LOG.info("Launching IntentService in standalone mode")
622 init_service_logger("intents")
623 setup_locale()
625 bus = MessageBusClient()
626 bus.run_in_thread()
627 bus.connected_event.wait()
629 intents = IntentService(bus)
631 wait_for_exit_signal()
633 intents.shutdown()
635 LOG.info('IntentService shutdown complete!')
638if __name__ == "__main__":
639 launch_standalone()