7 min read

Integrating MinIO Object Storage with Dynamic API Generation

현재 시스템 동작

  1. 샌드박스 서버: 특정 디렉토리(routes, constants)를 watchdog이 모니터링하다가 파일 변경이 감지되면 동적으로 FastAPI 라우트에 반영.
  2. 에이전트 서버: 사용자가 API를 등록하면 정해진 디렉토리에 필요한 파일(spec.yaml, api.py 등)을 생성하고 FastAPI 라우트로 등록.

문제점

사용자가 API를 정의하고 파일(예: 이미지, CSV, 문서 등)을 생성했을 때 이 파일들을 저장하고 호스팅할 방법이 현재 시스템에 없음.

요구사항

Robi-G에 이미 배포된 MinIO(AWS S3 호환) 객체 스토리지를 활용하여 파일을 저장하고 호스팅할 수 있는 유틸리티 함수를 개발.

해결 방안

MinIO와 연동하는 유틸리티 함수를 구현

MinIO 연동을 위한 utils/minio_client.py 파일:

# utils/minio_client.py
import os
import logging
from typing import BinaryIO, Optional, Tuple, List
from urllib.parse import urljoin

from minio import Minio
from minio.error import S3Error

from configs import Config

logger = logging.getLogger(__name__)
config = Config()


