
こんにちは。CREチームの西山 (
id:tukaelu) です。
kickflowでは、顧客の利用状況などを分析するデータ分析基盤をBigQuery + Metabaseで構築していて、社内のメンバーに利用されています。
契約テナントごとの利用状況に加え、テナントごとのエラーレートやレスポンスタイムと言ったユーザー体験の計測を強化するため、データ分析基盤のデータソースにアプリケーションログを追加してテナントごとの利用状況とあわせてより詳しい分析ができるようにしました。
データ分析の課題
kickflowのデータ分析基盤は、クラウドETLのTROCCOによる本番環境のデータベースの転送と、dbtによるデータのクレンジングとモデリングを行うパイプラインを構築しています。
データベース上にあるデータでは、例えば「ライセンス数の契約数」「作成されたワークフローの件数」「申請されたチケットの件数」「オプション機能の設定有無」といった『ユーザーがどのようにkickflowを利用しているか』については分析できるのですが、『ユーザーがどのような体験をしたか』が分析できません。
私たちは普段のサービスの監視をDatadogで行っているので、メトリクスやログによるモニタリングを目的としたサービス全体の体験についてはDatadogで把握できますが、「複雑なワークフローを組んでいる」「チケットの申請数が多い」と言ったテナント固有の状態を条件とした計測ができませんでした。
これに対応するためDatadogで監視しているアプリケーションログのうち、アクセスログをBigQueryで分析できるようして定量評価につなげたいという要望が開発チームから上がり、CREチームで整備をしました。
ログの分析環境の整備
分析基盤にログを転送する
ログをBigQueryで分析するために、まずはGoogle Cloud Platform(GCP)にログデータを転送する必要があります。 実はすでにDatadogのログアーカイブ機能を利用してGoogle Cloud Storage(GCS)にログファイルがgzipでアーカイブされており、このGCSバケットをデータソースとした外部テーブルも用意されていたのですが、私を含めあまり認知されていませんでした。
このテーブルの存在に気づいたとき「そのまま使えるのでは?」と思ったのですが、gzipでアーカイブされたデータがデータソースになるため、クエリを実行してレスポンスされるまでに約12分かかり、探索的クエリやダッシュボードのような実運用に耐えるものではありませんでした。 そのため、新たにBigQueryテーブルを追加して、GCSにアーカイブされているログファイルをロードすることにしました。
BigQueryテーブルにログデータをロードする
前述の通り、TROCCOを利用してデータパイプラインを組んでいるので、当初はGCSバケットからBigQueryテーブルへのログデータの転送にもTROCCOを利用しようかと検討をしました。 しかし、GCSを転送元としたTROCCOの差分転送に 『ファイル名を昇順でソートしたときに「最後に転送されたパス」より後になるファイルを増分とする』 という仕様1に気づき、これによりデータをロストする可能性があったため、テーブルへのデータ転送にはBigQuery Data Transfer Service(DTS)を選択しました。
ちなみにkickflowのログアーカイブのGCSバケットには以下のようなパスに、Datadogからアーカイブされたログが複数のファイルに分割されて転送されます。
{GCS Bucket名}/dt={YYYYMMDD形式の日付}/hour={00-23の時刻}/archive_xxxxxxxxxxxxxxx.json.gz
DTSの転送設定にあたり特定の1時間ぶんのファイル転送にかかる時間を調べたところ、GCS上のアーカイブファイルの作成時刻から各時間帯のアーカイブがGCSに転送し終えるまでに1.5時間ほどかかることが確認できたので、DTSでは以下のようにCloud Storage URIのパスを構成する日時にrun_time-2hと指定することで、DTSによる転送が実行された時間の2時間前の時間帯のログを対象にして、余裕を持ちつつ安全に転送されるように指定しました。
{GCS Bucket名}/dt={run_time-2h\|"%Y%m%d"}/hour={run_time-2h\|"%H"}/archive_*.json.gz
GCSにアーカイブされたログデータはNDJSON(改行区切りのJSON)形式で出力されていて、Datadogのログパイプライン機能でログをパースした任意の属性値がattributesプロパティにJSONオブジェクトとして付与されています。
このデータを吸収するため、ログデータのBigQueryテーブルにはJSON型のattributesカラムを持つ以下のようなスキーマを定義しました。
CREATE TABLE IF NOT EXISTS `dataset.table` ( `date` TIMESTAMP, `service` STRING, `host` STRING, `attributes` JSON, `_id` STRING, `source` STRING, `message` STRING, `status` STRING, `tags` ARRAY<STRING> ) PARTITION BY DATE(`date`);
ロードした未加工のログデータをdbtで加工する
ログの出力日時(date)はUTC時刻のため、上記で追加したテーブルを直接集計をするには都度JSTへの時刻変換を意識してクエリを書く必要があり少し扱いづらいです。
これを解消するため、ログ出力日時のJST変換と重複ログや不要データの排除などを行い、JST日付でのパーティショニングをするdbtモデルを作成します。
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
unique_key = '_id',
on_schema_change = 'sync_all_columns',
partition_by = {
"field": "log_date_on",
"data_type": "date",
"granularity": "day"
},
cluster_by = ['_id'],
incremental_predicates = [
"..."
]
) }}
WITH
`source_table` AS (
SELECT * FROM {{ source('dataset', 'table') }}
{% if is_incremental() %}
: -- (省略)
{% endif %}
),
`transform` AS (
SELECT
-- UTCからJSTに変換したログ出力日時と日付のカラムを追加
date AS `log_date_utc`,
DATETIME(date, 'Asia/Tokyo') AS `log_date_jst`,
DATE(date, 'Asia/Tokyo') AS `log_date_on`,
`_id`,
`service`,
`host`,
`source`,
`message`,
`status`,
`attributes`,
`tags`
FROM `source_app_logs`
: -- (省略)
),
`final` AS (
-- ログID`_id`が重複するログレコードを除去
SELECT
* EXCEPT(`rn`)
FROM (
SELECT
*,
ROW_NUMBER() OVER (
PARTITION BY `_id`
ORDER BY `log_date_utc` DESC NULLS LAST
) AS `rn`
FROM `transform`
)
WHERE
`rn` = 1
)
SELECT * FROM `final`
さらに加工してアクセスログに限定したモデルを作る
kickflowは2026年1月時点ではHerokuにホストされていて、上記のテーブルにはアクセスログ以外にHerokuのエッジルーターやWorkerプロセスのログなども含まれます。
このログデータからWeb Dynoへのアクセスログだけに絞りつつ、attributesカラムからメトリクスやテナント情報などを扱いやすく加工したモデルを追加します。
: -- (省略) `filtered` AS ( SELECT `log_date_utc`, `log_date_jst`, `log_date_on`, `_id`, `attributes` FROM `source` WHERE -- `attributes`に`load_balancer`を含まず、`method`と`path`があるログレコードをアクセスログとして抽出 JSON_QUERY(`attributes`, '$.load_balancer') IS NULL AND JSON_VALUE(`attributes`, '$.method') IS NOT NULL AND JSON_VALUE(`attributes`, '$.path') IS NOT NULL {% if is_incremental() %} : -- (省略) {% endif %} ), -- `attributes`から必要なカラムを抽出して適切に型変換する `final` AS ( SELECT `log_date_jst`, JSON_VALUE(`attributes`, '$.method') AS `method`, JSON_VALUE(`attributes`, '$.path') AS `path`, SAFE_CAST(JSON_VALUE(`attributes`, '$.duration') AS FLOAT64) AS `duration`, SAFE_CAST(JSON_VALUE(`attributes`, '$.db') AS FLOAT64) AS `db`, SAFE_CAST(JSON_VALUE(`attributes`, '$.view') AS FLOAT64) AS `view`, SAFE_CAST(JSON_VALUE(`attributes`, '$.allocations') AS INT64) AS `allocations`, SAFE_CAST(JSON_VALUE(`attributes`, '$.status') AS INT64) AS `status`, JSON_VALUE(`attributes`, '$.action') AS `action`, JSON_VALUE(`attributes`, '$.tenant_id') AS `tenant_id`, JSON_VALUE(`attributes`, '$.user_id') AS `user_id`, `_id`, `log_date_utc`, `log_date_on` FROM `filtered` ) : -- (省略)
このようなモデルを定義することで、契約テナントの主キーであるテナントID(tenant_id)をキーに契約や設定、ワークフローの申請状況などと、ログから集計したエラーレートや処理時間などとかけ合わせた分析ができるようになりました(以下はサンプルダッシュボードです)。

おわりに
GCSに蓄積されたアプリケーションログをBigQueryで分析できるようにしたことで、上記のようにテナントごとの利用状況とユーザー体験を組み合わせた分析が可能になりました。
特にデータベース上のデータだけでは把握できなかった「複雑なワークフローを持つテナントでのレスポンスタイム」や「申請数が多いテナントでのエラーレート」といった、テナント固有の状態を条件とした定量評価ができるようになったことは大きな成果です。
また、副次的な効果として、Datadogのログ保持期限を超えた過去のログもBigQuery上に保持されるため、長期的なトレンド分析やトラブルシューティング時の調査範囲が広がりました。
We are hiring!
kickflowでは一緒に働いてくれる仲間を募集しています!ご興味のある方は、ぜひ採用サイトをご覧ください。
- TROCCO公式ドキュメントの「パスプレフィックスを利用して差分転送を行うコネクタ」を参照(2026年1月時点)↩