Aws
Athena 쿼리를 Lambda로 가져와서 S3버킷에 csv파일로 저장하기
wngnl05
2024. 12. 24. 13:34
Glue에서 데이터베이스를 생성해줍니다.
log_sample.json
0.09MB
sample.parquet
0.00MB
sample.csv
0.00MB
Glue를 이용해서 테이블을 생성합니다.
Glue로 테이블을 생성할때 "Data Format"은 읽어올 데이터의 유형을 의미합니다.
직접 쿼리 읽어오기 < Athena >
select * from "<데이터베이스>"."<테이블>";
-- Lambda 코드 --
import boto3
# Athena 클라이언트 생성
athena_client = boto3.client(
'athena',
region_name='<리전>',
aws_access_key_id="<액세스 키>",
aws_secret_access_key="<시크릿 키>"
)
DATABASE_NAME = "<데이터베이스>"
TABLE_NAME = "<테이블>"
s3_output_location = "<s3 폴더 경로>" # 쿼리 결과 파일 저장 위치
# 쿼리 명령어 작성
query = f'SELECT * FROM "{DATABASE_NAME}"."{TABLE_NAME}"'
response = athena_client.start_query_execution(
QueryString=query,
ResultConfiguration={"OutputLocation": s3_output_location}
)
s3_output_location에 넣은 S3의 경로로 쿼리 결과가 파일로 저장됩니다.
S3에 저장되는 파일의 이름을 정하고 싶다면
아래의 코드를 사용합니다.
// FOLDER_PATH의 뒤에 /를 붙이지 않아야 오류가 발생하지 않습니다.
import datetime
import json
import boto3
import time
def lambda_handler(event, context):
print(f"event = {event}")
BUCKET_NAME = "<버킷 이름>"
FOLDER_PATH = "<폴더 경로>"
NEW_FILE_NAME = "<파일 이름>.csv"
athena_client = boto3.client(
'athena',
region_name='ap-northeast-2'
)
DATABASE_NAME = "my_db"
TABLE_NAME = "my_table"
s3_output_location = f"s3://{BUCKET_NAME}/{FOLDER_PATH}/" # 쿼리 결과 파일 저장 위치
# 쿼리 명령어 작성
query = f'SELECT * FROM "{DATABASE_NAME}"."{TABLE_NAME}"'
# 쿼리 실행
response = athena_client.start_query_execution(
QueryString=query,
ResultConfiguration={"OutputLocation": s3_output_location}
)
time.sleep(2)
# Athena 쿼리 ID를 가져옴
query_id = response['QueryExecutionId']
s3_client = boto3.client('s3')
# CSV파일 복사해서 이름 변경하기
s3_client.copy_object(
CopySource={'Bucket': f"{BUCKET_NAME}", 'Key': f"{FOLDER_PATH}/{query_id}.csv"},
Bucket=f"{BUCKET_NAME}",
Key=f"{FOLDER_PATH}/{NEW_FILE_NAME}"
)
# 기존 csv 파일 삭제
s3_client.delete_object(
Bucket=f"{BUCKET_NAME}",
Key=f"{FOLDER_PATH}/{query_id}.csv"
)
# 기존 metadata 파일 삭제
s3_client.delete_object(
Bucket=f"{BUCKET_NAME}",
Key=f"{FOLDER_PATH}/{query_id}.csv.metadata"
)