Coverage for ovos_core/intent_services/converse_service.py: 75%

189 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-17 13:44 +0000

1import time 

2from threading import Event 

3from typing import Optional, Dict, List, Union 

4 

5from ovos_bus_client.client import MessageBusClient 

6from ovos_bus_client.message import Message 

7from ovos_bus_client.session import SessionManager, UtteranceState, Session 

8from ovos_config.config import Configuration 

9from ovos_utils import flatten_list 

10from ovos_utils.fakebus import FakeBus 

11from ovos_utils.lang import standardize_lang_tag 

12from ovos_utils.log import LOG 

13 

14from ovos_plugin_manager.templates.pipeline import PipelinePlugin, IntentHandlerMatch 

15from ovos_workshop.permissions import ConverseMode, ConverseActivationMode 

16 

17 

18class ConverseService(PipelinePlugin): 

19 """Intent Service handling conversational skills.""" 

20 

21 def __init__(self, bus: Optional[Union[MessageBusClient, FakeBus]] = None, 

22 config: Optional[Dict] = None): 

23 config = config or Configuration().get("skills", {}).get("converse", {}) 

24 super().__init__(bus, config) 

25 self._consecutive_activations = {} 

26 self.bus.on('intent.service.skills.deactivate', self.handle_deactivate_skill_request) 

27 self.bus.on('intent.service.skills.activate', self.handle_activate_skill_request) 

28 self.bus.on('intent.service.active_skills.get', self.handle_get_active_skills) 

29 self.bus.on("skill.converse.get_response.enable", self.handle_get_response_enable) 

30 self.bus.on("skill.converse.get_response.disable", self.handle_get_response_disable) 

31 self.bus.on("converse:skill", self.handle_converse) 

32 

33 def handle_converse(self, message: Message): 

34 skill_id = message.data["skill_id"] 

35 self.bus.emit(message.reply(f"{skill_id}.converse.request", message.data)) 

36 

37 @property 

38 def active_skills(self): 

39 session = SessionManager.get() 

40 return session.active_skills 

41 

42 @active_skills.setter 

43 def active_skills(self, val): 

44 session = SessionManager.get() 

45 session.active_skills = [] 

46 for skill_id, ts in val: 

47 session.activate_skill(skill_id) 

48 

49 @staticmethod 

50 def get_active_skills(message: Optional[Message] = None) -> List[str]: 

51 """Active skill ids ordered by converse priority 

52 this represents the order in which converse will be called 

53 

54 Returns: 

55 active_skills (list): ordered list of skill_ids 

56 """ 

57 session = SessionManager.get(message) 

58 return [skill[0] for skill in session.active_skills] 

59 

60 def deactivate_skill(self, skill_id: str, source_skill: Optional[str] = None, 

61 message: Optional[Message] = None): 

62 """Remove a skill from being targetable by converse. 

63 

64 Args: 

65 skill_id (str): skill to remove 

66 source_skill (str): skill requesting the removal 

67 message (Message): the bus message that requested deactivation 

68 """ 

69 source_skill = source_skill or skill_id 

70 if self._deactivate_allowed(skill_id, source_skill): 

71 session = SessionManager.get(message) 

72 if session.is_active(skill_id): 

73 # update converse session 

74 session.deactivate_skill(skill_id) 

75 

76 # keep message.context 

77 message = message or Message("") 

78 message.context["session"] = session.serialize() # update session active skills 

79 # send bus event 

80 self.bus.emit( 

81 message.forward("intent.service.skills.deactivated", 

82 data={"skill_id": skill_id})) 

83 if skill_id in self._consecutive_activations: 

84 self._consecutive_activations[skill_id] = 0 

85 

86 def activate_skill(self, skill_id: str, source_skill: Optional[str] = None, 

87 message: Optional[Message] = None) -> Optional[Session]: 

88 """Add a skill or update the position of an active skill. 

89 

90 The skill is added to the front of the list, if it's already in the 

91 list it's removed so there is only a single entry of it. 

92 

93 Args: 

94 skill_id (str): identifier of skill to be added. 

95 source_skill (str): skill requesting the removal 

96 message (Message): the bus message that requested activation 

97 """ 

