Data Vault PIT Table Design Pattern for Hadoop, Optimized for Virtualization
This post is a migration from a blog I did 3 years ago.
In this post, I will take you through some of the typical arguments and questions that arise when building a data vault on Hadoop. I will also go through a practical example with an insert-only architecture. Since this original post I have witnessed many successful data warehouses built on Hadoop. It is imprtant to note that some of what I show are merely patterns and should not be seen as a full solution (my aim is to keep it simple for demonstration purposes)
Hadoop in the data warehouse landscape
The use of Hadoop in the data warehouse and BI landscape is increasingly becoming more prevalent as companies begin to understand the benefits it brings. It is widely being used as a Data Lake in many technology stacks to store loads of corporate information. If anybody has worked with Hadoop you will know the options are endless and the learning curve can be steep. The tools to leverage the information as we are used to with mature reliable tools is just not quite there yet. There are many data integration tools that can integrate with Hadoop as a source or target, but building a data warehouse on Hadoop and leveraging its native distributed power is just not enterprise-ready out of the box.
But what about the Insert only debate?
It is true that HDFS does not allow update statements to records (stored in txt files on the distributed file system) but some of the more modern Hadoop-based storage engines like Apache Kudu start to address these issues which have caused organizations to reject Hadoop as a candidate for warehousing. We also tend to think that we should use the same design concepts to re-design what we have had in the past with these new technologies and this is a natural tendency. There is no debate that we are still a long way from the bulletproof acid transactional features we look for when comparing to the traditional RDBMS counterparts, but the question remains is it always vital?
But then, how do I end date my satellite tables?
End dating satellite tables is not a requirement, in fact, an insert-only architecture improves scalability and flexibility. We can allow concurrent writes from multiple sources and reduce the need for traditional centralized data integration ETL pipelines which may be needed to do expensive table scans for delta processing. we can also gracefully cater for late-arriving data to the warehouse. We can even start to take advantage of a NoEtL approach, and perhaps this could allow more room for automation?
Why don’t we try to do this virtually using pit tables, this could work for most use cases. We may have some issues in a streaming architecture but for now, let's focus on a more batch processing use case. So let's land the point that no end dates are stored in any tables.
Virtualization, where are you going with this?
What good is this data if we can't use it effectively? There are some further points to consider as I take you through my use case:
1. Let Hadoop do what Hadoop is good at
- Hadoop is good at storing and processing large amounts of data and provides a cheaper alternative to do this- Great, let's hand off the data vault to Hadoop, especially one updated on batch. So for the back-end data engineers, Hadoop is a great place to play.
2. Let Hadoop not do what it is not good at
- Natively Hadoop does not provide rich modelling tools for our less technical savvy analysts who will be using the information the data vault holds to derive critical business insights (Data scientists, Visualization experts, Business Analysts, Business users) — Ok, but my users are currently using <insert tool here> it provides a great graphical UI for data modelling and has rich tools sitting on top of a high performing analytical in-memory database. It would be great if we could use that, especially since it integrates with my identity and access management system already. — Go for it, Hans :)
3. Let's find a balance
Good solution architecture is about finding the right balance, the sweet spot between cost,skills, maintainability, fit and requirements
Let's use the power of virtualization to leverage the data vault asset in Hadoop. We may even negate the need to persist another data store for our users.
I will unpack the patterns in later posts but for now just want to land the concepts. Alot of the work right up to the business vault can be completely automated using a metadata driven approach when investing in the insert only way of life. (I will show you how soon enough)…
A Practical Example
What is the problem statement here?
- If I do not have end dates in my satellites how do I select the correct record as at a certain date in history (remember I can't do a select where the loaddate <= input_date <= loadenddate) as I have no end dates
- How do we create a full history set to load a dimension table? (remember each satellite table can have a different number of historic records or no records at all for a specific date depending on its rate of change in the attributes it holds)
I will start using a simple example where I have one hub table and 3 satellite tables each with different histories of attributes
Data preview:
If we had to do a left join from the hub to each satellite we would get back around 200 records per client id and many duplicate records with no end dates to filter on.
Creating the PIT Table
The main purpose of the PIT table is to combine the various satellites keys and load dates against the lowest possible grain and represent a version of a hub at every possible point in history. This table is then used in joins to improve query performance (avoid non-equi date joins)
Another important factor is that using a PIT tables makes virtualization easier as we can leave all data in our vault in Hadoop and allow users to decide which satellites tables they require based on their analytic requirements.
- Union all the loaddatetime columns for each satellite and derive a unique set of dates (these are effectively distinct versions of your hub entity in history), then calculate the end dates for the unique set.
- Calculate the end dates virtually for each satellite table
- Calculate the PIT table with non-equi date joins
End Dating your Satellites
This can be done using the LEAD function which is native to almost all databases the example below uses Apache Impala Sql
Select
meta_hashkey,
meta_loaddatetime,
LEAD(
seconds_sub(meta_loaddatetime,1),1,to_timestamp('9999-12-31','yyyy-MM-dd')
) OVER (PARTITION BY meta_hashkey ORDER BY meta_loaddatetime ASC) as
meta_loaddendatetime
from
sat2
order by
meta_hashkey,
meta_loaddatetime
;
Create distinct versions of the hub entity
The below represents every possible version of the hub entity per customer. Notice we add the zero record ‘1900–01–01’ to create a full history set. This is not necessary but recommended and done only if required. It is very important we use ‘distinct’ and ‘union’ and not ‘union all’ as we want a distinct list with no overlaps.
with hub_versions
as
(
Select distinct meta_hashkey, to_timestamp('1900-01-01','yyyy-MM-dd') as
meta_loaddatetime from hub
UNION
Select distinct meta_hashkey, meta_loaddatetime from sat1
UNION
Select distinct meta_hashkey, meta_loaddatetime from sat2
UNION
Select distinct meta_hashkey, meta_loaddatetime from sat3
),
hub_versions_ended
as
(
Select
meta_hashkey,
meta_loaddatetime,
LEAD(
seconds_sub(meta_loaddatetime,1),1,to_timestamp('9999-12-31','yyyy-MM-dd')
) OVER (PARTITION BY meta_hashkey ORDER BY meta_loaddatetime ASC)
as meta_loaddendatetime
from
hub_versions
)
select *
from hub_versions_ended
order by
meta_hashkey,
meta_loaddatetime
Create the PIT table
Using the above we will now create the pit table which will have the following structure
with
sat1_ended
as
(
Select
meta_hashkey,
meta_loaddatetime,
LEAD(
seconds_sub(meta_loaddatetime,1),1,to_timestamp('9999-12-31','yyyy-MM-dd')
) OVER (PARTITION BY meta_hashkey ORDER BY meta_loaddatetime ASC) as
meta_loaddendatetime
from
sat1
),
sat2_ended
as
(
Select
meta_hashkey,
meta_loaddatetime,
LEAD(
seconds_sub(meta_loaddatetime,1),1,to_timestamp('9999-12-31','yyyy-MM-dd')
) OVER (PARTITION BY meta_hashkey ORDER BY meta_loaddatetime ASC) as
meta_loaddendatetime
from
sat2
),
sat3_ended
as
(
Select
meta_hashkey,
meta_loaddatetime,
LEAD(
seconds_sub(meta_loaddatetime,1),1,to_timestamp('9999-12-31','yyyy-MM-dd')
) OVER (PARTITION BY meta_hashkey ORDER BY meta_loaddatetime ASC) as
meta_loaddendatetime
from
sat3
),
hub_versions
as
(
Select distinct meta_hashkey, to_timestamp('1900-01-01','yyyy-MM-dd') as meta_loaddatetime from hub
UNION
Select distinct meta_hashkey, meta_loaddatetime from sat1
UNION
Select distinct meta_hashkey, meta_loaddatetime from sat2
UNION
Select distinct meta_hashkey, meta_loaddatetime from sat3
),
hub_versions_ended
as
(
Select
meta_hashkey,
meta_loaddatetime,
LEAD(
seconds_sub(meta_loaddatetime,1),1,to_timestamp('9999-12-31','yyyy-MM-dd')
) OVER (PARTITION BY meta_hashkey ORDER BY meta_loaddatetime ASC)
as meta_loaddendatetime
from
hub_versions
)
Select
pit.meta_hashkey,
pit.meta_loaddatetime as effectivedate,
pit.meta_loaddendatetime as effectiveenddate,
sat1.meta_loaddatetime as sat1_loaddatetime,
sat2.meta_loaddatetime as sat2_loaddatetime,
sat3.meta_loaddatetime as sat3_loaddatetime
from
hub_versions_ended as pit
left outer join
sat1_ended as sat1 on
sat1.meta_hashkey = pit.meta_hashkey and
sat1.meta_loaddatetime <= pit.meta_loaddatetime and
sat1.meta_loaddendatetime >= pit.meta_loaddendatetime
left outer join
sat2_ended as sat2 on
sat2.meta_hashkey = pit.meta_hashkey and
sat2.meta_loaddatetime <= pit.meta_loaddatetime and
sat2.meta_loaddendatetime >= pit.meta_loaddendatetime
left outer join
sat3_ended as sat3 on
sat3.meta_hashkey = pit.meta_hashkey and
sat3.meta_loaddatetime <= pit.meta_loaddatetime and
sat3.meta_loaddendatetime >= pit.meta_loaddendatetime
;
To show what would have occurred if we did not do the non-equi date joins. Notice the gaps as the satellite tables have different histories
For a batch operated data vault we would save an updated and persisted version of this PIT table with each and every batch window
Join the PIT and Satellites
select
h.customer_id,
pit.effectivedate,
pit.effectiveenddate,
s1.first_name,
s1.last_name,
s1.gender,
s1.marritalstatus,
s2.address,
s2.email,
s2.phone,
s3.car_make,
s3.department,
s3.has_dependents
from pit_customer as pit
inner join hub as h
on pit.hashkey = h.meta_hashkey
left outer join sat1 as s1
on pit.hashkey = s1.meta_hashkey
and pit.sat1_loaddatetime = s1.meta_loaddatetime
left outer join sat2 as s2
on pit.hashkey = s2.meta_hashkey
and pit.sat2_loaddatetime = s2.meta_loaddatetime
left outer join sat3 as s3
on pit.hashkey = s3.meta_hashkey
and pit.sat3_loaddatetime = s3.meta_loaddatetime
order by
h.customer_id,
pit.effectivedate
It is important to note that users may need to condense rows if they select attributes of a slowly changing satellite table only (there may be duplicated entries)
Try it yourself
A copy of the scripts and CSV files used in the above demo can be found at the following GitHub repository:
https://github.com/KeithLeo-Smith/example_dv_hadoop_pit_table