Thursday, 7 June 2018

Bigdata Testing – What to do?

“All code is guilty, until proven innocent.” – Anonymous

This blog is first in the series of blogs that would be focused on illustrating what should be kept in mind while doing Bigdata testing.

 Why is testing so important?

Testing is an integral part of the software development life cycle. It ensures that the software is working as per the specifications and is of high quality. Testing is required for an effective performance of software application or product.
Testing in the Bigdata world is even more complicated due to the size and variety of data.

Why is Bigdata testing complex?

The 3 Vs of Bigdata that makes it so powerful, also makes Bigdata Testing very complex:
Volume: volume of data is huge so it makes it next to impossible to test the entire data
Variety: Sources can have data coming in different formats – Structured, Semi structured and unstructured.
Velocity: Rate of ingestion can vary with type of source (batch, real-time, near real time etc.) With the need to ingest data at real time, replicating this kind of testing scenarios is challenging.

Technology Landscape: There are a number of open source technologies present in the market for data ingestion, processing and analytics. This increases the learning curve for the tester and also increases the time and effort required for testing.

Bigdata testing Methodology

Bigdata testing can be primarily divided into 3 areas:
  •  Data validation testing (Preprocessing)
  •  Business logic validation testing (Processing)
  •  Output validation testing (Consumption)

This blog talks about the different things that should be tested in the data validation testing phase.

Data Validation Testing

Data validation testing ensures that right data has been ingested into the system. It consists of testing the data ingestion pipeline and the data storage.

Data Ingestion

Prominent Bigdata technologies used for Data ingestion are Flume, Sqoop, Spark, Kafka, Nifi etc.

Multi-Source Integration – There is a need to integrate data from multiple sources into the ingestion pipeline. These sources would have different volumes of data getting generated at different times.

Things to test:
  • What is the mechanism of data ingestion from each source? (Data is getting pulled or pushed from each source)
  • What is the format in which data is received from each source?
  • Has input been checked for consistency with a minimum/maximum range?
  • Is the data secure while getting transferred from source to target?
  • Who has access to the source systems (RDBMS tables or SFTP folders)?
  • Are the files (In case of SFTP) getting archived or purged after copy, based on the pre decided frequency?
  • Are audit logs getting generated?
  • Are ingestion error logs getting generated?

Integrity Check – Data integrity can be checked via multiple methods like record count, file size, checksum validation etc.
Things to test:
  • What is the size of the file transferred?
  • The number of columns in the source and target are matching?
  • Is the sequence of columns correct?
  • Are the column data types matching?
  • Is the record count matching for both source and target?
  • Are the checksums for source and target matching?

Change data Capture (CDC) – For incremental data load all the scenarios of insert, update and delete of records/fields need to be tested.

Things to test:
  • Are the new records getting inserted?
  • Are the records deleted at the source side marked as delete at target?
  • Are new versions getting created for the updated records (if we are maintaining history else are they getting overwritten with latest version)?
  • Are audit logs getting generated?
  • Are error logs getting generated?

Data Quality – Quality of data is one of the most important aspects of data ingestion. Data quality rules are defined by functional SMEs after discussion with business users. These rules are specific to the source system.
Things to test:
  • Are any of the key fields Null?
  • Are there any duplicate records?
  • Have all the data quality rules being applied?
  • Is the total number of good and bad records matching the number of records ingested from each source?
  • Are the good records getting passed on to the next layer?
  • Are the bad records getting stored for future reference?
  • Can the bad records be traced back to the source?
  • Can the reason of bad records be traced?
  • Can we trace the good and bad records to the respective data loads?
  • Can the good and bad records be cleaned up for a particular load?
  • Are error/Audit logs getting generated?

Data Storage

Data ingested from multiple sources can be stored in premise or in cloud based on the deployment model chosen:
In Premise - Hadoop Distributed Filesystem, S3 or NoSQL data store.
Cloud based – S3, Glacier, HDFS, NoSQL, RDS etc.

Things to test:

Common across storage:
  • What is the file format of the ingested data?
  • Is the data getting compressed as per the decided compression format?
  • Is the integrity of data maintained after the compression?
  • Is the data getting archived to correct location?
  • Is the data getting archived at the decided time frame?
  • Is the data getting purged at the decided time frame?
  • Is the data stored in file system or NoSQL accessible?
  • Is the archived data accessible

