Skip to content

feat: pipes and datasources for CDP aggs (CDP-804)#3714

Merged
epipav merged 9 commits intomainfrom
feat/cdp-aggs-through-tinybird-sinks
Jan 30, 2026
Merged

feat: pipes and datasources for CDP aggs (CDP-804)#3714
epipav merged 9 commits intomainfrom
feat/cdp-aggs-through-tinybird-sinks

Conversation

@epipav
Copy link
Collaborator

@epipav epipav commented Dec 19, 2025

Note

Introduces Tinybird infrastructure for CDP aggregates across members and organizations.

  • Adds cdp_member_segment_aggregates_ds and cdp_organization_segment_aggregates_ds (AggregatingMergeTree) to store per-segment aggregate states
  • Materialized view and initial snapshot pipes to populate both datasources from activity relations
  • New Kafka sink pipes exporting aggregates for leaf/parent/grandparent segments, with daily "changed in previous day" exports and on-demand bucketed backfill variants
  • Schedules: member sinks at 01:00/01:30; organization sinks at 02:00/02:30; backfill sinks are @on-demand
  • Replaces formatter script with scripts/format.sh adding --help, --match, and --sequential options

Written by Cursor Bugbot for commit 76c850c. This will update automatically on new commits. Configure here.

@github-actions
Copy link
Contributor

⚠️ Jira Issue Key Missing

Your PR title doesn't contain a Jira issue key. Consider adding it for better traceability.

Example:

  • feat: add user authentication (CM-123)
  • feat: add user authentication (IN-123)

Projects:

  • CM: Community Data Platform
  • IN: Insights

Please add a Jira issue key to your PR title.

Copy link
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conventional Commits FTW!

@github-actions
Copy link
Contributor

⚠️ Jira Issue Key Missing

Your PR title doesn't contain a Jira issue key. Consider adding it for better traceability.

Example:

  • feat: add user authentication (CM-123)
  • feat: add user authentication (IN-123)

Projects:

  • CM: Community Data Platform
  • IN: Insights

Please add a Jira issue key to your PR title.

1 similar comment
@github-actions
Copy link
Contributor

⚠️ Jira Issue Key Missing

Your PR title doesn't contain a Jira issue key. Consider adding it for better traceability.

Example:

  • feat: add user authentication (CM-123)
  • feat: add user authentication (IN-123)

Projects:

  • CM: Community Data Platform
  • IN: Insights

Please add a Jira issue key to your PR title.

@epipav epipav changed the title Tinybird resources for CDP aggs Tinybird pipes and datasources for CDP aggs Dec 19, 2025
@github-actions
Copy link
Contributor

⚠️ Jira Issue Key Missing

Your PR title doesn't contain a Jira issue key. Consider adding it for better traceability.

Example:

  • feat: add user authentication (CM-123)
  • feat: add user authentication (IN-123)

Projects:

  • CM: Community Data Platform
  • IN: Insights

Please add a Jira issue key to your PR title.

@epipav epipav changed the title Tinybird pipes and datasources for CDP aggs Pipes and datasources for CDP aggs Dec 19, 2025
@github-actions
Copy link
Contributor

⚠️ Jira Issue Key Missing

Your PR title doesn't contain a Jira issue key. Consider adding it for better traceability.

Example:

  • feat: add user authentication (CM-123)
  • feat: add user authentication (IN-123)

Projects:

  • CM: Community Data Platform
  • IN: Insights

Please add a Jira issue key to your PR title.

@epipav epipav changed the title Pipes and datasources for CDP aggs feat: pipes and datasources for CDP aggs (CDP-804) Dec 19, 2025
@epipav epipav self-assigned this Jan 27, 2026
from cdp_member_segment_aggregates_ds
where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY)
)
GROUP BY segmentId, memberId, updatedAt
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GROUP BY includes updatedAt causing incorrect aggregation

High Severity

