• 프로필사진
    Home
  • Portfolio
  • github [#ffffff] Created with Sketch.
    Github
  • linkedin
    Linkedin
  • Setting
  • Posting
    • 분류 전체보기 (105)
      • Aws (96)
      • Backend (3)
      • Extension (1)
  • 방문자 수
    • 전체:
    • 오늘:
    • 어제:
  • 최근 댓글
      등록된 댓글이 없습니다.
    • 최근 공지
        등록된 공지가 없습니다.
      # Home
      # 공지사항
      #
      # 태그
      # 검색결과
      # 방명록
      • Kinesis Firehose & Lambda를 이용한 레코드 변환
        2024년 12월 24일
        • wngnl05
        • 작성자
        • 2024.12.24.:43

        Stream에 데이터 전송

        json_data='{"name": "kayaya", "age": 17}'
        aws kinesis put-record \
          --stream-name <스트림 이름> \
          --data "$(echo -n $json_data | base64)" \
          --partition-key "<내용>"

         

        Lambda 코드

        더보기
        import base64
        import json
        from datetime import datetime, timedelta
        
        
        
        def lambda_handler(event, context):
            # 현재 날짜&시간
            now = datetime.today()
            now = now + timedelta(hours=9)
            year = now.year
            month = now.month
            day = now.day
            hour = now.hour
            minute = now.minute
            date = f"{year} {month}/{day} {hour}:{minute}"
            # 현재 날짜&시간
            
            output = []
            for record in event['records']:
                message = json.loads(base64.b64decode(record['data']))
                # 반환할 데이터
                return_data = {
                        'date': date,
                        'my_name': message['name'],
                        'my_age': message['age']
                    }
                # JSON 데이터를 bytes로 변환하여 Base64 인코딩
                return_data = base64.b64encode(json.dumps(return_data).encode('utf-8')).decode('utf-8')
                output_record = {
                        'recordId': record['recordId'],
                        'result': 'Ok',
                        'data': return_data
                }
                output.append(output_record)
        
            return {'records': output}

         

        return_data에서 설정한 JSON 데이터를 Firehose로 반환합니다.

        message['name']은 Stream으로 보낸 JSON 데이터에서 name의 값을 의미합니다.

         

        위의 코드에서 result_data에 "date"라는 값을 추가해서 현재날짜를 작성했습니다

         

        Kinesis Firehose

        Firehose를 생성하고

        "구성" - "변환 및 레코드 변환" - "AWS Lambda를 사용하여 소스 레코드 변형"을 활성화 합니다.

        활성화 하고 Lambda를 설정해줍니다.

         

        Lambda를 연결할때 Lambda의 설정도 변경해줍니다.

        람다의 "구성" - "일반구성"에서 제한시간은 넉넉하게 설정해주세요 [ 약 1분 ]

         

         

        이제 KinesisStream에 Json데이터를 Base64로 전송하면

        Firehose를 통해서 Lambda로 이동하고 Lambda에서 데이터를 수정하고 

        수정된 데이터를 S3로 저장됩니다.

         

         

        2023.09.28 - [Aws] - Kinesis Firehose를 이용해서 S3에 Parquet확장자로 파일 저장하기

        만약 S3에 저장된 파일의 확장자를 설정하고 싶다면 위의 포스트를 참고하세요.

         

         

         

         

        저작자표시 비영리 변경금지 (새창열림)

        'Aws' 카테고리의 다른 글

        EC2에서 RDS 접속  (0) 2024.12.24
        Terraform 설치하는 방법 및 기본 코드  (0) 2024.12.24
        Codebuild로 ECR 업로드 하기  (1) 2024.12.24
        CodeCommit  (0) 2024.12.24
        Kinesis Firehose를 이용해서 S3에 Parquet확장자로 파일 저장하기  (0) 2024.12.24
        다음글
        다음 글이 없습니다.
        이전글
        이전 글이 없습니다.
        댓글
      조회된 결과가 없습니다.
      스킨 업데이트 안내
      현재 이용하고 계신 스킨의 버전보다 더 높은 최신 버전이 감지 되었습니다. 최신버전 스킨 파일을 다운로드 받을 수 있는 페이지로 이동하시겠습니까?
      ("아니오" 를 선택할 시 30일 동안 최신 버전이 감지되어도 모달 창이 표시되지 않습니다.)
      목차
      표시할 목차가 없습니다.
        • 안녕하세요
        • 감사해요
        • 잘있어요

        티스토리툴바