Simple Storage Service (S3):
  • Is there a requirement to encrypt data in S3? If yes, is the data getting encrypted correctly?
  • Is there a requirement for versioning data in S3? If yes is data getting versioned?
  • Are rules set to move data to RRS (Reduced Redundancy Storage) or Glacier? Is the data moving according to the policy?
  • Is all the access to buckets tested as per bucket policy and ACLs that have been set?
  • Is the data in S3 encrypted?
  • Can S3 data be accessed from the nodes in the cluster?

How to Test?

The most common way to test data is the following:
  • Data Comparison – Simplest way is to compare the source and target file for any differences.
  • Minus Query - Minus Queries purpose is to perform source-minus-target and target-minus-source queries for all data, making sure the extraction process did not provide duplicate data in the source and all unnecessary columns are removed before loading the data for validation.

Both of these methods are not easy to use in Bigdata world as the volume is huge. Hence we have to use different methods:
  • Data Sampling - Entire data cannot be tested record by record as the volume is huge. Data needs to be tested by sampling.
  • It’s very important to select the correct sample of data to ensure we have tested all scenarios. Samples should be selected to cover the maximum number of variations in the data.
  • Basic Data Profiling – Another method to ensure that the data has been ingested correctly is to do data profiling example -count of records ingested, Aggregation on certain columns, minimum value and maximum value ranges etc.
  • Running queries to check data correctness – Queries like grouping on certain columns or fetching top and bottom 5 can help in checking whether correct no of records has been ingested for each scenario.

Part 2 of the blog would cover the details on Business logic validation testing  and Data Consumption

Part 3 of the blog would cover aspects of performance and Security Testing

Thursday, 21 September 2017

Data Lake Explained

Data Lake Explained


Why Data Lake?

With the advent of Bigdata, Enterprises needed new ways to store, process and analyze data. The existing traditional data warehouses were not capable of handling such huge volume and variety of data. Data lakes became popular because they provided a cost-effective and technologically feasible way to meet these big data challenges. 

What is Data Lake?

Data Lake is an architecture that allows collecting, storing and processing, analyzing and consuming all data that flows into an organization. Data lake stores data in its raw (as-is) or native format.
It can also be considered as a repository created from the data coming in from disparate data sources.

Pentaho CEO James Dixon coined the word data lake:
“If you think of a Data Mart as a store of bottled water – cleansed and packaged and structured for easy consumption – the data lake is a large body of water in a more natural state

A data lake can be imagined as a huge grid, with billions of rows and columns. Where each cell of the grid may contain different types of data. Thus, a cell can contain a document, another photograph and other cell can contain a paragraph or a single word of a text. No matter where the data came from, it will just be stored in a cell.

How is a data lake different from a data warehouse?

Data lake and data warehouse both are very different as both are optimized for different purposes. There is a significant difference in the way the data is stored, processed and analyzed in each of them.
(Note: DL – Data lake, DW – Data warehouse)

Data: DW stores orange data which is highly structured, processed and transformed. DL stores data in raw format and it can be structured, semi structured or unstructured.

Storage: DW follows a ‘schema on write’ approach where the data structure is decided before ingesting the data. DL follows a ‘schema on read’ (Late binding) approach where any form of data can be ingested and schema is decided while reading the data.

Processing: DW has capability to process structured data whereas DL has capability to process data in any format (Structured, semi structured and unstructured). DW follows ETL (Extract, transform and load), the structures are predefined and data is loaded after transformation into those predefined structures. Top down approach is used.

DL follows ELT (Extract, load and transform), the data is loaded first and then transformed based on need. Bottom up approach is used.

Consumption: Since DW has a predefined structure only pre-determined questions can be answered and data is aggregated so visibility into the lowest levels is lost. DL offers endless possibility of querying data in different ways a data is stored at lowest granularity.

Cost: Cost of building a DL is low as it uses open source technologies like Hadoop, NoSQL etc.

Flexibility: DL can adapt to change easily as data is stored in raw format and its ‘schema on read’. Changes in DW would require considerable time and effort.

Data Lake Architecture

Data Lake Zones

