Sunday, May 9, 2010

Facebook has the world's largest Hadoop cluster!

It is not a secret anymore!

The Datawarehouse Hadoop cluster at Facebook has become the largest known Hadoop storage cluster in the world. Here are some of the details about this single HDFS cluster:
  • 21 PB of storage in a single HDFS cluster
  • 2000 machines
  • 12 TB per machine (a few machines have 24 TB each)
  • 1200 machines with 8 cores each + 800 machines with 16 cores each
  • 32 GB of RAM per machine
  • 15 map-reduce tasks per machine
That's a total of more than 21 PB of configured storage capacity! This is larger than the previously known Yahoo!'s cluster of 14 PB. Here are the cluster statistics from the HDFS cluster at Facebook:












Hadoop started at Yahoo! and full marks to Yahoo! for developing such critical infrastructure technology in the open. I started working with Hadoop when I joined Yahoo! in 2006. Hadoop was in its infancy at that time and I was fortunate to be part of the core set of Hadoop engineers at Yahoo!. Many thanks to Doug Cutting for creating Hadoop and Eric14 for convincing the executing management at Yahoo! to develop Hadoop as open source software.

Facebook engineers work closely with the Hadoop engineering team at Yahoo! to push Hadoop to greater scalability and performance. Facebook has many Hadoop clusters, the largest among them is the one that is used for Datawarehousing. Here are some statistics that describe a few characteristics of the Facebook's Datawarehousing Hadoop cluster:
  • 12 TB of compressed data added per day
  • 800 TB of compressed data scanned per day
  • 25,000 map-reduce jobs per day
  • 65 millions files in HDFS
  • 30,000 simultaneous clients to the HDFS NameNode
A majority of this data arrives via scribe, as desribed in scribe-hdfs integration. This data is loaded in Hive. Hive provides a very elegant way to query the data stored in Hadoop. Almost 99.9% Hadoop jobs at Facebook are generated by a Hive front-end system. We provide lots more details about our scale of operations in our paper at SIGMOD titled Datawarehousing and Analytics Infrastructure at Facebook.

Here are two pictorial representations of the rate of growth of the Hadoop cluster:



Details about our Hadoop configuration

I have fielded many questions from developers and system administrators about the Hadoop configuration that is deployed in the Facebook Hadoop Datawarehouse. Some of these questions are from Linux kernel developers who would like to make Linux swapping work better with Hadoop workload; other questions are from JVM developers who may attempt to make Hadoop run faster for processes with large heap size; yet others are from GPU architects who would like to port a Hadoop workload to run on GPUs. To enable this type of outside research, here are the details about the Facebook's Hadoop warehouse configurations. I hope this open sharing of infrastructure details from Facebook jumpstarts the research community to design ways and means to optimize systems for Hadoop usage.



Sunday, April 25, 2010

The Curse of the Singletons! The Vertical Scalability of Hadoop NameNode

Introduction

HDFS is designed to be a highly scalable storage system and sites at Facebook and Yahoo have 20PB size file systems in production deployments. The HDFS NameNode is the master of the Hadoop Distributed File System (HDFS). It maintains the critical data structures of the entire file system. Most of HDFS design has focussed on scalability of the system, i.e. the ability to support a large number of slave nodes in the cluster and an even larger number of files and blocks. However, a 20PB size cluster with 30K simultaneous clients requesting service from a single NameNode means that the NameNode has to run on a high-end non-commodity machine. There has been some efforts to scale the NameNode horizontally, i.e. allow the NameNode to run on multiple machines. I will defer analyzing those horizontal-scalability-efforts for a future blog post, instead let's discuss ways and means to make our singleton NameNode support an even greater load.


What are the bottlenecks of the NameNode?

Network: We have around 2000 nodes in our cluster and each node is running 9 mappers and 6 reducers simultaneously. This means that there are around 30K simultaneous clients requesting service from the NameNode. The Hive Metastore and the HDFS RaidNode imposes additional load on the NameNode. The Hadoop RPCServer has a singleton Listener Thread that pulls data from all incoming RPCs and hands it to a bunch of NameNode handler threads. Only after all the incoming parameters of the RPC are copied and deserialized by the Listener Thread does the NameNode handler threads get to process the RPC. One CPU core on our NameNode machine is completely consumed by the Listener Thread. This means that during times of high load, the Listener Thread is unable to copy and deserialize all incoming RPC data in time, thus leading to clients encountering RPC socket errors. This is one big bottleneck to vertically scalabiling of the NameNode.

