Reactor

Keeping Your Data Warehouse Lean and Insightful

Destinations · Updated July 17, 2025

Introduction

After Reactor efficiently collects, indexes, transforms, and delivers data from your source systems into your data warehouse, it adds this information as fresh, new rows. This approach is a bit different from systems like Fivetran, which often sync or update existing records directly in the warehouse. As a result, you might notice multiple rows accumulating over time, each representing a different update to the same underlying record.

To ensure your analytical efforts are laser-focused on the most current information and to avoid any data duplication headaches, we highly recommend building a deduplication view on top of your landing table. Think of this view as a smart filter, ensuring that only the latest and greatest version of each record shines through, leading to more accurate and efficient data analysis.

Now, while having the latest data is crucial for many analyses, holding onto historical updates can also unlock powerful insights! This capability allows you to build sophisticated data models that track slow-changing dimensions, capture the evolution of your data over time, and generate reports highlighting key changes. It's about having the flexibility to leverage your data in diverse ways.

Here are a couple of example deduplication queries tailored for Google BigQuery and Snowflake to get you started. Remember, these are just starting points. Feel free to adapt them to your specific needs.

Example Deduplication Query

This deduplication query can be used to populate a deduplicated view of a landing table. It excludes "stale" records, as well as records that have been marked for deletion by Reactor.

Several fields in this query represent identifying dimensions that can be configured in Reactor as model fields: 

  • The fields source_id, node, and scope represent the “identity” of a record.
    • source_id is the ID of a source entity.
    • node is the origin of the source data.
    • scope is a parent-level entity used to differentiate entities that share the same source_id but are separate within a higher-level business context.
  • The message_version field is an optional field used to track an entity's version. For example, when mapped to a timestamp-like source field, it can be used to track an entity's Type 2 Slowly Changing Dimension history.
  • The is_deleted field denotes whether a row should be deleted because Reactor has reprocessed it to comply with a customer data deletion request (See Requesting Removal of Consumer Data for more information on how Reactor handles customer data deletion requests).

ℹ️ See Landing Table Best Practices: Setting the Stage for Success for more information on the dimensions above.

Databricks (Spark SQL) Deduplication Query

This query can be executed as-is in Databricks. Simply replace database.schema.landing_table with the actual names of your database, schema, and landing table, respectively.

WITH ranked_data AS (
SELECT
*,
-- This message_history_version field is used to identify the most recent version of a record.
-- With this field we can dedupe out historical Reactor Replays, network crashes,
-- checkpointing and mapping changes.
ROW_NUMBER() OVER (
PARTITION BY
node, -- Origin of the source data
scope, -- Account-like thing at the Origin
source_id, -- ID of a source entity
message_version -- A version field of the source record (if available)
ORDER BY
version DESC, loaded_at DESC
) AS message_history_version,
-- The entity_version field is used to identify the most recent version of a record entity.
-- We don't dedupe on this field as a default, but you can use it as a filter in a downstream model.
-- Example: `select * from vw_landed_dedupe where entity_version = 1`
ROW_NUMBER() OVER (
PARTITION BY
node,
scope,
source_id
ORDER BY
message_version DESC NULLS LAST,
version DESC, loaded_at DESC
) AS entity_version
FROM database.schema.landing_table
WHERE NOT IFNULL(reactor_is_deleted, FALSE)
)
SELECT
*
FROM ranked_data
WHERE message_history_version = 1
AND NOT IFNULL(reactor_is_deleted, FALSE);

Google BigQuery Deduplication Query

This query can be executed as-is in BigQuery. Simply replace database.schema.landing_table with the actual names of your database, schema, and landing table, respectively.

with ranked_data as
(
select
*,
-- This message_history_version field is used to identify the most recent version of a record.
-- With this field we can dedupe out historical Reactor Replays, network crashes,
-- checkpointing and mapping changes.
row_number() over (
partition by
node, -- Origin of the source data
scope, -- Account-like thing at the Origin
source_id, -- ID of a source entity
message_version -- A version field of the source record (if available)
order by
version desc, loaded_at desc
) as message_history_version,
-- The entity_version field is used to identify the most recent version of a record entity.
-- We don't dedupe on this field as a default, but you can use it as a filter in a downstream model.
-- Example: `select * from vw_landed_dedupe where entity_version = 1`
row_number() over (
partition by
node,
scope,
source_id
order by
message_version desc nulls last,
version desc, loaded_at desc
) as entity_version
from database.schema.landing_table
where not ifnull(is_deleted,false)
)
select
*
from ranked_data
where message_history_version = 1
and not ifnull(reactor_is_deleted,false)