Coutsey: DZone
  • Transient zone – this layer is usually the landing layer where data is pulled or pushed from different source systems. It’s a temporary zone where data is kept before its inserted into data lake.
  • Raw Zone – This layer stores data ingested from multiple sources in raw (native or as-is) format.
  • Trusted Zone – This layer stores data after applying data quality rules on the raw data.
  • Refined Zone – This layer stores Manipulated and enriched data.

Benefits to Enterprise

Data Lake acts as a central storage of data and assists enterprises to get meaningful business insights.
It eliminates the need to query multiple systems existing in silos to get relevant information.
Maintenance, monitoring and security become comparatively easy as data is managed at a central location.
All kinds of data (structured, semi structured or unstructured) can be stored in the data lake.
Its ‘schema on read’ hence allows any kinds of queries to be designed on raw data.


Data lakes serve as a single repository to store vast amount of data in its native format. Data warehouse in conjunction with data lakes can be used to deliver accelerating returns. Data warehouse and Data Lake can complement each other.

Friday, 15 September 2017

Planning a cloud based Hadoop Environment

Planning a cloud based Hadoop Environment

The first step in starting a project is to plan the Hadoop cluster. The Hadoop cluster needs to be planned carefully based on the current and anticipated future needs. There are many aspects in designing the cluster:
Hadoop distribution selection
Hardware Selection
Selection of storage
Determine data volume and number of machines

1.     Hadoop distribution Selection

There are multiple vendors in this space; the most prominent ones are Cloudera and Hortonworks.
Below are the features based on which we can pick one of them:
Cost of license
Professional Services
Integration with end user tools
Interoperability with other systems
Security and data protection
Customer preference

2.    Hardware  Selection

Important parameters to consider are - Type of workload, Disk space, I/O bandwidth, computational power, Memory etc.

Type of workload

Type of workload gives us an idea on what type of machine should we use. Following are the types of workloads that can be looked at:

Courtsey: Cloudera

Balanced Workload
Workloads that are distributed equally across the various job types (CPU bound, Disk I/O bound, or Network I/O bound). This is a good default configuration for unknown or evolving workloads.
Recommended Configuration:
T2, M4 and M3 type of AWS instances a baseline level of CPU performance with the ability to burst above the baseline

Compute Intensive
Workloads that are CPU bound and require a large number of CPU cycles and large amounts of memory to store in-process data. (Ex: natural language processing or HPCC workloads)
Example: Clustering/Classification, Complex text mining, Natural-language processing, Feature extraction
Recommended Configuration:
C4 and C3 type of AWS instances are Compute-optimized instances.

Memory Intensive
Workloads that are memory bound and require a huge amount of memory.
Recommended Configuration:
X1 and R4 type of AWS Instances are optimized for large-scale, enterprise-class, in-memory applications

I/O Intensive
Workloads that require I/O bound capacity of the cluster.
For this type of workload, more disks per machine should be used.
Example: Indexing, Grouping, Data importing and exporting, Data movement and transformation
Recommended Configuration:
I3 type of AWS Instances are optimized for high Storage Instances that provide Non-Volatile Memory Express (NVMe) SSD backed instance storage optimized for low latency, very high random I/O performance, high sequential read throughput and provide high IOPS at a low cost.

Unknown or evolving workload patterns
Workloads where the pattern may not be known in advance.
Recommended Configuration:
We should start with balanced workload type of configuration.
We can either use EC2 machines to install Hadoop or use amazon EMR service. Amazon EMR can help in quickly provisioning hundreds or thousands of instances, automatically scale to match compute requirements, and shut cluster down when the job is complete This is very useful if you have variable or unpredictable processing requirements. 

Master hardware selection

Master processes tend to be RAM-hungry but low on disk space consumption. The namenode and jobtracker are also rather adept at producing logs on an active cluster, so plenty of space should be reserved on the disk or partition on which logs will be stored. Namenode disk requirements are modest in terms of storage    as all metadata must fit in memory.
Cloudera recommended specifications for NameNode/JobTracker/Standby NameNode nodes:
4–6 1TB hard disks in a JBOD configuration (1 for the OS, 2 for the FS image [RAID 1], 1 for Apache ZooKeeper, and 1 for Journal node)
2 quad-/hex-/octo-core CPUs, running at least 2-2.5GHz
64-128GB of RAM
Bonded Gigabit Ethernet or 10Gigabit Ethernet

