본문 바로가기
나의 개발일지/node.js

NestJS에서 이미지 및 영상 처리 작업의 부하를 줄이기 위해 BullMQ 사용

by stella_gu 2024. 10. 7.
반응형

BullMQ를 사용하는 이유

BullMQ는 Node.js 애플리케이션에서 비동기 작업 처리를 위해 Redis 기반의 큐 시스템을 제공하는 도구입니다. 주로 백그라운드 작업 처리시간이 오래 걸리는 작업을 메인 서버와 분리하여 성능을 향상시키기 위해 사용됩니다. 이로 인해 서버는 요청을 처리한 후, 리소스 집약적인 작업은 별도로 처리할 수 있습니다.

BullMQ 장점

  1. 성능: Redis의 빠른 데이터 처리 속도를 바탕으로 하여 대규모 작업 처리에서도 성능이 우수합니다.
  2. 작업 실패 관리: BullMQ는 작업 실패 시 자동으로 재시도를 하고, 지정된 최대 재시도 횟수를 넘으면 실패로 기록합니다. 이 과정에서 로그와 디버깅 정보를 제공하여 안정적인 작업 처리를 보장합니다.
  3. 확장성: 여러 워커를 통해 작업을 병렬로 처리할 수 있으며, 수평적 확장을 지원해 대규모 작업 처리에 적합합니다.
  4. 유연한 설정: 작업 우선순위, 재시도 횟수, 지연 시간 등의 다양한 설정을 통해 큐 시스템을 유연하게 관리할 수 있습니다.
  5. 영속성: 작업 및 그 상태가 Redis에 저장되므로, 서버가 재시작되거나 장애가 발생하더라도 작업이 손실되지 않습니다.
  6. 직관적인 API: Node.js 환경에서 쉽게 사용할 수 있도록 직관적인 API를 제공합니다. 이를 통해 쉽게 큐를 생성하고 작업을 추가할 수 있습니다.

BullMQ 단점

  1. Redis 의존성: Redis가 필요하기 때문에 Redis를 설치하고 유지 관리해야 하며, Redis가 없으면 BullMQ는 동작하지 않습니다. Redis의 성능이나 안정성에 의존하므로 Redis 서버가 제대로 동작하지 않으면 큐 시스템도 영향을 받습니다.
  2. 복잡한 설정: 기본적인 작업 처리는 간단하지만, 복잡한 작업 흐름을 구현하거나 다양한 작업 간의 의존성을 설정할 때는 구현이 복잡해질 수 있습니다.
  3. 서버 비용: Redis와 여러 워커 인스턴스를 운영할 경우, 서버 비용이 증가할 수 있습니다. 특히 Redis 클러스터링이나 확장성을 위해 추가적인 인프라가 필요할 수 있습니다.
  4. 학습 곡선: BullMQ는 Redis와의 통합 및 큐 시스템에 대한 기본 지식이 필요하므로, 초보자에게는 사용하기 어렵게 느껴질 수 있습니다.

BullMQ를 사용하는 상황

  • 대규모 비동기 작업 처리: 많은 수의 이미지/비디오 처리, 대량의 이메일 전송, 데이터 분석 등 리소스가 많이 드는 작업을 처리할 때.
  • 지연 작업: 사용자가 트리거한 작업을 특정 시점에 실행해야 하거나, 주기적으로 반복 작업을 처리해야 할 때.
  • 작업 재시도와 실패 관리가 필요한 복잡한 백엔드 시스템을 운영할 때.

NestJS에서 이미지 및 영상 처리 작업의 부하를 줄이기 위해 BullMQ 사용

  • 폴더 구조

 

1. BullMQ 설치

npm install @nestjs/bullmq bullmq

 

2. AppModele에 BullModule을 import하여 root module로 설정

import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bullmq';

@Module({
  imports: [
    BullModule.forRoot({
      connection: {
        host: 'localhost',
        port: 6379,
      },
    }),
  ],
})
export class AppModule {}

 

3. queue를 등록하기 위해 BullModule.registerQueue()를 import

  • 아래는 영상 변환 작업을 하는 VideosModule에서 BullMQ를 사용하기 위해 'video-processing'이라는 이름으로 BullModule.registerQueue() 동적 모듈을 import한 예시입니다.
