In this blog post we will discuss plans for an ambitious endeavour to deliver Data Warehouse capabilities with a Hadoop cluster leveraging its strengths and working around the weaknesses.
This is not going to be an easy task as we are dealing with a highly dynamic and distributed world of mobile games, however certainly it makes things interesting.
What do we need?
We need a system that is flexible, scalable, extensible and cheap. That’s at least our dream. Hadoop is well known for its scalability and cost effectiveness to process large volumes of unstructured data so we can almost imagine our dreams come true. If it’s good for unstructured data, let’s just impose the structure and we have the warehouse at home.
This sounds too simple. Let’s be more specific. We need to continuously gather and organise large volumes of data in a way that enables us to ask questions and get true answers fast. If we don’t meet these requirements, not much of business value is going to be delivered because either we will get wrong information or we will not get it on time.
Now when we are more specific about our needs, let’s see why some people say that Data Lake is not a Data Warehouse.
Everyone should already know that Big Data is about Vs. Variety is one of them. Hadoop can store any data and answer any question through its various interfaces being it MapReduce, Spark or Hive. However, to ask a question we need to impose some structure, process the data and hopefully get a structured answer. This can be done when reading the data and it’s called schema on read. This is an important property of Big Data systems and is not a case for traditional data warehouses.
So are traditional data warehouses incapable of answering various questions? They are limited for a reason which is not only about performance. They also impose schemas for analysts to understand the data and the sort of questions they can ask. Without a schema they would end up writing complex processing pipelines just to answer simple question they are used to ask in SQL.
It turns out that Hadoop has a way to tell the analysts about the structure of the data. It does it through HCatalog where data is grouped into databases, tables, partitions and columns just like in a traditional database. Hive is used to manage all this metadata and access the data through a familiar SQL interface. Under the hood the queries are translated into a series of operations performed on a Hadoop cluster. This is a managed version of a schema on read approach. The schema definition may change but the change should only affect the metadata, not the data.
The picture below illustrates the way how many data analysts are used to work with Hadoop through the Hive SQL interface.
As mentioned, we need to gather and organise large volumes of data so we need to efficiently scale storage and computing resources. It may seem trivial as Hadoop promises linear scalability with the size of the cluster but this is entirely not the case, if our questions are not of linear complexity. If our storage capacity and processing power increases linearly, the actual resources required may be growing much faster. This is especially true in cases where multiple datasets need to be joined together.
This problem is not only related to Hadoop but is also evident in any other system storing and processing large volumes of data. To overcome this problem we need to design our system so that the questions being asked are only targeted at a constant subset of the data and the data is organised in a way that reduces the processing complexity. To make it happen we need both indexing and partitioning. Fortunately, Hive enables us to define indexed partitions. We hope they should be sufficient to solve our problem. The only thing we need to do is to determine the indexing and partitioning approaches. Before doing this, we first need to take a closer look at the domain we are dealing with.
The picture below illustrates how the introduction of time partitions helps us decrease the computational complexity. Introducing efficient columnar storage formats like ORC or Parquet and indexing further helps to improve efficiency within the partitions.
Dimensions and Veracity
A well know approach to structure data in a relational data warehouse is to organise it in a star schema. In such a schema we can distinguish two types of objects: facts and dimensions. Facts don’t have identities, they have a very short life cycle and they usually reference some dimension objects. Facts are events which happen and immediately become a part of history. They are not tracked nor changed. An example fact in a game domain is a level up event. There are many such events happening throughout a game but once they happen they are already irrelevant from the game perspective. What matters to the game is the effect they have on the state. The state is maintained within the dimensions. They have identities and very long and often complex life cycles. An example dimension in the game domain is a player dimension. It may have various attributes which may change through the game. One of such attributes is the level.
The distinction between facts and dimensions is key to design an efficient approach to organise the data in the warehouse. Facts are easy to manage. They need to be appended into the right tables within time-based partitions and all their foreign keys could to be indexed to increase join performance. New partitions could be created on hourly or daily basis according to data volume and business requirements. Dimension management is much more complicated and is an important reason why Data Lake may fail to become a Data Warehouse. The complexity lies within a dynamic nature of dimensions. This introduces uncertainty about the correct state of a dimension in a given point in time. Traditional data warehouses handle this uncertainty by using the concept of slowly changing dimensions. Every new state of a dimension is a new instance of it in a data warehouse and all the facts reference a corresponding instance which was effective during the time the event occurred. In a Hadoop environment which is mostly suited for appending and sequential scanning this process is hard to implement. We need to find a way to overcome these limitations.
Dimensions and Velocity
In order to maintain an acceptable level of veracity for historical analysis, we usually need to implement an SCD mechanism of Type 2, 4 or 6. To manage the dynamic nature of such processes we need a system with efficient random access to exact instances of dimensions, update their effective dates and create new instances. It would also be good to have a way to generate unique surrogate keys for new entries to make analytics easier but for the sake of simplicity let’s forget about this requirement for the time being.
HBase is a storage system integrated with Hadoop which is our first target to base our SCD implementation on. How exactly could we use it to solve the problem? Let’s start with a naive approach and discuss reasons why it’s flawed.
In such approach HBase maintains two types of tables. One with the current state and one with a history of states of a dimension. The current table’s rowkey is the identifier while the history table’s rowkey is based on the update timestamp (including a salt to eliminate RegionServer hotspotting). Once a change is captured, the system queries the current table for a previous version. It the table doesn’t contain any record, a new record is inserted with end date being far in the future. If a record exists, it’s being moved to the history table with both rowkey and end date reflecting the timestamp of the recent update. The newly updated record is put into the current table.
The tables below illustrate an example state of current and history tables in HBase at a selected point in time.
HBase Current @ 2017-01-01 00:00:04
HBase History @ 2017-01-01 00:00:04
This approach would work fine under two conditions: the events are captured in an order according to the actual event time and the entire operation is transactional with the scope of each unique instance of a dimension. It’s not trivial to meet any of these conditions in distributed and highly dynamic environments. Why is that? First of all, events may be produced at any time and captured at any other. Parallel processing of the captured events may further reorder the sequence. Finally, when the current state is queried, we need to be certain that it’s still current. Without performing the replacement within a transaction, two events changing a state of one instance, may both refer to the same previous state, which is not the right reference for at least one of them. The problems become even worse, if the source systems are distributed what is often a case with modern architectures based on microservices.
The tables below illustrate the problem after a change to player B happens twice in short time periods.
HBase Current @ 2017-01-01 00:00:08
HBase History @ 2017-01-01 00:00:08
Microservices are deployed in order to physically separate responsibilities between various components of a software system. They maintain their own state and communicate with other services through the network. Although their state is local, they often refer to entities which exist globally. In a game domain one service may be responsible for managing player inventory and some other may track player levels. As these two services operate, they may be concurrently changing the state of a global instance of a dimension. The changes should be captured by the data platform and applied to the right version of the referenced object.
Velocity is our last V while SCD explicitly refers to slow changes. What if the changes are so dynamic and distributed? We need a complete redesign of the dimension management solution. There are alternative approaches but they need careful design and may not be practical in dynamic Big Data environments.
The problem of event and processing time discrepancies is known and described in a paper published by Google ‘The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data’. An example implementation of the Dataflow Model is Apache Beam. A complex example illustrates the problem within the game domain. This already sounds complicated and keep in mind that we are dealing with stateless facts in the example. What would be a solution for maintaining the truth accords multiple dimensions of such events? Let’s try to proceed, if our mind is not yet blown.
Rapidly Changing Dimensions
Because of the complex nature of time in the context of data analytics, batch processing will probably never go away. We will always need to fit the unbounded streams into fixed and hopefully closed time windows and perform some sort of batching. We can only hope for the windows to be closed so we need an approach to recalculate all the time partitions depending on changed ones.
Let’s try again with HBase and work around it’s transactional limitations. Fortunately, it ensures ordering of keys so event time ordering problem is solved when using update timestamp in the rowkey. We can append new changes to selected attributes of an identified entity in an HBase change log table. Except for the key, each row needs to contain a cell storing the entity identifier and a cell named after an attribute with the new value of it. After some time, when we are almost certain the window is closed, we could do a sequential rowkey scan of the hopefully closed window and apply the changes according to our naive algorithm with current and history tables in HBase. We can run this process in parallel but need to maintain the sequence of operation within each entity. Each version appended to a history table in HBase is already considered an immutable part of history. This observation is key to overcome the problem of nonlinear complexity as all the ineffective versions could be materialised in partitions based on the time window in which they stopped to be current. We still need to maintain one more partition though. This is a partition with all the instances which are not yet finalised and are still considered current. Let’s give this partition a timestamp far in the future just their contents’ end date. Every time the HBase batch process is finished, we would overwrite the active partition in Hive and insert a new historical partition.
The table below illustrates the way an HBase change log table could be organized. With dynamic schemas of Bigtable implementations the changes can refer to any subset of the attributes and can be integrated on read.
HBase Change Log @ 2017-01-01 00:00:03
Designing the solution is just the first step. We already see the implementation is not going to be easy but yet we have no evidence that our Data Lake cannot be transformed into an efficient Data Warehouse. We should expect other problems to appear but we hope to find workarounds for them.
Making the Dream Come True
Only after that we can do any analytical processing of the facts with the dimension. When performing joins in Hive we will only need to reference the dimension partitions corresponding to the analysed fact partitions plus the active partition which is fairly constant in size. In this way we should be capable of ensuring scalability and correctness at the cost of latency imposed by the batch process and its partitioning resolution.
In the next posts we will get our hands dirty with the actual PoC implementation for a selected stream of facts and the player dimension.
by Robert Wiśniewski, Big Data Analyst at Reality Games.