Microsoft SQL Deduplication Query

This query can be executed as-is in MS SQL. Simply replace database.schema.landing_table with the actual names of your database, schema, and landing table, respectively.

WITH ranked_data AS (
SELECT
*,
-- This message_history_version field is used to identify the most recent version of a record.
-- With this field we can dedupe out historical Reactor Replays, network crashes,
-- checkpointing and mapping changes.
ROW_NUMBER() OVER (
PARTITION BY
node, -- Origin of the source data
scope, -- Account-like thing at the Origin
source_id, -- ID of a source entity
message_version -- A version field of the source record (if available)
ORDER BY
version DESC, loaded_at DESC
) AS message_history_version,
-- The entity_version field is used to identify the most recent version of a record entity.
-- We don't dedupe on this field as a default, but you can use it as a filter in a downstream model.
-- Example: `select * from vw_landed_dedupe where entity_version = 1`
ROW_NUMBER() OVER (
PARTITION BY
node,
scope,
source_id
ORDER BY
CASE WHEN message_version IS NULL THEN 1 ELSE 0 END, message_version DESC, -- Emulates NULLS LAST
version DESC, loaded_at DESC
) AS entity_version
FROM
database.schema.landing_table
WHERE
NOT ISNULL(reactor_is_deleted, 0) -- ISNULL(expression, replacement)
)
SELECT
*
FROM
ranked_data
WHERE
message_history_version = 1
AND NOT ISNULL(reactor_is_deleted, 0);
Key Changes and Considerations:IFNULL / NVL / ISNULL: The ifnull(reactor_is_deleted, false) expression has been adapted to the respective DWH system's null-handling function (IFNULL for Snowflake/Databricks, NVL for Redshift, ISNULL for MS SQL Server). Note that for ISNULL in MS SQL, FALSE is represented as 0 for boolean/bit columns.NULLS LAST: Snowflake, Redshift, and Databricks directly support NULLS LAST in the ORDER BY clause of window functions. For MS SQL Server, a CASE statement is used to achieve the same effect by assigning a sorting priority to NULL values.Table Name: Remember to replace database.schema.landing_table with the actual path to your table in each query.These queries maintain the exact deduplication logic you provided, tailored for each specified data warehouse system.

Redshift BigQuery Deduplication Query

This query can be executed as-is in Redshift. Simply replace database.schema.landing_table with the actual names of your database, schema, and landing table, respectively.

RedshiftRedshift, being based on PostgreSQL, supports NULLS LAST and NVL (equivalent to IFNULL).-- Redshift Deduplication Query
WITH ranked_data AS (
SELECT
*,
-- This message_history_version field is used to identify the most recent version of a record.
-- With this field we can dedupe out historical Reactor Replays, network crashes,
-- checkpointing and mapping changes.
ROW_NUMBER() OVER (
PARTITION BY
node, -- Origin of the source data
scope, -- Account-like thing at the Origin
source_id, -- ID of a source entity
message_version -- A version field of the source record (if available)
ORDER BY
version DESC, loaded_at DESC
) AS message_history_version,
-- The entity_version field is used to identify the most recent version of a record entity.
-- We don't dedupe on this field as a default, but you can use it as a filter in a downstream model.
-- Example: `select * from vw_landed_dedupe where entity_version = 1`
ROW_NUMBER() OVER (
PARTITION BY
node,
scope,
source_id
ORDER BY
message_version DESC NULLS LAST,
version DESC, loaded_at DESC
) AS entity_version
FROM
database.schema.landing_table
WHERE
NOT NVL(reactor_is_deleted, FALSE)
)
SELECT
*
FROM
ranked_data
WHERE
message_history_version = 1
AND NOT NVL(reactor_is_deleted, FALSE);

Snowflake Deduplication Query

This query can be executed as-is in Snowflake. Simply replace database.schema.landing_table with the actual names of your database, schema, and landing table, respectively.

WITH ranked_data AS (
SELECT
*,
-- This message_history_version field is used to identify the most recent version of a record.
-- With this field we can dedupe out historical Reactor Replays, network crashes,
-- checkpointing and mapping changes.
ROW_NUMBER() OVER (
PARTITION BY
node, -- Origin of the source data
scope, -- Account-like thing at the Origin
source_id, -- ID of a source entity
message_version -- A version field of the source record (if available)
ORDER BY version DESC, loaded_at DESC
) AS message_history_version,
-- The entity_version field is used to identify the most recent version of a record entity.
-- We don't dedupe on this field as a default, but you can use it as a filter in a downstream model.
-- Example: `select * from vw_landed_dedupe where entity_version = 1`
ROW_NUMBER() OVER (
PARTITION BY
node,
scope,
source_id
ORDER BY message_version DESC NULLS LAST, version DESC, loaded_at DESC
) AS entity_version
FROM database.schema.landing_table
WHERE NOT IFNULL(reactor_is_deleted, FALSE)
)
SELECT
*
FROM ranked_data
WHERE message_history_version = 1
AND NOT IFNULL(reactor_is_deleted, FALSE);