98 source_skill = source_skill or skill_id 

99 if self._activate_allowed(skill_id, source_skill): 

100 # update converse session 

101 session = SessionManager.get(message) 

102 session.activate_skill(skill_id) 

103 

104 # keep message.context 

105 message = message or Message("") 

106 message.context["session"] = session.serialize() # update session active skills 

107 message = message.forward("intent.service.skills.activated", 

108 {"skill_id": skill_id}) 

109 # send bus event 

110 self.bus.emit(message) 

111 # update activation counter 

112 self._consecutive_activations[skill_id] += 1 

113 return session 

114 

115 def _activate_allowed(self, skill_id: str, source_skill: Optional[str] = None) -> bool: 

116 """Checks if a skill_id is allowed to jump to the front of active skills list 

117 

118 - can a skill activate a different skill 

119 - is the skill blacklisted from conversing 

120 - is converse configured to only allow specific skills 

121 - did the skill activate too many times in a row 

122 

123 Args: 

124 skill_id (str): identifier of skill to be added. 

125 source_skill (str): skill requesting the removal 

126 

127 Returns: 

128 permitted (bool): True if skill can be activated 

129 """ 

130 

131 # cross activation control if skills can activate each other 

132 if not self.config.get("cross_activation"): 

133 source_skill = source_skill or skill_id 

134 if skill_id != source_skill: 

135 # different skill is trying to activate this skill 

136 return False 

137 

138 # mode of activation dictates under what conditions a skill is 

139 # allowed to activate itself 

140 acmode = self.config.get("converse_activation") or \ 

141 ConverseActivationMode.ACCEPT_ALL 

142 if acmode == ConverseActivationMode.PRIORITY: 

143 prio = self.config.get("converse_priorities") or {} 

144 # only allowed to activate if no skill with higher priority is 

145 # active, currently there is no api for skills to 

146 # define their default priority, this is a user/developer setting 

147 priority = prio.get(skill_id, 50) 

148 if any(p > priority for p in 

149 [prio.get(s, 50) for s in self.get_active_skills()]): 

150 return False 

151 elif acmode == ConverseActivationMode.BLACKLIST: 

152 if skill_id in self.config.get("converse_blacklist", []): 

153 return False 

154 elif acmode == ConverseActivationMode.WHITELIST: 

155 if skill_id not in self.config.get("converse_whitelist", []): 

156 return False 

157 

158 # limit of consecutive activations 

159 default_max = self.config.get("max_activations", -1) 

160 # per skill override limit of consecutive activations 

161 skill_max = self.config.get("skill_activations", {}).get(skill_id) 

162 max_activations = skill_max or default_max 

163 if skill_id not in self._consecutive_activations: 

164 self._consecutive_activations[skill_id] = 0 

165 if max_activations < 0: 

166 pass # no limit (mycroft-core default) 

167 elif max_activations == 0: 

168 return False # skill activation disabled 

169 elif self._consecutive_activations.get(skill_id, 0) > max_activations: 

170 return False # skill exceeded authorized consecutive number of activations 

171 return True 

172 

173 def _deactivate_allowed(self, skill_id: str, source_skill: Optional[str] = None) -> bool: 

174 """Checks if a skill_id is allowed to be removed from active skills list 

175 

176 - can a skill deactivate a different skill 

177 

178 Args: 

179 skill_id (str): identifier of skill to be added. 

180 source_skill (str): skill requesting the removal 

181 

182 Returns: 

183 permitted (bool): True if skill can be deactivated 

184 """ 

185 # cross activation control if skills can deactivate each other 

186 if not self.config.get("cross_activation"): 

187 source_skill = source_skill or skill_id 

188 if skill_id != source_skill: 

189 # different skill is trying to deactivate this skill 

190 return False 

191 return True 

192 

193 def _converse_allowed(self, skill_id: str) -> bool: 

194 """Checks if a skill_id is allowed to converse 

195 

196 - is the skill blacklisted from conversing 

197 - is converse configured to only allow specific skills 

198 

199 Args: 

200 skill_id (str): identifier of skill that wants to converse. 

201 

202 Returns: 

203 permitted (bool): True if skill can converse 

204 """ 

