Queues & 비동기 작업 관리
Vercel Queues를 활용해 AI 작업의 fan-out, 배치 처리, 재시도·poison message 운영을 설계하는 방법을 정리합니다.
Workflow가 상태 머신과 장기 실행을 다룬다면, Queues는 짧은 비동기 작업의 분산과 속도 조절을 담당합니다. 둘을 혼동하면 간단한 fan-out에 Workflow를 쓰거나, 승인 대기 작업을 Queue에 넣는 실수가 생깁니다.
Workflow vs Queues 선택 기준
| 질문 | Workflow | Queues |
|---|---|---|
| 사람 승인이나 외부 이벤트 대기가 있나 | 강점 | 별도 구현 |
| 단순 fan-out 비동기 처리인가 | 가능 | 더 단순 |
| 실행 경로를 단계별로 감사해야 하나 | 강점 | 제한적 |
| 작업 하나가 10초 이내인가 | 가능 | 적합 |
| concurrency 제한이 필요한가 | 수동 | 내장 |
| 재시도/만료가 주된 관심사인가 | 가능 | 내장 |
실행 구조
AI 작업에서 Queues를 쓰는 패턴
| 패턴 | 설명 | 예시 |
|---|---|---|
| Fan-out embedding | 대량 문서를 청크 단위로 분산 임베딩 | RAG 인덱스 구축 |
| Batch classification | 다수 항목을 분류/태깅 | 티켓 대량 분류 |
| Notification dispatch | AI 결과를 다채널로 전달 | Slack + Email + CRM 동시 발송 |
| Rate-limited API call | 외부 API 호출을 속도 제한 내에서 처리 | CRM 대량 업데이트 |
| Scheduled chunk work | cron 결과를 작은 단위로 분할 처리 | 주간 보고서 데이터 수집 |
최소 구현 스켈레톤
import { handleCallback, send } from '@vercel/queue'
export async function enqueueDocument(docId: string, chunks: string[]) {
await Promise.all(
chunks.map((chunk, i) =>
send(
'embedding-jobs',
{ chunk, docId: `${docId}#${i}` },
{ idempotencyKey: `${docId}#${i}` }
)
)
)
}
export const POST = handleCallback(
async (job) => {
const embedding = await generateEmbedding(job.chunk)
await upsertVector({ docId: job.docId, chunk: job.chunk, embedding })
},
{
visibilityTimeoutSeconds: 600,
retry: (_error, metadata) => {
if (metadata.deliveryCount > 5) return { acknowledge: true }
return { afterSeconds: Math.min(300, 2 ** metadata.deliveryCount * 5) }
},
}
){
"functions": {
"app/api/queues/embed/route.ts": {
"experimentalTriggers": [{ "type": "queue/v2beta", "topic": "embedding-jobs" }]
}
}
}Queues + Workflow 조합 패턴
Workflow가 전체 오케스트레이션을 관리하고, 특정 step에서 Queues로 fan-out하는 조합이 가장 효과적입니다.
import { send } from '@vercel/queue'
import { sleep } from 'workflow'
export async function batchAnalysisWorkflow(batchId: string) {
'use workflow'
const data = await collectData(batchId)
const chunks = await splitIntoChunks(data)
// Queues로 fan-out
await enqueueProcessingChunks(batchId, chunks)
// 완료 대기 방식은 별도 status store나 webhook/hook으로 구현합니다.
await sleep('1h')
return aggregateResults(batchId)
}
async function enqueueProcessingChunks(batchId: string, chunks: string[]) {
'use step'
await Promise.all(
chunks.map((chunk, index) =>
send(
'processing-jobs',
{ batchId, chunk, index },
{ idempotencyKey: `${batchId}:${index}` }
)
)
)
}Poison message 운영 전략
| 항목 | 권장 기본값 | 이유 |
|---|---|---|
| 재시도 횟수 | 3회 | AI 호출 비용과 안정성 균형 |
| 재시도 간격 | exponential backoff | provider rate limit 대응 |
| 메시지 보존 | 60초~24시간 범위 | Vercel Queues retention 제한 |
| 실패 알림 | Slack 또는 대시보드 | 누적 방지 |
| 재처리 | 수동 트리거 기본 | 자동 재처리는 비용 폭발 가능 |
concurrency 설계
| 작업 유형 | 권장 concurrency | 이유 |
|---|---|---|
| 임베딩 생성 | 10~50 | API rate limit에 맞춤 |
| 분류/태깅 | 20~100 | 가벼운 모델 호출 |
| 외부 API 쓰기 | 5~10 | 외부 시스템 부하 제한 |
| Sandbox 실행 | 3~5 | 리소스 집약적 |
실무 해석
Queues는 "빠르게 많이" 처리하는 도구가 아니라 "안전하게 분산" 처리하는 도구입니다. concurrency를 높이면 throughput이 올라가지만, 외부 API rate limit이나 비용 폭발을 먼저 고려해야 합니다.
비용 태깅
Queue 작업에도 Gateway 호출 시 user/tag 기반 reporting metadata를 반드시 붙여야 합니다.
import { embed } from 'ai'
export const POST = handleCallback(async (job) => {
const { embedding } = await embed({
model: 'openai/text-embedding-3-small',
value: job.chunk,
providerOptions: {
gateway: {
tags: ['queue:embedding', `batch:${job.batchId}`],
},
},
})
// ...
})ADR 스타일 결론
Decision
짧은 비동기 작업의 분산, 속도 제한, 재시도는 Queues로 처리하고, 상태 머신과 장기 실행은 Workflow로 분리합니다. Workflow step 안에서 Queues로 fan-out하는 조합이 대규모 AI 파이프라인의 기본 패턴입니다.
실무 체크리스트
- 단순 fan-out을 Workflow로 과설계하지 않았는가
- concurrency가 외부 API rate limit 이내인가
- poison message 알림과 재처리 기준이 정해져 있는가
- Queue 작업에 비용 태그(feature, batch id, tenant)가 붙어 있는가