Coverage for ovos_core/intent_services/converse_service.py: 75%
189 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-17 13:44 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-17 13:44 +0000
1import time
2from threading import Event
3from typing import Optional, Dict, List, Union
5from ovos_bus_client.client import MessageBusClient
6from ovos_bus_client.message import Message
7from ovos_bus_client.session import SessionManager, UtteranceState, Session
8from ovos_config.config import Configuration
9from ovos_utils import flatten_list
10from ovos_utils.fakebus import FakeBus
11from ovos_utils.lang import standardize_lang_tag
12from ovos_utils.log import LOG
14from ovos_plugin_manager.templates.pipeline import PipelinePlugin, IntentHandlerMatch
15from ovos_workshop.permissions import ConverseMode, ConverseActivationMode
18class ConverseService(PipelinePlugin):
19 """Intent Service handling conversational skills."""
21 def __init__(self, bus: Optional[Union[MessageBusClient, FakeBus]] = None,
22 config: Optional[Dict] = None):
23 config = config or Configuration().get("skills", {}).get("converse", {})
24 super().__init__(bus, config)
25 self._consecutive_activations = {}
26 self.bus.on('intent.service.skills.deactivate', self.handle_deactivate_skill_request)
27 self.bus.on('intent.service.skills.activate', self.handle_activate_skill_request)
28 self.bus.on('intent.service.active_skills.get', self.handle_get_active_skills)
29 self.bus.on("skill.converse.get_response.enable", self.handle_get_response_enable)
30 self.bus.on("skill.converse.get_response.disable", self.handle_get_response_disable)
31 self.bus.on("converse:skill", self.handle_converse)
33 def handle_converse(self, message: Message):
34 skill_id = message.data["skill_id"]
35 self.bus.emit(message.reply(f"{skill_id}.converse.request", message.data))
37 @property
38 def active_skills(self):
39 session = SessionManager.get()
40 return session.active_skills
42 @active_skills.setter
43 def active_skills(self, val):
44 session = SessionManager.get()
45 session.active_skills = []
46 for skill_id, ts in val:
47 session.activate_skill(skill_id)
49 @staticmethod
50 def get_active_skills(message: Optional[Message] = None) -> List[str]:
51 """Active skill ids ordered by converse priority
52 this represents the order in which converse will be called
54 Returns:
55 active_skills (list): ordered list of skill_ids
56 """
57 session = SessionManager.get(message)
58 return [skill[0] for skill in session.active_skills]
60 def deactivate_skill(self, skill_id: str, source_skill: Optional[str] = None,
61 message: Optional[Message] = None):
62 """Remove a skill from being targetable by converse.
64 Args:
65 skill_id (str): skill to remove
66 source_skill (str): skill requesting the removal
67 message (Message): the bus message that requested deactivation
68 """
69 source_skill = source_skill or skill_id
70 if self._deactivate_allowed(skill_id, source_skill):
71 session = SessionManager.get(message)
72 if session.is_active(skill_id):
73 # update converse session
74 session.deactivate_skill(skill_id)
76 # keep message.context
77 message = message or Message("")
78 message.context["session"] = session.serialize() # update session active skills
79 # send bus event
80 self.bus.emit(
81 message.forward("intent.service.skills.deactivated",
82 data={"skill_id": skill_id}))
83 if skill_id in self._consecutive_activations:
84 self._consecutive_activations[skill_id] = 0
86 def activate_skill(self, skill_id: str, source_skill: Optional[str] = None,
87 message: Optional[Message] = None) -> Optional[Session]:
88 """Add a skill or update the position of an active skill.
90 The skill is added to the front of the list, if it's already in the
91 list it's removed so there is only a single entry of it.
93 Args:
94 skill_id (str): identifier of skill to be added.
95 source_skill (str): skill requesting the removal
96 message (Message): the bus message that requested activation
97 """
98 source_skill = source_skill or skill_id
99 if self._activate_allowed(skill_id, source_skill):
100 # update converse session
101 session = SessionManager.get(message)
102 session.activate_skill(skill_id)
104 # keep message.context
105 message = message or Message("")
106 message.context["session"] = session.serialize() # update session active skills
107 message = message.forward("intent.service.skills.activated",
108 {"skill_id": skill_id})
109 # send bus event
110 self.bus.emit(message)
111 # update activation counter
112 self._consecutive_activations[skill_id] += 1
113 return session
115 def _activate_allowed(self, skill_id: str, source_skill: Optional[str] = None) -> bool:
116 """Checks if a skill_id is allowed to jump to the front of active skills list
118 - can a skill activate a different skill
119 - is the skill blacklisted from conversing
120 - is converse configured to only allow specific skills
121 - did the skill activate too many times in a row
123 Args:
124 skill_id (str): identifier of skill to be added.
125 source_skill (str): skill requesting the removal
127 Returns:
128 permitted (bool): True if skill can be activated
129 """
131 # cross activation control if skills can activate each other
132 if not self.config.get("cross_activation"):
133 source_skill = source_skill or skill_id
134 if skill_id != source_skill:
135 # different skill is trying to activate this skill
136 return False
138 # mode of activation dictates under what conditions a skill is
139 # allowed to activate itself
140 acmode = self.config.get("converse_activation") or \
141 ConverseActivationMode.ACCEPT_ALL
142 if acmode == ConverseActivationMode.PRIORITY:
143 prio = self.config.get("converse_priorities") or {}
144 # only allowed to activate if no skill with higher priority is
145 # active, currently there is no api for skills to
146 # define their default priority, this is a user/developer setting
147 priority = prio.get(skill_id, 50)
148 if any(p > priority for p in
149 [prio.get(s, 50) for s in self.get_active_skills()]):
150 return False
151 elif acmode == ConverseActivationMode.BLACKLIST:
152 if skill_id in self.config.get("converse_blacklist", []):
153 return False
154 elif acmode == ConverseActivationMode.WHITELIST:
155 if skill_id not in self.config.get("converse_whitelist", []):
156 return False
158 # limit of consecutive activations
159 default_max = self.config.get("max_activations", -1)
160 # per skill override limit of consecutive activations
161 skill_max = self.config.get("skill_activations", {}).get(skill_id)
162 max_activations = skill_max or default_max
163 if skill_id not in self._consecutive_activations:
164 self._consecutive_activations[skill_id] = 0
165 if max_activations < 0:
166 pass # no limit (mycroft-core default)
167 elif max_activations == 0:
168 return False # skill activation disabled
169 elif self._consecutive_activations.get(skill_id, 0) > max_activations:
170 return False # skill exceeded authorized consecutive number of activations
171 return True
173 def _deactivate_allowed(self, skill_id: str, source_skill: Optional[str] = None) -> bool:
174 """Checks if a skill_id is allowed to be removed from active skills list
176 - can a skill deactivate a different skill
178 Args:
179 skill_id (str): identifier of skill to be added.
180 source_skill (str): skill requesting the removal
182 Returns:
183 permitted (bool): True if skill can be deactivated
184 """
185 # cross activation control if skills can deactivate each other
186 if not self.config.get("cross_activation"):
187 source_skill = source_skill or skill_id
188 if skill_id != source_skill:
189 # different skill is trying to deactivate this skill
190 return False
191 return True
193 def _converse_allowed(self, skill_id: str) -> bool:
194 """Checks if a skill_id is allowed to converse
196 - is the skill blacklisted from conversing
197 - is converse configured to only allow specific skills
199 Args:
200 skill_id (str): identifier of skill that wants to converse.
202 Returns:
203 permitted (bool): True if skill can converse
204 """
205 opmode = self.config.get("converse_mode",
206 ConverseMode.ACCEPT_ALL)
207 if opmode == ConverseMode.BLACKLIST and skill_id in \
208 self.config.get("converse_blacklist", []):
209 return False
210 elif opmode == ConverseMode.WHITELIST and skill_id not in \
211 self.config.get("converse_whitelist", []):
212 return False
213 return True
215 def _collect_converse_skills(self, message: Message) -> List[str]:
216 """use the messagebus api to determine which skills want to converse
218 Individual skills respond to this request via the `can_converse` method"""
219 skill_ids = []
220 want_converse = []
221 session = SessionManager.get(message)
223 # note: this is sorted by priority already
224 active_skills = [skill_id for skill_id in self.get_active_skills(message)
225 if session.utterance_states.get(skill_id, UtteranceState.INTENT) == UtteranceState.INTENT]
226 if not active_skills:
227 return want_converse
229 event = Event()
231 def handle_ack(msg):
232 nonlocal event
233 skill_id = msg.data["skill_id"]
235 # validate the converse pong
236 if all((skill_id not in want_converse,
237 msg.data.get("can_handle", True),
238 skill_id in active_skills)):
239 want_converse.append(skill_id)
241 if skill_id not in skill_ids: # track which answer we got
242 skill_ids.append(skill_id)
244 if all(s in skill_ids for s in active_skills):
245 # all skills answered the ping!
246 event.set()
248 self.bus.on("skill.converse.pong", handle_ack)
250 # ask skills if they want to converse
251 for skill_id in active_skills:
252 self.bus.emit(message.forward(f"{skill_id}.converse.ping", {**message.data, "skill_id": skill_id}))
254 # wait for all skills to acknowledge they want to converse
255 event.wait(timeout=0.5)
257 self.bus.remove("skill.converse.pong", handle_ack)
258 return want_converse
260 def _check_converse_timeout(self, message: Message):
261 """ filter active skill list based on timestamps """
262 timeouts = self.config.get("skill_timeouts") or {}
263 def_timeout = self.config.get("timeout", 300)
264 session = SessionManager.get(message)
265 session.active_skills = [
266 skill for skill in session.active_skills
267 if time.time() - skill[1] <= timeouts.get(skill[0], def_timeout)]
269 def match(self, utterances: List[str], lang: str, message: Message) -> Optional[IntentHandlerMatch]:
270 """
271 Attempt to converse with active skills for a given set of utterances.
273 Iterates through active skills to find one that can handle the utterance. Filters skills based on timeout and blacklist status.
275 Args:
276 utterances (List[str]): List of utterance strings to process
277 lang (str): 4-letter ISO language code for the utterances
278 message (Message): Message context for generating a reply
280 Returns:
281 PipelineMatch: Match details if a skill successfully handles the utterance, otherwise None
282 - handled (bool): Whether the utterance was fully handled
283 - match_data (dict): Additional match metadata
284 - skill_id (str): ID of the skill that handled the utterance
285 - updated_session (Session): Current session state after skill interaction
286 - utterance (str): The original utterance processed
288 Notes:
289 - Standardizes language tag
290 - Filters out blacklisted skills
291 - Checks for skill conversation timeouts
292 - Attempts conversation with each eligible skill
293 """
294 lang = standardize_lang_tag(lang)
295 session = SessionManager.get(message)
297 # we call flatten in case someone is sending the old style list of tuples
298 utterances = flatten_list(utterances)
300 # note: this is sorted by priority already
301 gr_skills = [skill_id for skill_id in self.get_active_skills(message)
302 if session.utterance_states.get(skill_id, UtteranceState.INTENT) == UtteranceState.RESPONSE]
304 # check if any skill wants to capture utterance for self.get_response method
305 for skill_id in gr_skills:
306 if skill_id in session.blacklisted_skills:
307 LOG.debug(f"ignoring match, skill_id '{skill_id}' blacklisted by Session '{session.session_id}'")
308 continue
309 LOG.debug(f"utterance captured by skill.get_response method: {skill_id}")
310 return IntentHandlerMatch(
311 match_type=f"{skill_id}.converse.get_response",
312 match_data={"utterances": utterances, "lang": lang},
313 skill_id=skill_id,
314 utterance=utterances[0],
315 updated_session=session
316 )
318 # filter allowed skills
319 self._check_converse_timeout(message)
321 # check if any skill wants to converse
322 for skill_id in self._collect_converse_skills(message):
323 if skill_id in session.blacklisted_skills:
324 LOG.debug(f"ignoring match, skill_id '{skill_id}' blacklisted by Session '{session.session_id}'")
325 continue
326 LOG.debug(f"Attempting to converse with skill: {skill_id}")
327 if self._converse_allowed(skill_id):
328 return IntentHandlerMatch(
329 match_type="converse:skill",
330 match_data={"utterances": utterances, "lang": lang, "skill_id": skill_id},
331 skill_id=skill_id,
332 utterance=utterances[0],
333 updated_session=session
334 )
336 return None
338 @staticmethod
339 def handle_get_response_enable(message: Message):
340 skill_id = message.data["skill_id"]
341 session = SessionManager.get(message)
342 session.enable_response_mode(skill_id)
343 if session.session_id == "default":
344 SessionManager.sync(message)
346 @staticmethod
347 def handle_get_response_disable(message: Message):
348 skill_id = message.data["skill_id"]
349 session = SessionManager.get(message)
350 session.disable_response_mode(skill_id)
351 if session.session_id == "default":
352 SessionManager.sync(message)
354 def handle_activate_skill_request(self, message: Message):
355 # TODO imperfect solution - only a skill can activate itself
356 # someone can forge this message and emit it raw, but in OpenVoiceOS all
357 # skill messages should have skill_id in context, so let's make sure
358 # this doesnt happen accidentally at very least
359 skill_id = message.data['skill_id']
360 source_skill = message.context.get("skill_id")
361 self.activate_skill(skill_id, source_skill, message)
362 sess = SessionManager.get(message)
363 if sess.session_id == "default":
364 SessionManager.sync(message)
366 def handle_deactivate_skill_request(self, message: Message):
367 # TODO imperfect solution - only a skill can deactivate itself
368 # someone can forge this message and emit it raw, but in ovos-core all
369 # skill message should have skill_id in context, so let's make sure
370 # this doesnt happen accidentally
371 skill_id = message.data['skill_id']
372 source_skill = message.context.get("skill_id") or skill_id
373 self.deactivate_skill(skill_id, source_skill, message)
374 sess = SessionManager.get(message)
375 if sess.session_id == "default":
376 SessionManager.sync(message)
378 def handle_get_active_skills(self, message: Message):
379 """Send active skills to caller.
381 Argument:
382 message: query message to reply to.
383 """
384 self.bus.emit(message.reply("intent.service.active_skills.reply",
385 {"skills": self.get_active_skills(message)}))
387 def shutdown(self):
388 self.bus.remove("converse:skill", self.handle_converse)
389 self.bus.remove('intent.service.skills.deactivate', self.handle_deactivate_skill_request)
390 self.bus.remove('intent.service.skills.activate', self.handle_activate_skill_request)
391 self.bus.remove('intent.service.active_skills.get', self.handle_get_active_skills)
392 self.bus.remove("skill.converse.get_response.enable", self.handle_get_response_enable)
393 self.bus.remove("skill.converse.get_response.disable", self.handle_get_response_disable)