Coverage for ovos_core/intent_services/fallback_service.py: 88%

94 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-17 13:44 +0000

1# Copyright 2020 Mycroft AI Inc. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15import operator 

16import time 

17from collections import namedtuple 

18from typing import Optional, Dict, List, Union 

19 

20from ovos_bus_client.client import MessageBusClient 

21from ovos_bus_client.message import Message 

22from ovos_bus_client.session import SessionManager 

23from ovos_config import Configuration 

24from ovos_plugin_manager.templates.pipeline import ConfidenceMatcherPipeline, IntentHandlerMatch 

25from ovos_utils import flatten_list 

26from ovos_utils.fakebus import FakeBus 

27from ovos_utils.lang import standardize_lang_tag 

28from ovos_utils.log import LOG 

29from ovos_workshop.permissions import FallbackMode 

30 

31FallbackRange = namedtuple('FallbackRange', ['start', 'stop']) 

32 

33 

34class FallbackService(ConfidenceMatcherPipeline): 

35 """Intent Service handling fallback skills.""" 

36 

37 def __init__(self, bus: Optional[Union[MessageBusClient, FakeBus]] = None, 

38 config: Optional[Dict] = None): 

39 config = config or Configuration().get("skills", {}).get("fallbacks", {}) 

40 super().__init__(bus, config) 

41 self.registered_fallbacks = {} # skill_id: priority 

42 self.bus.on("ovos.skills.fallback.register", self.handle_register_fallback) 

43 self.bus.on("ovos.skills.fallback.deregister", self.handle_deregister_fallback) 

44 

45 def handle_register_fallback(self, message: Message): 

46 skill_id = message.data.get("skill_id") 

47 priority = message.data.get("priority") or 101 

48 

49 # check if .conf is overriding the priority for this skill 

50 priority_overrides = self.config.get("fallback_priorities", {}) 

51 if skill_id in priority_overrides: 

52 new_priority = priority_overrides.get(skill_id) 

53 LOG.info(f"forcing {skill_id} fallback priority from {priority} to {new_priority}") 

54 self.registered_fallbacks[skill_id] = new_priority 

55 else: 

56 self.registered_fallbacks[skill_id] = priority 

57 

58 def handle_deregister_fallback(self, message: Message): 

59 skill_id = message.data.get("skill_id") 

60 if skill_id in self.registered_fallbacks: 

61 self.registered_fallbacks.pop(skill_id) 

62 

63 def _fallback_allowed(self, skill_id: str) -> bool: 

64 """Checks if a skill_id is allowed to fallback 

65 

66 - is the skill blacklisted from fallback 

67 - is fallback configured to only allow specific skills 

68 

69 Args: 

70 skill_id (str): identifier of skill that wants to fallback. 

71 

72 Returns: 

73 permitted (bool): True if skill can fallback 

74 """ 

75 opmode = self.config.get("fallback_mode", FallbackMode.ACCEPT_ALL) 

76 if opmode == FallbackMode.BLACKLIST and skill_id in \ 

77 self.config.get("fallback_blacklist", []): 

78 return False 

79 elif opmode == FallbackMode.WHITELIST and skill_id not in \ 

80 self.config.get("fallback_whitelist", []): 

81 return False 

82 return True 

83 

84 def _collect_fallback_skills(self, message: Message, 

85 fb_range: Optional[FallbackRange] = None) -> List[str]: 

86 """use the messagebus api to determine which skills have registered fallback handlers 

87 

88 Individual skills respond to this request via the `can_answer` method 

89 """ 

90 if fb_range is None: 

91 fb_range = FallbackRange(0, 100) 

92 skill_ids = [] # skill_ids that already answered to ping 

93 fallback_skills = [] # skill_ids that want to handle fallback 

94 

95 sess = SessionManager.get(message) 

96 # filter skills outside the fallback_range 

97 in_range = [s for s, p in self.registered_fallbacks.items() 

98 if fb_range.start < p <= fb_range.stop 

99 and s not in sess.blacklisted_skills] 

100 skill_ids += [s for s in self.registered_fallbacks if s not in in_range] 

