본문 바로가기
AI 기술

GCP를 활용한 데이터 파이프라인 구축 및 배포: BigQuery에서 머신러닝 모델 배포까지

by Ricolacola 2024. 7. 25.
반응형

현대 데이터 중심 기업에서는 데이터를 효과적으로 수집, 처리, 분석 및 배포하는 능력이 성공의 핵심 요소입니다. 이를 위해 안정적이고 확장 가능한 데이터 파이프라인을 구축하는 것은 매우 중요합니다. 이 블로그에서는 Google Cloud Platform(GCP)을 활용하여 데이터 파이프라인을 설계하고, BigQuery를 통해 데이터를 쿼리하고, Python을 사용해 모델링하며, 최종적으로 필요한 서버에 모델을 배포하는 전체 과정을 다룹니다. 이 과정은 다음과 같은 단계로 구성됩니다:

파이프라인 구축 및 배포

  1. BigQuery를 사용한 데이터 쿼리
  2. Python을 이용한 모델링
  3. 모델 배포를 위한 환경 설정 및 CI/CD
  4. 서버에서 모델 호출 및 활용

아래 글에서 필요한 도구와 방법들을 제시하여 독자가 GCP를 통해 데이터 파이프라인을 구축하는 데 필요한 지식을 제공합니다.

1. BigQuery를 사용한 데이터 쿼리

Google BigQuery는 빠르고 확장 가능한 서버리스 데이터 웨어하우스입니다. 대규모 데이터 세트를 분석하고 쿼리하는 데 최적화되어 있으며, SQL을 사용하여 데이터를 쉽게 쿼리할 수 있습니다. BigQuery를 사용하여 데이터를 쿼리하는 단계는 다음과 같습니다.

데이터 로드 및 쿼리

데이터 로드: 데이터 파일(CSV, JSON 등)을 BigQuery 테이블에 로드합니다. 이를 위해 GCP 콘솔이나 bq 커맨드 라인을 사용할 수 있습니다.

bq load --source_format=CSV my_dataset.my_table gs://my_bucket/my_data.csv

 

 

데이터 쿼리: BigQuery 콘솔 또는 Python 클라이언트를 사용하여 SQL 쿼리를 실행합니다.

from google.cloud import bigquery

client = bigquery.Client()
query = """
SELECT *
FROM `my_dataset.my_table`
WHERE condition = TRUE
"""
query_job = client.query(query)
results = query_job.result()
for row in results:
    print(row)

 

이 단계에서는 데이터를 쿼리하고, 필요한 전처리를 수행하여 모델링 단계로 넘길 데이터를 준비합니다.

스케줄링 및 자동화

반복적으로 실행해야 하는 쿼리의 경우, BigQuery의 스케줄링 기능을 활용할 수 있습니다. 이를 통해 정기적인 데이터 업데이트나 리포트 생성을 자동화할 수 있습니다.

비용 관리

BigQuery는 쿼리당 비용이 발생하므로, 비용 관리에 주의를 기울여야 합니다. 쿼리 실행 전 '쿼리 확인' 기능을 활용하여 예상 처리량을 확인하고, 필요에 따라 쿼리를 최적화하세요.

2. Python을 이용한 모델링

데이터가 준비되면, Python을 사용하여 머신러닝 모델을 개발할 수 있습니다. 여기서는 TensorFlow를 사용한 예를 들어 설명합니다.

파이프라인 구축 및 배포

데이터 전처리

모델링을 위해 데이터를 적절히 전처리합니다. 이는 결측값 처리, 데이터 정규화, 카테고리 변환 등을 포함합니다.

import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

# 데이터 로드
data = pd.read_csv('path/to/data.csv')

# 결측값 처리
data.fillna(method='ffill', inplace=True)

# 데이터 정규화
scaler = StandardScaler()
data_scaled = scaler.fit_transform(data)

# 데이터 분할
X = data_scaled[:, :-1]
y = data_scaled[:, -1]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

모델 선택 및 학습

scikit-learn 라이브러리를 사용하여 다양한 머신러닝 모델을 쉽게 구현하고 학습시킬 수 있습니다.

from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report

# 데이터 분할
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 모델 학습
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)

# 예측 및 평가
y_pred = model.predict(X_test)
print(accuracy_score(y_test, y_pred))
print(classification_report(y_test, y_pred))

모델 훈련

TensorFlow를 사용하여 간단한 회귀 모델을 훈련합니다.

import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense

# 모델 정의
model = Sequential([
    Dense(64, activation='relu', input_shape=(X_train.shape[1],)),
    Dense(32, activation='relu'),
    Dense(1)
])

# 모델 컴파일
model.compile(optimizer='adam', loss='mse')

# 모델 훈련
history = model.fit(X_train, y_train, epochs=50, validation_split=0.2)

3. 모델 배포를 위한 환경 설정 및 CI/CD

