Aws

Athena 쿼리를 Lambda로 가져와서 S3버킷에 csv파일로 저장하기

wngnl05 2024. 12. 24. 13:34

기초 요약

명령어 모음

 

 

Glue에서 데이터베이스를 생성해줍니다.

log_sample.json
0.09MB
sample.parquet
0.00MB
sample.csv
0.00MB

 

Glue를 이용해서 테이블을 생성합니다.

Glue로 테이블을 생성할때 "Data Format"은 읽어올 데이터의 유형을 의미합니다.

 

직접 쿼리 읽어오기 < Athena >

select * from "<데이터베이스>"."<테이블>";

 

-- Lambda 코드 --

import boto3

# Athena 클라이언트 생성
athena_client = boto3.client(
    'athena',
    region_name='<리전>',
    aws_access_key_id="<액세스 키>",
    aws_secret_access_key="<시크릿 키>"
)

DATABASE_NAME = "<데이터베이스>"
TABLE_NAME = "<테이블>"
s3_output_location = "<s3 폴더 경로>" # 쿼리 결과 파일 저장 위치

# 쿼리 명령어 작성
query = f'SELECT * FROM "{DATABASE_NAME}"."{TABLE_NAME}"'
response = athena_client.start_query_execution(
    QueryString=query,
    ResultConfiguration={"OutputLocation": s3_output_location}
)

 

s3_output_location에 넣은 S3의 경로로 쿼리 결과가 파일로 저장됩니다.

 

 

 

 

S3에 저장되는 파일의 이름을 정하고 싶다면

아래의 코드를 사용합니다.

// FOLDER_PATH의 뒤에 /를 붙이지 않아야 오류가 발생하지 않습니다.

import datetime
import json
import boto3
import time

def lambda_handler(event, context):
    print(f"event = {event}")
    BUCKET_NAME = "<버킷 이름>"
    FOLDER_PATH = "<폴더 경로>"
    NEW_FILE_NAME = "<파일 이름>.csv"
    
    
    athena_client = boto3.client(
        'athena',
        region_name='ap-northeast-2'
    )

    DATABASE_NAME = "my_db"
    TABLE_NAME = "my_table"
    s3_output_location = f"s3://{BUCKET_NAME}/{FOLDER_PATH}/"  # 쿼리 결과 파일 저장 위치
    
    # 쿼리 명령어 작성
    query = f'SELECT * FROM "{DATABASE_NAME}"."{TABLE_NAME}"'
    # 쿼리 실행
    response = athena_client.start_query_execution(
        QueryString=query,
        ResultConfiguration={"OutputLocation": s3_output_location}
    )
    
    time.sleep(2)
    # Athena 쿼리 ID를 가져옴
    query_id = response['QueryExecutionId']
    s3_client = boto3.client('s3')
    # CSV파일 복사해서 이름 변경하기
    s3_client.copy_object(
        CopySource={'Bucket': f"{BUCKET_NAME}", 'Key': f"{FOLDER_PATH}/{query_id}.csv"},
        Bucket=f"{BUCKET_NAME}",
        Key=f"{FOLDER_PATH}/{NEW_FILE_NAME}"
    )
    # 기존 csv 파일 삭제
    s3_client.delete_object(
        Bucket=f"{BUCKET_NAME}",
        Key=f"{FOLDER_PATH}/{query_id}.csv"
    )
    # 기존 metadata 파일 삭제
    s3_client.delete_object(
        Bucket=f"{BUCKET_NAME}",
        Key=f"{FOLDER_PATH}/{query_id}.csv.metadata"
    )