#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Action management system for conversation flows.
This module provides the ActionManager class which handles execution of actions
during conversation state transitions. It supports:
- Built-in actions (TTS, conversation ending)
- Custom action registration
- Synchronous and asynchronous handlers
- Pre and post-transition actions
- Error handling and validation
Actions are used to perform side effects during conversations, such as:
- Text-to-speech output
- Database updates
- External API calls
- Custom integrations
"""
import asyncio
import inspect
import warnings
from collections.abc import Callable
from dataclasses import dataclass
from typing import TYPE_CHECKING
from loguru import logger
from pipecat.frames.frames import (
BotStoppedSpeakingFrame,
ControlFrame,
EndFrame,
TTSSpeakFrame,
)
from pipecat.pipeline.worker import PipelineWorker
from pipecat_flows.exceptions import ActionError
from pipecat_flows.types import ActionConfig, FlowActionHandler
if TYPE_CHECKING:
from pipecat_flows.manager import FlowManager
[docs]
@dataclass
class FunctionActionFrame(ControlFrame):
"""Frame containing a function action to be executed.
Parameters:
action: Action configuration dictionary.
function: Function handler to execute.
"""
action: dict
function: FlowActionHandler
[docs]
@dataclass
class ActionFinishedFrame(ControlFrame):
"""Frame indicating that an action has completed execution."""
pass
[docs]
class ActionManager:
"""Manages the registration and execution of flow actions.
Actions are executed during state transitions and can include:
- Text-to-speech output
- Database updates
- External API calls
- Custom user-defined actions
Built-in actions:
- tts_say: Speak text using TTS
- end_conversation: End the current conversation
- function: Execute inline functions in the pipeline
Custom actions can be registered using register_action().
"""
[docs]
def __init__(self, worker: PipelineWorker, flow_manager: "FlowManager"):
"""Initialize the action manager.
Args:
worker: PipelineWorker instance used to queue frames.
flow_manager: FlowManager instance that this ActionManager is part of.
"""
self._action_handlers: dict[str, Callable] = {}
self._worker = worker
self._flow_manager = flow_manager
self._ongoing_actions_count = 0
self._ongoing_actions_finished_event = asyncio.Event()
self._deferred_post_actions: list[ActionConfig] = []
self._showed_deprecation_warning_for_legacy_action_handler = False
# Register built-in actions
self._register_action("tts_say", self._handle_tts_action)
self._register_action("end_conversation", self._handle_end_action)
self._register_action("function", self._handle_function_action)
# Add pipeline observation
worker.set_reached_downstream_filter(
(ActionFinishedFrame, FunctionActionFrame, BotStoppedSpeakingFrame)
)
@worker.event_handler("on_frame_reached_downstream")
async def on_frame_reached_downstream(worker, frame):
if isinstance(frame, FunctionActionFrame):
# Run function action
await frame.function(frame.action, flow_manager)
self._decrement_ongoing_actions_count()
elif isinstance(frame, BotStoppedSpeakingFrame):
# Execute deferred post-actions if the bot's turn is over.
# A BotStoppedSpeakingFrame only indicates that the bot's turn is over if there are
# no ongoing actions (otherwise one of those actions may have been responsible for it).
if self._ongoing_actions_count == 0:
await self._execute_deferred_post_actions()
elif isinstance(frame, ActionFinishedFrame):
# Handle action finished
self._decrement_ongoing_actions_count()
def _register_action(self, action_type: str, handler: Callable) -> None:
"""Register a handler for a specific action type.
Args:
action_type: String identifier for the action (e.g., "tts_say").
handler: Async or sync function that handles the action.
Raises:
ValueError: If handler is not callable.
"""
if not callable(handler):
raise ValueError("Action handler must be callable")
self._action_handlers[action_type] = handler
logger.debug(f"Registered handler for action type: {action_type}")
[docs]
async def execute_actions(self, actions: list[ActionConfig] | None) -> None:
"""Execute a list of actions.
Args:
actions: List of action configurations to execute.
Raises:
ActionError: If action execution fails.
Note:
Each action must have a 'type' field matching a registered handler.
"""
if not actions:
return
previous_action_type = None
for action in actions:
action_type = action.get("type")
if not action_type:
raise ActionError("Action missing required 'type' field")
handler = self._action_handlers.get(action_type)
if not handler:
raise ActionError(f"No handler registered for action type: {action_type}")
ongoing_actions_count = self._ongoing_actions_count
try:
# Based on the type of the previous action and the one coming up, we can determine
# if we need to wait for ongoing actions to finish before proceeding with this next
# one
await self._maybe_wait_for_ongoing_actions_to_finish(
previous_action_type, action_type
)
# Determine if handler can accept flow_manager argument by inspecting its signature
# Handlers can either take (action) or (action, flow_manager)
try:
sig = inspect.signature(handler)
can_handle_flow_manager_arg = len(sig.parameters) > 1
except (ValueError, TypeError):
logger.warning(
f"Unable to determine handler signature for action type '{action_type}', "
"falling back to legacy single-parameter call"
)
can_handle_flow_manager_arg = False
# Invoke handler appropriately, with async and flow_manager arg as needed
if can_handle_flow_manager_arg:
if asyncio.iscoroutinefunction(handler):
await handler(action, self._flow_manager)
else:
handler(action, self._flow_manager)
else:
if not self._showed_deprecation_warning_for_legacy_action_handler:
self._showed_deprecation_warning_for_legacy_action_handler = True
warnings.warn(
"Single-argument (legacy) action handlers are deprecated "
"and will be removed in 2.0.0. Update handlers to accept "
"(action: dict, flow_manager: FlowManager) instead.",
DeprecationWarning,
stacklevel=2,
)
if asyncio.iscoroutinefunction(handler):
await handler(action)
else:
handler(action)
# Record the type of the action we just executed
previous_action_type = action_type
logger.debug(f"Successfully executed action: {action_type}")
# If action was end_conversation, break
# (If we didn't, we could end up waiting for the next actions to finish, and...they
# never would)
if action_type == "end_conversation":
break
except Exception as e:
# Undo any increment of ongoing actions count that happened during this action
if self._ongoing_actions_count > ongoing_actions_count:
self._decrement_ongoing_actions_count() # Assumption: on increment per action
raise ActionError(f"Failed to execute action {action_type}: {str(e)}") from e
# Based on the type of the last action, we may need to wait for ongoing actions to finish
# before considering this set of actions complete.
await self._maybe_wait_for_ongoing_actions_to_finish(previous_action_type, None)
[docs]
def schedule_deferred_post_actions(self, post_actions: list[ActionConfig]) -> None:
"""Schedule "deferred" post-actions to be executed after next LLM completion.
Args:
post_actions: List of actions to execute after LLM response.
"""
self._deferred_post_actions = post_actions
[docs]
def clear_deferred_post_actions(self) -> None:
"""Clear any scheduled deferred post-actions."""
self._deferred_post_actions = []
async def _execute_deferred_post_actions(self) -> None:
"""Execute deferred post-actions."""
actions = self._deferred_post_actions
self._deferred_post_actions = []
if actions:
await self.execute_actions(actions)
async def _maybe_wait_for_ongoing_actions_to_finish(
self, previous_action_type: str | None, upcoming_action_type: str | None
) -> None:
"""Wait for ongoing actions to finish before executing the next action if needed.
This method determines whether to wait based on the types of the previous
and upcoming actions to avoid the upcoming action having an effect before
the previous one is done.
Args:
previous_action_type: Type of the previously executed action, or None
if this is the start of the action sequence.
upcoming_action_type: Type of the next action to execute, or None if
this is the end of the action sequence.
"""
needs_wait = False
if previous_action_type == "tts_say":
# "tts_say" enqueues a TTSSpeakFrame, which has an effect when it hits the TTS node in
# the pipeline.
# As long as the upcoming action enqueues a frame with an effect at the same point or
# later in the pipeline, we don't need to wait.
# If the upcoming action is:
# - "tts_say": no need to wait (effect happens at the same point)
# - "end_conversation": no need to wait (effect happens at the end of the pipeline)
# - "function": no need to wait (effect happens at the end of the pipeline)
# - None: wait (we're done with this set of actions; the next thing to occur may be a
# node change/LLM context update, which has an effect earlier in the pipeline)
# - custom action: wait (we don't know what it will do)
if upcoming_action_type not in ["tts_say", "end_conversation", "function"]:
needs_wait = True # None or custom action
elif previous_action_type == "function":
# "function" enqueues a FunctionActionFrame, which has an effect at the end of the
# pipeline.
# Functions can take some time to execute (and don't hold up the pipeline as they're
# doing so), so we need to wait for them to finish before proceeding with the next
# action or moving on from the current set of actions.
needs_wait = True
else:
# Either previous action was:
# - None (the upcoming action is the first one), so there's nothing to wait for.
# - A fully custom action, where we don't wait, like we've always done. Note that we
# could, in the future, add new API affordances for users to tell us to wait for the
# the action to finish before moving on to the next one along with a way for them to
# tell us when the action is done. But let's hold off on doing that since we're
# de-emphasizing custom actions in favor of "function" actions, which should meet most
# needs.
# Note that it should not be possible for the previous action to be "end_conversation",
# since we stop processing actions after that one.
pass
if needs_wait:
await self._ongoing_actions_finished_event.wait()
async def _handle_tts_action(self, action: dict) -> None:
"""Built-in handler for TTS actions.
Args:
action: Action configuration containing 'text' to speak.
"""
text = action.get("text")
if not text:
logger.error("TTS action missing 'text' field")
return
try:
# Mark that we're starting the action
self._increment_ongoing_actions_count()
# Queue the action frame
await self._worker.queue_frame(TTSSpeakFrame(text=text))
# Queue a frame marking the end of the action
await self._worker.queue_frame(ActionFinishedFrame())
except Exception as e:
self._decrement_ongoing_actions_count()
logger.error(f"TTS error: {e}")
async def _handle_end_action(self, action: dict) -> None:
"""Built-in handler for ending the conversation.
This handler queues an EndFrame to terminate the conversation. If the action
includes a 'text' key, it will queue that text to be spoken before ending.
Args:
action: Action configuration dictionary. Optional 'text' key for a
goodbye message.
"""
# Mark that we're starting the action
self._increment_ongoing_actions_count()
# Queue the action frames
if action.get("text"): # Optional goodbye message
await self._worker.queue_frame(TTSSpeakFrame(text=action["text"]))
await self._worker.queue_frame(EndFrame())
# NOTE: there's no point queueing an ActionFinishedFrame here, since the previously-queued
# EndFrame ensures that it'll never get delivered to our observer
async def _handle_function_action(self, action: dict) -> None:
"""Built-in handler for queuing functions to run inline in the pipeline.
This handler queues a FunctionActionFrame to be executed when the pipeline
is done with all the work queued before it. It expects a 'handler' key in
the action containing the function to execute.
Args:
action: Action configuration dictionary. Required 'handler' key
containing the function to execute.
"""
handler = action.get("handler")
if not handler:
logger.error("Function action missing 'handler' field")
return
# Mark that we're starting the action
self._increment_ongoing_actions_count()
# Queue the action frame (we're queueing rather than running it here to ensure it happens
# at the appropriate time in the pipeline, like when the bot's turn is over, for example).
await self._worker.queue_frame(FunctionActionFrame(action=action, function=handler))
# NOTE: we do NOT queue an ActionFinishedFrame here; instead, we will decrement the ongoing
# actions count when the function has finished executing (the function may take some time)
def _increment_ongoing_actions_count(self) -> None:
"""Increment the count of ongoing actions and reset the finished event if this is the first action."""
self._ongoing_actions_count += 1
if self._ongoing_actions_count == 1:
self._ongoing_actions_finished_event.clear()
def _decrement_ongoing_actions_count(self) -> None:
"""Decrement the count of ongoing actions and set the finished event if this was the last action."""
self._ongoing_actions_count = max(0, self._ongoing_actions_count - 1)
if self._ongoing_actions_count == 0:
self._ongoing_actions_finished_event.set()