모델을 배포하려면 CI/CD 파이프라인을 설정하여 모델을 자동으로 빌드하고 배포할 수 있습니다. 여기서는 Google Cloud Build와 Cloud Run을 사용한 예를 들어 설명합니다.

Docker를 사용한 모델 서빙

모델을 서빙하기 위해 Docker 이미지를 생성합니다.

# Dockerfile
FROM python:3.8-slim

WORKDIR /app

COPY . /app

RUN pip install -r requirements.txt

CMD ["python", "serve_model.py"]

Cloud Build 설정

cloudbuild.yaml 파일을 생성하여 Cloud Build 설정을 정의합니다.

steps:
  - name: 'gcr.io/cloud-builders/docker'
    args: ['build', '-t', 'gcr.io/$PROJECT_ID/my_model', '.']
  - name: 'gcr.io/cloud-builders/docker'
    args: ['push', 'gcr.io/$PROJECT_ID/my_model']

images:
  - 'gcr.io/$PROJECT_ID/my_model'

Cloud Run에 배포

Cloud Build를 트리거하여 이미지를 빌드하고, Cloud Run에 배포합니다.

gcloud builds submit --config cloudbuild.yaml
gcloud run deploy my-model-service --image gcr.io/$PROJECT_ID/my_model --platform managed

클라우드 서비스 활용

Google Cloud Platform의 AI Platform이나 AWS의 SageMaker와 같은 클라우드 서비스를 활용하면 손쉽게 모델을 배포하고 관리할 수 있습니다.

API 개발

Flask나 FastAPI를 이용하여 모델을 API로 개발하면, 다양한 클라이언트에서 쉽게 모델을 호출할 수 있습니다.

from flask import Flask, request, jsonify
import joblib

app = Flask(__name__)
model = joblib.load('model.joblib')

@app.route('/predict', methods=['POST'])
def predict():
    data = request.json
    prediction = model.predict(data['features'])
    return jsonify({'prediction': prediction.tolist()})

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8080)
4. 서버에서 모델 호출 및 활용

배포된 모델을 서버에서 호출하여 예측을 수행할 수 있습니다.

import requests

url = 'https://my-model-service-<region>.a.run.app/predict'
data = {'input': [1.2, 3.4, 5.6]}  # 예시 입력 데이터
response = requests.post(url, json=data)
print(response.json())

5. 파이프라인 통합 및 자동화

지금까지 살펴본 각 단계를 하나의 통합된 파이프라인으로 구성하고 자동화하는 방법을 알아보겠습니다.

Apache Airflow 활용

Apache Airflow는 복잡한 데이터 파이프라인을 쉽게 구성하고 스케줄링할 수 있게 해주는 강력한 도구입니다.

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'data_pipeline',
    default_args=default_args,
    description='A data pipeline DAG',
    schedule_interval=timedelta(days=1),
)

def extract_data():
    # BigQuery에서 데이터 추출
    pass

def process_data():
    # Python을 이용한 데이터 처리 및 모델링
    pass

def deploy_model():
    # 모델 배포
    pass

t1 = PythonOperator(
    task_id='extract_data',
    python_callable=extract_data,
    dag=dag,
)

t2 = PythonOperator(
    task_id='process_data',
    python_callable=process_data,
    dag=dag,
)

t3 = PythonOperator(
    task_id='deploy_model',
    python_callable=deploy_model,
    dag=dag,
)

t1 >> t2 >> t3

 

또한, Jenkins, GitLab CI, 또는 GitHub Actions와 같은 CI/CD 도구를 활용하여 코드 변경사항이 있을 때마다 자동으로 테스트, 빌드, 배포가 이루어지도록 설정할 수 있습니다.

 

이 블로그를 통해 GCP를 활용하여 데이터 파이프라인을 구축하고 배포하는 방법을 이해하고, 실제 프로젝트에 적용할 수 있기를 바랍니다. 데이터 쿼리부터 모델 배포까지의 전체 과정을 잘 이해하면, 데이터 중심 애플리케이션을 효과적으로 개발하고 운영할 수 있을 것입니다.

 

생성형AI 관련 글들은 아래 포스팅을 확인해주세요

 

BEiT: 이미지 변환기를 위한 BERT 사전 학습

BEiT란 무엇인가?BEiT는 Bidirectional Encoder Representation from Image Transformers의 약자로, 이미지 변환기를 위한 자가 지도 학습 비전 표현 모델입니다. 이 모델은 자연어 처리에서 사용되는 BERT 모델에서

contentstailor.com

 

 

파인튜닝을 위한 IP-Adapter 활용: Stable Diffusion 개선하기

Stable Diffusion 모델은 텍스트를 이미지로 변환하는 강력한 도구입니다. 그러나 모델의 성능을 극대화하기 위해서는 추가적인 이미지 프롬프트와 같은 방법들이 필요할 수 있습니다. 이때 IP-Adapter

contentstailor.com