MENU

【分析環境構築】Amazon SageMakerとS3を連携しました

こんにちは!株式会社エイトハンドレッドの原口です。 本記事では大規模データ用の分析環境を構築する際、何を意識したかご紹介したいと思います。

対象読者

  • クラウドのサービス連携に興味がある人
  • これからクラウド上で分析環境とストレージ環境を連携する必要がある人
  • 分析におけるAWSの活用方法を知りたい人

どのような課題があったか

案件で大規模なデータを処理する必要があり(6,000万行弱)、これまで使用していたストレージ環境ではファイルサイズ上限などが原因で管理・分析がしづらい状況でした。 そのため、分析環境全体をSageMaker Notebook Instancesに移行し、データを格納するストレージとしてS3を選定しました。

しかし、今回分析を担当するメンバーはSageMaker環境を使い慣れていないことから、キャッチアップに時間がかかってしまう懸念がありました。

何をしたか

AWS環境への移行・連携に際して、メンバーが既存環境と使用感を変えずに分析できるよう、以下2つの観点に基づいて汎用関数を実装しました。

  1. 作業者がAWSのシステムを意識せずに分析できる環境を構築する
  2. 大規模データを分割せずに管理できるようにする

どの環境を利用したか

弊社ではAWSのサービスを使用し、分析作業を行っています。主に使用しているのは、Amazon SageMaker(以下SageMaker)とAmazon S3(以下S3)の2つです。 まず、SageMakerとS3がどのようなサービスか簡単にご説明します。

  • SageMakerとは
    • データアナリスト・データサイエンティストを対象とした、Jupyter Notebook/Jupyter Labを使用する機械学習特化型の開発環境
    • その中で、Python環境のみ構築できるAmazon SageMaker Notebook Instances*1を使用(以降、SageMakerはSageMaker Notebook Instancesを指す)
  • S3とは
    • 格納するファイルの形式を問わないデータレイク型のストレージ環境
    • 例)xlsx、mp4、mp3

他にAWS上で分析環境を構築する際、「Amazon SageMaker Studio」と「Amazon EC2」も使用できますが、それぞれ以下の理由で使用していません。

  • SageMaker Studioは使用しないデバッグサービスなど含む統合開発環境で、そこまでの機能は必要ない*2
  • EC2は自由度は高いが、仮想環境にコーデイング環境を構築しなければならないため、過度な負担がかかる*3

なお、以下の環境で動作させています。

SageMakerのインスタンスタイプ:ml.r5.4xlarge
jupyterlab:4.2.4
Python バージョン:3.10.15

# ライブラリ(本ブログで使用するライブラリのみ抜粋)
boto3==1.35.16
pandas==1.5.3
PyYAML==6.0.1
pyarrow==17.0.0
xlsx2csv==0.8.3

スコープ外とした作業

今回の連携では、スコープ外としている実装があります。 興味のある方は文頭の"▶"マークを押して読んでみてください。

  • SageMakerとS3のデータ連携以外の実装
    • 例えば、S3バケットから別階層への移動など、SageMaker以外の1つのサービスで完結する処理は実装しない
  • S3からSageMakerにデータを送信する際の加工
    • 例えば、「parquetファイルからSQLで指定パーティションデータに絞ったうえで、pandasデータフレームとして読み込む」といった処理は実装しない
  • SageMakerと既存のストレージ環境との連携
  • ローカル開発環境(Python/R)からのアクセスに関する処理

4点をスコープ外にした理由は「キャッチアップに必要な時間と知識量を最小限にしたい」という意図があったからです。

連携処理のステップ

連携は下記4つのステップで行います。

  1. S3からデータを取得
  2. Pandasデータフレームを作成
  3. 加工してParquet形式に変換
  4. S3に出力

図1:SageMaker_S3_連携図
※図中の〇はboto3というAWSサービスを連携するAPIを介するという意味を指しています。

実装した連携処理で意識したこと

  • 意識した設計思想
    1. S3をローカル環境のように扱える状態にすること
    2. 他の案件に流用できる分析環境にすること
    3. 顧客から1つのデータが分割されて送付されるため、元データに戻す際の作業者の手間を削減すること
    4. 処理速度向上、Rユーザーを含む共同作業者とのデータ共有の2点を可能にするデータフォーマットを使用すること


  • 設計思想を実現するために、どのような手法を用いたか
    • クラウド環境にアクセスする際の煩雑なパラメーター設定・管理負担を軽減するため、入力元フォルダ、出力先フォルダを含む環境パラメーターをYAMLで管理する(設計思想1、2)

      • ノートブック上で’{◇◇◇}’となっている部分を変換する
      • 「WORKER_NAME」に作業者の名字を英小文字で登録する
      • 「BUCKET」には、S3のトップ画面に表示される対象バケット名を登録し、プロジェクトごとに変更する
      • 「ROOT_DIR_NAME」には、「BUCKET」直下のルートディレクトリ名を登録し、案件ごとに変更する
        図2:settings.yamlファイルで定義した環境パラメーター一覧(一部抜粋)

    • パーティションで分割されたデータ、ファイル拡張子が異なる形式で共有された生データを、マスタファイルの定義に沿って、1度の関数読み出しのみで縦結合する(設計思想3)

      • 「ファイル名」:対象ファイルの拡張子を除いた部分を指定
      • 「拡張子」:対象ファイルの「.」を除いたファイル形式を指定
      • 「シート名」:読み込む対象のシート名
      • 「ヘッダー」:読み込み始める行数のオフセットを指定
