멀티에이전트 라우팅
전체 통신 흐름
A2A 메시지 전송 구현
Agent Card 검색 및 연결
Rate Limiting 및 Exponential Backoff
BedrockAgentCoreApp 엔트리포인트
마지막 업데이트
마지막 업데이트
async def _send_message_impl(self, agent_name: str, task: str):
"""A2A 프로토콜로 원격 에이전트에 메시지 전송"""
# 중복 요청 방지
request_key = f"{agent_name}:{hash(task)}"
if request_key in self.completed_requests:
return "Request already processed"
# A2A 메시지 페이로드 구성
payload = {
"message": {
"role": "user",
"parts": [{"type": "text", "text": task}],
"messageId": str(uuid.uuid4()),
"contextId": str(uuid.uuid4()),
},
}
# SendMessageRequest 생성 및 전송
message_request = SendMessageRequest(
id=message_id,
params=MessageSendParams.model_validate(payload)
)
send_response = await client.send_message(message_request)
# 응답 파싱
if isinstance(send_response.root, SendMessageSuccessResponse):
json_content = json.loads(send_response.root.model_dump_json())
resp = []
for artifact in json_content.get("result", {}).get("artifacts", []):
if artifact.get("parts"):
resp.extend(artifact["parts"])
return json.dumps(resp, indent=2)async def _async_init_components(self, remote_agent_addresses):
"""원격 에이전트 연결 초기화"""
async with httpx.AsyncClient() as client:
for address in remote_agent_addresses:
# /.well-known/agent-card.json 엔드포인트에서 카드 조회
card_resolver = A2ACardResolver(client, address)
card = await card_resolver.get_agent_card()
# 원격 연결 등록
remote_connection = RemoteAgentConnections(
agent_card=card, agent_url=address
)
self.remote_agent_connections[card.name] = remote_connectionasync def stream(self, user_query: str):
async with self._bedrock_semaphore: # 동시 2개 요청 제한
# Exponential Backoff 재시도 로직
retry_count = 0
max_retries = 3
base_delay = 2.0
while retry_count <= max_retries:
try:
async for event in self.agent.stream_async(user_query):
if "data" in event:
yield event["data"]
break
except Exception as e:
if "throttling" in str(e).lower():
backoff_delay = base_delay * (2 ** retry_count) # 2s, 4s, 8s
await asyncio.sleep(backoff_delay)
retry_count += 1app = BedrockAgentCoreApp()
@app.entrypoint
async def invoke(payload, context):
user_message = payload["prompt"]
actor_id = payload["actor_id"]
session_id = context.session_id
# 비동기 태스크 생성
task = asyncio.create_task(
host_agent_task(user_message, session_id, actor_id)
)
# 스트리밍 응답 반환
async def stream_output():
response_queue = HostAgentContext.get_response_queue_ctx()
async for item in response_queue.stream():
yield item
await task
return stream_output()