CPU: The second bottleneck to scalability is the fact that most critical sections of the NameNode is protected by a singleton lock called the FSNamesystem lock. I had done some major restructuring of this code about three years ago via HADOOP-1269 but even that is not enough for supporting current workloads. Our NameNode machine has 8 cores but a fully loaded system can use at most only 2 cores simultaneously on the average; the reason being that most NameNode handler threads encounter serialization via the FSNamesystem lock.

Memory: The NameNode stores all its metadata in the main memory of the singleton machine on which it is deployed. In our cluster, we have about 60 million files and 80 million blocks; this requires the NameNode to have a heap size of about 58GB. This is huge! There isn't any more memory left to grow the NameNode's heap size! What can we do to support even greater number of files and blocks in our system?

Can we break the impasse?

RPC Server: We enhanced the Hadoop RPC Server to have a pool of Reader Threads that work in conjunction with the Listener Thread. The Listener Thread accepts a new connection from a client and then hands over the work of RPC-parameter-deserialization to one of the Reader Threads. In our case, we configured our system so that the Reader Threads consist of 8 threads. This change has doubled the number of RPCs that the NameNode can process at full throttle. This change has been contributed to the Apache code via HADOOP-6713.

The above change allowed a simulated workload to be able to consume 4 CPU cores out of a total of 8 CPU cores in the NameNode machine. Sadly enough, we still cannot get it to use all the 8 CPU cores!

FSNamesystem lock: A review of our workload showed that our NameNode typically has the following distribution of requests:
  • stat a file or directory 47%
  • open a file for read 42%
  • create a new file 3%
  • create a new directory 3%
  • rename a file 2%
  • delete a file 1%
The first two operations constitues about 90% workload for the NameNode and are readonly operations: they do not change file system metadata and do not trigger any synchronous transactions (the access time of a file is updated asynchronously). This means that if we change the FSnamesystem lock to a Readers-Writer lock we can achieve the full power of all processing cores in our NameNode machine. We did just that, and we saw yet another doubling of the processing rate of the NameNode! The load simulator can now make the NameNode process use all 8 CPU cores of the machine simultaneously. This code has been contributed to Apache Hadoop via HDFS-1093.

The memory bottleneck issue is still unresolved. People have asked me if the NameNode can keep some portion of its metadata in disk, but this will require a change in locking model design first. One cannot keep the FSNamesystem lock while reading in data from the disk: this will cause all other threads to block thus throttling the performance of the NameNode. Could one use flash memory effectively here? Maybe an LRU cache of file system metadata will work well with current metadata access patterns? If anybody has good ideas here, please share it with the Apache Hadoop community.

In a Nutshell

The two proposed enhancements have improved NameNode scalability by a factor of 8. Sweet, isn't it?

Saturday, February 6, 2010

Hadoop AvatarNode High Availability



Our Use-Case

The Hadoop Distributed File System's (HDFS) NameNode is a single point of falure. This has been a major stumbling block in using HDFS for a 24x7 type of deployment. It has been a topic of discussion among a wide circle of engineers.

I am part of a team that is operating a cluster of 1200 nodes and a total size of 12 PB. This cluster is currently running hadoop 0.20. The NameNode is configured to write its transaction logs to a shared NFS filer. The biggest cause of service downtime is when we need to deploy hadoop patches to our cluster. A fix to the DataNode software is easy to deploy without cluster downtime: we deploy new code and restart few DataNodes at a time. We can afford to do this because the DFSClient is equipped (via multiple retries to different replicas of the same block) to handle transient DataNode outages. The major problem arises when new software have to be deployed to our NameNode. A HDFS NameNode restart for a cluster of our size typically takes about an hour. During this time, the applications that use the Hadoop service cannot run. This led us to believe that some kind of High Availabilty (HA) for the NameNode is needed. We wanted a design than can support a failover in about a minute.