ファイル名 拡張子 シート名 ヘッダー
AAA xlsx シート1 1
BBB xls シート2 2
CCC csv シート3 0
DDD csv シート4 0

使用コード(トグルを展開してください)

# データ抽出関数(ローデータ読み込み用) ※型を指定して読み込む処理に変更する予定(実装次第low_memory=Falseは削除)
def raw_data_loading_with_diff_formats(REGION_NAME, BUCKET, PREFIX_DIR_PATH, MASTER_PREFIX_DIR_PATH, MASTER_FILE_NAME):

    df_total = pd.DataFrame()
    
    if check_file_exists(REGION_NAME, BUCKET, MASTER_PREFIX_DIR_PATH, MASTER_FILE_NAME): # 該当ファイルが存在するか確認する自作関数
        
        # excel
        if ('.xlsx' or '.xls') in os.path.splitext(MASTER_FILE_NAME)[1]:
            df_master = pd.read_excel('s3://{BUCKET}/{MASTER_PREFIX_DIR_PATH}/{MASTER_FILE_NAME}'.format(
                BUCKET=BUCKET,
                MASTER_PREFIX_DIR_PATH=MASTER_PREFIX_DIR_PATH,
                MASTER_FILE_NAME=MASTER_FILE_NAME),
                dtype={'ファイル名':str, '拡張子':str, 'シート名':str, 'ヘッダー':int}
                )

        # csv
        if '.csv' in os.path.splitext(MASTER_FILE_NAME)[1]:
            df_master = pd.read_csv('s3://{BUCKET}/{MASTER_PREFIX_DIR_PATH}/{MASTER_FILE_NAME}'.format(
                BUCKET=BUCKET,
                MASTER_PREFIX_DIR_PATH=MASTER_PREFIX_DIR_PATH,
                MASTER_FILE_NAME=MASTER_FILE_NAME))       
  
        display(df_master) 
        
        for row in range(0, len(df_master)):

            FILE_NAME = '{FILE_NAME}.{EXTENSION}'.format(FILE_NAME=df_master['ファイル名'][row], EXTENSION=df_master['拡張子'][row])

            if check_file_exists(REGION_NAME, BUCKET, PREFIX_DIR_PATH, FILE_NAME):

                # excel
                if ('xlsx' or 'xls') in df_master['拡張子'][row]:
                    df = pd.read_excel('s3://{BUCKET}/{PREFIX_DIR_PATH}/{FILE_NAME}'.format(
                        BUCKET=BUCKET,
                        PREFIX_DIR_PATH=PREFIX_DIR_PATH,
                        FILE_NAME=FILE_NAME),
                        sheet_name=df_master['シート名'][row],
                        header=df_master['ヘッダー'][row]
                    )

                # csv
                if 'csv' == df_master['拡張子'][row]:
                    df = pd.read_csv('s3://{BUCKET}/{PREFIX_DIR_PATH}/{FILE_NAME}'.format(
                        BUCKET=BUCKET,
                        PREFIX_DIR_PATH=PREFIX_DIR_PATH,
                        FILE_NAME=FILE_NAME),
                        header=df_master['ヘッダー'][row],
                        low_memory=False
                        )
                        
                 # tsv
                if 'tsv' == df_master['拡張子'][row]:
                    df = pd.read_csv('s3://{BUCKET}/{PREFIX_DIR_PATH}/{FILE_NAME}'.format(
                        BUCKET=BUCKET,
                        PREFIX_DIR_PATH=PREFIX_DIR_PATH,
                        FILE_NAME=FILE_NAME),
                        header=df_master['ヘッダー'][row],
                        delimiter='\t',
                        low_memory=False
                        )
                        
                df_total = pd.concat([df_total, df], axis=0)

    return df_total


  • データの入出力時間が短く、他言語との互換性からParquet形式でデータを出力する(設計思想4)
    • 参考までに比較になります(速度は変動します)
データフォーマット 速度 他言語との互換性
csv 遅い あり
Parquet 速い(csvの1/5程度) あり
pickle 速い(csvの1/5程度) なし


▼カンマ区切り値 (CSV) ファイル形式 www.creativyst.com

▼Apache Parquet とは www.databricks.com

▼pickle --- Python オブジェクトの直列化 docs.python.org


学び

  1. マスタファイルで情報を管理する方が、保守性と可用性に優れ、他に分析案件にも流用しやすい
  2. 通信時の読み書き速度とRユーザーへの連携を考えるとParquetがよい

課題

課題として、SageMakerとS3の連携だけでは、ファイルやデータのバージョン管理が難しいことが挙げられます。

現状の実装だと、クレンジングやデータスキーマの変更が発生するたびに、データのバージョンをを上げることになります。 この場合、最新のデータをチームに知らせる必要がありますが、周知漏れが発生する可能性があります。

漏れが起きた場合、バージョンの違うソースから前提条件が合わないまま各々が分析してしまうため、 手戻りが発生します。

今後はこの課題を解決するべく検討していきたいと考えています。



最後までお読みいただきありがとうございました! この記事が少しでも役に立つと幸いです!