• 프로필사진
    Home
  • Portfolio
  • github [#ffffff] Created with Sketch.
    Github
  • linkedin
    Linkedin
  • Setting
  • Posting
    • 분류 전체보기 (105)
      • Aws (96)
      • Backend (3)
      • Extension (1)
  • 방문자 수
    • 전체:
    • 오늘:
    • 어제:
  • 최근 댓글
      등록된 댓글이 없습니다.
    • 최근 공지
        등록된 공지가 없습니다.
      # Home
      # 공지사항
      #
      # 태그
      # 검색결과
      # 방명록
      • Lambda를 이용한 Kinesis firehose 레코드 변환
        2024년 12월 24일
        • wngnl05
        • 작성자
        • 2024.12.24.오후02:51

        Stream에 데이터 전송

        json_data='{"name": "wngnl", "age": 18}'
        aws kinesis put-record \
          --stream-name <스트림 이름> \
          --data "$(echo -n $json_data | base64)" \
          --partition-key "<내용>"

         

        Lambda 코드

        더보기
        import base64
        import json
        from datetime import datetime, timedelta
        
        
        
        def lambda_handler(event, context):
            # 현재 날짜&시간
            now = datetime.today()
            now = now + timedelta(hours=9)
            year = now.year
            month = now.month
            day = now.day
            hour = now.hour
            minute = now.minute
            date = f"{year} {month}/{day} {hour}:{minute}"
            # 현재 날짜&시간
            
            output = []
            for record in event['records']:
                message = json.loads(base64.b64decode(record['data']))
                # 반환할 데이터
                return_data = {
                        'date': date,
                        'my_name': message['name'],
                        'my_age': message['age']
                    }
                # JSON 데이터를 bytes로 변환하여 Base64 인코딩
                return_data = base64.b64encode(json.dumps(return_data).encode('utf-8')).decode('utf-8')
                output_record = {
                        'recordId': record['recordId'],
                        'result': 'Ok',
                        'data': return_data
                }
                output.append(output_record)
        
            return {'records': output}

         

        return_data에서 설정한 JSON 데이터를 Firehose로 반환합니다.

        message['name']은 Stream으로 보낸 JSON 데이터에서 name의 값을 의미합니다.

         

        위의 코드에서 result_data에 "date"라는 값을 추가해서 현재날짜를 작성했습니다

         

        Kinesis Firehose

        Firehose를 생성하고

        "구성" - "변환 및 레코드 변환" - "AWS Lambda를 사용하여 소스 레코드 변형"을 활성화 합니다.

        활성화 하고 Lambda를 설정해줍니다.

         

        Lambda를 연결할때 Lambda의 설정도 변경해줍니다.

        람다의 "구성" - "일반구성"에서 제한시간은 넉넉하게 설정해주세요 [ 약 1분 ]

         

        이제 KinesisStream에 Json데이터를 Base64로 전송하면

        Firehose를 통해서 Lambda로 이동하고 Lambda에서 현재시간을 추가한 json데이터를

        parquet 형식으로 변형해서 s3에 저장해줍니다.

         

        2023.09.28 - [Aws] - Kinesis Firehose를 이용해서 S3에 Parquet확장자로 파일 저장하기

        만약 S3에 저장된 파일의 확장자를 설정하고 싶다면 위의 포스트를 참고하세요.

         

         

         

         

        저작자표시 비영리 변경금지 (새창열림)

        'Aws' 카테고리의 다른 글

        EKS - Grafana  (0) 2024.12.24
        Lambda Revoke  (0) 2024.12.24
        Lambda를 대상으로 한 ALB 생성하기  (0) 2024.12.24
        EC2에서 SQS에 메세지 전송하는 코드  (1) 2024.12.24
        EC2에 로그인 시 특정 명령어 실행하는 방법  (0) 2024.12.24
        다음글
        다음 글이 없습니다.
        이전글
        이전 글이 없습니다.
        댓글
      조회된 결과가 없습니다.
      스킨 업데이트 안내
      현재 이용하고 계신 스킨의 버전보다 더 높은 최신 버전이 감지 되었습니다. 최신버전 스킨 파일을 다운로드 받을 수 있는 페이지로 이동하시겠습니까?
      ("아니오" 를 선택할 시 30일 동안 최신 버전이 감지되어도 모달 창이 표시되지 않습니다.)
      목차
      표시할 목차가 없습니다.
      • Stream에 데이터 전송
      • Lambda 코드
      • Kinesis Firehose
      • 안녕하세요
      • 감사해요
      • 잘있어요

      티스토리툴바

      단축키

      내 블로그

      내 블로그 - 관리자 홈 전환
      Q
      Q
      새 글 쓰기
      W
      W

      블로그 게시글

      글 수정 (권한 있는 경우)
      E
      E
      댓글 영역으로 이동
      C
      C

      모든 영역

      이 페이지의 URL 복사
      S
      S
      맨 위로 이동
      T
      T
      티스토리 홈 이동
      H
      H
      단축키 안내
      Shift + /
      ⇧ + /

      * 단축키는 한글/영문 대소문자로 이용 가능하며, 티스토리 기본 도메인에서만 동작합니다.