Why AvatarNode?

We took a serious look at the options available to us. The HDFS BackupNode is not available in 0.20, and upgrading our cluster to hadoop 0.20 is not a feasible option for us because it has to go through extensive time-consuming testing before we can deploy it in production. We started off white-boarding the simplest solution: the existance of a Primary NameNode and a Standby NameNode in our HDFS cluster. And we did not have to worry about split-brain-scenario and IO-fencing methods... the administrator ensures that the Primary NameNode is really, really dead before coverting the Standby NamenNode to become the Primary DataNode. We also did not want to change the code of the NameNode one single bit and wanted to build HA as a software layer on top of HDFS. We wanted a daemon that can switch from being a Primary to a Standby and vice-versa... as if switching avatars ... and this is how our AvatarNode is born!

The word avatar means a variant-phase or incarnations of a single entity, and we thought it appropriate that a wrapper that can make the NameNode behave as two different roles be aptly named as AvatarNode. Our design of the AvatarNode is such that it is a pure-wrapper around the existing NameNode that exists in Hadoop 0.20; thus the AvatarNode inherits the scalability, performance and robustness of the NameNode.

What is a AvatarNode?

The AvatarNode encapsulates an instance of the NameNode. You can start a AvatarNode in the Primary avatar or the Standby avatar. If you start it as a Primary avatar, then it behaves exactly as the NameNode you know now. In fact, it runs exactly the same code as the NameNode. The AvatarNode in its Primary avatar writes the HDFS transaction logs into the shared NFS filer. You can then start another instance of the AvatarNode on a different machine, but this time telling it to start as a Standby avatar.

The Standby AvatarNode encapsulates a NameNode and a SecondaryNameNode within it. The Standby AvatarNode continuously keeps reading the HDFS transaction logs from the same NFS filer and keeps feeding those transactions to the encapsulated NameNode instance. The NameNode that runs within the Standby AvatarNode is kept in SafeMode. Keeping the Standby AvatarNode is SafeMode prevents it from performing any active duties of the NameNode but at the same time keeping a copy of all NameNode metadata hot in its in-memory data structures.

HDFS clients are configured to access the NameNode via a Virtual IP Address (VIP). At the time of failover, the administrator kills the Primary AvatarNode on machine M1 and instructs the Standby AvatarNode on machine M2 (via a command line utility) to assume a Primary avatar. It is guaranteed that the Standby AvatarNode ingests all committed transactions because it reopens the edits log and consumes all transactions till the end of the file; this guarantee depends on the fact that NFS-v3 supports close-to-open cache coherency semantics. The Standby AvatarNode finishes ingestion of all transactions from the shared NFS filer and then leaves SafeMode. Then the administrator switches the VIP from M1 to M2 thus causing all HDFS clients to start accessing the NameNode instance on M2. This is practically instantaneous and the failover typically takes a few seconds!

There is no need to run a separate instance of the SecondaryNameNode. The AvatarNode, when run in the Standby avatar, performs the duties of the SecondaryNameNode too. The Standby AvatarNode comtinually ingests transactions from the Primary, but once a while it pauses the ingestion, invokes the SecondaryNameNode to create and upload a checkpoint to the Primary AvatarNode and then resumes the ingestion of transaction logs.

The AvatarDataNode

The AvatarDataNode is a wrapper around the vanilla DataNode found in hadoop 0.20. It is configured to send block reports and blockReceived messages to two AvatarNodes. The AvatarDataNode do not use the VIP to talk to the AvatarNode(s) (only HDFS clients use the VIP). An alternative to this approach would have been to make the Primary AvatarNode forward block reports to the Standby AvatarNode. We discounted this alternative approach because it adds code complexity to the Primary AvatarNode: the Primary AvatarNode would have to do buffering and flow control as part of forwarding block reports.

The Hadoop DataNode is already equipped to send (and retry if needed) blockReceived messages to the NameNode. We extend this functionality by making the AvatarDataNode send blockReceived messages to both the Primary and Standby AvatarNodes.

Does the failover affect HDFS clients?