205 opmode = self.config.get("converse_mode", 

206 ConverseMode.ACCEPT_ALL) 

207 if opmode == ConverseMode.BLACKLIST and skill_id in \ 

208 self.config.get("converse_blacklist", []): 

209 return False 

210 elif opmode == ConverseMode.WHITELIST and skill_id not in \ 

211 self.config.get("converse_whitelist", []): 

212 return False 

213 return True 

214 

215 def _collect_converse_skills(self, message: Message) -> List[str]: 

216 """use the messagebus api to determine which skills want to converse 

217 

218 Individual skills respond to this request via the `can_converse` method""" 

219 skill_ids = [] 

220 want_converse = [] 

221 session = SessionManager.get(message) 

222 

223 # note: this is sorted by priority already 

224 active_skills = [skill_id for skill_id in self.get_active_skills(message) 

225 if session.utterance_states.get(skill_id, UtteranceState.INTENT) == UtteranceState.INTENT] 

226 if not active_skills: 

227 return want_converse 

228 

229 event = Event() 

230 

231 def handle_ack(msg): 

232 nonlocal event 

233 skill_id = msg.data["skill_id"] 

234 

235 # validate the converse pong 

236 if all((skill_id not in want_converse, 

237 msg.data.get("can_handle", True), 

238 skill_id in active_skills)): 

239 want_converse.append(skill_id) 

240 

241 if skill_id not in skill_ids: # track which answer we got 

242 skill_ids.append(skill_id) 

243 

244 if all(s in skill_ids for s in active_skills): 

245 # all skills answered the ping! 

246 event.set() 

247 

248 self.bus.on("skill.converse.pong", handle_ack) 

249 

250 # ask skills if they want to converse 

251 for skill_id in active_skills: 

252 self.bus.emit(message.forward(f"{skill_id}.converse.ping", {**message.data, "skill_id": skill_id})) 

253 

254 # wait for all skills to acknowledge they want to converse 

255 event.wait(timeout=0.5) 

256 

257 self.bus.remove("skill.converse.pong", handle_ack) 

258 return want_converse 

259 

260 def _check_converse_timeout(self, message: Message): 

261 """ filter active skill list based on timestamps """ 

262 timeouts = self.config.get("skill_timeouts") or {} 

263 def_timeout = self.config.get("timeout", 300) 

264 session = SessionManager.get(message) 

265 session.active_skills = [ 

266 skill for skill in session.active_skills 

267 if time.time() - skill[1] <= timeouts.get(skill[0], def_timeout)] 

268 

269 def match(self, utterances: List[str], lang: str, message: Message) -> Optional[IntentHandlerMatch]: 

270 """ 

271 Attempt to converse with active skills for a given set of utterances. 

272 

273 Iterates through active skills to find one that can handle the utterance. Filters skills based on timeout and blacklist status. 

274 

275 Args: 

276 utterances (List[str]): List of utterance strings to process 

277 lang (str): 4-letter ISO language code for the utterances 

278 message (Message): Message context for generating a reply 

279 

280 Returns: 

281 PipelineMatch: Match details if a skill successfully handles the utterance, otherwise None 

282 - handled (bool): Whether the utterance was fully handled 

283 - match_data (dict): Additional match metadata 

284 - skill_id (str): ID of the skill that handled the utterance 

285 - updated_session (Session): Current session state after skill interaction 

286 - utterance (str): The original utterance processed 

287 

288 Notes: 

289 - Standardizes language tag 

290 - Filters out blacklisted skills 

291 - Checks for skill conversation timeouts 

292 - Attempts conversation with each eligible skill 

293 """ 

294 lang = standardize_lang_tag(lang) 

295 session = SessionManager.get(message) 

296 

297 # we call flatten in case someone is sending the old style list of tuples 

298 utterances = flatten_list(utterances) 

299 

300 # note: this is sorted by priority already 

301 gr_skills = [skill_id for skill_id in self.get_active_skills(message) 

302 if session.utterance_states.get(skill_id, UtteranceState.INTENT) == UtteranceState.RESPONSE] 

