BullMQ를 사용하는 이유
BullMQ는 Node.js 애플리케이션에서 비동기 작업 처리를 위해 Redis 기반의 큐 시스템을 제공하는 도구입니다. 주로 백그라운드 작업 처리나 시간이 오래 걸리는 작업을 메인 서버와 분리하여 성능을 향상시키기 위해 사용됩니다. 이로 인해 서버는 요청을 처리한 후, 리소스 집약적인 작업은 별도로 처리할 수 있습니다.
BullMQ 장점
- 성능: Redis의 빠른 데이터 처리 속도를 바탕으로 하여 대규모 작업 처리에서도 성능이 우수합니다.
- 작업 실패 관리: BullMQ는 작업 실패 시 자동으로 재시도를 하고, 지정된 최대 재시도 횟수를 넘으면 실패로 기록합니다. 이 과정에서 로그와 디버깅 정보를 제공하여 안정적인 작업 처리를 보장합니다.
- 확장성: 여러 워커를 통해 작업을 병렬로 처리할 수 있으며, 수평적 확장을 지원해 대규모 작업 처리에 적합합니다.
- 유연한 설정: 작업 우선순위, 재시도 횟수, 지연 시간 등의 다양한 설정을 통해 큐 시스템을 유연하게 관리할 수 있습니다.
- 영속성: 작업 및 그 상태가 Redis에 저장되므로, 서버가 재시작되거나 장애가 발생하더라도 작업이 손실되지 않습니다.
- 직관적인 API: Node.js 환경에서 쉽게 사용할 수 있도록 직관적인 API를 제공합니다. 이를 통해 쉽게 큐를 생성하고 작업을 추가할 수 있습니다.
BullMQ 단점
- Redis 의존성: Redis가 필요하기 때문에 Redis를 설치하고 유지 관리해야 하며, Redis가 없으면 BullMQ는 동작하지 않습니다. Redis의 성능이나 안정성에 의존하므로 Redis 서버가 제대로 동작하지 않으면 큐 시스템도 영향을 받습니다.
- 복잡한 설정: 기본적인 작업 처리는 간단하지만, 복잡한 작업 흐름을 구현하거나 다양한 작업 간의 의존성을 설정할 때는 구현이 복잡해질 수 있습니다.
- 서버 비용: Redis와 여러 워커 인스턴스를 운영할 경우, 서버 비용이 증가할 수 있습니다. 특히 Redis 클러스터링이나 확장성을 위해 추가적인 인프라가 필요할 수 있습니다.
- 학습 곡선: BullMQ는 Redis와의 통합 및 큐 시스템에 대한 기본 지식이 필요하므로, 초보자에게는 사용하기 어렵게 느껴질 수 있습니다.
BullMQ를 사용하는 상황
- 대규모 비동기 작업 처리: 많은 수의 이미지/비디오 처리, 대량의 이메일 전송, 데이터 분석 등 리소스가 많이 드는 작업을 처리할 때.
- 지연 작업: 사용자가 트리거한 작업을 특정 시점에 실행해야 하거나, 주기적으로 반복 작업을 처리해야 할 때.
- 작업 재시도와 실패 관리가 필요한 복잡한 백엔드 시스템을 운영할 때.
NestJS에서 이미지 및 영상 처리 작업의 부하를 줄이기 위해 BullMQ 사용
- 폴더 구조
1. BullMQ 설치
npm install @nestjs/bullmq bullmq
2. AppModele에 BullModule을 import하여 root module로 설정
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bullmq';
@Module({
imports: [
BullModule.forRoot({
connection: {
host: 'localhost',
port: 6379,
},
}),
],
})
export class AppModule {}
3. queue를 등록하기 위해 BullModule.registerQueue()를 import
- 아래는 영상 변환 작업을 하는 VideosModule에서 BullMQ를 사용하기 위해 'video-processing'이라는 이름으로 BullModule.registerQueue() 동적 모듈을 import한 예시입니다.
import { Module } from '@nestjs/common';
import { VideosService } from './videos.service';
import { VideosController } from './videos.controller';
import { BullModule } from '@nestjs/bullmq';
import { VideoProcessor } from './videos.processor';
@Module({
imports: [
BullModule.registerQueue({
name: 'video-processing',
}),
],
controllers: [VideosController],
providers: [VideosService, VideoProcessor],
})
export class VideosModule {}
4. 영상 처리를 위한 POST API와 영상 처리 완료 여부 확인을 위한 GET API를 추가한 controller 및 service 추가
import {
Body,
Controller,
Get,
Param,
Post,
UploadedFile,
UseInterceptors,
} from '@nestjs/common';
import { VideosService } from './videos.service';
import {
ApiBody,
ApiConsumes,
ApiOperation,
ApiParam,
ApiTags,
} from '@nestjs/swagger';
import { MorphVideoDto } from './dto/morph-video.dto';
import { FileInterceptor } from '@nestjs/platform-express';
@ApiTags('videos')
@Controller('videos')
export class VideosController {
constructor(private readonly videosService: VideosService) {}
@ApiOperation({ summary: 'Morph video file' })
@ApiConsumes('multipart/form-data')
@UseInterceptors(FileInterceptor('file'))
@ApiBody({ type: MorphVideoDto })
@Post('morph')
async morphVideoFile(
@UploadedFile() file: Express.Multer.File,
@Body() morphVideoDto: MorphVideoDto,
): Promise<string> {
return this.videosService.addVideoProcessingJob(file, morphVideoDto);
}
@ApiOperation({ summary: 'Check video processing job status' })
@ApiParam({
name: 'jobId',
description: 'Job ID',
required: true,
type: 'string',
})
@Get('status/:jobId')
async checkVideoJobStatus(
@Param('jobId') jobId: string,
): Promise<{ status: string; result?: string }> {
return this.videosService.checkJobStatus(jobId);
}
}
import { BadRequestException, Injectable } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bullmq';
import { Queue } from 'bullmq';
import { MorphVideoDto } from './dto/morph-video.dto';
@Injectable()
export class VideosService {
constructor(
@InjectQueue('video-processing') private readonly videoQueue: Queue,
) {}
// 작업을 큐에 추가
async addVideoProcessingJob(
file: Express.Multer.File,
morphVideoDto: MorphVideoDto,
): Promise<string> {
const job = await this.videoQueue.add('process-video', {
fileBuffer: file.buffer,
fileMimetype: file.mimetype,
morphVideoDto,
});
return job.id; // 작업 ID 반환
}
// 작업 상태 확인
async checkJobStatus(
jobId: string,
): Promise<{ status: string; result?: string }> {
const job = await this.videoQueue.getJob(jobId);
if (!job) {
throw new BadRequestException('Job not found');
}
if (job.isCompleted()) {
const result = await job.returnvalue; // 완료된 경우 결과 반환
return { status: 'completed', result };
} else if (job.isFailed()) {
return { status: 'failed' };
} else {
return { status: 'pending' };
}
}
}
5. queue에 들어온 작업 처리를 위한 processor 파일 추가
import {
Injectable,
OnModuleInit,
Logger,
BadRequestException,
} from '@nestjs/common';
import { Worker, QueueEvents } from 'bullmq';
import { spawn } from 'child_process';
import * as path from 'path';
import * as fs from 'fs';
import * as os from 'os';
import { promisify } from 'util';
import { ConfigService } from '@nestjs/config';
const writeFileAsync = promisify(fs.writeFile);
const unlinkAsync = promisify(fs.unlink);
@Injectable()
export class VideoProcessor implements OnModuleInit {
private readonly logger = new Logger(VideoProcessor.name);
private redisConnection: any;
constructor(private readonly configService: ConfigService) {
// Redis 연결 정보를 ConfigService에서 가져옴
this.redisConnection = {
host: this.configService.get<string>('REDIS_HOST', 'redis'),
port: this.configService.get<number>('REDIS_PORT', 6379),
};
}
onModuleInit() {
const queueEvents = new QueueEvents('video-processing', {
connection: this.redisConnection,
});
queueEvents.on('completed', (jobId) => {
this.logger.log(`Job ${jobId} has been completed`);
});
// Worker 설정 (큐에 등록된 작업 처리)
const worker = new Worker(
'video-processing',
async (job) => {
const { fileBuffer, morphVideoDto } = job.data;
const { quality, ext, width } = morphVideoDto;
// Buffer로 변환
const buffer = Buffer.from(fileBuffer.data); // fileBuffer를 실제 Buffer로 변환
// 임시 디렉토리에 파일 저장
const tempDir = os.tmpdir();
const inputPath = path.join(tempDir, `${Date.now()}.tmp`);
await writeFileAsync(inputPath, buffer);
// FFmpeg 옵션 배열 초기화
const ffmpegOption: string[] = ['-i', inputPath];
// 비트레이트 및 품질 설정
if (quality) {
// Certified Rate Factor - H.264 및 H.265 비디오 인코딩에서 사용되는 품질 조절 방식, 파일 크기와 비디오 품질 간의 균형을 맞추는 역할 (0~51, 0이 최고 품질)
let crf: number;
// 비트레이트 설정 - 초당 전송되는 비트의 양, 높을수록 더 높은 품질의 비디오 생성
let bitrate: string;
switch (quality) {
case 1:
crf = 30; // 저화질
bitrate = '300k';
break;
case 2:
crf = 23; // 보통 품질
bitrate = '500k';
break;
case 3:
crf = 18; // 고화질
bitrate = '1000k';
break;
default:
throw new BadRequestException(
'Invalid quality option. Choose between 1(low), 2(medium), or 3(high).',
);
}
ffmpegOption.push('-b:v', bitrate);
ffmpegOption.push('-crf', crf.toString());
}
if (width) ffmpegOption.push('-vf', `scale=${width}:-2`);
// ext가 webm일 경우 WebM 포맷에 맞는 옵션 적용
if (ext === 'webm') {
ffmpegOption.push('-c:v', 'libvpx-vp9'); // VP9 비디오 코덱 사용
ffmpegOption.push('-b:v', '1M'); // 비디오 비트레이트 설정
ffmpegOption.push('-c:a', 'libopus'); // Opus 오디오 코덱 사용
ffmpegOption.push('-b:a', '128k'); // 오디오 비트레이트 설정
} else {
// WebM이 아닌 경우 기존 H.264 코덱 적용
ffmpegOption.push('-vcodec', 'libx264');
ffmpegOption.push('-preset', 'medium');
ffmpegOption.push('-movflags', 'faststart');
}
// 출력 형식 지정 (ext가 존재할 경우)
const outputDir = path.resolve('morph_output_videos');
const outputExt = ext ?? 'mp4'; // 기본 확장자를 'mp4'로 설정
const fileName = Date.now();
const outputPath = path.join(outputDir, `${fileName}.${outputExt}`);
// video_output 폴더가 없으면 생성
if (!fs.existsSync(outputDir))
fs.mkdirSync(outputDir, { recursive: true });
ffmpegOption.push('-f', outputExt);
ffmpegOption.push('-loglevel', 'error', outputPath);
// FFmpeg 프로세스 실행
const ffmpegProcess = spawn('ffmpeg', ffmpegOption);
// 프로세스 완료 여부 확인
return new Promise((resolve, reject) => {
let ffmpegErrorOccurred = false;
// 에러 발생 시 로그 출력
ffmpegProcess.stderr.on('data', (data) => {
this.logger.error(`FFmpeg stderr: ${data}`);
ffmpegErrorOccurred = true;
});
ffmpegProcess.on('close', async (code) => {
if (code === 0 && !ffmpegErrorOccurred) {
const baseUrl = this.configService.get<string>(
'BASE_URL',
'<http://localhost:3000>',
);
const resultPath = `${baseUrl}/output-videos/${fileName}.${ext}`;
this.logger.log(`FFmpeg completed successfully: ${resultPath}`);
resolve(resultPath);
} else {
if (fs.existsSync(outputPath)) await unlinkAsync(outputPath);
reject(new Error('Video encoding failed.'));
}
});
});
},
{
connection: this.redisConnection, // Redis 연결 정보 전달
},
);
worker.on('completed', (job) => {
this.logger.log(`Job ${job.id} completed successfully`);
});
worker.on('failed', (job, err) => {
this.logger.error(`Job ${job.id} failed with error: ${err.message}`);
});
}
}
전체 코드는 아래 github repository에서 확인 가능합니다.
https://github.com/jangjiyu/media-morph-with-bullmq
[참고] MQ(Message Queue)란?
- 메시지 큐는 프로듀서가 작업 요청(메시지)을 큐에 넣으면, 큐에서 이 메시지를 처리할 준비가 된 컨슈머가 이를 꺼내 실행하는 방식으로 동작합니다. 메시지 큐는 주로 작업을 비동기로 처리하고, 시스템 간의 의존성을 줄이며 더 큰 확장성을 제공하기 위해 사용됩니다.
메시지 큐의 두 주요 구성 요소는:
- 프로듀서(Producer): 큐에 작업(메시지)을 넣는 역할을 합니다.
- 컨슈머(Consumer): 큐에서 작업을 꺼내 처리하는 역할을 합니다.
프로듀서(Producer)
- 역할: 프로듀서는 작업을 생성하고 이를 큐에 넣는 주체입니다. 예를 들어, 사용자가 이미지를 업로드하면 프로듀서가 이 이미지를 변환하는 작업을 큐에 추가하는 방식입니다.
- 작업 흐름:
- 프로듀서가 작업을 생성합니다.
- 생성된 작업(메시지)을 메시지 큐에 넣습니다.
- 큐에 작업이 들어가면, 메시지 큐가 이 작업을 대기열에 저장합니다.
- 예시: 이미지 처리 요청, 이메일 전송 요청, 비디오 변환 요청 등을 생성하는 것이 프로듀서의 역할입니다.
컨슈머(Consumer)
- 역할: 컨슈머는 큐에서 작업을 꺼내서 처리하는 주체입니다. 컨슈머는 큐에서 대기 중인 작업을 가져와 이를 실행하고, 작업이 완료되면 큐에서 이 작업을 제거하거나 상태를 업데이트합니다.
- 작업 흐름:
- 큐에 있는 작업을 컨슈머가 꺼냅니다.
- 작업을 처리합니다.
- 작업이 성공적으로 완료되면, 큐에서 해당 작업이 처리 완료로 표시됩니다.
- 만약 작업이 실패하면, 재시도하거나 실패 상태로 기록됩니다.
- 예시: 이미지 변환, 데이터 분석, 이메일 발송 등의 작업을 수행하는 것이 컨슈머의 역할입니다.
프로듀서와 컨슈머의 관계
- 비동기적 통신: 프로듀서와 컨슈머는 서로 직접적으로 통신하지 않고, 메시지 큐를 통해 간접적으로 연결됩니다. 이는 프로듀서가 작업을 큐에 넣고, 컨슈머는 준비가 되었을 때 작업을 처리하는 비동기 방식입니다.
- 분리된 처리: 프로듀서는 작업을 큐에 넣기만 하고 작업 처리 상태나 속도에 관여하지 않으며, 컨슈머는 큐에 있는 작업을 독립적으로 처리합니다. 이로 인해 두 시스템 간의 의존성이 줄어들고 더 높은 확장성과 유연성을 제공합니다.
메시지 큐 예시
- 이미지 변환:
- 프로듀서: 사용자가 이미지를 업로드하면, 이 작업을 큐에 넣습니다. (이미지 변환 요청)
- 컨슈머: 큐에서 작업을 꺼내어 이미지를 변환한 후, 결과물을 저장하고 상태를 업데이트합니다.
- 이메일 발송:
- 프로듀서: 사용자가 이메일을 발송하라는 요청을 보냅니다. 이 요청을 큐에 넣습니다.
- 컨슈머: 큐에서 이메일 발송 작업을 꺼내 실제로 이메일을 발송하는 작업을 수행합니다.
[참고]
https://www.npmjs.com/package/bullmq
https://www.npmjs.com/package/@nestjs/bullmq
https://docs.nestjs.com/techniques/queues
https://medium.com/@vitor.rafaeldeveloper/nestjs-background-processing-with-bull-cfd162774d30
'나의 개발일지 > node.js' 카테고리의 다른 글
Node.js(NestJS)에서 FFmpeg 사용하여 동영상 파일 처리하기 (+ AWS S3 업로드) (3) | 2024.10.07 |
---|---|
nodejs(expressjs) 환경변수 validation (0) | 2024.02.14 |
NestJS 환경변수 관리 - process.env보단 ConfigModule을 (0) | 2024.02.07 |
[nodejs | 노드js] package.json (0) | 2022.07.31 |
[nodejs | 노드js] 모듈(module), 내장 객체 (0) | 2022.07.27 |