こんにちは!株式会社エイトハンドレッドの原口です。 本記事では大規模データ用の分析環境を構築する際、何を意識したかご紹介したいと思います。
対象読者
- クラウドのサービス連携に興味がある人
- これからクラウド上で分析環境とストレージ環境を連携する必要がある人
- 分析におけるAWSの活用方法を知りたい人
どのような課題があったか
案件で大規模なデータを処理する必要があり(6,000万行弱)、これまで使用していたストレージ環境ではファイルサイズ上限などが原因で管理・分析がしづらい状況でした。 そのため、分析環境全体をSageMaker Notebook Instancesに移行し、データを格納するストレージとしてS3を選定しました。
しかし、今回分析を担当するメンバーはSageMaker環境を使い慣れていないことから、キャッチアップに時間がかかってしまう懸念がありました。
何をしたか
AWS環境への移行・連携に際して、メンバーが既存環境と使用感を変えずに分析できるよう、以下2つの観点に基づいて汎用関数を実装しました。
- 作業者がAWSのシステムを意識せずに分析できる環境を構築する
- 大規模データを分割せずに管理できるようにする
どの環境を利用したか
弊社ではAWSのサービスを使用し、分析作業を行っています。主に使用しているのは、Amazon SageMaker(以下SageMaker)とAmazon S3(以下S3)の2つです。 まず、SageMakerとS3がどのようなサービスか簡単にご説明します。
- SageMakerとは
- データアナリスト・データサイエンティストを対象とした、Jupyter Notebook/Jupyter Labを使用する機械学習特化型の開発環境
- その中で、Python環境のみ構築できるAmazon SageMaker Notebook Instances*1を使用(以降、SageMakerはSageMaker Notebook Instancesを指す)
- S3とは
- 格納するファイルの形式を問わないデータレイク型のストレージ環境
- 例)xlsx、mp4、mp3
他にAWS上で分析環境を構築する際、「Amazon SageMaker Studio」と「Amazon EC2」も使用できますが、それぞれ以下の理由で使用していません。
- SageMaker Studioは使用しないデバッグサービスなど含む統合開発環境で、そこまでの機能は必要ない*2
- EC2は自由度は高いが、仮想環境にコーデイング環境を構築しなければならないため、過度な負担がかかる*3
なお、以下の環境で動作させています。
SageMakerのインスタンスタイプ:ml.r5.4xlarge jupyterlab:4.2.4 Python バージョン:3.10.15 # ライブラリ(本ブログで使用するライブラリのみ抜粋) boto3==1.35.16 pandas==1.5.3 PyYAML==6.0.1 pyarrow==17.0.0 xlsx2csv==0.8.3
スコープ外とした作業
今回の連携では、スコープ外としている実装があります。 興味のある方は文頭の"▶"マークを押して読んでみてください。
- SageMakerとS3のデータ連携以外の実装
- 例えば、S3バケットから別階層への移動など、SageMaker以外の1つのサービスで完結する処理は実装しない
- S3からSageMakerにデータを送信する際の加工
- 例えば、「parquetファイルからSQLで指定パーティションデータに絞ったうえで、pandasデータフレームとして読み込む」といった処理は実装しない
- SageMakerと既存のストレージ環境との連携
- ローカル開発環境(Python/R)からのアクセスに関する処理
4点をスコープ外にした理由は「キャッチアップに必要な時間と知識量を最小限にしたい」という意図があったからです。
連携処理のステップ
連携は下記4つのステップで行います。
- S3からデータを取得
- Pandasデータフレームを作成
- 加工してParquet形式に変換
- S3に出力
※図中の〇はboto3というAWSサービスを連携するAPIを介するという意味を指しています。
実装した連携処理で意識したこと
- 意識した設計思想
- S3をローカル環境のように扱える状態にすること
- 他の案件に流用できる分析環境にすること
- 顧客から1つのデータが分割されて送付されるため、元データに戻す際の作業者の手間を削減すること
- 処理速度向上、Rユーザーを含む共同作業者とのデータ共有の2点を可能にするデータフォーマットを使用すること
- 設計思想を実現するために、どのような手法を用いたか
クラウド環境にアクセスする際の煩雑なパラメーター設定・管理負担を軽減するため、入力元フォルダ、出力先フォルダを含む環境パラメーターをYAMLで管理する(設計思想1、2)
- ノートブック上で’{◇◇◇}’となっている部分を変換する
- 「WORKER_NAME」に作業者の名字を英小文字で登録する
- 「BUCKET」には、S3のトップ画面に表示される対象バケット名を登録し、プロジェクトごとに変更する
- 「ROOT_DIR_NAME」には、「BUCKET」直下のルートディレクトリ名を登録し、案件ごとに変更する
パーティションで分割されたデータ、ファイル拡張子が異なる形式で共有された生データを、マスタファイルの定義に沿って、1度の関数読み出しのみで縦結合する(設計思想3)
- 「ファイル名」:対象ファイルの拡張子を除いた部分を指定
- 「拡張子」:対象ファイルの「.」を除いたファイル形式を指定
- 「シート名」:読み込む対象のシート名
- 「ヘッダー」:読み込み始める行数のオフセットを指定
ファイル名 | 拡張子 | シート名 | ヘッダー |
---|---|---|---|
AAA | xlsx | シート1 | 1 |
BBB | xls | シート2 | 2 |
CCC | csv | シート3 | 0 |
DDD | csv | シート4 | 0 |
使用コード(トグルを展開してください)
# データ抽出関数(ローデータ読み込み用) ※型を指定して読み込む処理に変更する予定(実装次第low_memory=Falseは削除) def raw_data_loading_with_diff_formats(REGION_NAME, BUCKET, PREFIX_DIR_PATH, MASTER_PREFIX_DIR_PATH, MASTER_FILE_NAME): df_total = pd.DataFrame() if check_file_exists(REGION_NAME, BUCKET, MASTER_PREFIX_DIR_PATH, MASTER_FILE_NAME): # 該当ファイルが存在するか確認する自作関数 # excel if ('.xlsx' or '.xls') in os.path.splitext(MASTER_FILE_NAME)[1]: df_master = pd.read_excel('s3://{BUCKET}/{MASTER_PREFIX_DIR_PATH}/{MASTER_FILE_NAME}'.format( BUCKET=BUCKET, MASTER_PREFIX_DIR_PATH=MASTER_PREFIX_DIR_PATH, MASTER_FILE_NAME=MASTER_FILE_NAME), dtype={'ファイル名':str, '拡張子':str, 'シート名':str, 'ヘッダー':int} ) # csv if '.csv' in os.path.splitext(MASTER_FILE_NAME)[1]: df_master = pd.read_csv('s3://{BUCKET}/{MASTER_PREFIX_DIR_PATH}/{MASTER_FILE_NAME}'.format( BUCKET=BUCKET, MASTER_PREFIX_DIR_PATH=MASTER_PREFIX_DIR_PATH, MASTER_FILE_NAME=MASTER_FILE_NAME)) display(df_master) for row in range(0, len(df_master)): FILE_NAME = '{FILE_NAME}.{EXTENSION}'.format(FILE_NAME=df_master['ファイル名'][row], EXTENSION=df_master['拡張子'][row]) if check_file_exists(REGION_NAME, BUCKET, PREFIX_DIR_PATH, FILE_NAME): # excel if ('xlsx' or 'xls') in df_master['拡張子'][row]: df = pd.read_excel('s3://{BUCKET}/{PREFIX_DIR_PATH}/{FILE_NAME}'.format( BUCKET=BUCKET, PREFIX_DIR_PATH=PREFIX_DIR_PATH, FILE_NAME=FILE_NAME), sheet_name=df_master['シート名'][row], header=df_master['ヘッダー'][row] ) # csv if 'csv' == df_master['拡張子'][row]: df = pd.read_csv('s3://{BUCKET}/{PREFIX_DIR_PATH}/{FILE_NAME}'.format( BUCKET=BUCKET, PREFIX_DIR_PATH=PREFIX_DIR_PATH, FILE_NAME=FILE_NAME), header=df_master['ヘッダー'][row], low_memory=False ) # tsv if 'tsv' == df_master['拡張子'][row]: df = pd.read_csv('s3://{BUCKET}/{PREFIX_DIR_PATH}/{FILE_NAME}'.format( BUCKET=BUCKET, PREFIX_DIR_PATH=PREFIX_DIR_PATH, FILE_NAME=FILE_NAME), header=df_master['ヘッダー'][row], delimiter='\t', low_memory=False ) df_total = pd.concat([df_total, df], axis=0) return df_total
- データの入出力時間が短く、他言語との互換性からParquet形式でデータを出力する(設計思想4)
- 参考までに比較になります(速度は変動します)
データフォーマット | 速度 | 他言語との互換性 |
---|---|---|
csv | 遅い | あり |
Parquet | 速い(csvの1/5程度) | あり |
pickle | 速い(csvの1/5程度) | なし |
▼カンマ区切り値 (CSV) ファイル形式 www.creativyst.com
▼Apache Parquet とは www.databricks.com
▼pickle --- Python オブジェクトの直列化 docs.python.org
学び
- マスタファイルで情報を管理する方が、保守性と可用性に優れ、他に分析案件にも流用しやすい
- 通信時の読み書き速度とRユーザーへの連携を考えるとParquetがよい
課題
課題として、SageMakerとS3の連携だけでは、ファイルやデータのバージョン管理が難しいことが挙げられます。
現状の実装だと、クレンジングやデータスキーマの変更が発生するたびに、データのバージョンをを上げることになります。
この場合、最新のデータをチームに知らせる必要がありますが、周知漏れが発生する可能性があります。
漏れが起きた場合、バージョンの違うソースから前提条件が合わないまま各々が分析してしまうため、
手戻りが発生します。
今後はこの課題を解決するべく検討していきたいと考えています。
最後までお読みいただきありがとうございました! この記事が少しでも役に立つと幸いです!