An HDFS client that was reading a file caches the location of the replicas of almost all blocks of a file at the time of opening the file. This means that HDFS readers are not impacted by AvatarNode failover.

What happens to files that were being written by a client when the failover occurred? The Hadoop 0.20 NameNode does not record new block allocations to a file until the time when the file is closed. This means that a client that was writing to a file when the failover occured will encounter an IO exception after a successful failover event. This is somewhat tolerable for map-reduce job because the Hadoop MapReduce framework will retry any failed tasks. This solution also works well for HBase because the HBase issues a sync/hflush call to persist HDFS file contents before marking a HBase-transaction to be completed. In future, it is possible that HDFS may be enhanced to record every new block allocation in the HDFS transaction log, details here. In that case, a failover event will not impact HDFS writers as well.

Other NameNode Failover techniques

People have asked me to compare the AvatarNode HA implementation with the Hadoop with DRBD and Linux HA. Both approaches require that the Primary writes transaction logs to a shared device. The difference is that the Standby AvatarNode is a hot standby whereas the DRBD-LinuxHA solution is a cold standby. The failover time for our AvatarNode is about a minute, whereas the failover time of the DRBD-LinuxHA would require about an hour for our cluster that has about 50 million files. The reason that the AvatarNode can function as a hot standby is because the Standby AvatarNode has an instance of the NameNode encapsulated within it and this encapsulated NameNode receives messages from the DataNodes thus keeping its metadata state fully upto-date.

Another implementation that I have heard about is from a team at China Mobile Research Institue, their approach is described here.

Where is the code?

This code has been contributed to the Apache HDFS project via HDFS-976. A prerequisite for this patch is HDFS-966.

Monday, November 9, 2009

HDFS High Availability

I have encountered plenty of questions about the single point of failure for the HDFS NameNode. The most common concern being that if the NameNode dies, then the whole cluster is unavailable. This means that HDFS is unsuitable for applications that need a high degree of uptime. This is not a problem when you run map-reduce jobs on HDFS, especially because a map-reduce system is a batch system and the uptime requirements of a batch system is typically not that stringent.

In recent times, I am starting to see lots of distributed applications that are using HDFS as a general purpose storage system. These applications range from multimedia servers that store mails, photos, videos, etc to systems that store updates from thousands of distributed sensors. These applications prefer HDFS because they can store a large volume of data. These applications do not use map-reduce to mine any of the information stored in HDFS. However, these applications do need consistency and availability of data. The Brewer's CAP Theorem states that most distributed systems need some tradeoffs among Consistency, Availability and Partition tolerance. HDFS's does an excellent job of providing Consistency of data at all times. Traditionally, it did not address Availability and Partition tolerance earlier. That could change to a certain extent with HDFS 0.21 release. HDFS 0.21 has a new entity called the BackupNode that receives real-time updates about transactions from the NameNode. This is the first step in making making HDFS highly available.

Here is a slide-deck that I wanted to present at ApacheCon 2009 about the current state of affairs regarding High Availabilty with HDFS.

Monday, October 19, 2009

Hadoop discussions at Microsoft Research

I was invited to present a talk about Hadoop File System Architecture at Microsoft Research at Seattle. This is a research group and is focussed on long-term research, so it is no surprise that they are interested in knowing how a growing company like Facebook is using Hadoop to its advantage.

I met a few folks who chatted with me about how Microsoft SQL Server is being modified to handle large scala databases. These folks heartily agreed with a comment I made in my presentation that Dr. Dewitt and Dr. Stonebraker is missing the point when they are comparing performance numbers between Hadoop with traditional Database systems.... rather than comparing the scalability and fault-tolerance of these systems. I had learned some of the fundamentals of Database systems from Professor Dewitt during my graduate studies at Uiversity of Wisconsin Madison, but Dr Dewitt is a Microsoft employee now!

The fact that Facebook uses the SQL interface of Hive layered over Hadoop makes it even more interesting to Microsoft. They wanted to know the performance difference between Hive and PIG and would like to compare them to their distributed-SQL-Server software.

Here are the slides I used for my presentation.

Friday, October 2, 2009