Worker hardware selection

Worker node hardware is selected based on the type of workload. 20-30% of the machine’s raw disk capacity needs to be reserved for temporary data.

Cloudera recommended specifications for for DataNode/TaskTrackers in a balanced Hadoop cluster:
12-24 1-4TB hard disks in a JBOD (Just a Bunch Of Disks) configuration
2 quad-/hex-/octo-core CPUs, running at least 2-2.5GHz
64-512GB of RAM
Bonded Gigabit Ethernet or 10Gigabit Ethernet (the more storage density, the higher the network throughput needed)

3.    Selection of storage

There are different types of storage that Amazon provides:
Instance store – These are local stores attached to instance and are temporary. Their lifecycle is dependent on the instance. Since these are ephemeral, regular backups need to be taken.
Elastic Block store (EBS) – These are persistent external to instance and independent of the instance lifecycle. These volumes are automatically replicated within their Availability Zone to protect from component failure, offering high availability and durability.
EBS provides volumes of two major categories: SSD-backed storage volumes and HDD-backed storage volumes.
HDD should be used for large block sequential workloads
SSD backed should be used for random small block workloads

Simple storage service (S3) – They are external data store and can be accessed via simple API. Bandwidth is dependent on instance. Amazon S3 has cross-region replication to automatically copy objects across S3 buckets in different AWS Regions asynchronously, providing disaster recovery solutions for business continuity
Amazon Glacier - Data that requires encrypted archival storage with infrequent read access with a long recovery time objective (RTO) can be stored in Amazon Glacier more cost-effectively.

4.    Determine data volume and number of machines

Data volume

Total Data volume consists of two things:
Historical Data – Existing data residing in current systems that needs to be migrated into the Bigdata solution.
Daily Incremental Data – Data ingested into the Bigdata solution as a part of daily ETL.
We should also think about the year on year growth percentage.


Data retention period

Data retention period is the time to which we want to retain data in active state before archiving it. It is different for different industry based on multiple factors like compliance and regulation needs, reporting requirements etc.
With the advent of Bigdata systems and data storage becoming cheaper by the day now it is possible to store multiple years of data as active data as opposed to archiving it on tapes. Tape archival and management was cumbersome and required greater SLA for fetching the data back.

Data Volume Estimation

Yearly Data volume estimation
Current Data volume = (Historical data + (Daily Incremental data * 365))
Data volume after replication = Data volume * Replication Factor
Buffer = 35-40% of the current data volume
Total data = Data volume after replication + Buffer
Note: Replication factor = 3 (default). This can be changed based on criticality of data
We should start small and scale horizontally as the data volume grows. Initial cluster sizing can be done for a year and we can have estimates for consecutive years based on the year on year growth.
This sizing applies to hadoop, for HBase this calculation would not apply.

Thursday, 14 September 2017

Designing BigData Solution (Part 2)

Designing BigData Architecture (Part 2)

In part1 of Designing BigData Architecture we looked at various activities involved in planning BigData architecture. This article covers each of the logical layers in architecting the BigData Solution

Data Source Identification
Source profiling is one of the most important steps in deciding the architecture. It involves identifying the different source systems and categorizing them based on their nature and type.
Data sources can be of different types: Legacy systems (Mainframes, ERP, CRM), operational database, transactional database, Smart devices, Social Media websites, Data providers etc.

Points to be considered while profiling the data sources:

  •  Identify the internal and external sources systems
  • High Level Guesstimate for the amount of data ingested from each source
  • Identify the mechanism used to get data – push or pull
  • Determine the type of data source – Database, File, web service, streams etc
  • Determine the type of data – structured, semi structured or unstructured

Data Ingestion Strategy and Acquisition
Data ingestion in the Hadoop world means ELT (Extract, Load and Transform) as opposed to ETL (Extract Transform Load) in case of traditional warehouses. Different ingestion tools and strategy have to be selected for various types of data.

Points to be considered:

  • Determine the frequency at which data would be ingested from each source
  • Is there a need to change the semantics of the data append replace etc.
  • Is there any data validation or transformation required before ingestion (Pre-processing)
  • Segregate the data sources based on mode of ingestion – Batch or real-time

