A Coding Implementation Showcasing ClawTeam’s Multi-Agent Swarm Orchestration with OpenAI Function Calling


SWARM_TOOLS = [
   {
       "type": "function",
       "function": {
           "name": "task_update",
           "description": "Update the status of a task. Use 'in_progress' when starting, 'completed' when done.",
           "parameters": {
               "type": "object",
               "properties": {
                   "task_id": {"type": "string", "description": "The task ID"},
                   "status": {"type": "string", "enum": ["in_progress", "completed", "failed"]},
                   "result": {"type": "string", "description": "Result or output of the task"},
               },
               "required": ["task_id", "status"],
           },
       },
   },
   {
       "type": "function",
       "function": {
           "name": "inbox_send",
           "description": "Send a message to another agent (e.g., 'leader' or a worker name).",
           "parameters": {
               "type": "object",
               "properties": {
                   "to": {"type": "string", "description": "Recipient agent name"},
                   "message": {"type": "string", "description": "Message content"},
               },
               "required": ["to", "message"],
           },
       },
   },
   {
       "type": "function",
       "function": {
           "name": "inbox_receive",
           "description": "Check and consume all messages in your inbox.",
           "parameters": {
               "type": "object",
               "properties": {},
           },
       },
   },
   {
       "type": "function",
       "function": {
           "name": "task_list",
           "description": "List tasks assigned to you or all team tasks.",
           "parameters": {
               "type": "object",
               "properties": {
                   "owner": {"type": "string", "description": "Filter by owner name (optional)"},
               },
           },
       },
   },
]




class SwarmAgent:


   def __init__(
       self,
       name: str,
       role: str,
       system_prompt: str,
       task_board: TaskBoard,
       inbox: InboxSystem,
       registry: TeamRegistry,
   ):
       self.name = name
       self.role = role
       self.system_prompt = system_prompt
       self.task_board = task_board
       self.inbox = inbox
       self.registry = registry
       self.conversation_history: list[dict] = []
       self.inbox.register(name)
       self.registry.register(name, role)


   def _build_system_prompt(self) -> str:
       coord_protocol = f"""
## Coordination Protocol (auto-injected — you are agent '{self.name}')


You are part of an AI agent swarm. Your role: {self.role}
Your name: {self.name}


Available tools (equivalent to ClawTeam CLI):
- task_list: Check your assigned tasks (like `clawteam task list`)
- task_update: Update task status to in_progress/completed/failed (like `clawteam task update`)
- inbox_send: Send messages to other agents (like `clawteam inbox send`)
- inbox_receive: Check your inbox for messages (like `clawteam inbox receive`)


WORKFLOW:
1. Check your tasks with task_list
2. Mark a task as in_progress when you start
3. Do the work (think, analyze, produce output)
4. Mark the task as completed with your result
5. Send a summary message to 'leader' when done
"""
       return self.system_prompt + "\n" + coord_protocol


   def _handle_tool_call(self, tool_name: str, args: dict) -> str:
       if tool_name == "task_update":
           status = TaskStatus(args["status"])
           result = args.get("result", "")
           self.task_board.update_status(args["task_id"], status, result)
           if status == TaskStatus.COMPLETED:
               self.registry.increment_completed(self.name)
           return json.dumps({"ok": True, "task_id": args["task_id"], "new_status": args["status"]})


       elif tool_name == "inbox_send":
           self.inbox.send(self.name, args["to"], args["message"])
           return json.dumps({"ok": True, "sent_to": args["to"]})


       elif tool_name == "inbox_receive":
           msgs = self.inbox.receive(self.name)
           if not msgs:
               return json.dumps({"messages": [], "note": "No new messages"})
           return json.dumps({
               "messages": [
                   {"from": m.sender, "content": m.content, "time": m.timestamp}
                   for m in msgs
               ]
           })


       elif tool_name == "task_list":
           owner = args.get("owner", self.name)
           tasks = self.task_board.get_tasks(owner=owner)
           return json.dumps({"tasks": [t.to_dict() for t in tasks]})


       return json.dumps({"error": f"Unknown tool: {tool_name}"})


   def run(self, user_message: str, max_iterations: int = 6) -> str:
       self.conversation_history.append({"role": "user", "content": user_message})


       for iteration in range(max_iterations):
           try:
               response = client.chat.completions.create(
                   model=MODEL,
                   messages=[
                       {"role": "system", "content": self._build_system_prompt()},
                       *self.conversation_history,
                   ],
                   tools=SWARM_TOOLS,
                   tool_choice="auto",
                   temperature=0.4,
               )
           except Exception as e:
               return f"[API Error] {e}"


           choice = response.choices[0]
           msg = choice.message


           assistant_msg = {"role": "assistant", "content": msg.content or ""}
           if msg.tool_calls:
               assistant_msg["tool_calls"] = [
                   {
                       "id": tc.id,
                       "type": "function",
                       "function": {"name": tc.function.name, "arguments": tc.function.arguments},
                   }
                   for tc in msg.tool_calls
               ]
           self.conversation_history.append(assistant_msg)


           if not msg.tool_calls:
               return msg.content or "(No response)"


           for tc in msg.tool_calls:
               fn_name = tc.function.name
               fn_args = json.loads(tc.function.arguments)
               result = self._handle_tool_call(fn_name, fn_args)
               self.conversation_history.append({
                   "role": "tool",
                   "tool_call_id": tc.id,
                   "content": result,
               })


       return "(Agent reached max iterations)"



Source link

  • Related Posts

    LlamaIndex Releases LiteParse: A CLI and TypeScript-Native Library for Spatial PDF Parsing in AI Agent Workflows

    In the current landscape of Retrieval-Augmented Generation (RAG), the primary bottleneck for developers is no longer the large language model (LLM) itself, but the data ingestion pipeline. For software developers,…

    Google Colab Now Has an Open-Source MCP (Model Context Protocol) Server: Use Colab Runtimes with GPUs from Any Local AI Agent

    Google has officially released the Colab MCP Server, an implementation of the Model Context Protocol (MCP) that enables AI agents to interact directly with the Google Colab environment. This integration…

    Leave a Reply

    Your email address will not be published. Required fields are marked *