I presented a set of slides that describes the Hadoop development at Facebook at the HadoopWorld conference in New York today. It was well received by more than 100 people. I have presented at many-a-conferences in the west coast but this is the first time I have presented at a conference in New York... there are more hadoop users here versus mostly hadoop developers in the west coast. There were plenty of questions, especially about Hadoop-Archive and Realtime-Hadoop. There were people asking me questions about HDFS Symbolic links and HDFS-scribe copier.

Earlier, I visited the university of Notre Dame to conduct a department seminar and present a guest lecture for the graduate students at the Department of Computer Science. There is plenty of interesting research being led by Prof Douglas Thain. One interesting research idea that came up was to place HDFS block replicas by analyzing HDFS access patterns. It is possible that we can provide HDFS datanode/namenode logs to researchers who can analyze these logs to come up with better algorithms for HDFS block replica placement.

Monday, September 14, 2009

HDFS block replica placement in your hands now!

Most Hadoop administrators set the default replication factor for their files to be three. The main assumption here is that if you keep three copies of the data, your data is safe. I have observed this to be true in the big clusters that we manage and operate. In actuality, administrators are managing two failure aspects: data corruption and data availability.

If all the datanodes on which the replicas of a block exist catch fire at the same time, then that data is lost and cannot be recovered. Or if an administrative error causes all the existing replicas of a block to be deleted, then it is a catastrophic failure. This is data corruption. On the other hand, if a rack switch goes down for sometime, the datanodes on that rack are in-accessible during that time. When that faulty rack switch is fixed, the data on the rack rejoins the HDFS cluster and life goes on as usual. This is a data avilability issue; in this case data was not corrupted or lost, it was just unavailable for some time. HDFS keeps three copies of a block on three different datanodes to protect against true data corruption. HDFS also tries to distribute these three replicas on more than one rack to protect against data availability issues. The fact that HDFS actively monitors any failed datanode(s) and upon failure detection immediately schedules re-replication of blocks (if needed) implies that three copies of data on three different nodes is sufficient to avoid corrupted files.

HDFS uses a simple but highly effective policy to allocate replicas for a block. If a process that is running on any of the HDFS cluster nodes open a file for writing a block, then one replica of that block is allocated on the same machine on which the client is running. The second replica is allocated on a randomly chosen rack that is different from the rack on which the first replica was allocated. The third replica is allocated on a randomly chosen machine on the same remote rack that was chosen in the earlier step. This means that a block is present on two unique racks. One point to note is that there is no relationship between replicas of different blocks of the same file as far as their location is concerned. Each block is allocated independently.

The above algorithm is great for availability and scalability. However, there are scenarios where co-locating many block of the same file on the same set of datanode(s) or rack(s) is beneficial for performance reasons. For example, if many blocks of the same file are present on the same datanode(s), a single mapper instance could process all these blocks using the CombineFileInputFormat. Similarly, if a dataset contains many small files that are co-located on the same datanode(s) or rack(s), one can use CombineFileInputFormat to process all these file together by using fewer mapper instances via CombineFileInputFormat. If an application always uses one dataset with another dataset (think Hive or Pig join), then co-locating these two datasets on the same set of datanodes is beneficial.

Another reason when one might want to allocate replicas using a different policy is to ensure that replicas and their parity blocks truly reside in different failure domains. The erasure code work in HDFS could effectively bring down the physical replication factor of a file to about 1.5 (while keeping the logical replication factor at 3) if it can place replicas of all blocks in a stripe more intelligently.

Yet another reason, however exotic, is to allow HDFS to place replicas based on the HeatMap of your cluster. If one of of the node in the cluster is at a higher temperature than that of another, then it might be better to prefer the cooler node while allocating a new replica. If you want to experiment with HDFS across two data centers, you might want to try out new policies for replica placement.

Well, now you can finally get your hands wet! HDFS-385 is part of the Hadoop trunk and will be part of the next major HDFS 0.21 release. This feature provides a way for the adventurous developer to write Java code that specifies how HDFS should allocate replicas of blocks of a file. The API is experimental in nature, and could change in the near future if we discover any in-efficiencies in it. Please let the Hadoop community know if you need any changes in this API or if you come across novel uses of this API.