class MinioClient:
    """
    MinIO 객체 스토리지와 상호작용하는 클라이언트 클래스.
    Robi-G 시스템의 파일 저장 및 호스팅에 사용됩니다.
    """
    def __init__(self):
        """MinIO 클라이언트 초기화"""
        self.endpoint = config.minio_endpoint
        self.access_key = config.minio_access_key
        self.secret_key = config.minio_secret_key
        self.secure = config.minio_secure.lower() == 'true'
        self.client = Minio(
            endpoint=self.endpoint,
            access_key=self.access_key,
            secret_key=self.secret_key,
            secure=self.secure
        )
        self.default_bucket = config.minio_default_bucket
        
        # 버킷이 존재하지 않으면 생성
        self._ensure_bucket_exists(self.default_bucket)
        
    def _ensure_bucket_exists(self, bucket_name: str) -> None:
        """
        지정된 버킷이 존재하는지 확인하고, 없으면 생성합니다.
        
        Args:
            bucket_name: 확인 또는 생성할 버킷 이름
        """
        try:
            if not self.client.bucket_exists(bucket_name):
                region = config.minio_region or 'us-east-1'
                self.client.make_bucket(bucket_name, location=region)
                logger.info(f"버킷 '{bucket_name}'이 생성되었습니다.")
            else:
                logger.debug(f"버킷 '{bucket_name}'이 이미 존재합니다.")
        except S3Error as e:
            logger.error(f"버킷 존재 확인 또는 생성 중 오류 발생: {str(e)}")
            raise

    def upload_file(self, file_path: str, object_name: Optional[str] = None, 
                    bucket_name: Optional[str] = None) -> str:
        """
        로컬 파일을 MinIO에 업로드합니다.
        
        Args:
            file_path: 업로드할 로컬 파일 경로
            object_name: MinIO에 저장될 객체 이름 (None이면 파일 이름 사용)
            bucket_name: 대상 버킷 이름 (None이면 기본 버킷 사용)
            
        Returns:
            str: 업로드된 파일의 객체 이름
        """
        bucket = bucket_name or self.default_bucket
        if not object_name:
            object_name = os.path.basename(file_path)
            
        try:
            # 파일 메타데이터 및 MIME 타입 자동 감지
            content_type = self._get_content_type(file_path)
            
            # 파일 업로드
            self.client.fput_object(
                bucket_name=bucket,
                object_name=object_name,
                file_path=file_path,
                content_type=content_type
            )
            logger.info(f"파일 '{file_path}'가 '{bucket}/{object_name}'에 업로드되었습니다.")
            return object_name
        except S3Error as e:
            logger.error(f"파일 업로드 중 오류 발생: {str(e)}")
            raise

    def upload_fileobj(self, file_obj: BinaryIO, object_name: str, 
                      content_type: str, bucket_name: Optional[str] = None) -> str:
        """
        파일 객체를 MinIO에 업로드합니다.
        
        Args:
            file_obj: 업로드할 파일 객체(파일 라이크 객체)
            object_name: MinIO에 저장될 객체 이름
            content_type: 업로드할 파일의 MIME 타입
            bucket_name: 대상 버킷 이름 (None이면 기본 버킷 사용)
            
        Returns:
            str: 업로드된 파일의 객체 이름
        """
        bucket = bucket_name or self.default_bucket
        try:
            file_size = file_obj.seek(0, 2)
            file_obj.seek(0)
            
            self.client.put_object(
                bucket_name=bucket,
                object_name=object_name,
                data=file_obj,
                length=file_size,
                content_type=content_type
            )
            logger.info(f"파일 객체가 '{bucket}/{object_name}'에 업로드되었습니다.")
            return object_name
        except S3Error as e:
            logger.error(f"파일 객체 업로드 중 오류 발생: {str(e)}")
            raise

    def download_file(self, object_name: str, file_path: str, 
                     bucket_name: Optional[str] = None) -> str:
        """
        MinIO에서 파일을 다운로드합니다.
        
        Args:
            object_name: 다운로드할 객체 이름
            file_path: 저장할 로컬 파일 경로
            bucket_name: 소스 버킷 이름 (None이면 기본 버킷 사용)
            
        Returns:
            str: 다운로드된 파일 경로
        """
        bucket = bucket_name or self.default_bucket
        try:
            self.client.fget_object(
                bucket_name=bucket,
                object_name=object_name,
                file_path=file_path
            )
            logger.info(f"'{bucket}/{object_name}'에서 '{file_path}'로 파일이 다운로드되었습니다.")
            return file_path
        except S3Error as e:
            logger.error(f"파일 다운로드 중 오류 발생: {str(e)}")
            raise

    def delete_file(self, object_name: str, bucket_name: Optional[str] = None) -> bool:
        """
        MinIO에서 파일을 삭제합니다.
        
        Args:
            object_name: 삭제할 객체 이름
            bucket_name: 버킷 이름 (None이면 기본 버킷 사용)
            
        Returns:
            bool: 삭제 성공 여부
        """
        bucket = bucket_name or self.default_bucket
        try:
            self.client.remove_object(bucket_name=bucket, object_name=object_name)
            logger.info(f"'{bucket}/{object_name}' 객체가 삭제되었습니다.")
            return True
        except S3Error as e:
            logger.error(f"파일 삭제 중 오류 발생: {str(e)}")
            return False

    def get_file_url(self, object_name: str, bucket_name: Optional[str] = None, 
                    expires: int = 3600) -> str:
        """
        MinIO 객체에 대한 임시 URL을 생성합니다.
        
        Args:
            object_name: URL을 생성할 객체 이름
            bucket_name: 버킷 이름 (None이면 기본 버킷 사용)
            expires: URL 만료 시간(초), 기본값: 1시간
            
        Returns:
            str: 생성된 임시 URL
        """
        bucket = bucket_name or self.default_bucket
        try:
            url = self.client.presigned_get_object(
                bucket_name=bucket,
                object_name=object_name,
                expires=expires
            )
            logger.debug(f"'{bucket}/{object_name}'에 대한 임시 URL이 생성되었습니다.")
            return url
        except S3Error as e:
            logger.error(f"URL 생성 중 오류 발생: {str(e)}")
            raise

    def get_permanent_url(self, object_name: str, bucket_name: Optional[str] = None) -> str:
        """
        MinIO 객체에 대한 영구 URL을 생성합니다 (공개 버킷인 경우에만 작동).
        
        Args:
            object_name: URL을 생성할 객체 이름
            bucket_name: 버킷 이름 (None이면 기본 버킷 사용)
            
        Returns:
            str: 생성된 영구 URL
        """
        bucket = bucket_name or self.default_bucket
        if not self.endpoint.startswith(('http://', 'https://')):
            protocol = 'https://' if self.secure else 'http://'
            base_url = f"{protocol}{self.endpoint}"
        else:
            base_url = self.endpoint
            
        url = f"{base_url}/{bucket}/{object_name}"
        return url

    def list_files(self, prefix: str = "", bucket_name: Optional[str] = None) -> List[str]:
        """
        지정된 접두사로 시작하는 MinIO 객체 목록을 반환합니다.
        
        Args:
            prefix: 객체 이름 접두사 필터
            bucket_name: 버킷 이름 (None이면 기본 버킷 사용)
            
        Returns:
            List[str]: 객체 이름 목록
        """
        bucket = bucket_name or self.default_bucket
        try:
            objects = self.client.list_objects(bucket_name=bucket, prefix=prefix, recursive=True)
            return [obj.object_name for obj in objects]
        except S3Error as e:
            logger.error(f"객체 목록 조회 중 오류 발생: {str(e)}")
            return []

    def _get_content_type(self, file_path: str) -> str:
        """
        파일 확장자를 기반으로 MIME 타입을 결정합니다.
        
        Args:
            file_path: 파일 경로
            
        Returns:
            str: MIME 타입
        """
        ext = os.path.splitext(file_path)[1].lower()
        mime_types = {
            '.jpg': 'image/jpeg',
            '.jpeg': 'image/jpeg',
            '.png': 'image/png',
            '.gif': 'image/gif',
            '.pdf': 'application/pdf',
            '.txt': 'text/plain',
            '.csv': 'text/csv',
            '.json': 'application/json',
            '.yaml': 'application/x-yaml',
            '.yml': 'application/x-yaml',
            '.py': 'text/x-python',
            '.js': 'application/javascript',
            '.html': 'text/html',
            '.css': 'text/css',
            '.xml': 'application/xml',
            '.doc': 'application/msword',
            '.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
            '.xls': 'application/vnd.ms-excel',
            '.xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
            '.ppt': 'application/vnd.ms-powerpoint',
            '.pptx': 'application/vnd.openxmlformats-officedocument.presentationml.presentation',
        }
        return mime_types.get(ext, 'application/octet-stream')


