Coverage for ovos_core/intent_services/fallback_service.py: 88%
94 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-17 13:44 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-17 13:44 +0000
1# Copyright 2020 Mycroft AI Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15import operator
16import time
17from collections import namedtuple
18from typing import Optional, Dict, List, Union
20from ovos_bus_client.client import MessageBusClient
21from ovos_bus_client.message import Message
22from ovos_bus_client.session import SessionManager
23from ovos_config import Configuration
24from ovos_plugin_manager.templates.pipeline import ConfidenceMatcherPipeline, IntentHandlerMatch
25from ovos_utils import flatten_list
26from ovos_utils.fakebus import FakeBus
27from ovos_utils.lang import standardize_lang_tag
28from ovos_utils.log import LOG
29from ovos_workshop.permissions import FallbackMode
31FallbackRange = namedtuple('FallbackRange', ['start', 'stop'])
34class FallbackService(ConfidenceMatcherPipeline):
35 """Intent Service handling fallback skills."""
37 def __init__(self, bus: Optional[Union[MessageBusClient, FakeBus]] = None,
38 config: Optional[Dict] = None):
39 config = config or Configuration().get("skills", {}).get("fallbacks", {})
40 super().__init__(bus, config)
41 self.registered_fallbacks = {} # skill_id: priority
42 self.bus.on("ovos.skills.fallback.register", self.handle_register_fallback)
43 self.bus.on("ovos.skills.fallback.deregister", self.handle_deregister_fallback)
45 def handle_register_fallback(self, message: Message):
46 skill_id = message.data.get("skill_id")
47 priority = message.data.get("priority") or 101
49 # check if .conf is overriding the priority for this skill
50 priority_overrides = self.config.get("fallback_priorities", {})
51 if skill_id in priority_overrides:
52 new_priority = priority_overrides.get(skill_id)
53 LOG.info(f"forcing {skill_id} fallback priority from {priority} to {new_priority}")
54 self.registered_fallbacks[skill_id] = new_priority
55 else:
56 self.registered_fallbacks[skill_id] = priority
58 def handle_deregister_fallback(self, message: Message):
59 skill_id = message.data.get("skill_id")
60 if skill_id in self.registered_fallbacks:
61 self.registered_fallbacks.pop(skill_id)
63 def _fallback_allowed(self, skill_id: str) -> bool:
64 """Checks if a skill_id is allowed to fallback
66 - is the skill blacklisted from fallback
67 - is fallback configured to only allow specific skills
69 Args:
70 skill_id (str): identifier of skill that wants to fallback.
72 Returns:
73 permitted (bool): True if skill can fallback
74 """
75 opmode = self.config.get("fallback_mode", FallbackMode.ACCEPT_ALL)
76 if opmode == FallbackMode.BLACKLIST and skill_id in \
77 self.config.get("fallback_blacklist", []):
78 return False
79 elif opmode == FallbackMode.WHITELIST and skill_id not in \
80 self.config.get("fallback_whitelist", []):
81 return False
82 return True
84 def _collect_fallback_skills(self, message: Message,
85 fb_range: Optional[FallbackRange] = None) -> List[str]:
86 """use the messagebus api to determine which skills have registered fallback handlers
88 Individual skills respond to this request via the `can_answer` method
89 """
90 if fb_range is None:
91 fb_range = FallbackRange(0, 100)
92 skill_ids = [] # skill_ids that already answered to ping
93 fallback_skills = [] # skill_ids that want to handle fallback
95 sess = SessionManager.get(message)
96 # filter skills outside the fallback_range
97 in_range = [s for s, p in self.registered_fallbacks.items()
98 if fb_range.start < p <= fb_range.stop
99 and s not in sess.blacklisted_skills]
100 skill_ids += [s for s in self.registered_fallbacks if s not in in_range]
102 def handle_ack(msg):
103 skill_id = msg.data["skill_id"]
104 if msg.data.get("can_handle", True):
105 if skill_id in in_range:
106 fallback_skills.append(skill_id)
107 LOG.info(f"{skill_id} will try to handle fallback")
108 else:
109 LOG.debug(f"{skill_id} is out of range, skipping")
110 else:
111 LOG.debug(f"{skill_id} does NOT WANT to try to handle fallback")
112 skill_ids.append(skill_id)
114 if in_range: # no need to search if no skills available
115 self.bus.on("ovos.skills.fallback.pong", handle_ack)
117 LOG.info("checking for FallbackSkill candidates")
118 message.data["range"] = (fb_range.start, fb_range.stop)
119 # wait for all skills to acknowledge they want to answer fallback queries
120 self.bus.emit(message.forward("ovos.skills.fallback.ping",
121 message.data))
122 start = time.time()
123 while not all(s in skill_ids for s in self.registered_fallbacks) \
124 and time.time() - start <= 0.5:
125 time.sleep(0.02)
127 self.bus.remove("ovos.skills.fallback.pong", handle_ack)
128 return fallback_skills
130 def _fallback_range(self, utterances: List[str], lang: str,
131 message: Message, fb_range: FallbackRange) -> Optional[IntentHandlerMatch]:
132 """Send fallback request for a specified priority range.
134 Args:
135 utterances (list): List of tuples,
136 utterances and normalized version
137 lang (str): Langauge code
138 message: Message for session context
139 fb_range (FallbackRange): fallback order start and stop.
141 Returns:
142 PipelineMatch or None
143 """
144 lang = standardize_lang_tag(lang)
145 # we call flatten in case someone is sending the old style list of tuples
146 utterances = flatten_list(utterances)
147 message.data["utterances"] = utterances # all transcripts
148 message.data["lang"] = lang
150 sess = SessionManager.get(message)
151 # new style bus api
152 available_skills = self._collect_fallback_skills(message, fb_range)
153 fallbacks = [(k, v) for k, v in self.registered_fallbacks.items()
154 if k in available_skills]
155 sorted_handlers = sorted(fallbacks, key=operator.itemgetter(1))
157 for skill_id, prio in sorted_handlers:
158 if skill_id in sess.blacklisted_skills:
159 LOG.debug(f"ignoring match, skill_id '{skill_id}' blacklisted by Session '{sess.session_id}'")
160 continue
162 if self._fallback_allowed(skill_id):
163 return IntentHandlerMatch(
164 match_type=f"ovos.skills.fallback.{skill_id}.request",
165 match_data={"skill_id": skill_id,
166 "utterances": utterances,
167 "lang": lang},
168 utterance=utterances[0],
169 updated_session=sess
170 )
172 return None
174 def match_high(self, utterances: List[str], lang: str, message: Message) -> Optional[IntentHandlerMatch]:
175 """High confidence/quality matchers."""
176 return self._fallback_range(utterances, lang, message,
177 FallbackRange(0, 5))
179 def match_medium(self, utterances: List[str], lang: str, message: Message) -> Optional[IntentHandlerMatch]:
180 """General fallbacks."""
181 return self._fallback_range(utterances, lang, message,
182 FallbackRange(5, 90))
184 def match_low(self, utterances: List[str], lang: str, message: Message) -> Optional[IntentHandlerMatch]:
185 """Low prio fallbacks with general matching such as chat-bot."""
186 return self._fallback_range(utterances, lang, message,
187 FallbackRange(90, 101))
189 def shutdown(self):
190 self.bus.remove("ovos.skills.fallback.register", self.handle_register_fallback)
191 self.bus.remove("ovos.skills.fallback.deregister", self.handle_deregister_fallback)