101 

102 def handle_ack(msg): 

103 skill_id = msg.data["skill_id"] 

104 if msg.data.get("can_handle", True): 

105 if skill_id in in_range: 

106 fallback_skills.append(skill_id) 

107 LOG.info(f"{skill_id} will try to handle fallback") 

108 else: 

109 LOG.debug(f"{skill_id} is out of range, skipping") 

110 else: 

111 LOG.debug(f"{skill_id} does NOT WANT to try to handle fallback") 

112 skill_ids.append(skill_id) 

113 

114 if in_range: # no need to search if no skills available 

115 self.bus.on("ovos.skills.fallback.pong", handle_ack) 

116 

117 LOG.info("checking for FallbackSkill candidates") 

118 message.data["range"] = (fb_range.start, fb_range.stop) 

119 # wait for all skills to acknowledge they want to answer fallback queries 

120 self.bus.emit(message.forward("ovos.skills.fallback.ping", 

121 message.data)) 

122 start = time.time() 

123 while not all(s in skill_ids for s in self.registered_fallbacks) \ 

124 and time.time() - start <= 0.5: 

125 time.sleep(0.02) 

126 

127 self.bus.remove("ovos.skills.fallback.pong", handle_ack) 

128 return fallback_skills 

129 

130 def _fallback_range(self, utterances: List[str], lang: str, 

131 message: Message, fb_range: FallbackRange) -> Optional[IntentHandlerMatch]: 

132 """Send fallback request for a specified priority range. 

133 

134 Args: 

135 utterances (list): List of tuples, 

136 utterances and normalized version 

137 lang (str): Langauge code 

138 message: Message for session context 

139 fb_range (FallbackRange): fallback order start and stop. 

140 

141 Returns: 

142 PipelineMatch or None 

143 """ 

144 lang = standardize_lang_tag(lang) 

145 # we call flatten in case someone is sending the old style list of tuples 

146 utterances = flatten_list(utterances) 

147 message.data["utterances"] = utterances # all transcripts 

148 message.data["lang"] = lang 

149 

150 sess = SessionManager.get(message) 

151 # new style bus api 

152 available_skills = self._collect_fallback_skills(message, fb_range) 

153 fallbacks = [(k, v) for k, v in self.registered_fallbacks.items() 

154 if k in available_skills] 

155 sorted_handlers = sorted(fallbacks, key=operator.itemgetter(1)) 

156 

157 for skill_id, prio in sorted_handlers: 

158 if skill_id in sess.blacklisted_skills: 

159 LOG.debug(f"ignoring match, skill_id '{skill_id}' blacklisted by Session '{sess.session_id}'") 

160 continue 

161 

162 if self._fallback_allowed(skill_id): 

163 return IntentHandlerMatch( 

164 match_type=f"ovos.skills.fallback.{skill_id}.request", 

165 match_data={"skill_id": skill_id, 

166 "utterances": utterances, 

167 "lang": lang}, 

168 utterance=utterances[0], 

169 updated_session=sess 

170 ) 

171 

172 return None 

173 

174 def match_high(self, utterances: List[str], lang: str, message: Message) -> Optional[IntentHandlerMatch]: 

175 """High confidence/quality matchers.""" 

176 return self._fallback_range(utterances, lang, message, 

177 FallbackRange(0, 5)) 

178 

179 def match_medium(self, utterances: List[str], lang: str, message: Message) -> Optional[IntentHandlerMatch]: 

180 """General fallbacks.""" 

181 return self._fallback_range(utterances, lang, message, 

182 FallbackRange(5, 90)) 

183 

184 def match_low(self, utterances: List[str], lang: str, message: Message) -> Optional[IntentHandlerMatch]: 

185 """Low prio fallbacks with general matching such as chat-bot.""" 

186 return self._fallback_range(utterances, lang, message, 

187 FallbackRange(90, 101)) 

188 

189 def shutdown(self): 

190 self.bus.remove("ovos.skills.fallback.register", self.handle_register_fallback) 

191 self.bus.remove("ovos.skills.fallback.deregister", self.handle_deregister_fallback)