- Lambda를 이용한 Kinesis firehose 레코드 변환2024년 12월 24일
- wngnl05
- 작성자
- 2024.12.24.오후02:51
Stream에 데이터 전송
json_data='{"name": "wngnl", "age": 18}' aws kinesis put-record \ --stream-name <스트림 이름> \ --data "$(echo -n $json_data | base64)" \ --partition-key "<내용>"
Lambda 코드
더보기import base64 import json from datetime import datetime, timedelta def lambda_handler(event, context): # 현재 날짜&시간 now = datetime.today() now = now + timedelta(hours=9) year = now.year month = now.month day = now.day hour = now.hour minute = now.minute date = f"{year} {month}/{day} {hour}:{minute}" # 현재 날짜&시간 output = [] for record in event['records']: message = json.loads(base64.b64decode(record['data'])) # 반환할 데이터 return_data = { 'date': date, 'my_name': message['name'], 'my_age': message['age'] } # JSON 데이터를 bytes로 변환하여 Base64 인코딩 return_data = base64.b64encode(json.dumps(return_data).encode('utf-8')).decode('utf-8') output_record = { 'recordId': record['recordId'], 'result': 'Ok', 'data': return_data } output.append(output_record) return {'records': output}
return_data에서 설정한 JSON 데이터를 Firehose로 반환합니다.
message['name']은 Stream으로 보낸 JSON 데이터에서 name의 값을 의미합니다.
위의 코드에서 result_data에 "date"라는 값을 추가해서 현재날짜를 작성했습니다
Kinesis Firehose
Firehose를 생성하고
"구성" - "변환 및 레코드 변환" - "AWS Lambda를 사용하여 소스 레코드 변형"을 활성화 합니다.
활성화 하고 Lambda를 설정해줍니다.
Lambda를 연결할때 Lambda의 설정도 변경해줍니다.
람다의 "구성" - "일반구성"에서 제한시간은 넉넉하게 설정해주세요 [ 약 1분 ]
이제 KinesisStream에 Json데이터를 Base64로 전송하면
Firehose를 통해서 Lambda로 이동하고 Lambda에서 현재시간을 추가한 json데이터를
parquet 형식으로 변형해서 s3에 저장해줍니다.
2023.09.28 - [Aws] - Kinesis Firehose를 이용해서 S3에 Parquet확장자로 파일 저장하기
만약 S3에 저장된 파일의 확장자를 설정하고 싶다면 위의 포스트를 참고하세요.
'Aws' 카테고리의 다른 글
EKS - Grafana (0) 2024.12.24 Lambda Revoke (0) 2024.12.24 Lambda를 대상으로 한 ALB 생성하기 (0) 2024.12.24 EC2에서 SQS에 메세지 전송하는 코드 (1) 2024.12.24 EC2에 로그인 시 특정 명령어 실행하는 방법 (0) 2024.12.24 다음글이전글이전 글이 없습니다.댓글
스킨 업데이트 안내
현재 이용하고 계신 스킨의 버전보다 더 높은 최신 버전이 감지 되었습니다. 최신버전 스킨 파일을 다운로드 받을 수 있는 페이지로 이동하시겠습니까?
("아니오" 를 선택할 시 30일 동안 최신 버전이 감지되어도 모달 창이 표시되지 않습니다.)