Batch Data Ingestion
Batch data ingestion may happen at a regular interval (once or twice) a day,   
depending on customer requirement. Usually it is a csv file, text file, JSON file or    
Technologies used:
Flume : Used to ingest flat files, logs etc.
Sqoop: Used to ingest data from RDBMS’s.

Real time Data ingestion
Real-time means near to zero latency and access to information whenever it is required. Stream is a constantly created data that is fit for real-time ingestion and processing.
Ex: Data coming in from sensors, Social Media etc.

Technology used:
Apache storm – It’s a Distributed real-time computation system. Storm makes it easy to reliably process unbounded streams of data, for real-time processing.
Apache Kafka - Kafka is a distributed publish-subscribe messaging system that is designed to be fast, scalable, and durable.
Flume - Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of streaming event data.
Spark streaming - Spark Streaming allows reusing the same code for batch processing, join streams against historical data, or run ad-hoc queries on stream state. 

Data Storage
Big data storage should be able to store large amount of data of any type and should be able to scale on need basis. We should also consider the number of IOPS (Input output operations per second) that it can provide. Hadoop distributed file system is the most commonly used storage framework in BigData world, others are the NoSql data stores – MongoDB, HBase, Cassandra etc. One of the salient features of Hadoop storage is its capability to scale, self-manage and self-heal.

There are 2 kinds of analytical requirements that storage can support:
Synchronous – Data is analysed in real-time or near real-time, the storage should be optimized for low latency.
Asynchronous – Data is captured, recorded and analysed in batch.          

Hadoop Storage is best suited for Batch and asynchronous analytics requirements, NoSql are better suited for scenarios where real-time analytics and flexible schema are required.
There are multiple storage options available like SAS/SATA solid-state drives (SSDs) PCI Express (PCIe) card-based solid-state etc., PCIe are most popular as their implementation offers the lowest latency.

Things to consider while planning storage methodology:
  • Type of data  (Historical or Incremental)
  • Format of data ( structured, semi structured and unstructured)
  • Compression requirements
  • Frequency of incoming data
  • Query pattern on the data
  • Consumers of the data

Data Processing
With the advent of BigData, the amount of data being stored and processed has increased multifold.
Earlier frequently accessed data was stored in Dynamic RAMs but now due to the sheer volume its being stored on multiple disks on a number of machines connected via network. Instead of bringing the data to processing, processing is taken closer to data which significantly reduce the network I/O.
Processing methodology is driven by business requirements. It can be categorized into Batch , real-time or Hybrid based on the SLA.

Batch Processing – Batch is collecting the input for a specified interval of time and running transformations on it in a scheduled way. Historical data load is a typical batch operation
Technology Used: MapReduce, Hive, Pig

Real-time Processing – Real-time processing involves running transformations as and when data is acquired.
Technology Used: Impala, Spark, spark SQL, Tez, Apache Drill

Hybrid Processing – It’s a combination of both batch and real-time processing needs.
Best example would be lambda architecture.

Data Consumption
This layer consumes the output provided by processing layer. Different users like administrator, Business users, business analyst, customers, vendor, partners etc. can consume data in different format. Output of analysis can be consumed by recommendation engine or business processes can be triggered based on the analysis.
Different forms of data consumption are:
Export Data sets – There can be requirements for third party data set generation. Data sets can be generated using hive export or directly from HDFS.

Reporting and visualization – Different reporting and visualization tool scan connect to Hadoop using JDBC/ODBC connectivity to hive. Tools – Microstrategy, Talend, Pentaho, Jaspersoft, Tableau etc. Adhoc or scheduled reports can be created.

Data Exploration – Data scientist can build models and perform deep exploration in a sandbox environment. Sandbox can be a separate cluster (Recommended approach) or a separate schema within same cluster that contains subset of actual data.

Adhoc Querying – Adhoc or Interactive querying can be supported by using Hive, Impala or spark SQL.

The two biggest challenges in designing BigData Architecture are:
·         Dynamics of use case: There a number of scenarios as illustrated in the article which need to be considered while designing the architecture – form and frequency of data, Type of data, Type of processing and analytics required. 
·         Myriad of technologies: Proliferation of tools in the market has led to a lot of confusion around what to use and when, there are multiple technologies offering similar features and claiming to be better than the others.

