Aws

Lambda를 이용한 Kinesis firehose 레코드 변환

wngnl05 2024. 12. 24. 14:51

Stream에 데이터 전송

json_data='{"name": "wngnl", "age": 18}'
aws kinesis put-record \
  --stream-name <스트림 이름> \
  --data "$(echo -n $json_data | base64)" \
  --partition-key "<내용>"

 

Lambda 코드

더보기
import base64
import json
from datetime import datetime, timedelta



def lambda_handler(event, context):
    # 현재 날짜&시간
    now = datetime.today()
    now = now + timedelta(hours=9)
    year = now.year
    month = now.month
    day = now.day
    hour = now.hour
    minute = now.minute
    date = f"{year} {month}/{day} {hour}:{minute}"
    # 현재 날짜&시간
    
    output = []
    for record in event['records']:
        message = json.loads(base64.b64decode(record['data']))
        # 반환할 데이터
        return_data = {
                'date': date,
                'my_name': message['name'],
                'my_age': message['age']
            }
        # JSON 데이터를 bytes로 변환하여 Base64 인코딩
        return_data = base64.b64encode(json.dumps(return_data).encode('utf-8')).decode('utf-8')
        output_record = {
                'recordId': record['recordId'],
                'result': 'Ok',
                'data': return_data
        }
        output.append(output_record)

    return {'records': output}

 

return_data에서 설정한 JSON 데이터를 Firehose로 반환합니다.

message['name']은 Stream으로 보낸 JSON 데이터에서 name의 값을 의미합니다.

 

위의 코드에서 result_data에 "date"라는 값을 추가해서 현재날짜를 작성했습니다

 

Kinesis Firehose

Firehose를 생성하고

"구성" - "변환 및 레코드 변환" - "AWS Lambda를 사용하여 소스 레코드 변형"을 활성화 합니다.

활성화 하고 Lambda를 설정해줍니다.

 

Lambda를 연결할때 Lambda의 설정도 변경해줍니다.

람다의 "구성" - "일반구성"에서 제한시간은 넉넉하게 설정해주세요 [ 약 1분 ]

 

이제 KinesisStream에 Json데이터를 Base64로 전송하면

Firehose를 통해서 Lambda로 이동하고 Lambda에서 현재시간을 추가한 json데이터를

parquet 형식으로 변형해서 s3에 저장해줍니다.

 

2023.09.28 - [Aws] - Kinesis Firehose를 이용해서 S3에 Parquet확장자로 파일 저장하기

만약 S3에 저장된 파일의 확장자를 설정하고 싶다면 위의 포스트를 참고하세요.