Source code for pipecat_flows.manager

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""Core conversation flow management system.

This module provides the FlowManager class which orchestrates
conversations across different LLM providers. It supports:

- Flows with runtime-determined transitions
- State management and transitions
- Function registration and execution
- Action handling
- Cross-provider compatibility

The flow manager coordinates all aspects of a conversation, including:

- LLM context management
- Function registration
- State transitions
- Action execution
- Error handling
"""

import asyncio
import inspect
import warnings
from collections.abc import Callable
from typing import Any, cast

from loguru import logger
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.frames.frames import (
    FunctionCallResultProperties,
    LLMMessagesAppendFrame,
    LLMMessagesUpdateFrame,
    LLMRunFrame,
    LLMSetToolsFrame,
    LLMUpdateSettingsFrame,
)
from pipecat.pipeline.llm_switcher import LLMSwitcher
from pipecat.pipeline.worker import PipelineWorker
from pipecat.processors.aggregators.llm_context import LLMContext, NotGiven
from pipecat.services.llm_service import FunctionCallParams, LLMService
from pipecat.services.settings import LLMSettings
from pipecat.transports.base_transport import BaseTransport

from pipecat_flows.actions import ActionError, ActionManager
from pipecat_flows.adapters import LLMAdapter
from pipecat_flows.exceptions import (
    FlowError,
    FlowInitializationError,
    FlowTransitionError,
    InvalidFunctionError,
)
from pipecat_flows.types import (
    ActionConfig,
    ConsolidatedFunctionResult,
    ContextStrategy,
    ContextStrategyConfig,
    FlowArgs,
    FlowFunctionHandler,
    FlowResult,
    FlowsDirectFunction,
    FlowsDirectFunctionWrapper,
    FlowsFunctionSchema,
    FunctionHandler,
    LegacyFunctionHandler,
    NodeConfig,
    ZeroArgFunctionHandler,
    get_or_generate_node_name,
)


[docs] class FlowManager: """Manages conversation flows. The FlowManager orchestrates conversation flows by managing state transitions, function registration, and message handling across different LLM providers, with comprehensive action handling and error management. The manager coordinates all aspects of a conversation including LLM context management, function registration, state transitions, and action execution. """
[docs] def __init__( self, *, llm: LLMService | LLMSwitcher, context_aggregator: Any, worker: PipelineWorker | None = None, task: PipelineWorker | None = None, context_strategy: ContextStrategyConfig | None = None, transport: BaseTransport | None = None, global_functions: list[FlowsFunctionSchema | FlowsDirectFunction] | None = None, ): """Initialize the flow manager. Args: llm: LLM service or LLMSwitcher. context_aggregator: Context aggregator for updating user context. worker: PipelineWorker instance for queueing frames. task: PipelineWorker instance for queueing frames. .. deprecated:: 1.2.0 Use ``worker`` instead. ``task`` will be removed in a future release. context_strategy: Context strategy configuration for managing conversation context during transitions. transport: Transport instance for communication. global_functions: Optional list of FlowsFunctionSchemas or FlowsDirectFunctions that will be available at every node. These functions are registered once during initialization and automatically included alongside node-specific functions. """ if worker is not None and task is not None: raise ValueError("Pass either 'worker' or 'task' (deprecated), not both.") if task is not None: warnings.warn( "'task' is deprecated and will be removed in a future release; " "use 'worker' instead.", DeprecationWarning, stacklevel=2, ) worker = task if worker is None: raise ValueError("FlowManager requires a 'worker' (PipelineWorker).") self._worker = worker self._llm = llm self._action_manager = ActionManager(worker, flow_manager=self) self._adapter = LLMAdapter() self._initialized = False self._context_aggregator = context_aggregator self._pending_transition: dict[str, Any] | None = None self._context_strategy = context_strategy or ContextStrategyConfig( strategy=ContextStrategy.APPEND ) self._transport = transport self._global_functions = global_functions or [] self._state: dict[str, Any] = {} # Internal state storage self._current_functions: set[str] = set() # Track registered functions self._current_node: str | None = None self._showed_deprecation_warning_for_role_messages = False self._showed_deprecation_warning_for_reset_with_summary = False self._showed_deprecation_warning_for_zero_arg_handler = False self._showed_deprecation_warning_for_legacy_handler = False self._showed_deprecation_warning_for_task = False
@property def state(self) -> dict[str, Any]: """Access the shared state dictionary across nodes. This property provides access to a persistent dictionary that maintains data across node transitions. It can be used to store and retrieve conversation state, user preferences, or any other data that needs to persist throughout the flow. Returns: Dict[str, Any]: The shared state dictionary that can be used for reading and writing state data. Examples: Setting state:: flow_manager.state["user_name"] = "Alice" flow_manager.state["age"] = 25 Getting state:: name = flow_manager.state.get("user_name", "Unknown") age = flow_manager.state["age"] Checking for state:: if "user_preferences" in flow_manager.state: preferences = flow_manager.state["user_preferences"] """ return self._state @property def transport(self) -> BaseTransport | None: """Access the transport instance used for communication. This property provides access to the transport instance that handles communication with the client (e.g., DailyTransport for Daily rooms). The transport can be used to interact with participants, manage audio/video settings, or access platform-specific features. Returns: Optional[BaseTransport]: The transport instance if provided during initialization, None otherwise. Examples: Accessing transport in action handlers:: async def mute_participant(action: dict, flow_manager: FlowManager): transport = flow_manager.transport if transport and hasattr(transport, 'update_participant'): await transport.update_participant(participant_id, {"canSnd": False}) Working with Daily transport features:: async def get_room_info(action: dict, flow_manager: FlowManager): transport = flow_manager.transport if isinstance(transport, DailyTransport): participants = transport.participants() return {"participant_count": len(participants)} """ return self._transport @property def current_node(self) -> str | None: """Access the identifier of the currently active conversation node. This property provides access to the current node name/identifier in the conversation flow. It can be used to make decisions based on the current state of the conversation, implement conditional logic, or for debugging and logging purposes. Returns: Optional[str]: The identifier of the current node if a node is active, None if no node has been set or before initialization. Examples: Conditional logic based on current node:: async def participant_joined(action: dict, flow_manager: FlowManager): current = flow_manager.current_node if current == "transferring_to_human_agent": await start_human_agent_interaction(flow_manager) elif current == "collecting_payment": await setup_secure_session(flow_manager) Logging and debugging:: async def log_conversation_state(action: dict, flow_manager: FlowManager): node = flow_manager.current_node logger.info(f"Current conversation node: {node}") return {"current_node": node} """ return self._current_node @property def worker(self) -> PipelineWorker: """Access the pipeline worker instance for frame queueing. This property provides access to the PipelineWorker instance used by the FlowManager. The worker can be used to queue custom frames directly into the pipeline, enabling advanced flow control and custom frame injection. Returns: PipelineWorker: The pipeline worker instance used for frame processing and queueing operations. Examples: Queueing frames in handlers:: async def send_custom_notification(action: dict, flow_manager: FlowManager): from pipecat.frames.frames import TTSUpdateSettingsFrame # Queue a TTS settings update frame await flow_manager.worker.queue_frame( TTSUpdateSettingsFrame(settings={"voice": "your-new-voice-id"}) ) """ return self._worker @property def task(self) -> PipelineWorker: """Access the pipeline worker instance for frame queueing. .. deprecated:: 1.2.0 Use :attr:`worker` instead. This property will be removed in a future release. Returns: PipelineWorker: The pipeline worker instance used for frame processing and queueing operations. """ if not self._showed_deprecation_warning_for_task: self._showed_deprecation_warning_for_task = True warnings.warn( "'FlowManager.task' is deprecated and will be removed in a future release; " "use 'FlowManager.worker' instead.", DeprecationWarning, stacklevel=2, ) return self._worker
[docs] async def initialize(self, initial_node: NodeConfig | None = None) -> None: """Initialize the flow manager. Args: initial_node: Optional initial node configuration. If provided, the flow will start at this node immediately. Raises: FlowInitializationError: If initialization fails. Examples: Initialize with an initial node:: flow_manager = FlowManager( ... # Initialization parameters ) await flow_manager.initialize(create_initial_node()) Initialize without an initial node (set later via set_node_from_config):: flow_manager = FlowManager( ... # Initialization parameters ) await flow_manager.initialize() """ if self._initialized: logger.warning(f"{self.__class__.__name__} already initialized") return try: self._initialized = True logger.debug(f"Initialized {self.__class__.__name__}") # Set initial node if provided (otherwise initial node # will be set later via set_node_from_config()) if initial_node: node_name = get_or_generate_node_name(initial_node) logger.debug(f"Setting initial node: {node_name}") await self._set_node(node_name, initial_node) except Exception as e: self._initialized = False raise FlowInitializationError(f"Failed to initialize flow: {str(e)}") from e
[docs] def get_current_context(self) -> list[dict]: """Get the current conversation context. Returns: List of messages in the current context, including system messages, user messages, and assistant responses. Raises: FlowError: If context aggregator is not available. """ if not self._context_aggregator: raise FlowError("No context aggregator available") context = self._context_aggregator.user()._context return context.get_messages()
[docs] def register_action(self, action_type: str, handler: Callable) -> None: """Register a handler for a specific action type. Args: action_type: String identifier for the action (e.g., "tts_say"). handler: Async or sync function that handles the action. Example:: async def custom_notification(action: dict): text = action.get("text", "") await notify_user(text) flow_manager.register_action("notify", custom_notification) """ self._action_manager._register_action(action_type, handler)
def _register_action_from_config(self, action: ActionConfig) -> None: """Register an action handler from action configuration. Args: action: Action configuration dictionary containing type and optional handler. Raises: ActionError: If action type is not registered and no valid handler provided. """ action_type = action.get("type") handler = action.get("handler") # Register action if not already registered if action_type and action_type not in self._action_manager._action_handlers: # Register handler if provided if handler and callable(handler): self.register_action(action_type, handler) logger.debug(f"Registered action handler from config: {action_type}") else: raise ActionError( f"Action '{action_type}' not registered. " "Provide handler in action config or register manually." ) async def _call_handler( self, handler: FunctionHandler, args: FlowArgs ) -> FlowResult | ConsolidatedFunctionResult: """Call handler with appropriate parameters based on its signature. Detects whether the handler can accept a flow_manager parameter and calls it accordingly to maintain backward compatibility with legacy handlers. Args: handler: The function handler to call (either legacy or modern format). args: Arguments dictionary from the function call. Returns: The result returned by the handler. """ # Get the function signature sig = inspect.signature(handler) # Calculate effective parameter count effective_param_count = len(sig.parameters) # Handle different function signatures. inspect.signature has already # proven the shape, so each cast narrows the union to the branch we know # we're in. if effective_param_count == 0: if not self._showed_deprecation_warning_for_zero_arg_handler: self._showed_deprecation_warning_for_zero_arg_handler = True warnings.warn( "Zero-argument function handlers are deprecated and will be " "removed in 2.0.0. Update handlers to accept " "(args: FlowArgs, flow_manager: FlowManager) instead.", DeprecationWarning, stacklevel=2, ) return await cast(ZeroArgFunctionHandler, handler)() elif effective_param_count == 1: if not self._showed_deprecation_warning_for_legacy_handler: self._showed_deprecation_warning_for_legacy_handler = True warnings.warn( "Single-argument (legacy) function handlers are deprecated " "and will be removed in 2.0.0. Update handlers to accept " "(args: FlowArgs, flow_manager: FlowManager) instead.", DeprecationWarning, stacklevel=2, ) return await cast(LegacyFunctionHandler, handler)(args) else: return await cast(FlowFunctionHandler, handler)(args, self) async def _create_transition_func( self, name: str, handler: Callable | FlowsDirectFunctionWrapper | None, ) -> Callable: """Create a transition function for the given name and handler. Args: name: Name of the function being registered. handler: Optional function to process data. Returns: Async function that handles the tool invocation. """ async def transition_func(params: FunctionCallParams) -> None: """Inner function that handles the actual tool invocation.""" try: logger.debug(f"Function called: {name}") # Execute handler if present is_transition_only_function = False acknowledged_result = {"status": "acknowledged"} if handler: # Invoke the handler with the provided arguments if isinstance(handler, FlowsDirectFunctionWrapper): handler_response = await handler.invoke(params.arguments, self) else: # Convert Pipecat's Mapping to a fresh dict so handlers may # mutate without touching Pipecat's internal state. (In 2.0.0 # FlowArgs is planned to widen to Mapping; this conversion # can go away then.) handler_response = await self._call_handler(handler, dict(params.arguments)) # Support both "consolidated" handlers that return (result, next_node) and handlers # that return just the result. if isinstance(handler_response, tuple): result, next_node = handler_response if result is None: result = acknowledged_result is_transition_only_function = True else: result = handler_response next_node = None # FlowsDirectFunctions should always be "consolidated" functions that return a tuple if isinstance(handler, FlowsDirectFunctionWrapper): raise InvalidFunctionError( f"Direct function {name} expected to return a tuple (result, next_node) but got {type(result)}" ) else: result = acknowledged_result next_node = None is_transition_only_function = True logger.debug( f"{'Transition-only function called for' if is_transition_only_function else 'Function handler completed for'} {name}" ) # Determine if this is an edge function is_edge_function = bool(next_node) if is_edge_function: # Store transition info for coordinated execution transition_info = { "next_node": next_node, "function_name": name, "arguments": params.arguments, "result": result, } self._pending_transition = transition_info properties = FunctionCallResultProperties( run_llm=False, # Don't run LLM until transition completes on_context_updated=self._check_and_execute_transition, ) else: # Node function - run LLM immediately properties = FunctionCallResultProperties( run_llm=True, on_context_updated=None, ) await params.result_callback(result, properties=properties) except Exception as e: logger.error(f"Error in transition function {name}: {str(e)}") error_result = {"status": "error", "error": str(e)} await params.result_callback(error_result) return transition_func async def _check_and_execute_transition(self) -> None: """Check if all functions are complete and execute transition if so.""" if not self._pending_transition: return # Check if all function calls are complete using Pipecat's state assistant_aggregator = self._context_aggregator.assistant() if not assistant_aggregator.has_function_calls_in_progress: # All functions complete, execute transition transition_info = self._pending_transition self._pending_transition = None await self._execute_transition(transition_info) async def _execute_transition(self, transition_info: dict[str, Any]) -> None: """Execute the stored transition.""" next_node = transition_info.get("next_node") try: if next_node: node_name = get_or_generate_node_name(next_node) logger.debug(f"Transition to function-returned node: {node_name}") await self._set_node(node_name, next_node) except Exception as e: logger.error(f"Error executing transition: {str(e)}") raise async def _register_function( self, name: str, new_functions: set[str], handler: Callable | FlowsDirectFunctionWrapper | None, *, cancel_on_interruption: bool = True, timeout_secs: float | None = None, ) -> None: """Register a function with the LLM. Always re-registers so a node redeclaring a function with a different handler replaces the previous binding. ``LLMService.register_function`` is a dict upsert by name, so re-registration is idempotent. Args: name: Name of the function to register handler: A callable function handler or a FlowsDirectFunction. new_functions: Set to track newly registered functions for this node cancel_on_interruption: Whether to cancel this function call when an interruption occurs. Defaults to True. timeout_secs: Optional per-tool timeout in seconds, overriding the global ``function_call_timeout_secs``. Defaults to None (use global timeout). Raises: FlowError: If function registration fails """ try: # Create transition function transition_func = await self._create_transition_func(name, handler) # Register function with LLM (or LLMSwitcher) kwargs = {} if timeout_secs is not None: kwargs["timeout_secs"] = timeout_secs self._llm.register_function( name, transition_func, cancel_on_interruption=cancel_on_interruption, **kwargs, ) new_functions.add(name) logger.debug(f"Registered function: {name}") except Exception as e: logger.error(f"Failed to register function {name}: {str(e)}") raise FlowError(f"Function registration failed: {str(e)}") from e
[docs] async def set_node_from_config(self, node_config: NodeConfig) -> None: """Set up a new conversation node and transition to it. Used to manually transition between nodes in a flow. Args: node_config: Configuration for the new node. Raises: FlowTransitionError: If manager not initialized. FlowError: If node setup fails. """ await self._set_node(get_or_generate_node_name(node_config), node_config)
async def _set_node(self, node_id: str, node_config: NodeConfig) -> None: """Set up a new conversation node and transition to it. Handles the complete node transition process in the following order: 1. Execute pre-actions (if any) 2. Set up messages (role and task) 3. Register node functions 4. Update LLM context with messages and tools 5. Update state (current node and functions) 6. Trigger LLM completion with new context 7. Execute post-actions (if any) Args: node_id: Identifier for the new node. node_config: Complete configuration for the node. Raises: FlowTransitionError: If manager not initialized. FlowError: If node setup fails. """ if not self._initialized: raise FlowTransitionError(f"{self.__class__.__name__} must be initialized first") try: # Clear any pending transition state when starting a new node # This ensures clean state regardless of how we arrived here: # - Normal transition flow (already cleared in _check_and_execute_transition) # - Direct calls to set_node/set_node_from_config self._pending_transition = None self._validate_node_config(node_id, node_config) logger.debug(f"Setting node: {node_id}") # Clear any deferred post-actions from previous node self._action_manager.clear_deferred_post_actions() # Register action handlers from config for action_list in [ node_config.get("pre_actions", []), node_config.get("post_actions", []), ]: for action in action_list: self._register_action_from_config(action) # Execute pre-actions if any if pre_actions := node_config.get("pre_actions"): await self._execute_actions(pre_actions=pre_actions) # Register functions and prepare tools tools: list[FlowsFunctionSchema | FlowsDirectFunctionWrapper] = [] new_functions: set[str] = set() # Get functions list with default empty list if not provided functions_list = node_config.get("functions", []) # Mix in global functions that should be available at every node functions_list = self._global_functions + functions_list async def register_function_schema(schema: FlowsFunctionSchema): """Helper to register a single FlowsFunctionSchema.""" tools.append(schema) await self._register_function( name=schema.name, new_functions=new_functions, handler=schema.handler, cancel_on_interruption=schema.cancel_on_interruption, timeout_secs=schema.timeout_secs, ) async def register_direct_function(func): """Helper to register a single direct function.""" direct_function = FlowsDirectFunctionWrapper(function=func) tools.append(direct_function) await self._register_function( name=direct_function.name, new_functions=new_functions, handler=direct_function, cancel_on_interruption=direct_function.cancel_on_interruption, timeout_secs=direct_function.timeout_secs, ) for func_config in functions_list: if callable(func_config): await register_direct_function(func_config) elif isinstance(func_config, FlowsFunctionSchema): await register_function_schema(func_config) else: raise InvalidFunctionError( f"Invalid function format in node '{node_id}'. " "Use FlowsFunctionSchema or direct functions." ) # Create ToolsSchema with standard function schemas standard_functions = [] for tool in tools: # Convert FlowsFunctionSchema to standard FunctionSchema for the LLM standard_functions.append(tool.to_function_schema()) formatted_tools = self._adapter.format_functions(standard_functions) role_message = node_config.get("role_message") role_messages = node_config.get("role_messages") if role_message and role_messages: logger.warning( "Both 'role_message' and 'role_messages' specified; using 'role_message'" ) if role_messages and not role_message: if not self._showed_deprecation_warning_for_role_messages: self._showed_deprecation_warning_for_role_messages = True warnings.warn( "'role_messages' is deprecated and will be removed in 2.0.0. " "Use 'role_message' (singular, str) instead.", DeprecationWarning, stacklevel=2, ) # Update LLM context await self._update_llm_context( role_message=role_message, role_messages=role_messages if not role_message else None, task_messages=node_config["task_messages"], functions=formatted_tools, strategy=node_config.get("context_strategy"), ) logger.debug("Updated LLM context") # Update state self._current_node = node_id self._current_functions = new_functions # Trigger completion with new context respond_immediately = node_config.get("respond_immediately", True) if respond_immediately: await self._worker.queue_frames([LLMRunFrame()]) # Execute post-actions if any if post_actions := node_config.get("post_actions"): if respond_immediately: await self._execute_actions(post_actions=post_actions) else: # Schedule post-actions for execution after first LLM response in this node self._schedule_deferred_post_actions(post_actions=post_actions) logger.debug(f"Successfully set node: {node_id}") except Exception as e: logger.error(f"Error setting node {node_id}: {str(e)}") raise FlowError(f"Failed to set node {node_id}: {str(e)}") from e def _schedule_deferred_post_actions(self, post_actions: list[ActionConfig]) -> None: self._action_manager.schedule_deferred_post_actions(post_actions=post_actions) async def _create_conversation_summary( self, summary_prompt: str, context: LLMContext ) -> str | None: """Generate a conversation summary from a given context.""" return await self._adapter.generate_summary(self._llm, summary_prompt, context) async def _update_llm_context( self, role_message: str | None, role_messages: list[dict] | None, task_messages: list[dict], functions: ToolsSchema | NotGiven, strategy: ContextStrategyConfig | None = None, ) -> None: """Update LLM context with new messages and functions. If ``role_message`` is provided, it is sent as an ``LLMUpdateSettingsFrame`` (system instruction on the LLM itself). If ``role_messages`` (deprecated) is provided, the messages are prepended to the conversation context alongside ``task_messages``. Args: role_message: Optional role/personality string sent as the LLM system instruction via ``LLMUpdateSettingsFrame``. role_messages: Deprecated list-of-dicts prepended to context messages for backward compatibility. task_messages: Task messages to add to context. functions: New functions to make available. strategy: Optional context update configuration. Raises: FlowError: If context update fails. """ try: frames = [] # New path: role_message as LLM system instruction (persists until changed) if role_message: frames.append( LLMUpdateSettingsFrame(delta=LLMSettings(system_instruction=role_message)) ) messages = [] # Legacy path: role_messages prepended to context messages if role_messages: messages.extend(role_messages) update_config = strategy or self._context_strategy if update_config.strategy == ContextStrategy.RESET_WITH_SUMMARY: if not self._showed_deprecation_warning_for_reset_with_summary: self._showed_deprecation_warning_for_reset_with_summary = True warnings.warn( "RESET_WITH_SUMMARY is deprecated and will be removed in 2.0.0. " "Use Pipecat's native context summarization instead. To trigger " "on-demand summarization during a node transition, push an " "LLMSummarizeContextFrame in a pre-action. See " "https://docs.pipecat.ai/guides/fundamentals/context-summarization", DeprecationWarning, stacklevel=2, ) if ( update_config.strategy == ContextStrategy.RESET_WITH_SUMMARY and self._context_aggregator and self._context_aggregator.user()._context ): # We know summary_prompt exists because of __post_init__ validation in ContextStrategyConfig summary_prompt = cast(str, update_config.summary_prompt) try: # Try to get summary with 5 second timeout summary = await asyncio.wait_for( self._create_conversation_summary( summary_prompt, self._context_aggregator.user()._context, ), timeout=5.0, ) if summary: summary_message = self._adapter.format_summary_message(summary) messages.append(summary_message) logger.debug(f"Added conversation summary to context: {summary_message}") else: # Fall back to APPEND strategy if summary fails logger.warning( "Failed to generate summary, falling back to APPEND strategy" ) update_config.strategy = ContextStrategy.APPEND except TimeoutError: logger.warning("Summary generation timed out, falling back to APPEND strategy") update_config.strategy = ContextStrategy.APPEND # Add task messages messages.extend(task_messages) # For first node or RESET/RESET_WITH_SUMMARY strategy, use update frame frame_type = ( LLMMessagesUpdateFrame if self._current_node is None or update_config.strategy in [ContextStrategy.RESET, ContextStrategy.RESET_WITH_SUMMARY] else LLMMessagesAppendFrame ) frames.append(frame_type(messages=messages)) frames.append(LLMSetToolsFrame(tools=functions)) await self._worker.queue_frames(frames) logger.debug( f"Updated LLM context using {frame_type.__name__} with strategy {update_config.strategy}" ) except Exception as e: logger.error(f"Failed to update LLM context: {str(e)}") raise FlowError(f"Context update failed: {str(e)}") from e async def _execute_actions( self, pre_actions: list[ActionConfig] | None = None, post_actions: list[ActionConfig] | None = None, ) -> None: """Execute pre and post actions. Args: pre_actions: Actions to execute before context update. post_actions: Actions to execute after context update. """ if pre_actions: await self._action_manager.execute_actions(pre_actions) if post_actions: await self._action_manager.execute_actions(post_actions) def _validate_node_config(self, node_id: str, config: NodeConfig) -> None: """Validate the configuration of a conversation node. This method ensures that: 1. Required fields (task_messages) are present 2. Functions have valid configurations based on their type: - FlowsFunctionSchema objects have proper handler/transition fields - Direct functions are valid according to the FlowsDirectFunctions validation 3. Edge functions (matching node names) are allowed without handlers/transitions Args: node_id: Identifier for the node being validated. config: Complete node configuration to validate. Raises: FlowError: If required fields are missing. InvalidFunctionError: If function format is invalid. """ # Check required fields if "task_messages" not in config: raise FlowError(f"Node '{node_id}' missing required 'task_messages' field") # Get functions list with default empty list if not provided functions_list = config.get("functions", []) # Validate each function configuration if there are any for func in functions_list: if callable(func): FlowsDirectFunctionWrapper.validate_function(func) elif isinstance(func, FlowsFunctionSchema): if not func.handler: logger.warning(f"Function '{func.name}' in node '{node_id}' has no handler") else: raise InvalidFunctionError( f"Invalid function format in node '{node_id}'. " "Use FlowsFunctionSchema or direct functions." )