feat: pipes and datasources for CDP aggs (CDP-804)#3714
Conversation
|
Your PR title doesn't contain a Jira issue key. Consider adding it for better traceability. Example:
Projects:
Please add a Jira issue key to your PR title. |
|
Your PR title doesn't contain a Jira issue key. Consider adding it for better traceability. Example:
Projects:
Please add a Jira issue key to your PR title. |
1 similar comment
|
Your PR title doesn't contain a Jira issue key. Consider adding it for better traceability. Example:
Projects:
Please add a Jira issue key to your PR title. |
|
Your PR title doesn't contain a Jira issue key. Consider adding it for better traceability. Example:
Projects:
Please add a Jira issue key to your PR title. |
|
Your PR title doesn't contain a Jira issue key. Consider adding it for better traceability. Example:
Projects:
Please add a Jira issue key to your PR title. |
services/libs/tinybird/pipes/cdp_member_aggregates_bucket_backfiller_sink.pipe
Outdated
Show resolved
Hide resolved
services/libs/tinybird/pipes/cdp_member_aggregates_bucket_backfiller_sink.pipe
Outdated
Show resolved
Hide resolved
services/libs/tinybird/pipes/cdp_organization_aggregates_changed_parent_segments_sink.pipe
Outdated
Show resolved
Hide resolved
| from cdp_member_segment_aggregates_ds | ||
| where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) | ||
| ) | ||
| GROUP BY segmentId, memberId, updatedAt |
There was a problem hiding this comment.
GROUP BY includes updatedAt causing incorrect aggregation
High Severity
The GROUP BY clause includes updatedAt from the table column, which will produce multiple rows per (segmentId, memberId/organizationId) pair when there are multiple distinct updatedAt values. The parent and grandparent segment pipes correctly use GROUP BY parentId, memberId without updatedAt. The updatedAt in GROUP BY references the table column, not the now() alias, causing partial instead of full aggregations.
Additional Locations (1)
services/libs/tinybird/pipes/cdp_organization_aggregates_bucket_backfiller_sink.pipe
Show resolved
Hide resolved
| groupArrayDistinctMerge(activeOnState) AS activeOn, | ||
| countMerge(activityCountState) AS activityCount, | ||
| countDistinctMerge(memberCountState) as memberCount, | ||
| round(avgMerge(avgContributorEngagement)) AS avgContributorEngagement, |
There was a problem hiding this comment.
Inconsistent round() usage causes data precision mismatch
Medium Severity
The backfiller pipe uses round(avgMerge(avgContributorEngagement)) while the changed segments sink pipes use avgMerge(avgContributorEngagement) without round(). Both export to the same Kafka topic (organizationSegmentsAgg_sink), causing data from backfills to have different precision than incremental updates. This inconsistency affects data integrity downstream.
Additional Locations (2)
| required=False, | ||
| ) | ||
| }} | ||
| {% end %} |
There was a problem hiding this comment.
Backfillers include NULL parent/grandparent IDs without bucket_id
Medium Severity
When bucket_id is not defined, the member backfiller pipes have no WHERE clause and will include segments with NULL parentId or grandparentId, producing records with NULL segmentId. The daily changed segments sinks filter these out via their IN subqueries (since NULL doesn't match IN clauses), creating inconsistent behavior between backfill and incremental exports.
Additional Locations (1)
| Format files containing "cdp_organization" sequentially | ||
|
|
||
| EOF | ||
| exit 0 |
There was a problem hiding this comment.
Script exits with success code on argument errors
Low Severity
The show_help function always exits with code 0, but it's also called from error conditions (missing --match argument, unknown option). This causes the script to report success when it actually failed due to invalid arguments, which could mislead automated pipelines or CI/CD systems that rely on exit codes.
Additional Locations (2)
…wdDotDev/crowd.dev into feat/cdp-aggs-through-tinybird-sinks
| from cdp_member_segment_aggregates_ds | ||
| where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) | ||
| ) | ||
| GROUP BY segmentId, memberId, updatedAt |
There was a problem hiding this comment.
GROUP BY includes updatedAt causing incorrect aggregation
High Severity
The GROUP BY clause includes updatedAt, which prevents proper merging of aggregate states. Unlike the parent and grandparent segment pipes which group only by segmentId, memberId, this causes each distinct updatedAt value in the source table to produce a separate row with partial aggregation results. The query outputs now() as updatedAt, but the GROUP BY references the source column, resulting in multiple incorrectly aggregated rows per entity being sent to Kafka.
Additional Locations (1)
| countMerge(activityCountState) AS activityCount, | ||
| countDistinctMerge(memberCountState) as memberCount, | ||
| round(avgMerge(avgContributorEngagement)) AS avgContributorEngagement, | ||
| max(updatedAt) AS updatedAt |
There was a problem hiding this comment.
Organization backfiller uses different updatedAt semantics than other pipes
Medium Severity
The organization backfiller uses max(updatedAt) to preserve source timestamps, while all member backfillers and all changed segment sinks use now() for the export timestamp. This inconsistency means the updatedAt field has different semantics depending on how data reaches Kafka, which could confuse downstream systems expecting uniform timestamp behavior.
Additional Locations (2)
| from cdp_member_segment_aggregates_ds | ||
| where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) | ||
| ) | ||
| ) |
There was a problem hiding this comment.
Missing filter for empty parentId/grandparentId causes incorrect aggregation
High Severity
The segments datasource defaults parentId and grandparentId to empty string for top-level segments. The parent and grandparent aggregation queries don't filter out empty values, causing all segments without parents/grandparents to be aggregated together into a single row with segmentId = ''. This combines unrelated data and produces invalid segment identifiers for Kafka export.
Note
Introduces Tinybird infrastructure for CDP aggregates across members and organizations.
cdp_member_segment_aggregates_dsandcdp_organization_segment_aggregates_ds(AggregatingMergeTree) to store per-segment aggregate states@on-demandscripts/format.shadding--help,--match, and--sequentialoptionsWritten by Cursor Bugbot for commit 76c850c. This will update automatically on new commits. Configure here.