303 

304 # check if any skill wants to capture utterance for self.get_response method 

305 for skill_id in gr_skills: 

306 if skill_id in session.blacklisted_skills: 

307 LOG.debug(f"ignoring match, skill_id '{skill_id}' blacklisted by Session '{session.session_id}'") 

308 continue 

309 LOG.debug(f"utterance captured by skill.get_response method: {skill_id}") 

310 return IntentHandlerMatch( 

311 match_type=f"{skill_id}.converse.get_response", 

312 match_data={"utterances": utterances, "lang": lang}, 

313 skill_id=skill_id, 

314 utterance=utterances[0], 

315 updated_session=session 

316 ) 

317 

318 # filter allowed skills 

319 self._check_converse_timeout(message) 

320 

321 # check if any skill wants to converse 

322 for skill_id in self._collect_converse_skills(message): 

323 if skill_id in session.blacklisted_skills: 

324 LOG.debug(f"ignoring match, skill_id '{skill_id}' blacklisted by Session '{session.session_id}'") 

325 continue 

326 LOG.debug(f"Attempting to converse with skill: {skill_id}") 

327 if self._converse_allowed(skill_id): 

328 return IntentHandlerMatch( 

329 match_type="converse:skill", 

330 match_data={"utterances": utterances, "lang": lang, "skill_id": skill_id}, 

331 skill_id=skill_id, 

332 utterance=utterances[0], 

333 updated_session=session 

334 ) 

335 

336 return None 

337 

338 @staticmethod 

339 def handle_get_response_enable(message: Message): 

340 skill_id = message.data["skill_id"] 

341 session = SessionManager.get(message) 

342 session.enable_response_mode(skill_id) 

343 if session.session_id == "default": 

344 SessionManager.sync(message) 

345 

346 @staticmethod 

347 def handle_get_response_disable(message: Message): 

348 skill_id = message.data["skill_id"] 

349 session = SessionManager.get(message) 

350 session.disable_response_mode(skill_id) 

351 if session.session_id == "default": 

352 SessionManager.sync(message) 

353 

354 def handle_activate_skill_request(self, message: Message): 

355 # TODO imperfect solution - only a skill can activate itself 

356 # someone can forge this message and emit it raw, but in OpenVoiceOS all 

357 # skill messages should have skill_id in context, so let's make sure 

358 # this doesnt happen accidentally at very least 

359 skill_id = message.data['skill_id'] 

360 source_skill = message.context.get("skill_id") 

361 self.activate_skill(skill_id, source_skill, message) 

362 sess = SessionManager.get(message) 

363 if sess.session_id == "default": 

364 SessionManager.sync(message) 

365 

366 def handle_deactivate_skill_request(self, message: Message): 

367 # TODO imperfect solution - only a skill can deactivate itself 

368 # someone can forge this message and emit it raw, but in ovos-core all 

369 # skill message should have skill_id in context, so let's make sure 

370 # this doesnt happen accidentally 

371 skill_id = message.data['skill_id'] 

372 source_skill = message.context.get("skill_id") or skill_id 

373 self.deactivate_skill(skill_id, source_skill, message) 

374 sess = SessionManager.get(message) 

375 if sess.session_id == "default": 

376 SessionManager.sync(message) 

377 

378 def handle_get_active_skills(self, message: Message): 

379 """Send active skills to caller. 

380 

381 Argument: 

382 message: query message to reply to. 

383 """ 

384 self.bus.emit(message.reply("intent.service.active_skills.reply", 

385 {"skills": self.get_active_skills(message)})) 

386 

387 def shutdown(self): 

388 self.bus.remove("converse:skill", self.handle_converse) 

389 self.bus.remove('intent.service.skills.deactivate', self.handle_deactivate_skill_request) 

390 self.bus.remove('intent.service.skills.activate', self.handle_activate_skill_request) 

391 self.bus.remove('intent.service.active_skills.get', self.handle_get_active_skills) 

392 self.bus.remove("skill.converse.get_response.enable", self.handle_get_response_enable) 

393 self.bus.remove("skill.converse.get_response.disable", self.handle_get_response_disable)