현대 데이터 중심 기업에서는 데이터를 효과적으로 수집, 처리, 분석 및 배포하는 능력이 성공의 핵심 요소입니다. 이를 위해 안정적이고 확장 가능한 데이터 파이프라인을 구축하는 것은 매우 중요합니다. 이 블로그에서는 Google Cloud Platform(GCP)을 활용하여 데이터 파이프라인을 설계하고, BigQuery를 통해 데이터를 쿼리하고, Python을 사용해 모델링하며, 최종적으로 필요한 서버에 모델을 배포하는 전체 과정을 다룹니다. 이 과정은 다음과 같은 단계로 구성됩니다:
- BigQuery를 사용한 데이터 쿼리
- Python을 이용한 모델링
- 모델 배포를 위한 환경 설정 및 CI/CD
- 서버에서 모델 호출 및 활용
아래 글에서 필요한 도구와 방법들을 제시하여 독자가 GCP를 통해 데이터 파이프라인을 구축하는 데 필요한 지식을 제공합니다.
1. BigQuery를 사용한 데이터 쿼리
Google BigQuery는 빠르고 확장 가능한 서버리스 데이터 웨어하우스입니다. 대규모 데이터 세트를 분석하고 쿼리하는 데 최적화되어 있으며, SQL을 사용하여 데이터를 쉽게 쿼리할 수 있습니다. BigQuery를 사용하여 데이터를 쿼리하는 단계는 다음과 같습니다.
데이터 로드 및 쿼리
데이터 로드: 데이터 파일(CSV, JSON 등)을 BigQuery 테이블에 로드합니다. 이를 위해 GCP 콘솔이나 bq 커맨드 라인을 사용할 수 있습니다.
bq load --source_format=CSV my_dataset.my_table gs://my_bucket/my_data.csv
데이터 쿼리: BigQuery 콘솔 또는 Python 클라이언트를 사용하여 SQL 쿼리를 실행합니다.
from google.cloud import bigquery
client = bigquery.Client()
query = """
SELECT *
FROM `my_dataset.my_table`
WHERE condition = TRUE
"""
query_job = client.query(query)
results = query_job.result()
for row in results:
print(row)
이 단계에서는 데이터를 쿼리하고, 필요한 전처리를 수행하여 모델링 단계로 넘길 데이터를 준비합니다.
스케줄링 및 자동화
반복적으로 실행해야 하는 쿼리의 경우, BigQuery의 스케줄링 기능을 활용할 수 있습니다. 이를 통해 정기적인 데이터 업데이트나 리포트 생성을 자동화할 수 있습니다.
비용 관리
BigQuery는 쿼리당 비용이 발생하므로, 비용 관리에 주의를 기울여야 합니다. 쿼리 실행 전 '쿼리 확인' 기능을 활용하여 예상 처리량을 확인하고, 필요에 따라 쿼리를 최적화하세요.
2. Python을 이용한 모델링
데이터가 준비되면, Python을 사용하여 머신러닝 모델을 개발할 수 있습니다. 여기서는 TensorFlow를 사용한 예를 들어 설명합니다.
데이터 전처리
모델링을 위해 데이터를 적절히 전처리합니다. 이는 결측값 처리, 데이터 정규화, 카테고리 변환 등을 포함합니다.
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
# 데이터 로드
data = pd.read_csv('path/to/data.csv')
# 결측값 처리
data.fillna(method='ffill', inplace=True)
# 데이터 정규화
scaler = StandardScaler()
data_scaled = scaler.fit_transform(data)
# 데이터 분할
X = data_scaled[:, :-1]
y = data_scaled[:, -1]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
모델 선택 및 학습
scikit-learn 라이브러리를 사용하여 다양한 머신러닝 모델을 쉽게 구현하고 학습시킬 수 있습니다.
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report
# 데이터 분할
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 모델 학습
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
# 예측 및 평가
y_pred = model.predict(X_test)
print(accuracy_score(y_test, y_pred))
print(classification_report(y_test, y_pred))
모델 훈련
TensorFlow를 사용하여 간단한 회귀 모델을 훈련합니다.
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
# 모델 정의
model = Sequential([
Dense(64, activation='relu', input_shape=(X_train.shape[1],)),
Dense(32, activation='relu'),
Dense(1)
])
# 모델 컴파일
model.compile(optimizer='adam', loss='mse')
# 모델 훈련
history = model.fit(X_train, y_train, epochs=50, validation_split=0.2)
3. 모델 배포를 위한 환경 설정 및 CI/CD
모델을 배포하려면 CI/CD 파이프라인을 설정하여 모델을 자동으로 빌드하고 배포할 수 있습니다. 여기서는 Google Cloud Build와 Cloud Run을 사용한 예를 들어 설명합니다.
Docker를 사용한 모델 서빙
모델을 서빙하기 위해 Docker 이미지를 생성합니다.
# Dockerfile
FROM python:3.8-slim
WORKDIR /app
COPY . /app
RUN pip install -r requirements.txt
CMD ["python", "serve_model.py"]
Cloud Build 설정
cloudbuild.yaml 파일을 생성하여 Cloud Build 설정을 정의합니다.
steps:
- name: 'gcr.io/cloud-builders/docker'
args: ['build', '-t', 'gcr.io/$PROJECT_ID/my_model', '.']
- name: 'gcr.io/cloud-builders/docker'
args: ['push', 'gcr.io/$PROJECT_ID/my_model']
images:
- 'gcr.io/$PROJECT_ID/my_model'
Cloud Run에 배포
Cloud Build를 트리거하여 이미지를 빌드하고, Cloud Run에 배포합니다.
gcloud builds submit --config cloudbuild.yaml
gcloud run deploy my-model-service --image gcr.io/$PROJECT_ID/my_model --platform managed
클라우드 서비스 활용
Google Cloud Platform의 AI Platform이나 AWS의 SageMaker와 같은 클라우드 서비스를 활용하면 손쉽게 모델을 배포하고 관리할 수 있습니다.
API 개발
Flask나 FastAPI를 이용하여 모델을 API로 개발하면, 다양한 클라이언트에서 쉽게 모델을 호출할 수 있습니다.
from flask import Flask, request, jsonify
import joblib
app = Flask(__name__)
model = joblib.load('model.joblib')
@app.route('/predict', methods=['POST'])
def predict():
data = request.json
prediction = model.predict(data['features'])
return jsonify({'prediction': prediction.tolist()})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080)
배포된 모델을 서버에서 호출하여 예측을 수행할 수 있습니다.
import requests
url = 'https://my-model-service-<region>.a.run.app/predict'
data = {'input': [1.2, 3.4, 5.6]} # 예시 입력 데이터
response = requests.post(url, json=data)
print(response.json())
5. 파이프라인 통합 및 자동화
지금까지 살펴본 각 단계를 하나의 통합된 파이프라인으로 구성하고 자동화하는 방법을 알아보겠습니다.
Apache Airflow 활용
Apache Airflow는 복잡한 데이터 파이프라인을 쉽게 구성하고 스케줄링할 수 있게 해주는 강력한 도구입니다.
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'data_pipeline',
default_args=default_args,
description='A data pipeline DAG',
schedule_interval=timedelta(days=1),
)
def extract_data():
# BigQuery에서 데이터 추출
pass
def process_data():
# Python을 이용한 데이터 처리 및 모델링
pass
def deploy_model():
# 모델 배포
pass
t1 = PythonOperator(
task_id='extract_data',
python_callable=extract_data,
dag=dag,
)
t2 = PythonOperator(
task_id='process_data',
python_callable=process_data,
dag=dag,
)
t3 = PythonOperator(
task_id='deploy_model',
python_callable=deploy_model,
dag=dag,
)
t1 >> t2 >> t3
또한, Jenkins, GitLab CI, 또는 GitHub Actions와 같은 CI/CD 도구를 활용하여 코드 변경사항이 있을 때마다 자동으로 테스트, 빌드, 배포가 이루어지도록 설정할 수 있습니다.
이 블로그를 통해 GCP를 활용하여 데이터 파이프라인을 구축하고 배포하는 방법을 이해하고, 실제 프로젝트에 적용할 수 있기를 바랍니다. 데이터 쿼리부터 모델 배포까지의 전체 과정을 잘 이해하면, 데이터 중심 애플리케이션을 효과적으로 개발하고 운영할 수 있을 것입니다.
생성형AI 관련 글들은 아래 포스팅을 확인해주세요
AI 모델 파이프라인 구축은 아래에서 확인하실 수 있습니다.
'AI 기술' 카테고리의 다른 글
Flux 이해하기: Stable Diffusion의 이미지 생성 기술 깊이 알아보기 (2) | 2024.11.07 |
---|---|
확산 모델(Diffusion Models) 이해하기: Hugging Face 강의를 통한 noising 가이드 (7) | 2024.09.20 |
LangChain: 혁신적인 언어 모델 프레임워크 (0) | 2024.07.19 |
Stable Diffusion XL 활용해서 고품질 이미지 제작하기 (0) | 2024.07.13 |
텍스트-투-이미지 변환: Hugging Face Diffusers 라이브러리를 사용한 실습 (0) | 2024.07.03 |