Aws
Lambda를 이용한 Kinesis firehose 레코드 변환
wngnl05
2024. 12. 24. 14:51
Stream에 데이터 전송
json_data='{"name": "wngnl", "age": 18}'
aws kinesis put-record \
--stream-name <스트림 이름> \
--data "$(echo -n $json_data | base64)" \
--partition-key "<내용>"
Lambda 코드
더보기
import base64
import json
from datetime import datetime, timedelta
def lambda_handler(event, context):
# 현재 날짜&시간
now = datetime.today()
now = now + timedelta(hours=9)
year = now.year
month = now.month
day = now.day
hour = now.hour
minute = now.minute
date = f"{year} {month}/{day} {hour}:{minute}"
# 현재 날짜&시간
output = []
for record in event['records']:
message = json.loads(base64.b64decode(record['data']))
# 반환할 데이터
return_data = {
'date': date,
'my_name': message['name'],
'my_age': message['age']
}
# JSON 데이터를 bytes로 변환하여 Base64 인코딩
return_data = base64.b64encode(json.dumps(return_data).encode('utf-8')).decode('utf-8')
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': return_data
}
output.append(output_record)
return {'records': output}
return_data에서 설정한 JSON 데이터를 Firehose로 반환합니다.
message['name']은 Stream으로 보낸 JSON 데이터에서 name의 값을 의미합니다.
위의 코드에서 result_data에 "date"라는 값을 추가해서 현재날짜를 작성했습니다
Kinesis Firehose
Firehose를 생성하고
"구성" - "변환 및 레코드 변환" - "AWS Lambda를 사용하여 소스 레코드 변형"을 활성화 합니다.
활성화 하고 Lambda를 설정해줍니다.
Lambda를 연결할때 Lambda의 설정도 변경해줍니다.
람다의 "구성" - "일반구성"에서 제한시간은 넉넉하게 설정해주세요 [ 약 1분 ]
이제 KinesisStream에 Json데이터를 Base64로 전송하면
Firehose를 통해서 Lambda로 이동하고 Lambda에서 현재시간을 추가한 json데이터를
parquet 형식으로 변형해서 s3에 저장해줍니다.
2023.09.28 - [Aws] - Kinesis Firehose를 이용해서 S3에 Parquet확장자로 파일 저장하기
만약 S3에 저장된 파일의 확장자를 설정하고 싶다면 위의 포스트를 참고하세요.