import { Module } from '@nestjs/common';
import { VideosService } from './videos.service';
import { VideosController } from './videos.controller';
import { BullModule } from '@nestjs/bullmq';
import { VideoProcessor } from './videos.processor';

@Module({
  imports: [
    BullModule.registerQueue({
      name: 'video-processing',
    }),
  ],
  controllers: [VideosController],
  providers: [VideosService, VideoProcessor],
})
export class VideosModule {}

 

4. 영상 처리를 위한 POST API와 영상 처리 완료 여부 확인을 위한 GET API를 추가한 controller 및 service 추가

import {
  Body,
  Controller,
  Get,
  Param,
  Post,
  UploadedFile,
  UseInterceptors,
} from '@nestjs/common';
import { VideosService } from './videos.service';
import {
  ApiBody,
  ApiConsumes,
  ApiOperation,
  ApiParam,
  ApiTags,
} from '@nestjs/swagger';
import { MorphVideoDto } from './dto/morph-video.dto';
import { FileInterceptor } from '@nestjs/platform-express';

@ApiTags('videos')
@Controller('videos')
export class VideosController {
  constructor(private readonly videosService: VideosService) {}

  @ApiOperation({ summary: 'Morph video file' })
  @ApiConsumes('multipart/form-data')
  @UseInterceptors(FileInterceptor('file'))
  @ApiBody({ type: MorphVideoDto })
  @Post('morph')
  async morphVideoFile(
    @UploadedFile() file: Express.Multer.File,
    @Body() morphVideoDto: MorphVideoDto,
  ): Promise<string> {
    return this.videosService.addVideoProcessingJob(file, morphVideoDto);
  }

  @ApiOperation({ summary: 'Check video processing job status' })
  @ApiParam({
    name: 'jobId',
    description: 'Job ID',
    required: true,
    type: 'string',
  })
  @Get('status/:jobId')
  async checkVideoJobStatus(
    @Param('jobId') jobId: string,
  ): Promise<{ status: string; result?: string }> {
    return this.videosService.checkJobStatus(jobId);
  }
}
import { BadRequestException, Injectable } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bullmq';
import { Queue } from 'bullmq';
import { MorphVideoDto } from './dto/morph-video.dto';

@Injectable()
export class VideosService {
  constructor(
    @InjectQueue('video-processing') private readonly videoQueue: Queue,
  ) {}

  // 작업을 큐에 추가
  async addVideoProcessingJob(
    file: Express.Multer.File,
    morphVideoDto: MorphVideoDto,
  ): Promise<string> {
    const job = await this.videoQueue.add('process-video', {
      fileBuffer: file.buffer,
      fileMimetype: file.mimetype,
      morphVideoDto,
    });

    return job.id; // 작업 ID 반환
  }

  // 작업 상태 확인
  async checkJobStatus(
    jobId: string,
  ): Promise<{ status: string; result?: string }> {
    const job = await this.videoQueue.getJob(jobId);

    if (!job) {
      throw new BadRequestException('Job not found');
    }

    if (job.isCompleted()) {
      const result = await job.returnvalue; // 완료된 경우 결과 반환
      return { status: 'completed', result };
    } else if (job.isFailed()) {
      return { status: 'failed' };
    } else {
      return { status: 'pending' };
    }
  }
}

 

5. queue에 들어온 작업 처리를 위한 processor 파일 추가

import {
  Injectable,
  OnModuleInit,
  Logger,
  BadRequestException,
} from '@nestjs/common';
import { Worker, QueueEvents } from 'bullmq';
import { spawn } from 'child_process';
import * as path from 'path';
import * as fs from 'fs';
import * as os from 'os';
import { promisify } from 'util';
import { ConfigService } from '@nestjs/config';

const writeFileAsync = promisify(fs.writeFile);
const unlinkAsync = promisify(fs.unlink);

@Injectable()
export class VideoProcessor implements OnModuleInit {
  private readonly logger = new Logger(VideoProcessor.name);
  private redisConnection: any;

  constructor(private readonly configService: ConfigService) {
    // Redis 연결 정보를 ConfigService에서 가져옴
    this.redisConnection = {
      host: this.configService.get<string>('REDIS_HOST', 'redis'),
      port: this.configService.get<number>('REDIS_PORT', 6379),
    };
  }

