DEVELOPER’s BLOG
技術ブログ
「コスパならBigQuery」?実験で比較してみた【Redshift vs. BigQuery】
目次
「費用対効果」で選ぶには?
こんにちは。アクセルユニバース株式会社、データサイエンティストの世古大樹です。
今回は、以前のデータウェアハウス比較記事の続編で、BigQuery と Redshift Serverless のコストパフォーマンスを比較する実験を行いました。
詳しくは後述しますが、同じデータセットに対して同じクエリを実行し、かかった料金(コスト)と応答時間(パフォーマンス)を計測しています。
データセットは、80GiB程度の構造化データと360GiB程度の半構造化データの2種類を利用します。それぞれのデータセットに対し、性質の異なる3つのクエリ(合計6つのクエリ)を書きました。クエリ1つにつき5回実行して、特有のオーバーヘッドがある1回目を除外し、2〜5回目の計測結果の平均値を取っています。
「BigQueryの方が安価で性能が良い」とよく言われているようですが、実際に比較した結果を公開しているものは見当たりません。本記事を読めば、コストとパフォーマンスに根拠を持ってデータウェアハウスの選定ができるようになります。
結果:BigQueryは「安定」している
実験の詳細は後回しにして、いきなり結果から発表したいと思います。詳細は .xlsx ファイル(こちらからダウンロード)でご確認いただけますが、ここでは概要の解説を行います。
まず、「コスト」にあたる平均課金額は以下のようになりました。(データセットやクエリの内容は後述します)
「差」「比」はともに「BigQuery 容量コンピューティング」と「Redshift Serverless」を比較したもので、BigQueryが有利な場合は青色、Redshiftが有利な場合は赤色で表現しています(以降の表も同様)。
「比」を見ると、思いのほか傾向が一定していないのが驚きです。BigQueryの方が安いクエリも、Redshiftの方が安いクエリもあります。クエリ1-1はBigQueryの方が約13倍高い一方、2-2はRedshiftの方が15倍高くなり、ばらつきが大きいようです。
次に、「パフォーマンス」にあたる平均応答時間は以下のようになりました。
BigQueryの応答時間は比較的安定している(425〜27,212ms、約64倍)のに対し、Redshiftはかなり開きがある(68〜158,411ms、約2300倍)のが目を惹きます。RedshiftのRPU(仮想的な演算リソース)は、BigQueryと近い応答時間になるように設定したのですが、実際に比を1付近にできたのは1-2と2-3だけです。1-1と2-2ではRPUを最低にしてもBigQueryより大幅に速くなり、逆に1-3と2-1ではRPUを非常に大きくしてもあまり速度が上がらず、BigQueryには遠く及びませんでした。
「BigQueryの方がパフォーマンスが高い」とよく言われていますが、むしろ正確には、「BigQueryの方がパフォーマンスが安定している(Redshift Serverless ほど極端に長い応答時間を必要としない)」傾向にありそうだ、ということが読み取れます。
また、平均課金額の表で垣間見えていた、各DWHにとって「どのクエリが得意/不得意か」の傾向は、応答時間にも共通しており、課金額の比が大きい順に平均応答時間の比も大きくなりました。つまり、どちらのDWHにも、「得意なクエリは安いし速い」「苦手なクエリは高いし遅い」という相対的な傾向があるようです。
そこで、各クエリがそれぞれのDWHにとってどの程度「得意」かを表す指標を作ります。平均応答時間の逆数を、クエリに対するDWHの「パフォーマンス」、平均課金額を「コスト」と捉え、パフォーマンスをコストで割ります。この「コストパフォーマンス指数」が高いほど、そのDWHがそのクエリを「得意」としていることを示します。
まず行ごとに「比」を見ると、BigQueryがRedshiftの何倍のコストパフォーマンスを示したかがわかります。全体的にはBigQueryが上回っており、特に2-1では210倍もの差になっていますが、6個中2つのクエリではRedshiftが優勢で、1-1では83倍の差です。必ずBigQueryの方が優れているとは言い切れず、「どちらが良いかは場合による」ことになります。
実験した限りでは、Redshiftが上回った場合の共通点は、「RPUが最低の8でも非常に速いこと(数秒以内)」でした。「CTE(サブクエリ)があるとRedshiftが不利になる傾向にある」ということも言えそうですが、サブクエリはメモリサイズをボトルネックとする処理なので、「メモリサイズと比例するRPUの数を増やしても、サブクエリを含むクエリがあまり速くならないこと」の説明がつきません。事前には、BigQueryではインデックスを張れるのに対しRedshiftではソートキーの指定しか行えないため、JOINが含まれるとBigQueryが大幅に速くなると予想していたのですが、そういった傾向もありませんでした。
困ったことに、BigQueryの測定から得られる情報(スロット数・応答時間・処理データ容量)からは、Redshiftの方が良いのかどうか予想できません。言えるのは、「Redshift Serverless を使っている場合、RPU数が小さくなければBigQueryに移行した方が良くなる見込みが大きい」ということです。
次に列ごとに(縦に)表を見ると、こちらははっきりした傾向が読み取れます。値が大きいほどそのDWHにとって簡単な処理で、逆に小さいほど難しい処理だということになり、最大値と最小値の幅が狭いほど、常に安定したコストパフォーマンスを発揮できることになります。BigQueryの方が、平均課金額・平均応答時間ともに幅の開きが小さかったため、コストパフォーマンス指数も安定しました。BigQueryは 0.00012〜2.4 の20,000倍、Redshiftは 0.0000023〜200 の約90,000,000倍です。「BigQueryのコストパフォーマンスは、Redshift Serverless に比べて、(優れているというより)安定している」というのが、この実験における最も重要な結論です。
平均前の測定値、RPU数、課金額の計算方法などの詳細、および注意事項などは、 .xlsx ファイル(こちらからダウンロード)をご覧ください。
実験条件
以下では、どのような条件の下に実験を行ったのかを、「データセット」「データウェアハウス設定・計測条件」「クエリ」に分けて述べます。ただし、実際に利用したクエリ文等は非常に長いので、末尾の「コード集」にまとめて掲載しました。ここでは日本語ベースで、概要や意図だけを説明します。
データセット
利用したデータセットの1つ目は、Chicago Data Portal の Taxi Trips (2013-2023) です。シカゴ市が公開している、市内のタクシーの賃走についてのデータセットです。11年分の運行について、タクシーID、開始・終了時刻、走行距離、運賃などが記録されています。CSV形式で23列×約2億行、解凍時サイズは約80GiBの、単一テーブルのデータです。表では「小規模構造化データ」と呼んでいます。
2つ目は、Hugging Face にある McAuley-Lab の Amazon-Reviews-2023 です。1996年から2023年までに amazon.com へ投稿された商品レビューが収集されています。レビューデータの集まりである User Reviews、レビューされる商品の情報である Item Metadata の2種類のデータがあり、さらにそれぞれ商品カテゴリでデータセットを分割されています。実験では、商品カテゴリによって分割されたデータを1つにまとめ、種類に応じて2つのテーブルとしました。いずれもJSON形式で、User Reviews は約260GiB、Item Metadata は約90GiB、計360GiBほどです。表では「中規模半構造化データ」と呼んでいます。
いずれも GCS/S3 バケットに保存し、そこから BigQuery/Redshift へロードしました。
データウェアハウス設定・計測条件
今回の実験では、「実際にDWHの利用を開始した時のパフォーマンスを再現すること」を目指し、現実の利用開始時に近い設定としました。
パフォーマンス最適化の基本としては、BigQueryではインデックスの作成、Redshiftではソートキーの指定ができます。DWHの利用が進んでいけば、メトリクスに基づいて最適なインデックスを構築したり、ソートキーが自動的に最適化されたりするところですが、普通、利用開始時にはそこまで行わないと思います。とはいえ、最適化を何も行わないのもリアリティがありません。そこで今回は、「インデックスは主キーにのみ張り、ソートキーにはタイムスタンプ列を指定する」というシンプルな設定にしました。また、RedshiftのVACUUMは、各データベースでデータロード完了後の1度のみ実施しました。
計測に際しては、信頼度の高いデータとするため、リザルトキャッシュの取得を無効化した上で5回連続で同じクエリを行い、2〜5回目の計測結果の平均値を取りました。1回目の結果を除くのは、Redshiftで初回のみコードコンパイルが行われるなど、例外的なオーバーヘッドが発生しうるからです。
リージョン(ロケーション)については、AWS・Google Cloud ともに全て東京単一リージョンで実行し、料金も東京の金額で計算しました。
BigQueryの課金モデルは、予約したスロット(仮想的な演算リソース)の利用量に基づく「容量コンピューティング」と、スキャンしたデータ量に基づく「オンデマンド」の2種類があり、さらに「容量コンピューティング」には3つのエディションがあります。BigQueryの利用を開始する場合、まず「オンデマンド」で自社の利用量等を把握し、「容量コンピューティング」の予約に移行することが想定されていますが、初めから「容量コンピューティング」を使うことも十分あり得ます。そこで今回は、「オンデマンド」を利用してその課金額と処理データ量を記載しつつ、「容量コンピューティング・Standardエディション」の場合のスロット数と課金額も計算し、記録しました(こちらの数値を元に、他のエディションの金額も計算可能です)。
Redshift Serverless の課金額は、RPU(仮想的な演算リソース)の利用量に基づきます。このRPUのベース値は8〜512の間で指定することができ、大きくすると応答時間は短くなりますが、ある程度を超えると短縮効果がコストの増加に見合わなくなります。今回は、原則的にBigQueryの応答時間と近くなるように設定しつつも、コストに見合わないほど大きな値にはしないようにしました。例えばクエリ1-3の応答時間は、RPUが128のとき約160秒でしたが、最大の512に設定しても約100秒と、40%の時短に対して2.5倍のコストになります(コストパフォーマンス指数は0.64倍)。BigQueryは約20秒でしたが、それにできるだけ近づけるのではなく、現実的なラインと考えられた128RPUに設定しました。
なお、最適なRPU数を探索するためのクエリは、測定に影響を及ぼさないように、別のデータベースで行いました。また、Redshift Serverless には1分間の最低課金時間があり、1分未満のクエリであっても1分間分の料金が発生します。しかし実際の利用においては、1分未満のクエリも、他のクエリと同時に実行することで合計1分以上にできると考えられるため、測定されたままのミリ秒数に基づいて料金を計算しました。
以上の料金は、演算(クエリ)に対する費用であり、実際にはストレージ(データ保存)に対する費用も発生しますが、そちらは(通常の利用では)演算に対する費用に比べてかなり安価になることと、デモを行わなくても簡単に見積が取れることから、計測を行いませんでした。また、本記事に掲載されている料金についての情報は公開時点のものであることにご注意ください。料金の詳細について、BigQueryはこちら、Redshiftはこちらをご覧ください。
計測結果の取得には、BigQueryでは後に掲載するクエリを用いました。Redshiftでは、クエリエディタv2のリザルトに表示される Elapsed time で応答時間を計測し、応答時間にRPU数とRPU時間あたりの料金を乗じて課金額を計算しました。RPU数が指定したベースRPU数の通りであることはコンソール上で確認しました。
クエリ
DWHパフォーマンスのボトルネックとして代表的な処理は、ディスクI/OとCPUに負荷のかかるソート(結合・ORDER BY句・ウィンドウ関数)と、メインメモリを消費するサブクエリです。そのため、DWHの主なワークロードである集計(集約関数)を全クエリに含めながら、これらの句や関数の様々な組み合わせを試しました。
できるだけ標準SQLに準拠するようにしました。方言の違いにより、BigQueryとRedshiftで全く同じクエリ文にはなりませんでしたが、同じ処理を実現するものになっています。
また、クエリは集計・分析として意味を持つように作成し、コード集にはその説明も付けておきました。
コード集
前処理
Amazon-Reviews-2023 の Item Metadata は、そのままだと BigQuery の仕様でエラーが出るので、以下のPythonスクリプトで前処理しました。Redshiftの場合は前処理なしで読み込めますが、同一条件で比較するために、同じく前処理したデータをロードしています。
import json
import re
def clean_columns(obj):
if isinstance(obj, dict):
keys = list(obj.keys())
for key in keys:
# テーブル定義に合わないデータ型を修正する。
if key == 'rating_number' and not isinstance(obj[key], int):
del obj[key]
elif key in ('average_rating', 'price') and not isinstance(obj[key], float):
del obj[key]
elif key == 'details' and not isinstance(obj[key], dict):
del obj[key]
elif key in ('videos', 'images', 'features', 'description', 'categories', 'bought_together') and not isinstance(obj[key], list):
obj[key] = []
elif key == 'images':
images = obj[key]
for image in images:
for image_size in list(image):
if not image_size in ('thumb', 'large', 'variant', 'hi_res'):
del image[image_size]
print(str(image_size) + 'を削除しました。')
# BigQueryの仕様に合わせたキーの前処理:使用できない特殊文字を消去し、大文字・小文字の違いを無視する。衝突が発生するか空文字列になる場合は削除する。
cleaned_key = re.sub(r'[!"$()*,./;?@[\]^`{}~]', '', key.lower())
if key != cleaned_key or cleaned_key == '':
if cleaned_key in obj or cleaned_key == '':
del obj[key]
else:
obj[cleaned_key] = obj.pop(key)
for value in obj.values(): # ネストされたオブジェクトに対して再帰的に処理を行う。
clean_columns(value)
return obj
def main():
with open('item_metadata_before_cleaned.jsonl', 'r') as input, open('item_metadata.jsonl', 'w') as output:
for line in input:
obj = json.loads(line)
obj = clean_columns(obj)
cleaned_line = json.dumps(obj)
output.write(cleaned_line + '\n')
if __name__ == '__main__':
main()
データロード等
Taxi Trips (2013-2023), BigQuery
このデータセットのタイムスタンプ形式はBigQueryにサポートされていないので、一度文字列型で読み込んでから変換しています。 また、各データセット共通ですが、BigQueryでは4GB以上のファイルを圧縮形式でロードできないので、解凍したデータを読み込んでいます(Redshiftでは圧縮したまま利用しています)。
CREATE SCHEMA taxi_dataset
OPTIONS(
location='asia-northeast1',
storage_billing_model='LOGICAL'
);
CREATE TABLE taxi_dataset.TaxiTrip (
trip_id STRING,
taxi_id STRING,
trip_start_timestamp STRING,
trip_end_timestamp STRING,
trip_seconds INT,
trip_miles FLOAT64,
pickup_census_tract STRING,
dropoff_census_tract STRING,
pickup_community_area INT,
dropoff_community_area INT,
fare FLOAT64,
tips FLOAT64,
tolls FLOAT64,
extras FLOAT64,
trip_total FLOAT64,
payment_type STRING,
company STRING,
pickup_centroid_latitude FLOAT64,
pickup_centroid_longitude FLOAT64,
pickup_centroid_location GEOGRAPHY,
dropoff_centroid_latitude FLOAT64,
dropoff_centroid_longitude FLOAT64,
dropoff_centroid_location GEOGRAPHY,
PRIMARY KEY(trip_id) NOT ENFORCED
);
LOAD DATA INTO taxi_dataset.TaxiTrip
FROM FILES (
allow_jagged_rows=true,
field_delimiter=',',
format='CSV',
null_marker='',
skip_leading_rows=1,
uris=['gs://my-bucket-name/TaxiTrip.csv']
);
ALTER TABLE taxi_dataset.TaxiTrip
ADD COLUMN new_trip_start_timestamp TIMESTAMP,
ADD COLUMN new_trip_end_timestamp TIMESTAMP;
UPDATE taxi_dataset.TaxiTrip
SET
new_trip_start_timestamp = PARSE_TIMESTAMP('%m/%d/%Y %I:%M:%S %p', trip_start_timestamp),
new_trip_end_timestamp = PARSE_TIMESTAMP('%m/%d/%Y %I:%M:%S %p', trip_end_timestamp)
WHERE true;
ALTER TABLE taxi_dataset.TaxiTrip
DROP COLUMN trip_start_timestamp,
DROP COLUMN trip_end_timestamp;
ALTER TABLE taxi_dataset.TaxiTrip
RENAME COLUMN new_trip_start_timestamp TO trip_start_timestamp,
RENAME COLUMN new_trip_end_timestamp TO trip_end_timestamp;
CREATE SEARCH INDEX tt_pk_index -- 主キーにインデックスを張る。
ON taxi_dataset.TaxiTrip(trip_id);
Taxi Trips (2013-2023), Redshift Serverless
CREATE DATABASE taxi_database;
CREATE TABLE IF NOT EXISTS
taxi_database.public.TaxiTrip (
trip_id CHAR(40),
taxi_id CHAR(128),
trip_start_timestamp TIMESTAMP,
trip_end_timestamp TIMESTAMP,
trip_seconds INT,
trip_miles FLOAT,
pickup_census_tract VARCHAR(255),
dropoff_census_tract VARCHAR(255),
pickup_community_area INT,
dropoff_community_area INT,
fare FLOAT,
tips FLOAT,
tolls FLOAT,
extras FLOAT,
trip_total FLOAT,
payment_type VARCHAR(64),
company VARCHAR(255),
pickup_centroid_latitude FLOAT,
pickup_centroid_longitude FLOAT,
pickup_centroid_location GEOMETRY,
dropoff_centroid_latitude FLOAT,
dropoff_centroid_longitude FLOAT,
dropoff_centroid_location GEOMETRY,
PRIMARY KEY(trip_id)
)
BACKUP NO
COMPOUND SORTKEY(trip_start_timestamp, trip_end_timestamp); -- タイムスタンプ列をソートキーにする。
COPY taxi_database.public.TaxiTrip (
trip_id, taxi_id, trip_start_timestamp, trip_end_timestamp, trip_seconds, trip_miles,
pickup_census_tract, dropoff_census_tract, pickup_community_area, dropoff_community_area,
fare, tips, tolls, extras, trip_total, payment_type, company,
pickup_centroid_latitude, pickup_centroid_longitude, pickup_centroid_location,
dropoff_centroid_latitude, dropoff_centroid_longitude, dropoff_centroid_location
)
FROM 's3://my-bucket-name/TaxiTrip.csv.gz'
IAM_ROLE 'my-role-arn'
REGION 'ap-northeast-1'
CSV
DELIMITER ','
NULL AS ''
IGNOREHEADER 1
TIMEFORMAT 'MM/DD/YYYY HH:MI:SS AM'
GZIP;
VACUUM;
Amazon-Reviews-2023, BigQuery
CREATE SCHEMA amazon_reviews_dataset
OPTIONS(
location='asia-northeast1',
storage_billing_model='LOGICAL'
);
CREATE TABLE amazon_reviews_dataset.ItemMetadata (
main_category STRING
, title STRING
, average_rating FLOAT64
, rating_number INT
, features ARRAY
, description ARRAY
, price FLOAT64
, images ARRAY>
, videos ARRAY>
, store STRING
, categories ARRAY
, details JSON
, parent_asin STRING
, bought_together ARRAY
, PRIMARY KEY(parent_asin) NOT ENFORCED
);
/*
ItemMetadataのみ、スキーマ自動検出でエラーが出るので、スキーマ自動検出をオフにするため、SQLではなく以下のbqコマンドでロードする。
$ bq --location=asia-northeast1 load \
--noautodetect --source_format=NEWLINE_DELIMITED_JSON \
amazon_reviews_dataset.ItemMetadata \
gs://my-bucket-name/item_metadata.jsonl
*/
CREATE SEARCH INDEX im_pk_index -- 主キーにインデックスを張る。
ON amazon_reviews_dataset.ItemMetadata(parent_asin);
CREATE TABLE amazon_reviews_dataset.UserReviews (
rating FLOAT64
, title STRING
, text STRING
, images ARRAY>
, asin STRING
, parent_asin STRING
, user_id STRING
, timestamp INT -- UNIX時刻。
, verified_purchase BOOLEAN
, helpful_vote INT
, PRIMARY KEY(parent_asin, user_id) NOT ENFORCED
, FOREIGN KEY(parent_asin) REFERENCES amazon_reviews_dataset.ItemMetadata(parent_asin) NOT ENFORCED
);
LOAD DATA INTO amazon_reviews_dataset.UserReviews
FROM FILES (
format='NEWLINE_DELIMITED_JSON',
ignore_unknown_values=true,
uris=['gs://my-bucket-name/user_reviews.jsonl']
);
CREATE SEARCH INDEX ur_pk_index -- 主キーにインデックスを張る。
ON amazon_reviews_dataset.UserReviews(parent_asin, user_id);
Amazon-Reviews-2023, Redshift Serverless
Item Metadata の description は本来SUPER型ですが、サイズが大き過ぎてCOPY時にエラーが出るデータがいくつもあります。サイズの限度で打ち切るため、COPY時に TRUNCATECOLUMNS オプションを付け、SUPER型は TRUNCATECOLUMNS の対象にならないため VARCHAR(65535) 型で定義しました。この処理を施してもデータサイズはほぼ変化せず、クエリに description を使うこともないため、測定に問題はありません。
CREATE DATABASE amazon_reviews_database;
CREATE TABLE IF NOT EXISTS
amazon_reviews_database.public.ItemMetadata (
main_category VARCHAR(64)
, title VARCHAR(4096)
, average_rating FLOAT
, rating_number INT
, features SUPER
, description VARCHAR(65535)
, price VARCHAR(64)
, images SUPER
, videos SUPER
, store VARCHAR(4096)
, categories SUPER
, details SUPER
, parent_asin VARCHAR(16)
, bought_together SUPER
, PRIMARY KEY(parent_asin)
)
BACKUP NO;
COPY amazon_reviews_database.public.ItemMetadata (
main_category, title, average_rating, rating_number, features, description, price,
images, videos, store, categories, details, parent_asin, bought_together
)
FROM 's3://my-bucket-name/item_metadata.jsonl.gz'
IAM_ROLE 'my-role-arn'
REGION 'ap-northeast-1'
JSON 'auto'
NULL AS ''
TRUNCATECOLUMNS
GZIP;
CREATE TABLE IF NOT EXISTS
amazon_reviews_database.public.UserReviews (
rating FLOAT
, title VARCHAR(4096)
, text VARCHAR(65535)
, images SUPER
, asin VARCHAR(16)
, parent_asin VARCHAR(16)
, user_id VARCHAR(128)
, timestamp BIGINT
, verified_purchase BOOLEAN
, helpful_vote INT
, FOREIGN KEY(parent_asin) REFERENCES ItemMetadata(parent_asin)
)
BACKUP NO
COMPOUND SORTKEY(timestamp);
COPY amazon_reviews_database.public.UserReviews (
rating, title, text, images, asin,
parent_asin, user_id, timestamp, verified_purchase, helpful_vote
)
FROM 's3://my-bucket-name/user_reviews.jsonl.gz'
IAM_ROLE 'my-role-arn'
REGION 'ap-northeast-1'
JSON 'auto'
NULL AS ''
TRUNCATECOLUMNS
GZIP;
VACUUM;
計測記録の取得(BigQuery)
Redshiftではコンソール上で記録を閲覧するため、コードがありません。
SELECT
job_id -- クエリの一意ID。
, TIMESTAMP_DIFF(end_time, start_time, MILLISECOND) AS elapsed_time_milliseconds -- 応答時間(ミリ秒)。
, total_slot_ms / TIMESTAMP_DIFF(end_time, start_time, MILLISECOND) AS num_slot -- 「容量コンピューティング」の場合のスロット数。
, 0.051 * total_slot_ms / (1000 * 60 * 60) AS cost_capacity -- 「容量コンピューティング」の場合の課金額(USD)。
, total_bytes_billed * POW(2, -40) AS total_terabytes_billed -- スキャンされたデータ容量(TiB)。
, 7.5 * total_bytes_billed * POW(2, -40) AS cost_on_demand -- 「オンデマンド」の場合の課金額(USD)。
, start_time -- クエリの開始時刻。
, state -- クエリの状態(実行中でないか確認するために出力させる)
, LEFT(query, 50) -- クエリ文の冒頭50文字。
FROM `my-project-name`.`region-asia-northeast1`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
ORDER BY start_time DESC;
リザルトキャッシュの無効化(Redshift Serverless)
BigQueryではコンソール上で設定を行うため、コードがありません。
SET enable_result_cache_for_session TO off; -- 無効化。
SHOW enable_result_cache_for_session; -- 無効化されていることの確認。
クエリ1-1:小規模構造化データ、単純集計
データセットに含まれる運行の総数と、運賃の平均値を求めます。
BigQuery
SELECT
COUNT(*) AS num_trips,
AVG(fare) AS avg_fare
FROM taxi_dataset.TaxiTrip;
Redshift
SELECT
COUNT(*) AS num_trips,
AVG(fare) AS avg_fare
FROM TaxiTrip;
クエリ1-2:小規模構造化データ、集計+ウィンドウ関数
1台のタクシーが1日に乗車された回数のTOP10と、その日付を調べます。
BigQuery
SELECT
RANK() OVER (ORDER BY COUNT(trip_id) DESC) AS ranking,
LEFT(taxi_id, 10) AS short_taxi_id,
CAST(trip_start_timestamp AS DATE) AS date,
COUNT(trip_id) AS trip_count
FROM taxi_dataset.TaxiTrip
GROUP BY
CAST(trip_start_timestamp AS DATE),
taxi_id
ORDER BY ranking ASC
LIMIT 10;
Redshift
SELECT
RANK() OVER (ORDER BY COUNT(trip_id) DESC) AS ranking,
LEFT(taxi_id, 10) AS short_taxi_id,
CAST(trip_start_timestamp AS DATE) AS date,
COUNT(trip_id) AS trip_count
FROM TaxiTrip
GROUP BY
CAST(trip_start_timestamp AS DATE),
taxi_id
ORDER BY ranking ASC
LIMIT 10;
クエリ1-3:小規模構造化データ、ウィンドウ関数+CTE+集計
「タクシーは、どの程度スムーズに乗客を見つけられるのか?」を知りたいとします。一つの指標として、「前の乗客を降ろした場所の近くで、すぐに次の乗客を拾うという事象」について調べます。1人目の客を降ろした直後にその近くで2人目の客を拾い、2人目の客を降ろした直後にその近くで3人目の客を拾い......と、この事象は連続することがあります。n回連続した事象を series n trip、n回以上連続した事象を series n or more trip と呼び、n = 1, 2, 3 について相対度数を算出します。
BigQuery
-- タクシーごとに、各運行とその直前の運行の、運行ID、開始/終了座標、開始/終了時刻を取得するCTE。
WITH CurrAndPrevTrips AS (
SELECT
trip_id AS curr_trip_id,
LAG(trip_id, 1) OVER W AS prev_trip_id,
pickup_centroid_location AS curr_start_point,
ST_GeogFromText(
LAG(ST_AsText(dropoff_centroid_location), 1) OVER W -- 地理データに直接ウィンドウ関数を適用できないRedshiftに合わせた処理(本来BigQueryでは不要)。
) AS prev_end_point,
trip_start_timestamp AS curr_start_time,
LAG(trip_end_timestamp, 1) OVER W AS prev_end_time
FROM taxi_dataset.TaxiTrip
WHERE
ST_Distance(pickup_centroid_location, dropoff_centroid_location)
< trip_miles * 1609 -- 誤った位置情報を持っている可能性が高いレコードが15%程度あるので、除外する。
WINDOW W AS (
PARTITION BY taxi_id
ORDER BY trip_start_timestamp, trip_end_timestamp, trip_id
)
),
-- 客を降ろしてから、一定の時間・距離内で、新しい客を拾った運行(series 1 trip)の組を取得するCTE。
Series1Trips AS (
SELECT
curr_trip_id,
prev_trip_id
FROM CurrAndPrevTrips
WHERE
TIMESTAMP_DIFF(curr_start_time, prev_end_time, MINUTE) <= 15
AND ST_Distance(curr_start_point, prev_end_point) < 500
),
Series2Trips AS (
SELECT
Tbl1.curr_trip_id AS curr_trip_id,
Tbl1.prev_trip_id AS prev1_trip_id,
Tbl2.prev_trip_id AS prev2_trip_id
FROM
Series1Trips AS Tbl1
INNER JOIN Series1Trips AS Tbl2
ON Tbl1.prev_trip_id = Tbl2.curr_trip_id -- 連続している2つのうち前者が、別の連続の後者として存在している場合を捉えて結合する。以下同様。
),
Series3Trips AS (
SELECT
S2.curr_trip_id AS curr_trip_id,
S2.prev1_trip_id AS prev1_trip_id,
S2.prev2_trip_id AS prev2_trip_id,
S1.prev_trip_id AS prev3_trip_id
FROM
Series2Trips AS S2
INNER JOIN Series1Trips AS S1
ON S2.prev2_trip_id = S1.curr_trip_id
),
Series4Trips AS (
SELECT S3.curr_trip_id
FROM
Series3Trips AS S3
INNER JOIN Series1Trips AS S1
ON S3.prev3_trip_id = S1.curr_trip_id
),
-- 総運行数とn連続運行数を集計するCTE。
DuplicatedCounts AS (
SELECT
(SELECT COUNT(*) FROM CurrAndPrevTrips) AS num_all_trips,
(SELECT COUNT(*) FROM Series1Trips) AS s1, -- この数は、series 2 trip を重複して2回数えている(一般に series n trip を重複してn回数えている)。
(SELECT COUNT(*) FROM Series2Trips) AS s2, -- この数は、series 3 trip を重複して2回数えている(一般に series n trip を重複してn-1回数えている)。以下同様。
(SELECT COUNT(*) FROM Series3Trips) AS s3,
(SELECT COUNT(*) FROM Series4Trips) AS s4
)
SELECT
num_all_trips,
s1 - s2 AS num_series_1_or_more_trips, -- この数は、series n trip を重複なく1回として数えている。以下同様。
s2 - s3 AS num_series_2_or_more_trips,
s3 - s4 AS num_series_3_or_more_trips,
FORMAT('%.2f %%', (s1 - s2) / (num_all_trips * 1.0) * 100) AS rate_series_1_or_more_trips, -- 1.0 を掛けるのは、実数除算を行うための措置。以下同様。
FORMAT('%.2f %%', (s2 - s3) / (num_all_trips * 1.0) * 100) AS rate_series_2_or_more_trips,
FORMAT('%.2f %%', (s3 - s4) / (num_all_trips * 1.0) * 100) AS rate_series_3_or_more_trips
FROM DuplicatedCounts;
Redshift
-- タクシーごとに、各運行とその直前の運行の、運行ID、開始/終了座標、開始/終了時刻を取得するCTE。
WITH CurrAndPrevTrips AS (
SELECT
trip_id AS curr_trip_id,
LAG(trip_id, 1) OVER (
PARTITION BY taxi_id
ORDER BY trip_start_timestamp, trip_end_timestamp, trip_id
) AS prev_trip_id,
pickup_centroid_location AS curr_start_point,
ST_GeomFromText(LAG(ST_AsText(dropoff_centroid_location), 1) OVER ( -- Redshift では、地理データに直接ウィンドウ関数を適用できない。
PARTITION BY taxi_id
ORDER BY trip_start_timestamp, trip_end_timestamp, trip_id
)) AS prev_end_point,
trip_start_timestamp AS curr_start_time,
LAG(trip_end_timestamp, 1) OVER (
PARTITION BY taxi_id
ORDER BY trip_start_timestamp, trip_end_timestamp, trip_id
) AS prev_end_time
FROM TaxiTrip
WHERE
ST_Distance(pickup_centroid_location, dropoff_centroid_location)
< trip_miles * 1609 -- 誤った位置情報を持っている可能性が高いレコードが15%程度あるので、除外する。
),
-- 客を降ろしてから、一定の時間・距離内で、新しい客を拾った運行(series 1 trip)の組を取得するCTE。
Series1Trips AS (
SELECT
curr_trip_id,
prev_trip_id
FROM CurrAndPrevTrips
WHERE
(EXTRACT(EPOCH FROM curr_start_time) - EXTRACT(EPOCH FROM prev_end_time)) / 60 <= 15
AND ST_Distance(curr_start_point, prev_end_point) < 500
),
Series2Trips AS (
SELECT
Tbl1.curr_trip_id AS curr_trip_id,
Tbl1.prev_trip_id AS prev1_trip_id,
Tbl2.prev_trip_id AS prev2_trip_id
FROM
Series1Trips AS Tbl1
INNER JOIN Series1Trips AS Tbl2
ON Tbl1.prev_trip_id = Tbl2.curr_trip_id -- 連続している2つのうち前者が、別の連続の後者として存在している場合を捉えて結合する。以下同様。
),
Series3Trips AS (
SELECT
S2.curr_trip_id AS curr_trip_id,
S2.prev1_trip_id AS prev1_trip_id,
S2.prev2_trip_id AS prev2_trip_id,
S1.prev_trip_id AS prev3_trip_id
FROM
Series2Trips AS S2
INNER JOIN Series1Trips AS S1
ON S2.prev2_trip_id = S1.curr_trip_id
),
Series4Trips AS (
SELECT S3.curr_trip_id
FROM
Series3Trips AS S3
INNER JOIN Series1Trips AS S1
ON S3.prev3_trip_id = S1.curr_trip_id
),
-- 総運行数とn連続運行数を集計するCTE。
DuplicatedCounts AS (
SELECT
(SELECT COUNT(*) FROM CurrAndPrevTrips) AS num_all_trips,
(SELECT COUNT(*) FROM Series1Trips) AS s1, -- この数は、series 2 trip を重複して2回数えている(一般に series n trip を重複してn回数えている)。
(SELECT COUNT(*) FROM Series2Trips) AS s2, -- この数は、series 3 trip を重複して2回数えている(一般に series n trip を重複してn-1回数えている)。以下同様。
(SELECT COUNT(*) FROM Series3Trips) AS s3,
(SELECT COUNT(*) FROM Series4Trips) AS s4
)
SELECT
num_all_trips,
s1 - s2 AS num_series_1_or_more_trips, -- この数は、series n trip を重複なく1回として数えている。以下同様。
s2 - s3 AS num_series_2_or_more_trips,
s3 - s4 AS num_series_3_or_more_trips,
TO_CHAR((s1 - s2) / (num_all_trips * 1.0) * 100, 'FM999.000') || ' %' AS rate_series_1_or_more_trips, -- 1.0 を掛けるのは、実数除算を行うための措置。以下同様。
TO_CHAR((s2 - s3) / (num_all_trips * 1.0) * 100, 'FM999.000') || ' %' AS rate_series_2_or_more_trips,
TO_CHAR((s3 - s4) / (num_all_trips * 1.0) * 100, 'FM999.000') || ' %' AS rate_series_3_or_more_trips
FROM DuplicatedCounts;
クエリ2-1:中規模半構造化データ、CTE+集計
Amazonレビューが「参考になった」を獲得する数に関して分析したいとします。一般に、投稿された直後のレビューは「参考になった」を獲得しておらず、時間の経過とともに獲得数を増やし、十分時間が経過することでそのレビューに相応しい獲得数に漸近すると考えられます。そこで分析の際、「十分時間」が経過していないレビューをWHERE句で排除することで、獲得数が安定したデータのみを対象にできます。そのためにここでは、「十分時間」が何年であるか特定するため、レビューを経過年数の階級に分類し、階級ごとに獲得数の平均値を求めます。そして階級の平均値と全体の平均値の比を取り、その比が一定の閾値に達しているかどうかのフラグを出力させます。
このデータセットのタイムスタンプはミリ秒単位のUNIX時刻です。このデータが収集されたのは2023年9月23日で、UNIX時刻にすると1695427200000になります。
BigQuery
WITH VoteNumWithClass AS (
SELECT
helpful_vote
, CASE
WHEN timestamp > 1679529600000 THEN '0.0-0.5y' -- 投稿から経過した時間が0年以上半年未満である階級。以下同様。
WHEN timestamp > 1663891200000 THEN '0.5-1.0y'
WHEN timestamp > 1647993600000 THEN '1.0-1.5y'
WHEN timestamp > 1632355200000 THEN '1.5-2.0y'
WHEN timestamp > 1616457600000 THEN '2.0-2.5y'
WHEN timestamp > 1600819200000 THEN '2.5-3.0y'
WHEN timestamp > 1584921600000 THEN '3.0-3.5y'
WHEN timestamp > 1569196800000 THEN '3.5-4.0y'
WHEN timestamp > 1553299200000 THEN '4.0-4.5y'
WHEN timestamp > 1537660800000 THEN '4.5-5.0y'
WHEN timestamp <= 1537660800000 THEN '5.0y-'
ELSE NULL
END AS class
, AVG(helpful_vote) OVER () AS overall_avg
FROM amazon_reviews_dataset.UserReviews
)
SELECT
class
, AVG(helpful_vote) / MAX(overall_avg) AS votes_ratio_to_overall
, CASE
WHEN AVG(helpful_vote) / MAX(overall_avg) < 0.7
THEN 'Too recent to rely on.'
ELSE 'Old enough.'
END AS eval_usable_or_not
FROM VoteNumWithClass
GROUP BY class
ORDER BY class;
Redshift
WITH VoteNumWithClass AS (
SELECT
helpful_vote
, CASE
WHEN timestamp > 1679529600000 THEN '0.0-0.5y' -- 投稿から経過した時間が0年以上半年未満である階級。以下同様。
WHEN timestamp > 1663891200000 THEN '0.5-1.0y'
WHEN timestamp > 1647993600000 THEN '1.0-1.5y'
WHEN timestamp > 1632355200000 THEN '1.5-2.0y'
WHEN timestamp > 1616457600000 THEN '2.0-2.5y'
WHEN timestamp > 1600819200000 THEN '2.5-3.0y'
WHEN timestamp > 1584921600000 THEN '3.0-3.5y'
WHEN timestamp > 1569196800000 THEN '3.5-4.0y'
WHEN timestamp > 1553299200000 THEN '4.0-4.5y'
WHEN timestamp > 1537660800000 THEN '4.5-5.0y'
WHEN timestamp <= 1537660800000 THEN '5.0y-'
ELSE NULL
END AS class
, AVG(helpful_vote) OVER () AS overall_avg
FROM UserReviews
)
SELECT
class
, AVG(helpful_vote) / MAX(overall_avg) AS votes_ratio_to_overall
, CASE
WHEN AVG(helpful_vote) / MAX(overall_avg) < 0.7
THEN 'Too recent to rely on.'
ELSE 'Old enough.'
END AS eval_usable_or_not
FROM VoteNumWithClass
GROUP BY class
ORDER BY class;
クエリ2-2:中規模半構造化データ、JOIN+集計
商品のレビュー数が多くなると、「自分がレビューを書く意味がない」と感じる人が増えて、レビュー数の増加が鈍化する傾向がある、という推測を立てました。これについて調査したいと思います。しかし、単にレビュー数の時間変化率だけを見ると、その商品が人気を失ってレビューされなくなった場合と区別できません。また、サクラを利用していたり、クーポンなどの報酬を用意して購入者にレビューを書くよう促したりしている商品であれば、通常のレビュー数の鈍化パターンに当てはまらないだろうと思われます。
そこでこのクエリでは、「商品ID、商品のレビュー数、商品のレビューについた『参考になった』の総数」の組を取得します。十分なレビュー数がある商品のうち、レビュー数に対して「参考になった」総数が多いものは、鈍化傾向にある可能性が高く、逆に少ないものはサクラなどを利用している可能性が高いと考えられます。
結果セットのサイズが大きいので、Redshiftでは一時テーブルに出力して表示させることができません。UNLOADコマンドによってS3バケットに結果を出力します。
BigQuery
SELECT
UR.parent_asin
, MAX(IM.rating_number) AS rating_number -- 1つの parent_asin に対応する rating_number は一意。このMAX関数は形式的なもので、実際は唯一の rating_number が返される。
, SUM(UR.helpful_vote) AS sum_vote
FROM
amazon_reviews_dataset.UserReviews UR
INNER JOIN amazon_reviews_dataset.ItemMetadata IM
ON UR.parent_asin = IM.parent_asin
GROUP BY UR.parent_asin;
Redshift
UNLOAD('
SELECT
UR.parent_asin
, MAX(IM.rating_number) AS rating_number -- 1つの parent_asin に対応する rating_number は一意。このMAX関数は形式的なもので、実際は唯一の rating_number が返される。
, SUM(UR.helpful_vote) AS sum_vote
FROM
UserReviews UR
INNER JOIN ItemMetadata IM
ON UR.parent_asin = IM.parent_asin
GROUP BY UR.parent_asin;
')
TO 's3://my-bucket-name/unload2-2/'
IAM_ROLE 'my-role-arn'
クエリ2-3:中規模半構造化データ、ウィンドウ関数集計+CTE+JOIN
「参考になった」を獲得しやすい、優れたレビューの特徴を分析したいとします。そのためのデータを、2-1で求めた期間の条件でフィルタリングし、適切に加工して取り出す必要があります。しかし、単純に「参考になった」の獲得数を基準とするのは不適切だと考えられます。100件以上の「参考になった」を獲得しているレビューが複数ある商品で10件の「参考になった」を獲得しているレビューと、他に「参考になった」を獲得しているレビューが存在しない商品で10件獲得しているレビューでは、後者をより「優れている」と判定すべきだからです。
そこでこの分析では、パレート分析のような方法を用いることにしました。具体的には、商品ごとに、「参考になった」獲得数の順にレビューを並べ、累積相対度数に応じてグループA、B、Cに分けます(総度数が0の場合のグループはNULLとします)。そしてレビューごとに、そのレビューがどのグループに属するかと、以下の情報をセットにして出力します(後工程で、レビューがどのグループになるかを説明できる変数の組み合わせを探るという想定):「商品ID」、「評価値(星の数)」、「商品全体の平均評価値と評価値の差」、「添付画像の有無」、「レビュータイトルの原文」、「レビュータイトルの単語数」、「レビュー本文の原文」、「レビュー本文の単語数」。
結果セットのサイズが大きいので、一時テーブルに出力して表示させることができません。BigQueryでは別のテーブルを作成した上でそこに結果を保存する設定を行い、RedshiftではUNLOADコマンドによってS3バケットに結果を出力します。
BigQuery
WITH UserReviewWithWindow AS (
SELECT
*
, 1.0 * SUM(UR.helpful_vote) OVER (
PARTITION BY UR.parent_asin
ORDER BY UR.helpful_vote DESC
) -- 商品ごとに、「参考になった」数の多い順にレビューを並べたときの、「参考になった」の累積度数。1.0を掛けるのは商を浮動小数点値にするため。
/ NULLIF(SUM(UR.helpful_vote) OVER (PARTITION BY UR.parent_asin), 0) -- 商品ごとの「参考になった」合計数。これで割ることで、「参考になった」の累積相対度数になる。
AS crf
, RANK() OVER (
PARTITION BY UR.parent_asin
ORDER BY UR.helpful_vote DESC
) AS rank -- 商品ごとに、「参考になった」数の多い順にレビューを並べたときの順位。
FROM amazon_reviews_dataset.UserReviews UR
WHERE UR.timestamp < 1584921600000 -- 2−1で決まったUNIX時刻(3年以上経過)。
)
SELECT
UR.parent_asin -- 商品ID。
, CASE -- パレート分析のグループを割り当てる。70%までをAグループ、90%までをBグループ、以降をCグループとする。
WHEN UR.rank = 1 AND UR.crf IS NOT NULL THEN 'A' -- 相対度数が0.7以上のレビューをAグループに割り振るための処理。
WHEN UR.crf < 0.7 THEN 'A'
WHEN UR.crf < 0.9 THEN 'B'
WHEN UR.crf <= 1.0 THEN 'C'
ELSE NULL
END AS pareto_group
, UR.rating
, IM.average_rating - UR.rating AS diff_avg_rating
, CASE
WHEN ARRAY_LENGTH(UR.images) = 0
THEN false
ELSE true
END AS has_images
, UR.title AS raw_title
, LENGTH(UR.title) - LENGTH(REPLACE(UR.title, ' ', '')) + 1 AS num_words_title
, UR.text AS raw_text
, LENGTH(UR.text) - LENGTH(REPLACE(UR.text, ' ', '')) + 1 AS num_words_text
FROM
UserReviewWithWindow UR
INNER JOIN amazon_reviews_dataset.ItemMetadata IM
ON UR.parent_asin = IM.parent_asin;
Redshift
UNLOAD('
WITH UserReviewWithWindow AS (
SELECT
*
, 1.0 * SUM(UR.helpful_vote) OVER (
PARTITION BY UR.parent_asin
ORDER BY UR.helpful_vote DESC
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING -- Redshift では集約ウィンドウ関数に ORDER BY 句を含める場合、フレーム句が必須。
) -- 商品ごとに、「参考になった」数の多い順にレビューを並べたときの、「参考になった」の累積度数。1.0を掛けるのは商を浮動小数点値にするため。
/ NULLIF(SUM(UR.helpful_vote) OVER (PARTITION BY UR.parent_asin), 0) -- 商品ごとの「参考になった」合計数。これで割ることで、「参考になった」の累積相対度数になる。
AS crf
, RANK() OVER (
PARTITION BY UR.parent_asin
ORDER BY UR.helpful_vote desc
) AS rank -- 商品ごとに、「参考になった」数の多い順にレビューを並べたときの順位。
FROM UserReviews UR
WHERE timestamp < 1584921600000 -- 2-1で決まったUNIX時刻(3年以上経過)。
)
SELECT
UR.parent_asin -- 商品ID。
, CASE -- パレート分析のグループを割り当てる。70%までをAグループ、90%までをBグループ、以降をCグループとする。
WHEN UR.rank = 1 AND UR.crf IS NOT NULL THEN ''A'' -- 相対度数が0.7以上のレビューをAグループに割り振るための処理。
WHEN UR.crf < 0.7 THEN ''A''
WHEN UR.crf < 0.9 THEN ''B''
ELSE ''C''
END AS pareto_group
, UR.rating
, IM.average_rating - UR.rating AS diff_avg_rating
, CASE
WHEN GET_ARRAY_LENGTH(UR.images) = 0
THEN false
ELSE true
END AS has_images
, UR.title AS raw_title
, LENGTH(UR.title) - LENGTH(REPLACE(UR.title, '' '', '''')) + 1 AS num_words_title
, UR.text AS raw_text
, LENGTH(UR.text) - LENGTH(REPLACE(UR.text, '' '', '''')) + 1 AS num_words_text
FROM
UserReviewWithWindow UR
INNER JOIN ItemMetadata IM
ON UR.parent_asin = IM.parent_asin;
')
TO 's3://my-bucket-name/unload2-3/'
IAM_ROLE 'my-role-arn'
X(旧Twitter)・Facebookで定期的に情報発信しています!
Follow @acceluniverse