Manager
Core conversation flow management system.
This module provides the FlowManager class which orchestrates conversations across different LLM providers. It supports:
Flows with runtime-determined transitions
State management and transitions
Function registration and execution
Action handling
Cross-provider compatibility
The flow manager coordinates all aspects of a conversation, including:
LLM context management
Function registration
State transitions
Action execution
Error handling
- class pipecat_flows.manager.FlowManager(*, llm: LLMService | LLMSwitcher, context_aggregator: Any, worker: PipelineWorker | None = None, task: PipelineWorker | None = None, context_strategy: ContextStrategyConfig | None = None, transport: BaseTransport | None = None, global_functions: list[FlowsFunctionSchema | Callable[[...], Awaitable[tuple[Any, NodeConfig | None]]]] | None = None)[source]
Bases:
objectManages conversation flows.
The FlowManager orchestrates conversation flows by managing state transitions, function registration, and message handling across different LLM providers, with comprehensive action handling and error management.
The manager coordinates all aspects of a conversation including LLM context management, function registration, state transitions, and action execution.
- __init__(*, llm: LLMService | LLMSwitcher, context_aggregator: Any, worker: PipelineWorker | None = None, task: PipelineWorker | None = None, context_strategy: ContextStrategyConfig | None = None, transport: BaseTransport | None = None, global_functions: list[FlowsFunctionSchema | Callable[[...], Awaitable[tuple[Any, NodeConfig | None]]]] | None = None)[source]
Initialize the flow manager.
- Parameters:
llm – LLM service or LLMSwitcher.
context_aggregator – Context aggregator for updating user context.
worker – PipelineWorker instance for queueing frames.
task –
PipelineWorker instance for queueing frames.
Deprecated since version 1.2.0: Use
workerinstead.taskwill be removed in a future release.context_strategy – Context strategy configuration for managing conversation context during transitions.
transport – Transport instance for communication.
global_functions – Optional list of FlowsFunctionSchemas or FlowsDirectFunctions that will be available at every node. These functions are registered once during initialization and automatically included alongside node-specific functions.
- property state: dict[str, Any]
Access the shared state dictionary across nodes.
This property provides access to a persistent dictionary that maintains data across node transitions. It can be used to store and retrieve conversation state, user preferences, or any other data that needs to persist throughout the flow.
- Returns:
- The shared state dictionary that can be used for
reading and writing state data.
- Return type:
Dict[str, Any]
Examples
Setting state:
flow_manager.state["user_name"] = "Alice" flow_manager.state["age"] = 25
Getting state:
name = flow_manager.state.get("user_name", "Unknown") age = flow_manager.state["age"]
Checking for state:
if "user_preferences" in flow_manager.state: preferences = flow_manager.state["user_preferences"]
- property transport: BaseTransport | None
Access the transport instance used for communication.
This property provides access to the transport instance that handles communication with the client (e.g., DailyTransport for Daily rooms). The transport can be used to interact with participants, manage audio/video settings, or access platform-specific features.
- Returns:
- The transport instance if provided during
initialization, None otherwise.
- Return type:
Optional[BaseTransport]
Examples
Accessing transport in action handlers:
async def mute_participant(action: dict, flow_manager: FlowManager): transport = flow_manager.transport if transport and hasattr(transport, 'update_participant'): await transport.update_participant(participant_id, {"canSnd": False})
Working with Daily transport features:
async def get_room_info(action: dict, flow_manager: FlowManager): transport = flow_manager.transport if isinstance(transport, DailyTransport): participants = transport.participants() return {"participant_count": len(participants)}
- property current_node: str | None
Access the identifier of the currently active conversation node.
This property provides access to the current node name/identifier in the conversation flow. It can be used to make decisions based on the current state of the conversation, implement conditional logic, or for debugging and logging purposes.
- Returns:
- The identifier of the current node if a node is active,
None if no node has been set or before initialization.
- Return type:
Optional[str]
Examples
Conditional logic based on current node:
async def participant_joined(action: dict, flow_manager: FlowManager): current = flow_manager.current_node if current == "transferring_to_human_agent": await start_human_agent_interaction(flow_manager) elif current == "collecting_payment": await setup_secure_session(flow_manager)
Logging and debugging:
async def log_conversation_state(action: dict, flow_manager: FlowManager): node = flow_manager.current_node logger.info(f"Current conversation node: {node}") return {"current_node": node}
- property worker: PipelineWorker
Access the pipeline worker instance for frame queueing.
This property provides access to the PipelineWorker instance used by the FlowManager. The worker can be used to queue custom frames directly into the pipeline, enabling advanced flow control and custom frame injection.
- Returns:
- The pipeline worker instance used for frame processing
and queueing operations.
- Return type:
PipelineWorker
Examples
Queueing frames in handlers:
async def send_custom_notification(action: dict, flow_manager: FlowManager): from pipecat.frames.frames import TTSUpdateSettingsFrame # Queue a TTS settings update frame await flow_manager.worker.queue_frame( TTSUpdateSettingsFrame(settings={"voice": "your-new-voice-id"}) )
- property task: PipelineWorker
Access the pipeline worker instance for frame queueing.
Deprecated since version 1.2.0: Use
workerinstead. This property will be removed in a future release.- Returns:
- The pipeline worker instance used for frame processing
and queueing operations.
- Return type:
PipelineWorker
- async initialize(initial_node: NodeConfig | None = None) None[source]
Initialize the flow manager.
- Parameters:
initial_node – Optional initial node configuration. If provided, the flow will start at this node immediately.
- Raises:
FlowInitializationError – If initialization fails.
Examples
Initialize with an initial node:
flow_manager = FlowManager( ... # Initialization parameters ) await flow_manager.initialize(create_initial_node())
Initialize without an initial node (set later via set_node_from_config):
flow_manager = FlowManager( ... # Initialization parameters ) await flow_manager.initialize()
- get_current_context() list[dict][source]
Get the current conversation context.
- Returns:
List of messages in the current context, including system messages, user messages, and assistant responses.
- Raises:
FlowError – If context aggregator is not available.
- register_action(action_type: str, handler: Callable) None[source]
Register a handler for a specific action type.
- Parameters:
action_type – String identifier for the action (e.g., “tts_say”).
handler – Async or sync function that handles the action.
Example:
async def custom_notification(action: dict): text = action.get("text", "") await notify_user(text) flow_manager.register_action("notify", custom_notification)
- async set_node_from_config(node_config: NodeConfig) None[source]
Set up a new conversation node and transition to it.
Used to manually transition between nodes in a flow.
- Parameters:
node_config – Configuration for the new node.
- Raises:
FlowTransitionError – If manager not initialized.
FlowError – If node setup fails.