To be nimble, flexible and fluid in our approach to a solution we need to use these technologies in a way that will adapt to the changing business and technology dynamics.

Designing BigData Solution (Part 1)

Designing BigData Solution (Part 1)

Designing Bigdata architecture is a complex task in itself looking at the data volume, variety and velocity at which data is generated and consumed. Keeping pace with the speed of technology innovations, competitive products in the market and their fitment imposes a great challenge for a BigData Architect.
This article is first in the series of articles that would be focussed on illustrating components of BigData architecture, the myths around them and how they are different from the way they have been handled traditionally.


Analyse the Business problem
First step is to look at the business problem objectively and identify whether it is a Bigdata problem or not. Sheer volume or cost may not be the deciding factor, multiple criteria’s like velocity, variety, challenges with the current system , time taken for processing etc. should be considered.

Common Use cases:
·         Data archival/ Data Offload – Despite the cumbersome process and long SLAs for retrieval of data from tapes, it’s the most commonly used method of backup as the cost prohibits the amount of active data maintained in the current systems. Alternatively Hadoop facilitates storing huge amounts of data spanning across years (active data) at a very low cost.
·         Process Offload – Offload jobs that consume expensive MIPS cycles or consume extensive CPU cycles on the current systems.
·         Data Lake Implementation– Data lakes help in storing and processing massive amounts of data.
·         Unstructured data processing – BigData technologies provide capabilities to store and process any amount of unstructured data natively. RDBMS’s can also store unstructured data as BLOB or CLOB but wouldn’t provide processing capabilities natively.
·         Datawarehouse Modernization – Integrate the capabilities of BigData and Datawarehouse to increase operational efficiency.

Vendor Selection
Vendor selection for the Hadoop distribution may be guided by client most of the time, depending on their personal bias, market capture of the vendor or their partnership. The vendors for Hadoop distribution are Cloudera, Hortonworks, Mapr, BigInsights (Cloudera and Hortonworks being the prominent ones). As far as the capabilities are concerned all are similar with small nuances in terms of cost and the services that they offer.

Deployment Strategy
Deployment strategy is to decide whether an on premise, a cloud based or a mix deployment is required. Each of these has their own pros and cons. *

An on premise solution tends to be more secure (at least in the customers mind :) and it is typically a concern for BFS and healthcare customers) as data doesn’t leave the premise. Although the hardware procurement and maintenance would cost a lot more money, effort and time.

A cloud based solution is cost effective pay as you go model which provides a lot of flexibility in terms of scalability and eliminates procurement and maintenance overhead. (Prominent cloud vendors are AWS and Rackspace)

A mix deployment strategy gives us best of both worlds and can be planned to retain PII data in premise and rest in cloud.
* Deployment strategy is purely based on the use case and customers security requirements.

Capacity Planning
Capacity planning plays a pivotal role in hardware and infrastructure sizing. Important factors to be considered are:
·         Data volume for one time historical load
·         Daily data ingestion volume
·         Retention period of data
·         HDFS Replication factor based on criticality of data
·         Time period for which cluster is sized (typically 6months -1 year), after which cluster is scaled horizontally based on requirement.
·         Multi datacentre deployment

Infrastructure sizing
Infrastructure sizing is based on our capacity planning and decides the type of hardware required. The number of machines, CPU, memory, HDD, network etc.
It also involves deciding the number of clusters/environments required. Typically we may have Development, QA, Prod and DR.

Important factors to be considered
·         Types of processing Memory or I/O intensive
·         Type of disk
·         No of disks per machine
·         Memory size
·         HDD size
·         No of CPU and cores
·         Data retained and stored in each environment (Ex: Dev may be 30% of prod)

Backup and Disaster Recovery planning
Backup and disaster recovery is a very important part of planning and involves the following consideration:
·         The criticality of data stored
·         RPO (Recovery Point Objective) and RTO (Recovery Time Objective) requirements.
·         Active-Active or Active-Passive Disaster recovery mechanism
·         Multi datacentre deployment
·         Backup Interval (Can be different for different type of data)

Part2: Would explain each of the logical steps in designing BigData Architecture

Bigdata Testing – What to do? “All code is guilty, until proven innocent.” – Anonymous This blog is first in the series ...