Integrating MinIO Object Storage with Dynamic API Generation
현재 시스템 동작
- 샌드박스 서버: 특정 디렉토리(routes, constants)를 watchdog이 모니터링하다가 파일 변경이 감지되면 동적으로 FastAPI 라우트에 반영.
- 에이전트 서버: 사용자가 API를 등록하면 정해진 디렉토리에 필요한 파일(spec.yaml, api.py 등)을 생성하고 FastAPI 라우트로 등록.
문제점
사용자가 API를 정의하고 파일(예: 이미지, CSV, 문서 등)을 생성했을 때 이 파일들을 저장하고 호스팅할 방법이 현재 시스템에 없음.
요구사항
Robi-G에 이미 배포된 MinIO(AWS S3 호환) 객체 스토리지를 활용하여 파일을 저장하고 호스팅할 수 있는 유틸리티 함수를 개발.
해결 방안
MinIO와 연동하는 유틸리티 함수를 구현
MinIO 연동을 위한 utils/minio_client.py
파일:
# utils/minio_client.py
import os
import logging
from typing import BinaryIO, Optional, Tuple, List
from urllib.parse import urljoin
from minio import Minio
from minio.error import S3Error
from configs import Config
logger = logging.getLogger(__name__)
config = Config()
class MinioClient:
"""
MinIO 객체 스토리지와 상호작용하는 클라이언트 클래스.
Robi-G 시스템의 파일 저장 및 호스팅에 사용됩니다.
"""
def __init__(self):
"""MinIO 클라이언트 초기화"""
self.endpoint = config.minio_endpoint
self.access_key = config.minio_access_key
self.secret_key = config.minio_secret_key
self.secure = config.minio_secure.lower() == 'true'
self.client = Minio(
endpoint=self.endpoint,
access_key=self.access_key,
secret_key=self.secret_key,
secure=self.secure
)
self.default_bucket = config.minio_default_bucket
# 버킷이 존재하지 않으면 생성
self._ensure_bucket_exists(self.default_bucket)
def _ensure_bucket_exists(self, bucket_name: str) -> None:
"""
지정된 버킷이 존재하는지 확인하고, 없으면 생성합니다.
Args:
bucket_name: 확인 또는 생성할 버킷 이름
"""
try:
if not self.client.bucket_exists(bucket_name):
region = config.minio_region or 'us-east-1'
self.client.make_bucket(bucket_name, location=region)
logger.info(f"버킷 '{bucket_name}'이 생성되었습니다.")
else:
logger.debug(f"버킷 '{bucket_name}'이 이미 존재합니다.")
except S3Error as e:
logger.error(f"버킷 존재 확인 또는 생성 중 오류 발생: {str(e)}")
raise
def upload_file(self, file_path: str, object_name: Optional[str] = None,
bucket_name: Optional[str] = None) -> str:
"""
로컬 파일을 MinIO에 업로드합니다.
Args:
file_path: 업로드할 로컬 파일 경로
object_name: MinIO에 저장될 객체 이름 (None이면 파일 이름 사용)
bucket_name: 대상 버킷 이름 (None이면 기본 버킷 사용)
Returns:
str: 업로드된 파일의 객체 이름
"""
bucket = bucket_name or self.default_bucket
if not object_name:
object_name = os.path.basename(file_path)
try:
# 파일 메타데이터 및 MIME 타입 자동 감지
content_type = self._get_content_type(file_path)
# 파일 업로드
self.client.fput_object(
bucket_name=bucket,
object_name=object_name,
file_path=file_path,
content_type=content_type
)
logger.info(f"파일 '{file_path}'가 '{bucket}/{object_name}'에 업로드되었습니다.")
return object_name
except S3Error as e:
logger.error(f"파일 업로드 중 오류 발생: {str(e)}")
raise
def upload_fileobj(self, file_obj: BinaryIO, object_name: str,
content_type: str, bucket_name: Optional[str] = None) -> str:
"""
파일 객체를 MinIO에 업로드합니다.
Args:
file_obj: 업로드할 파일 객체(파일 라이크 객체)
object_name: MinIO에 저장될 객체 이름
content_type: 업로드할 파일의 MIME 타입
bucket_name: 대상 버킷 이름 (None이면 기본 버킷 사용)
Returns:
str: 업로드된 파일의 객체 이름
"""
bucket = bucket_name or self.default_bucket
try:
file_size = file_obj.seek(0, 2)
file_obj.seek(0)
self.client.put_object(
bucket_name=bucket,
object_name=object_name,
data=file_obj,
length=file_size,
content_type=content_type
)
logger.info(f"파일 객체가 '{bucket}/{object_name}'에 업로드되었습니다.")
return object_name
except S3Error as e:
logger.error(f"파일 객체 업로드 중 오류 발생: {str(e)}")
raise
def download_file(self, object_name: str, file_path: str,
bucket_name: Optional[str] = None) -> str:
"""
MinIO에서 파일을 다운로드합니다.
Args:
object_name: 다운로드할 객체 이름
file_path: 저장할 로컬 파일 경로
bucket_name: 소스 버킷 이름 (None이면 기본 버킷 사용)
Returns:
str: 다운로드된 파일 경로
"""
bucket = bucket_name or self.default_bucket
try:
self.client.fget_object(
bucket_name=bucket,
object_name=object_name,
file_path=file_path
)
logger.info(f"'{bucket}/{object_name}'에서 '{file_path}'로 파일이 다운로드되었습니다.")
return file_path
except S3Error as e:
logger.error(f"파일 다운로드 중 오류 발생: {str(e)}")
raise
def delete_file(self, object_name: str, bucket_name: Optional[str] = None) -> bool:
"""
MinIO에서 파일을 삭제합니다.
Args:
object_name: 삭제할 객체 이름
bucket_name: 버킷 이름 (None이면 기본 버킷 사용)
Returns:
bool: 삭제 성공 여부
"""
bucket = bucket_name or self.default_bucket
try:
self.client.remove_object(bucket_name=bucket, object_name=object_name)
logger.info(f"'{bucket}/{object_name}' 객체가 삭제되었습니다.")
return True
except S3Error as e:
logger.error(f"파일 삭제 중 오류 발생: {str(e)}")
return False
def get_file_url(self, object_name: str, bucket_name: Optional[str] = None,
expires: int = 3600) -> str:
"""
MinIO 객체에 대한 임시 URL을 생성합니다.
Args:
object_name: URL을 생성할 객체 이름
bucket_name: 버킷 이름 (None이면 기본 버킷 사용)
expires: URL 만료 시간(초), 기본값: 1시간
Returns:
str: 생성된 임시 URL
"""
bucket = bucket_name or self.default_bucket
try:
url = self.client.presigned_get_object(
bucket_name=bucket,
object_name=object_name,
expires=expires
)
logger.debug(f"'{bucket}/{object_name}'에 대한 임시 URL이 생성되었습니다.")
return url
except S3Error as e:
logger.error(f"URL 생성 중 오류 발생: {str(e)}")
raise
def get_permanent_url(self, object_name: str, bucket_name: Optional[str] = None) -> str:
"""
MinIO 객체에 대한 영구 URL을 생성합니다 (공개 버킷인 경우에만 작동).
Args:
object_name: URL을 생성할 객체 이름
bucket_name: 버킷 이름 (None이면 기본 버킷 사용)
Returns:
str: 생성된 영구 URL
"""
bucket = bucket_name or self.default_bucket
if not self.endpoint.startswith(('http://', 'https://')):
protocol = 'https://' if self.secure else 'http://'
base_url = f"{protocol}{self.endpoint}"
else:
base_url = self.endpoint
url = f"{base_url}/{bucket}/{object_name}"
return url
def list_files(self, prefix: str = "", bucket_name: Optional[str] = None) -> List[str]:
"""
지정된 접두사로 시작하는 MinIO 객체 목록을 반환합니다.
Args:
prefix: 객체 이름 접두사 필터
bucket_name: 버킷 이름 (None이면 기본 버킷 사용)
Returns:
List[str]: 객체 이름 목록
"""
bucket = bucket_name or self.default_bucket
try:
objects = self.client.list_objects(bucket_name=bucket, prefix=prefix, recursive=True)
return [obj.object_name for obj in objects]
except S3Error as e:
logger.error(f"객체 목록 조회 중 오류 발생: {str(e)}")
return []
def _get_content_type(self, file_path: str) -> str:
"""
파일 확장자를 기반으로 MIME 타입을 결정합니다.
Args:
file_path: 파일 경로
Returns:
str: MIME 타입
"""
ext = os.path.splitext(file_path)[1].lower()
mime_types = {
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg',
'.png': 'image/png',
'.gif': 'image/gif',
'.pdf': 'application/pdf',
'.txt': 'text/plain',
'.csv': 'text/csv',
'.json': 'application/json',
'.yaml': 'application/x-yaml',
'.yml': 'application/x-yaml',
'.py': 'text/x-python',
'.js': 'application/javascript',
'.html': 'text/html',
'.css': 'text/css',
'.xml': 'application/xml',
'.doc': 'application/msword',
'.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
'.xls': 'application/vnd.ms-excel',
'.xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
'.ppt': 'application/vnd.ms-powerpoint',
'.pptx': 'application/vnd.openxmlformats-officedocument.presentationml.presentation',
}
return mime_types.get(ext, 'application/octet-stream')
# 싱글톤 인스턴스 생성
minio_client = MinioClient()
.env
파일에 MinIO 관련 설정을 추가:
# MinIO 설정
MINIO_ENDPOINT=minio-service.robi-g.svc.cluster.local:9000
MINIO_ACCESS_KEY=minioadmin
MINIO_SECRET_KEY=minioadmin
MINIO_SECURE=false
MINIO_DEFAULT_BUCKET=py-runner
MINIO_REGION=us-east-1
configs/config.py
파일에 MinIO 관련 설정 추가:
import os
from dotenv import load_dotenv
load_dotenv(".env")
class Config:
def __init__(self):
self.swagger_agent_server_url = os.getenv("SWAGGER_AGENT_SERVER_URL")
self.swagger_sandbox_server_url = os.getenv("SWAGGER_SANDBOX_SERVER_URL")
self.swagger_dev_server_url = os.getenv("SWAGGER_DEV_SERVER_URL")
self.otlp_endpoint = os.getenv("TRACING_AGENT_URL")
self.app_name_prefix = os.getenv("APP_NAME_PREFIX")
self.agent_app_name = os.getenv("AGENT_APP_NAME")
self.sandbox_prod_app_name = os.getenv("SANDBOX_PROD_APP_NAME")
self.sandbox_dev_app_name = os.getenv("SANDBOX_DEV_APP_NAME")
self.athena_url = os.getenv("ATHENA_URL")
self.lexihub_url = os.getenv("LEXIHUB_URL")
self.athena_timeout = int(os.getenv("ATHENA_TIMEOUT", 600))
self.lexi_hub_timeout = int(os.getenv("LEXI_HUB_TIMEOUT", 600))
self.lexi_hub_max_tokens = int(os.getenv("LEXI_HUB_MAX_TOKENS", 4000))
self.lexi_hub_system_prompt = os.getenv("LEXI_HUB_SYSTEM_PROMPT")
self.lexi_hub_user_prompt = os.getenv("LEXI_HUB_USER_PROMPT")
self.lexi_hub_model_name = os.getenv("LEXI_HUB_MODEL_NAME")
# MinIO 설정 추가
self.minio_endpoint = os.getenv("MINIO_ENDPOINT", "localhost:9000")
self.minio_access_key = os.getenv("MINIO_ACCESS_KEY", "minioadmin")
self.minio_secret_key = os.getenv("MINIO_SECRET_KEY", "minioadmin")
self.minio_secure = os.getenv("MINIO_SECURE", "false")
self.minio_default_bucket = os.getenv("MINIO_DEFAULT_BUCKET", "py-runner")
self.minio_region = os.getenv("MINIO_REGION", "us-east-1")
utils/__init__.py
파일을 수정:
from .request import post_one, post_all, get_one, get_all
from .redis import r, set_data, get_data, delete_data
from .debugger import sse_log
from .memory_profiling import start_memory_profiling, stop_memory_profiling, profile_memory, log_memory_snapshot
from .agent_helper import (
generate_model_from_schema,
generate_model_code,
)
from .logging_middleware import APILoggingMiddleware, configure_api_logging
from .minio_client import minio_client
기능 요약:
- MinIO 클라이언트 초기화 및 버킷 관리
- 로컬 파일 및 파일 객체 업로드
- 파일 다운로드
- 파일 삭제
- 임시 및 영구 URL 생성 (파일 호스팅)
- 파일 목록 조회
- MIME 타입 자동 감지
Member discussion