  onModuleInit() {
    const queueEvents = new QueueEvents('video-processing', {
      connection: this.redisConnection,
    });

    queueEvents.on('completed', (jobId) => {
      this.logger.log(`Job ${jobId} has been completed`);
    });

    // Worker 설정 (큐에 등록된 작업 처리)
    const worker = new Worker(
      'video-processing',
      async (job) => {
        const { fileBuffer, morphVideoDto } = job.data;
        const { quality, ext, width } = morphVideoDto;

        // Buffer로 변환
        const buffer = Buffer.from(fileBuffer.data); // fileBuffer를 실제 Buffer로 변환

        // 임시 디렉토리에 파일 저장
        const tempDir = os.tmpdir();
        const inputPath = path.join(tempDir, `${Date.now()}.tmp`);

        await writeFileAsync(inputPath, buffer);

        // FFmpeg 옵션 배열 초기화
        const ffmpegOption: string[] = ['-i', inputPath];

        // 비트레이트 및 품질 설정
        if (quality) {
          // Certified Rate Factor - H.264 및 H.265 비디오 인코딩에서 사용되는 품질 조절 방식, 파일 크기와 비디오 품질 간의 균형을 맞추는 역할 (0~51, 0이 최고 품질)
          let crf: number;
          // 비트레이트 설정 - 초당 전송되는 비트의 양, 높을수록 더 높은 품질의 비디오 생성
          let bitrate: string;

          switch (quality) {
            case 1:
              crf = 30; // 저화질
              bitrate = '300k';
              break;
            case 2:
              crf = 23; // 보통 품질
              bitrate = '500k';
              break;
            case 3:
              crf = 18; // 고화질
              bitrate = '1000k';
              break;
            default:
              throw new BadRequestException(
                'Invalid quality option. Choose between 1(low), 2(medium), or 3(high).',
              );
          }

          ffmpegOption.push('-b:v', bitrate);
          ffmpegOption.push('-crf', crf.toString());
        }

        if (width) ffmpegOption.push('-vf', `scale=${width}:-2`);

        // ext가 webm일 경우 WebM 포맷에 맞는 옵션 적용
        if (ext === 'webm') {
          ffmpegOption.push('-c:v', 'libvpx-vp9'); // VP9 비디오 코덱 사용
          ffmpegOption.push('-b:v', '1M'); // 비디오 비트레이트 설정
          ffmpegOption.push('-c:a', 'libopus'); // Opus 오디오 코덱 사용
          ffmpegOption.push('-b:a', '128k'); // 오디오 비트레이트 설정
        } else {
          // WebM이 아닌 경우 기존 H.264 코덱 적용
          ffmpegOption.push('-vcodec', 'libx264');
          ffmpegOption.push('-preset', 'medium');
          ffmpegOption.push('-movflags', 'faststart');
        }

        // 출력 형식 지정 (ext가 존재할 경우)
        const outputDir = path.resolve('morph_output_videos');
        const outputExt = ext ?? 'mp4'; // 기본 확장자를 'mp4'로 설정
        const fileName = Date.now();
        const outputPath = path.join(outputDir, `${fileName}.${outputExt}`);

        // video_output 폴더가 없으면 생성
        if (!fs.existsSync(outputDir))
          fs.mkdirSync(outputDir, { recursive: true });

        ffmpegOption.push('-f', outputExt);
        ffmpegOption.push('-loglevel', 'error', outputPath);

        // FFmpeg 프로세스 실행
        const ffmpegProcess = spawn('ffmpeg', ffmpegOption);

        // 프로세스 완료 여부 확인
        return new Promise((resolve, reject) => {
          let ffmpegErrorOccurred = false;

          // 에러 발생 시 로그 출력
          ffmpegProcess.stderr.on('data', (data) => {
            this.logger.error(`FFmpeg stderr: ${data}`);
            ffmpegErrorOccurred = true;
          });

          ffmpegProcess.on('close', async (code) => {
            if (code === 0 && !ffmpegErrorOccurred) {
              const baseUrl = this.configService.get<string>(
                'BASE_URL',
                '<http://localhost:3000>',
              );
              const resultPath = `${baseUrl}/output-videos/${fileName}.${ext}`;
              this.logger.log(`FFmpeg completed successfully: ${resultPath}`);

              resolve(resultPath);
            } else {
              if (fs.existsSync(outputPath)) await unlinkAsync(outputPath);
              reject(new Error('Video encoding failed.'));
            }
          });
        });
      },
      {
        connection: this.redisConnection, // Redis 연결 정보 전달
      },
    );

    worker.on('completed', (job) => {
      this.logger.log(`Job ${job.id} completed successfully`);
    });

    worker.on('failed', (job, err) => {
      this.logger.error(`Job ${job.id} failed with error: ${err.message}`);
    });
  }
}

 

 