# 싱글톤 인스턴스 생성
minio_client = MinioClient()

.env 파일에 MinIO 관련 설정을 추가:

# MinIO 설정
MINIO_ENDPOINT=minio-service.robi-g.svc.cluster.local:9000
MINIO_ACCESS_KEY=minioadmin
MINIO_SECRET_KEY=minioadmin
MINIO_SECURE=false
MINIO_DEFAULT_BUCKET=py-runner
MINIO_REGION=us-east-1

configs/config.py 파일에 MinIO 관련 설정 추가:

import os

from dotenv import load_dotenv

load_dotenv(".env")

class Config:
    def __init__(self):
        self.swagger_agent_server_url = os.getenv("SWAGGER_AGENT_SERVER_URL")
        self.swagger_sandbox_server_url = os.getenv("SWAGGER_SANDBOX_SERVER_URL")
        self.swagger_dev_server_url = os.getenv("SWAGGER_DEV_SERVER_URL")
        self.otlp_endpoint = os.getenv("TRACING_AGENT_URL")

        self.app_name_prefix = os.getenv("APP_NAME_PREFIX")
        self.agent_app_name = os.getenv("AGENT_APP_NAME")
        self.sandbox_prod_app_name = os.getenv("SANDBOX_PROD_APP_NAME")
        self.sandbox_dev_app_name = os.getenv("SANDBOX_DEV_APP_NAME")

        self.athena_url = os.getenv("ATHENA_URL")
        self.lexihub_url = os.getenv("LEXIHUB_URL")
        self.athena_timeout = int(os.getenv("ATHENA_TIMEOUT", 600))
        self.lexi_hub_timeout = int(os.getenv("LEXI_HUB_TIMEOUT", 600))
        self.lexi_hub_max_tokens = int(os.getenv("LEXI_HUB_MAX_TOKENS", 4000))
        self.lexi_hub_system_prompt = os.getenv("LEXI_HUB_SYSTEM_PROMPT")
        self.lexi_hub_user_prompt = os.getenv("LEXI_HUB_USER_PROMPT")
        self.lexi_hub_model_name = os.getenv("LEXI_HUB_MODEL_NAME")
        
        # MinIO 설정 추가
        self.minio_endpoint = os.getenv("MINIO_ENDPOINT", "localhost:9000")
        self.minio_access_key = os.getenv("MINIO_ACCESS_KEY", "minioadmin")
        self.minio_secret_key = os.getenv("MINIO_SECRET_KEY", "minioadmin")
        self.minio_secure = os.getenv("MINIO_SECURE", "false")
        self.minio_default_bucket = os.getenv("MINIO_DEFAULT_BUCKET", "py-runner")
        self.minio_region = os.getenv("MINIO_REGION", "us-east-1")

utils/__init__.py 파일을 수정:

from .request import post_one, post_all, get_one, get_all
from .redis import r, set_data, get_data, delete_data
from .debugger import sse_log
from .memory_profiling import start_memory_profiling, stop_memory_profiling, profile_memory, log_memory_snapshot
from .agent_helper import (
    generate_model_from_schema,
    generate_model_code,
)
from .logging_middleware import APILoggingMiddleware, configure_api_logging
from .minio_client import minio_client

기능 요약:

  1. MinIO 클라이언트 초기화 및 버킷 관리
  2. 로컬 파일 및 파일 객체 업로드
  3. 파일 다운로드
  4. 파일 삭제
  5. 임시 및 영구 URL 생성 (파일 호스팅)
  6. 파일 목록 조회
  7. MIME 타입 자동 감지