The Importance of Landing Table Pruning

Over time, not all historical data retains its value. Rows in your landing table that were once critical for analysis, operations, or even AI models might become stale and no longer contribute meaningful insights, especially if you have executed Mapper Replays as your Reactor mapping configurations evolve over time. Holding onto this outdated information can lead to increased storage costs and potentially impact query performance.

This is where the concept of landing table pruning comes in. Pruning is the process of systematically deleting these "stale" rows from your landing table. By identifying and removing data that is no longer actively used, you can keep your data warehouse lean, efficient, and focused on the information that truly matters. This practice not only optimizes storage but also helps to maintain a clean and performant environment for all your data-driven initiatives.

Below are some example garbage collection queries that can be executed against a deduplication view shared above. This query selects any rows in the landing table that are not included in a deduplicated view of the landing table (using the deduplication query above).

ℹ️ The queries below reference several fields referenced in the Example Deduplication Query above, as well as the loaded_at field, which is a timestamp representing when a record was exported from Reactor to your data warehouse.

Databricks (SparkSQL) Pruning Query

This query can be executed as-is in Databricks. Simply replace database.schema.vw_table_deduping_view with the actual names of your database, schema, and deduping view, respectively.

DELETE
FROM database.landing_schema.table
WHERE IFNULL(CONCAT(node, ':', scope, ':', source_id, ':', version, ':', loaded_at), '') NOT IN
(
SELECT IFNULL(CONCAT(node, ':', scope, ':', source_id, ':', version, ':', loaded_at), '')
FROM database.landing_schema.vw_table_deduping_view
)
AND loaded_at < DATE_ADD(CURRENT_TIMESTAMP(), -1);

Google BigQuery Pruning Query

This query can be executed as-is in BigQuery. Simply replace database.schema.vw_table_deduping_view with the actual names of your database, schema, and deduping view, respectively.

delete
from database.landing_schema.table
where ifnull(concat(node,':',scope,':',source_id,':',version,':',loaded_at),'') not in
(
select ifnull(concat(node,':',scope,':',source_id,':',version,':',loaded_at),'')
from database.landing_schema.vw_table_deduping_view
)
and loaded_at < timestamp_add(current_timestamp(), interval -1 day)

Microsoft SQL Pruning Query

This query can be executed as-is in MS SQL. Simply replace database.schema.vw_table_deduping_view with the actual names of your database, schema, and deduping view, respectively.

DELETE
FROM database.landing_schema.table
WHERE ISNULL(CONCAT(node, ':', scope, ':', source_id, ':', version, ':', CONVERT(VARCHAR, loaded_at, 120)), '') NOT IN
(
SELECT ISNULL(CONCAT(node, ':', scope, ':', source_id, ':', version, ':', CONVERT(VARCHAR, loaded_at, 120)), '')
FROM database.landing_schema.vw_table_deduping_view
)
AND loaded_at < DATEADD(day, -1, GETDATE());

Redshift Pruning Query

This query can be executed as-is in Redshift. Simply replace database.schema.vw_table_deduping_view with the actual names of your database, schema, and deduping view, respectively.

DELETE
FROM database.landing_schema.table
WHERE NVL(node || ':' || scope || ':' || source_id || ':' || version || ':' || loaded_at::VARCHAR, '') NOT IN
(
SELECT NVL(node || ':' || scope || ':' || source_id || ':' || version || ':' || loaded_at::VARCHAR, '')
FROM database.landing_schema.vw_table_deduping_view
)
AND loaded_at < DATEADD(day, -1, GETDATE());

Snowflake Pruning Query

This query can be executed as-is in Snowflake. Simply replace database.schema.vw_table_deduping_view with the actual names of your database, schema, and deduping view, respectively.

DELETE
from database.landing_schema.table
WHERE IFNULL(CONCAT(node,':',scope,':',source_id,':',version,':',loaded_at),'') NOT IN
(
SELECT IFNULL (CONCAT(node,':',scope,':',source_id,':',version,':',loaded_at),'')
FROM database.landing_schema.vw_table_deduping_view
)
AND loaded_at < TIMESTAMPADD(DAY, -1, SYSDATE())