전체 코드는 아래 github repository에서 확인 가능합니다.

https://github.com/jangjiyu/media-morph-with-bullmq

 

 

[참고] MQ(Message Queue)란?

  • 메시지 큐는 프로듀서가 작업 요청(메시지)을 큐에 넣으면, 큐에서 이 메시지를 처리할 준비가 된 컨슈머가 이를 꺼내 실행하는 방식으로 동작합니다. 메시지 큐는 주로 작업을 비동기로 처리하고, 시스템 간의 의존성을 줄이며 더 큰 확장성을 제공하기 위해 사용됩니다.

메시지 큐의 두 주요 구성 요소는:

  1. 프로듀서(Producer): 큐에 작업(메시지)을 넣는 역할을 합니다.
  2. 컨슈머(Consumer): 큐에서 작업을 꺼내 처리하는 역할을 합니다.

프로듀서(Producer)

  • 역할: 프로듀서는 작업을 생성하고 이를 큐에 넣는 주체입니다. 예를 들어, 사용자가 이미지를 업로드하면 프로듀서가 이 이미지를 변환하는 작업을 큐에 추가하는 방식입니다.
  • 작업 흐름:
    1. 프로듀서가 작업을 생성합니다.
    2. 생성된 작업(메시지)을 메시지 큐에 넣습니다.
    3. 큐에 작업이 들어가면, 메시지 큐가 이 작업을 대기열에 저장합니다.
  • 예시: 이미지 처리 요청, 이메일 전송 요청, 비디오 변환 요청 등을 생성하는 것이 프로듀서의 역할입니다.

컨슈머(Consumer)

  • 역할: 컨슈머는 큐에서 작업을 꺼내서 처리하는 주체입니다. 컨슈머는 큐에서 대기 중인 작업을 가져와 이를 실행하고, 작업이 완료되면 큐에서 이 작업을 제거하거나 상태를 업데이트합니다.
  • 작업 흐름:
    1. 큐에 있는 작업을 컨슈머가 꺼냅니다.
    2. 작업을 처리합니다.
    3. 작업이 성공적으로 완료되면, 큐에서 해당 작업이 처리 완료로 표시됩니다.
    4. 만약 작업이 실패하면, 재시도하거나 실패 상태로 기록됩니다.
  • 예시: 이미지 변환, 데이터 분석, 이메일 발송 등의 작업을 수행하는 것이 컨슈머의 역할입니다.

프로듀서와 컨슈머의 관계

  • 비동기적 통신: 프로듀서와 컨슈머는 서로 직접적으로 통신하지 않고, 메시지 큐를 통해 간접적으로 연결됩니다. 이는 프로듀서가 작업을 큐에 넣고, 컨슈머는 준비가 되었을 때 작업을 처리하는 비동기 방식입니다.
  • 분리된 처리: 프로듀서는 작업을 큐에 넣기만 하고 작업 처리 상태나 속도에 관여하지 않으며, 컨슈머는 큐에 있는 작업을 독립적으로 처리합니다. 이로 인해 두 시스템 간의 의존성이 줄어들고 더 높은 확장성유연성을 제공합니다.

메시지 큐 예시

  • 이미지 변환:
    • 프로듀서: 사용자가 이미지를 업로드하면, 이 작업을 큐에 넣습니다. (이미지 변환 요청)
    • 컨슈머: 큐에서 작업을 꺼내어 이미지를 변환한 후, 결과물을 저장하고 상태를 업데이트합니다.
  • 이메일 발송:
    • 프로듀서: 사용자가 이메일을 발송하라는 요청을 보냅니다. 이 요청을 큐에 넣습니다.
    • 컨슈머: 큐에서 이메일 발송 작업을 꺼내 실제로 이메일을 발송하는 작업을 수행합니다.
 

 

 

 

[참고]

https://www.npmjs.com/package/bullmq

https://www.npmjs.com/package/@nestjs/bullmq

https://docs.nestjs.com/techniques/queues

https://medium.com/@vitor.rafaeldeveloper/nestjs-background-processing-with-bull-cfd162774d30

반응형