Use a foreach node to create a sub agentic workflow that repeats nodes for each item in the input schema.To configure a foreach node in an agentic workflow, call the foreach() method to define a nested agentic workflow. In this method, define the following parameters:
The example below shows how to configure a foreach node:
Python
"""In this example, the user is retrieving a set of email addresses from a contact list, and for each email address, sending out an invitation."""from typing import Listfrom pydantic import BaseModel, Fieldfrom ibm_watsonx_orchestrate.agent_builder.tools import tool, ToolPermissionfrom ibm_watsonx_orchestrate.flow_builder.flows import Flow, flow, START, ENDfrom .send_invitation_email import send_invitation_emailfrom .get_emails_from_customer import get_emails_from_customer, CustomerRecordclass CustomerName(BaseModel): name: str = Field(description="The name of the customer")class Invitations(BaseModel): invitations: List[str]@flow( name="send_invitation_to_customer", input_schema=CustomerName, output_schema=None)def build_send_invitation_to_customer_flow(aflow: Flow) -> Flow: """ Given a list of customers, we will iterate through the list and send email to each """ get_customer_list_node = aflow.tool(get_emails_from_customer) # calling add_foreach will create a subflow, and we can add more node to the subflow foreach_flow: Flow = aflow.foreach(item_schema = CustomerRecord, output_schema=Invitations) node2 = foreach_flow.tool(send_invitation_email) foreach_flow.sequence(START, node2, END) # add the foreach flow to the main flow aflow.edge(START, get_customer_list_node) aflow.edge(get_customer_list_node, foreach_flow) aflow.edge(foreach_flow, END) return aflow
You can configure how your foreach node processes input items. There are two processing methods:
SequentialA sequential foreach processes items one after another. The next item starts only after the previous one finishes. Use this method when the order of item processing affects the result.
ParallelA parallel foreach processes one or more items at the same time. In most cases, this method runs faster than the sequential approach.
To set the processing method, call the policy() method in your foreach node. This method accepts the following parameter:
Defines the processing method. Use ForeachPolicy.SEQUENTIAL for sequential processing or ForeachPolicy.PARALLEL for parallel processing.
Python
"""In this example, the user is retrieving a set of email addresses from a contact list, and for each email address, sending out an invitation."""from typing import Listfrom pydantic import BaseModel, Fieldfrom ibm_watsonx_orchestrate.agent_builder.tools import tool, ToolPermissionfrom ibm_watsonx_orchestrate.flow_builder.flows import Flow, flow, START, ENDfrom ibm_watsonx_orchestrate.flow_builder.types import ForeachPolicyfrom .send_invitation_email import send_invitation_emailfrom .get_emails_from_customer import get_emails_from_customer, CustomerRecordclass CustomerName(BaseModel): name: str = Field(description="The name of the customer")class Invitations(BaseModel): invitations: List[str]@flow( name="send_invitation_to_customer", input_schema=CustomerName, output_schema=None)def build_send_invitation_to_customer_flow(aflow: Flow) -> Flow: """ Given a list of customers, we will iterate through the list and send email to each """ get_customer_list_node = aflow.tool(get_emails_from_customer) # calling add_foreach will create a subflow, and we can add more node to the subflow foreach_flow: Flow = aflow.foreach(item_schema = CustomerRecord, output_schema=Invitations) \ .policy(kind=ForeachPolicy.SEQUENTIAL) # .policy(kind=ForeachPolicy.PARALLEL) # replace with 'ForeachPolicy.PARALLEL' if need to run .foreach in parallel node2 = foreach_flow.tool(send_invitation_email) foreach_flow.sequence(START, node2, END) # add the foreach flow to the main flow aflow.edge(START, get_customer_list_node) aflow.edge(get_customer_list_node, foreach_flow) aflow.edge(foreach_flow, END) return aflow