실전 아키텍처 사례
고객 지원, 코드 생성, 리서치, 데이터 파이프라인 — 4가지 사례를 패턴 조합으로 분석
핵심 요약
- 고객 지원, 코딩 에이전트, 리서치 파이프라인, 데이터 파이프라인 4가지 사례를 패턴 조합과 통제 지점 관점으로 분석합니다.
- 고객 지원은 action 기준 라우팅이 핵심이며, write action은 confidence 0.90 이상, 0.75 미만은 무조건 사람 검토로 보내야 오분류 환불 사고를 막습니다.
- 코딩 에이전트는 lint/type/test fix loop에 MAX_RETRIES 제한이 없으면 무한 수정 루프로 비용이 폭증하므로 동일 오류 2회 반복 시 human handoff합니다.
- 리서치 파이프라인은 수치 claim에
min_sources=2교차검증을 적용하고 단일 출처는 "미검증" 라벨을 붙여야 환각 수치 유입을 막습니다. - 데이터 파이프라인은 모든 스테이지에 task_id 기반 멱등성 키와 load의 upsert가 없으면 재실행 시 중복 적재가 발생합니다.
앞선 패턴을 실제 시스템으로 어떻게 조합하는지 예시로 살펴봅니다. 관건은 "어떤 프레임워크를 쓸까"가 아니라, 문제 구조에 맞춰 패턴 조합과 통제 지점을 어떻게 고를까입니다.
사례 요약
| 사례 | 주 패턴 | 핵심 위험 | 사람 개입 |
|---|---|---|---|
| 고객 지원 자동화 | routing + retrieval + approval | 잘못된 정책 안내 | 환불/예외 처리 |
| 코딩 에이전트 | pipeline + evaluator | 잘못된 코드 수정 | merge 전 승인 |
| 리서치 파이프라인 | parallelization + aggregator | 근거 누락, 환각 | 최종 보고서 검토 |
| 데이터 파이프라인 운영 | DAG + recovery | 부분 실패, 중복 실행 | 장애 대응 승인 |
사례 1. 고객 지원 자동화
문제 구조
- 유입량은 많지만 질문 유형은 반복적이다.
- 정책/계정 상태를 함께 봐야 한다.
- 잘못된 write나 약속은 곧 금전 손실로 이어진다.
권장 구조
설계 포인트
- 라우팅 기준은 주제보다 action 기준으로 둡니다.
- retrieval 결과에는 정책 버전과 근거 링크를 포함합니다.
- 환불, 계정 변경, 보상 약속은 자동화 범위에서 분리합니다.
구현 상세
라우팅 규칙 예시:
def route_ticket(ticket: Ticket) -> str:
"""action 기준 라우팅 — 주제가 아닌 요청 행위로 분류"""
intent = classify_intent(ticket.message)
if intent.action in ("refund", "cancel", "account_modify"):
return "human_review" # write 행위는 자동화 제외
if intent.confidence < 0.75:
return "needs_triage" # 낮은 확신은 사람에게
if intent.action == "status_inquiry":
return "auto_status" # 조회는 바로 처리
return "policy_retrieval" # 나머지는 정책 검색 후 응답confidence threshold 설정:
THRESHOLDS = {
"auto_respond": 0.90, # 자동 응답 — 매우 높은 확신만
"policy_lookup": 0.75, # 정책 검색 경유 — 중간 확신
"human_triage": 0.0, # 그 외 전부 사람에게
}에스컬레이션 조건:
def needs_escalation(session: Session) -> bool:
return (
session.sentiment_score < -0.6 # 고객 불만 감지
or session.retry_count >= 3 # 반복 문의
or session.involves_write_action # 데이터 변경 행위
or session.policy_version_mismatch # 정책 버전 불일치
)실패 사례: 라우팅 오분류로 잘못된 환불 안내
한 이커머스 팀이 intent 분류 confidence threshold를 0.5로 잡았습니다. "배송 어디까지 왔나요?"를 "환불 문의"로 오분류해 환불 절차를 자동 안내했고, 고객이 실제로 환불을 진행해버렸습니다.
원인: threshold가 지나치게 낮아 조회성 문의를 write action으로 분류. 대응: write action 라우팅은 confidence 0.90 이상으로 상향, 0.75 미만은 무조건 사람 검토 큐로 전환.
| 운영 포인트 | 내용 |
|---|---|
| 입력 | 고객 문의, 주문/계정 상태, 정책 버전 |
| 상태 | routed -> researching -> needs_review -> responded |
| 승인 | 환불, 보상, 계정 수정은 사람 승인 |
| fallback | needs_triage 또는 human review queue |
사례 2. 코딩 에이전트
문제 구조
- 계획, 구현, 테스트, 리뷰가 서로 다른 책임을 가진다.
- repo-aware context와 deterministic gate가 중요하다.
- 잘못된 수정이 누적되면 복구 비용이 크다.
권장 구조
| 단계 | 패턴 | 핵심 통제 |
|---|---|---|
| 이슈 해석 | planner | acceptance criteria |
| 코드 수정 | implementation worker | sandbox, branch isolation |
| 검증 | evaluator | lint, typecheck, tests |
| 결과 배포 | approval | draft PR, human gate |
이 사례에서는 "코드를 잘 쓰는 모델"보다 검증 루프를 시스템으로 강제하는 설계가 더 중요합니다.
구현 상세
lint/test 통합 검증 스크립트:
#!/bin/bash
# evaluate.sh — Evaluator 단계에서 실행
set -euo pipefail
BRANCH="$1"
MAX_RETRIES=3
ATTEMPT=0
git checkout "$BRANCH"
while [ $ATTEMPT -lt $MAX_RETRIES ]; do
echo "=== 검증 시도 $((ATTEMPT + 1))/$MAX_RETRIES ==="
# 1. lint
if ! yarn lint 2> lint_errors.log; then
echo "LINT_FAIL: fix 요청"
cat lint_errors.log | agent fix-lint
ATTEMPT=$((ATTEMPT + 1))
continue
fi
# 2. typecheck
if ! yarn typecheck 2> type_errors.log; then
echo "TYPE_FAIL: fix 요청"
cat type_errors.log | agent fix-types
ATTEMPT=$((ATTEMPT + 1))
continue
fi
# 3. test
if ! yarn test 2> test_errors.log; then
echo "TEST_FAIL: fix 요청"
cat test_errors.log | agent fix-tests
ATTEMPT=$((ATTEMPT + 1))
continue
fi
echo "ALL_PASS"
exit 0
done
echo "MAX_RETRIES_EXCEEDED: human handoff"
exit 1파일 변경 감지 -> 리뷰 에이전트 트리거:
# watcher.py — git diff 기반 리뷰 트리거
def trigger_review(pr_number: int):
changed_files = get_changed_files(pr_number)
risk_score = 0
for f in changed_files:
if f.path.startswith("src/api/"):
risk_score += 3 # API 변경은 고위험
if f.additions > 200:
risk_score += 2 # 대규모 변경
if "migration" in f.path:
risk_score += 5 # DB 마이그레이션
if risk_score >= 5:
assign_reviewer("senior-review-agent", pr_number)
request_human_approval(pr_number)
else:
assign_reviewer("standard-review-agent", pr_number)실패 사례: 무한 수정 루프
코딩 에이전트가 lint 오류를 고치면서 새 타입 오류를 만들고, 타입 오류를 고치면서 다시 lint 오류를 내는 순환에 빠졌습니다. 48회를 돌고 CI 비용이 $120 쌓인 뒤에야 사람이 발견했습니다.
원인: fix loop에 최대 반복 횟수 제한이 없었고, 각 fix 단계가 다른 단계의 결과를 덮어쓰는 구조.
대응: MAX_RETRIES=3으로 제한, 동일 파일에서 동일 유형 오류가 2회 반복되면 즉시 human handoff.
| 운영 포인트 | 내용 |
|---|---|
| 입력 | issue, repo context, acceptance criteria |
| 상태 | planned -> implementing -> validating -> approved |
| 승인 | PR 전, merge 전 human approval |
| fallback | 테스트 실패 시 fix loop, 반복 실패 시 handoff |
사례 3. 리서치 파이프라인
문제 구조
- 여러 출처를 병렬 조사해야 한다.
- 근거 품질이 최종 보고서 신뢰도를 좌우한다.
- 각 조사 결과를 하나의 통합 구조로 묶어야 한다.
권장 구조
- parallel research worker를 fan-out한다.
- 각 worker는 요약이 아니라 evidence bundle을 반환한다.
- aggregator가 출처 충돌과 빈 구간을 정리한다.
- evaluator가 주장과 근거의 정합성을 확인한다.
구현 상세
source aggregation 로직:
def aggregate_evidence(worker_results: list[EvidenceBundle]) -> Report:
"""worker 결과를 claim 단위로 병합하고 충돌을 표시"""
claim_map: dict[str, list[Evidence]] = {}
for bundle in worker_results:
for evidence in bundle.items:
claim_key = normalize_claim(evidence.claim)
claim_map.setdefault(claim_key, []).append(evidence)
report = Report()
for claim, evidences in claim_map.items():
if has_conflict(evidences):
report.add_conflict(claim, evidences) # 충돌은 명시적으로 표시
else:
merged = merge_evidences(evidences)
report.add_claim(claim, merged)
report.mark_gaps(find_uncovered_claims(claim_map)) # 빈 구간 표시
return report충돌 해결 전략:
def resolve_conflict(claim: str, evidences: list[Evidence]) -> Evidence:
"""출처 신뢰도 가중치 기반 충돌 해결"""
SOURCE_WEIGHTS = {
"official_doc": 1.0, # 공식 문서 — 최고 신뢰
"peer_reviewed": 0.9, # 학술 논문
"industry_report": 0.7, # 업계 리포트
"blog_post": 0.4, # 블로그
"forum": 0.2, # 포럼/커뮤니티
}
scored = []
for ev in evidences:
weight = SOURCE_WEIGHTS.get(ev.source_type, 0.3)
recency_bonus = 0.1 if ev.published_within_months(6) else 0.0
scored.append((ev, weight + recency_bonus))
scored.sort(key=lambda x: x[1], reverse=True)
# 최고 점수 근거를 채택하되, 충돌 사실은 보고서에 명시
winner = scored[0][0]
winner.conflict_note = f"{len(evidences)}개 출처 중 채택 (신뢰도 {scored[0][1]:.1f})"
return winner메모
- 병렬 worker마다 같은 질문을 조금씩 비틀어 던지기보다 source domain을 나누는 편이 안정적입니다.
- 최종 문서는 단일 요약보다 claim-by-claim evidence map이 있을 때 검토하기 쉽습니다.
실패 사례: 근거 없는 수치가 최종 보고서에 포함
리서치 에이전트가 "시장 규모 $50B"이라는 수치를 블로그 1개에서 가져왔는데, 그 블로그가 출처 없이 어림한 숫자였습니다. evaluator가 원출처를 확인하지 않았고, 이 수치가 경영진 보고서에 그대로 들어갔습니다.
원인: aggregator가 수치 claim에 대해 최소 2개 독립 출처 교차검증을 요구하지 않았음.
대응: 수치/통계가 포함된 claim은 min_sources=2 정책을 적용하고, 단일 출처 수치는 보고서에 "미검증" 라벨을 자동 부착.
| 운영 포인트 | 내용 |
|---|---|
| 입력 | 조사 질문, 소스 목록, 보고서 템플릿 |
| 상태 | fan_out -> aggregating -> validating -> delivered |
| 승인 | 대외 발행 전 최종 human review |
| fallback | 근거 부족 claim 제거, source gap 재탐색 |
사례 4. 데이터 파이프라인 운영
문제 구조
- ETL, 검증, 알림, 재처리 같은 단계가 존재한다.
- 일부 단계 실패를 전체 실패로 볼지 판단해야 한다.
- 외부 API와 배치 스케줄이 함께 얽힌다.
권장 구조
| 요소 | 설계 |
|---|---|
| 실행 모델 | DAG workflow |
| 상태 저장 | checkpoint + idempotent task id |
| 장애 대응 | retry, circuit breaker, partial replay |
| 사람 개입 | 데이터 이상치 승인, 재처리 승인 |
이 사례에서는 "agent"보다 workflow 엔진이 더 큰 몫을 합니다. 에이전트는 이상 원인 요약이나 대응 제안처럼 판단이 필요한 지점에만 두는 편이 좋습니다.
구현 상세
DAG 정의 예시:
# dag_definition.py — 스테이지별 의존성과 정책 선언
pipeline = DAG("daily_user_etl", schedule="0 2 * * *")
extract = pipeline.add_stage(
"extract",
handler=extract_from_sources,
retry=RetryPolicy(max_attempts=3, backoff="exponential", base_delay=30),
timeout=600,
)
transform = pipeline.add_stage(
"transform",
handler=transform_records,
depends_on=[extract],
retry=RetryPolicy(max_attempts=2, backoff="fixed", base_delay=10),
)
enrich = pipeline.add_stage(
"enrich",
handler=enrich_with_external_api,
depends_on=[extract],
retry=RetryPolicy(max_attempts=3, backoff="exponential", base_delay=60),
circuit_breaker=CircuitBreaker(fail_threshold=5, reset_timeout=300),
)
validate = pipeline.add_stage(
"validate",
handler=validate_quality,
depends_on=[transform, enrich],
retry=RetryPolicy(max_attempts=1), # 검증은 재시도 불필요
)
load = pipeline.add_stage(
"load",
handler=load_to_warehouse,
depends_on=[validate],
retry=RetryPolicy(max_attempts=2, backoff="fixed", base_delay=5),
idempotent=True, # 중복 실행 안전
)스테이지별 재시도 정책:
| 스테이지 | 재시도 | 전략 | 이유 |
|---|---|---|---|
| extract | 3회 | exponential backoff | 외부 API 일시 장애 대응 |
| transform | 2회 | fixed delay | 내부 로직, 빠른 재시도 |
| enrich | 3회 + circuit breaker | exponential + 차단 | 외부 의존성 보호 |
| validate | 1회 | 재시도 없음 | 검증 실패는 데이터 문제 |
| load | 2회 | fixed delay, idempotent | 멱등성 보장으로 안전한 재시도 |
부분 실패 복구:
def handle_partial_failure(run: PipelineRun):
"""실패 스테이지만 재실행 — 성공 스테이지는 checkpoint에서 복원"""
failed_stages = [s for s in run.stages if s.status == "failed"]
succeeded_stages = [s for s in run.stages if s.status == "completed"]
for stage in failed_stages:
if stage.name == "enrich" and stage.circuit_breaker.is_open:
# 외부 API 차단 상태 → enrich 건너뛰고 부분 데이터로 진행
run.mark_degraded(stage, reason="circuit_breaker_open")
continue
if stage.retry_exhausted:
# 재시도 소진 → dead-letter queue로 이동, 사람 알림
dead_letter.push(stage.failed_records)
notify_oncall(f"{stage.name} 재시도 소진, {len(stage.failed_records)}건 DLQ 이동")
continue
# checkpoint에서 마지막 성공 지점 복원 후 재실행
stage.restore_checkpoint()
stage.execute()실패 사례: 멱등성 미보장으로 중복 적재
enrich 스테이지가 타임아웃되어 파이프라인이 재실행됐는데, extract와 load가 멱등하지 않아 같은 데이터가 두 번 적재됐습니다. 중복 레코드 12만 건이 리포트에 반영되면서 매출 수치가 2배로 보고됐습니다.
원인: load 스테이지에 idempotent 설정이 없었고, 각 레코드에 고유한 task_id가 부여되지 않았음.
대응: 모든 스테이지에 task_id 기반 멱등성 키 부여, load에 upsert 전략 적용, 재실행 전 checkpoint 확인 필수화.
| 운영 포인트 | 내용 |
|---|---|
| 입력 | 스케줄 이벤트, 검증 메트릭, 외부 API 상태 |
| 상태 | `queued -> running -> degraded |
| 승인 | 재처리, 데이터 보정, incident close 승인 |
| fallback | partial replay, dead-letter, human escalation |
사례별 선택 기준
| 질문 | 고객 지원 | 코딩 | 리서치 | 데이터 파이프라인 |
|---|---|---|---|---|
| 라우팅이 중요한가 | 매우 중요 | 중간 | 낮음 | 중간 |
| 병렬화 가치가 큰가 | 중간 | 낮음 | 매우 큼 | 중간 |
| 평가 루프가 필요한가 | 높음 | 매우 높음 | 높음 | 중간 |
| 인간 승인 비중이 큰가 | 높음 | 매우 높음 | 중간 | 높음 |
ADR 스타일 결론
Decision
사례마다 최적 패턴은 다르지만 공통 원칙은 같습니다. action 기준 라우팅, 근거 중심 handoff, side effect 앞 승인, checkpoint 기반 복구를 기본으로 깔고, 병렬화와 하위 agent는 실제로 병목이 생기는 지점에만 도입합니다.
실무 체크리스트
- 문제 구조를 패턴 언어로 설명할 수 있는가
- 가장 위험한 side effect 앞에 승인 또는 검증 루프가 있는가
- evidence가 최종 결과까지 이어지는가
- 실패 시 부분 복구가 가능한가
- 사람 개입이 단순 예외 처리인지, 핵심 품질 게이트인지 구분되어 있는가