Manager

Core conversation flow management system.

This module provides the FlowManager class which orchestrates conversations across different LLM providers. It supports:

  • Flows with runtime-determined transitions

  • State management and transitions

  • Function registration and execution

  • Action handling

  • Cross-provider compatibility

The flow manager coordinates all aspects of a conversation, including:

  • LLM context management

  • Function registration

  • State transitions

  • Action execution

  • Error handling

class pipecat_flows.manager.FlowManager(*, llm: LLMService | LLMSwitcher, context_aggregator: Any, worker: PipelineWorker | None = None, task: PipelineWorker | None = None, context_strategy: ContextStrategyConfig | None = None, transport: BaseTransport | None = None, global_functions: list[FlowsFunctionSchema | Callable[[...], Awaitable[tuple[Any, NodeConfig | None]]]] | None = None)[source]

Bases: object

Manages conversation flows.

The FlowManager orchestrates conversation flows by managing state transitions, function registration, and message handling across different LLM providers, with comprehensive action handling and error management.

The manager coordinates all aspects of a conversation including LLM context management, function registration, state transitions, and action execution.

__init__(*, llm: LLMService | LLMSwitcher, context_aggregator: Any, worker: PipelineWorker | None = None, task: PipelineWorker | None = None, context_strategy: ContextStrategyConfig | None = None, transport: BaseTransport | None = None, global_functions: list[FlowsFunctionSchema | Callable[[...], Awaitable[tuple[Any, NodeConfig | None]]]] | None = None)[source]

Initialize the flow manager.

Parameters:
  • llm – LLM service or LLMSwitcher.

  • context_aggregator – Context aggregator for updating user context.

  • worker – PipelineWorker instance for queueing frames.

  • task

    PipelineWorker instance for queueing frames.

    Deprecated since version 1.2.0: Use worker instead. task will be removed in a future release.

  • context_strategy – Context strategy configuration for managing conversation context during transitions.

  • transport – Transport instance for communication.

  • global_functions – Optional list of FlowsFunctionSchemas or FlowsDirectFunctions that will be available at every node. These functions are registered once during initialization and automatically included alongside node-specific functions.

property state: dict[str, Any]

Access the shared state dictionary across nodes.

This property provides access to a persistent dictionary that maintains data across node transitions. It can be used to store and retrieve conversation state, user preferences, or any other data that needs to persist throughout the flow.

Returns:

The shared state dictionary that can be used for

reading and writing state data.

Return type:

Dict[str, Any]

Examples

Setting state:

flow_manager.state["user_name"] = "Alice"
flow_manager.state["age"] = 25

Getting state:

name = flow_manager.state.get("user_name", "Unknown")
age = flow_manager.state["age"]

Checking for state:

if "user_preferences" in flow_manager.state:
    preferences = flow_manager.state["user_preferences"]
property transport: BaseTransport | None

Access the transport instance used for communication.

This property provides access to the transport instance that handles communication with the client (e.g., DailyTransport for Daily rooms). The transport can be used to interact with participants, manage audio/video settings, or access platform-specific features.

Returns:

The transport instance if provided during

initialization, None otherwise.

Return type:

Optional[BaseTransport]

Examples

Accessing transport in action handlers:

async def mute_participant(action: dict, flow_manager: FlowManager):
    transport = flow_manager.transport
    if transport and hasattr(transport, 'update_participant'):
        await transport.update_participant(participant_id, {"canSnd": False})

Working with Daily transport features:

async def get_room_info(action: dict, flow_manager: FlowManager):
    transport = flow_manager.transport
    if isinstance(transport, DailyTransport):
        participants = transport.participants()
        return {"participant_count": len(participants)}
property current_node: str | None

Access the identifier of the currently active conversation node.

This property provides access to the current node name/identifier in the conversation flow. It can be used to make decisions based on the current state of the conversation, implement conditional logic, or for debugging and logging purposes.

Returns:

The identifier of the current node if a node is active,

None if no node has been set or before initialization.

Return type:

Optional[str]

Examples

Conditional logic based on current node:

async def participant_joined(action: dict, flow_manager: FlowManager):
    current = flow_manager.current_node
    if current == "transferring_to_human_agent":
        await start_human_agent_interaction(flow_manager)
    elif current == "collecting_payment":
        await setup_secure_session(flow_manager)

Logging and debugging:

async def log_conversation_state(action: dict, flow_manager: FlowManager):
    node = flow_manager.current_node
    logger.info(f"Current conversation node: {node}")
    return {"current_node": node}
property worker: PipelineWorker

Access the pipeline worker instance for frame queueing.

This property provides access to the PipelineWorker instance used by the FlowManager. The worker can be used to queue custom frames directly into the pipeline, enabling advanced flow control and custom frame injection.

Returns:

The pipeline worker instance used for frame processing

and queueing operations.

Return type:

PipelineWorker

Examples

Queueing frames in handlers:

async def send_custom_notification(action: dict, flow_manager: FlowManager):
    from pipecat.frames.frames import TTSUpdateSettingsFrame

    # Queue a TTS settings update frame
    await flow_manager.worker.queue_frame(
        TTSUpdateSettingsFrame(settings={"voice": "your-new-voice-id"})
    )
property task: PipelineWorker

Access the pipeline worker instance for frame queueing.

Deprecated since version 1.2.0: Use worker instead. This property will be removed in a future release.

Returns:

The pipeline worker instance used for frame processing

and queueing operations.

Return type:

PipelineWorker

async initialize(initial_node: NodeConfig | None = None) None[source]

Initialize the flow manager.

Parameters:

initial_node – Optional initial node configuration. If provided, the flow will start at this node immediately.

Raises:

FlowInitializationError – If initialization fails.

Examples

Initialize with an initial node:

flow_manager = FlowManager(
    ... # Initialization parameters
)
await flow_manager.initialize(create_initial_node())

Initialize without an initial node (set later via set_node_from_config):

flow_manager = FlowManager(
    ... # Initialization parameters
)
await flow_manager.initialize()
get_current_context() list[dict][source]

Get the current conversation context.

Returns:

List of messages in the current context, including system messages, user messages, and assistant responses.

Raises:

FlowError – If context aggregator is not available.

register_action(action_type: str, handler: Callable) None[source]

Register a handler for a specific action type.

Parameters:
  • action_type – String identifier for the action (e.g., “tts_say”).

  • handler – Async or sync function that handles the action.

Example:

async def custom_notification(action: dict):
    text = action.get("text", "")
    await notify_user(text)

flow_manager.register_action("notify", custom_notification)
async set_node_from_config(node_config: NodeConfig) None[source]

Set up a new conversation node and transition to it.

Used to manually transition between nodes in a flow.

Parameters:

node_config – Configuration for the new node.

Raises:
  • FlowTransitionError – If manager not initialized.

  • FlowError – If node setup fails.