The GROUP BY clause includes updatedAt from the table column, which will produce multiple rows per (segmentId, memberId/organizationId) pair when there are multiple distinct updatedAt values. The parent and grandparent segment pipes correctly use GROUP BY parentId, memberId without updatedAt. The updatedAt in GROUP BY references the table column, not the now() alias, causing partial instead of full aggregations.

Additional Locations (1)

Fix in Cursor Fix in Web

groupArrayDistinctMerge(activeOnState) AS activeOn,
countMerge(activityCountState) AS activityCount,
countDistinctMerge(memberCountState) as memberCount,
round(avgMerge(avgContributorEngagement)) AS avgContributorEngagement,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent round() usage causes data precision mismatch

Medium Severity

The backfiller pipe uses round(avgMerge(avgContributorEngagement)) while the changed segments sink pipes use avgMerge(avgContributorEngagement) without round(). Both export to the same Kafka topic (organizationSegmentsAgg_sink), causing data from backfills to have different precision than incremental updates. This inconsistency affects data integrity downstream.

Additional Locations (2)

Fix in Cursor Fix in Web

required=False,
)
}}
{% end %}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Backfillers include NULL parent/grandparent IDs without bucket_id

Medium Severity

When bucket_id is not defined, the member backfiller pipes have no WHERE clause and will include segments with NULL parentId or grandparentId, producing records with NULL segmentId. The daily changed segments sinks filter these out via their IN subqueries (since NULL doesn't match IN clauses), creating inconsistent behavior between backfill and incremental exports.

Additional Locations (1)

Fix in Cursor Fix in Web

Format files containing "cdp_organization" sequentially

EOF
exit 0
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Script exits with success code on argument errors

Low Severity

The show_help function always exits with code 0, but it's also called from error conditions (missing --match argument, unknown option). This causes the script to report success when it actually failed due to invalid arguments, which could mislead automated pipelines or CI/CD systems that rely on exit codes.

Additional Locations (2)

Fix in Cursor Fix in Web

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 3 potential issues.

from cdp_member_segment_aggregates_ds
where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY)
)
GROUP BY segmentId, memberId, updatedAt
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GROUP BY includes updatedAt causing incorrect aggregation

High Severity

The GROUP BY clause includes updatedAt, which prevents proper merging of aggregate states. Unlike the parent and grandparent segment pipes which group only by segmentId, memberId, this causes each distinct updatedAt value in the source table to produce a separate row with partial aggregation results. The query outputs now() as updatedAt, but the GROUP BY references the source column, resulting in multiple incorrectly aggregated rows per entity being sent to Kafka.

Additional Locations (1)

Fix in Cursor Fix in Web

countMerge(activityCountState) AS activityCount,
countDistinctMerge(memberCountState) as memberCount,
round(avgMerge(avgContributorEngagement)) AS avgContributorEngagement,
max(updatedAt) AS updatedAt
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Organization backfiller uses different updatedAt semantics than other pipes

Medium Severity

The organization backfiller uses max(updatedAt) to preserve source timestamps, while all member backfillers and all changed segment sinks use now() for the export timestamp. This inconsistency means the updatedAt field has different semantics depending on how data reaches Kafka, which could confuse downstream systems expecting uniform timestamp behavior.

Additional Locations (2)

Fix in Cursor Fix in Web

from cdp_member_segment_aggregates_ds
where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY)
)
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing filter for empty parentId/grandparentId causes incorrect aggregation

High Severity

The segments datasource defaults parentId and grandparentId to empty string for top-level segments. The parent and grandparent aggregation queries don't filter out empty values, causing all segments without parents/grandparents to be aggregated together into a single row with segmentId = ''. This combines unrelated data and produces invalid segment identifiers for Kafka export.

Additional Locations (2)

Fix in Cursor Fix in Web

@epipav epipav merged commit b54ee37 into main Jan 30, 2026
16 checks passed
@epipav epipav deleted the feat/cdp-aggs-through-tinybird-sinks branch January 30, 2026 14:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants