Wednesday, November 2, 2011

Hadoop NextGen


Hadoop NextGen, the next major revision to the Hadoop MapReduce framework, addresses several issues with Hadoop’s scalability and design. Above all, the current version of Hadoop doesn’t scale past 4,000 nodes and the current architectural design makes it difficult to fix this and other problems without a fundamental overhaul of the code base.

Hadoop NextGen divides the responsibilities currently handled by the job tracker into two separate components. The Resource Manager globally assigns compute resources to applications, while the per-application Application Master manages scheduling and coordination (an application is either a single MapReduce job, or a DAG of MR jobs).

Reading this blog post after Mesos, it seems like Nextgen takes an approach that is quite similar to Mesos, though at a different level of abstraction. The ResourceManager acts much like Mesos’ scheduler, coordinating the global assignment of compute resources to applications (though without Mesos’ resource offer model). At the same time, Nextgen is at a lower level of abstraction – in principle, it could itself run on Mesos, sharing resources with, say, Spark, MPI, and other frameworks. That said, while Mesos could enable multiple Hadoop instances to run over the same cluster, this would not fix the scalability issues inherent in the current version, since Hadoop would still not be able to scale over 4000 nodes for a given job. Thus, Nextgen still fills a need that Mesos alone (or another resource scheduler) could not necessarily satisfy.

As far as tradeoffs go, certain parts of the Nextgen framework are still centralized (e.g. the resource manager), so this could become a bottleneck for performance in the future. Despite this, Nextgen appears very promising and makes many significant performance and scalability improvements on the ever-popular Hadoop framework, which should guarantee continuing popularity for Hadoop (that is, if everyone doesn’t switch to Spark first!).

Mesos: A Platform for Fine-Grained Resource Sharing in the Data Center


Mesos is a thin layer that provides fine-grained resource sharing across diverse cluster computing frameworks. It lets, for example, Hadoop and MPI, or Hadoop and Spark run on the same cluster without having to partition the cluster or allocate a set of VMs to each framework. Instead, Mesos is based on a scheduler that delegates scheduling control to the frameworks themselves using a “resource offer” abstraction. Mesos controls how many resources to offer each framework, and each framework determines which resources (that have been offered) to accept and which tasks to run on those resources.

The main nugget here was that pushing control of resources (or at least partial control) to the frameworks sharing the cluster provides an efficient way sharing resources (since they take only what they need – and are offered only what is available), while keeping the scheduler simple. The paper identifies several trade-offs in their approach—and these are mainly tied to the choice of decentralized vs. centralized scheduling. This model is vulnerable to fragmentation due to heterogeneous resource demands, doesn’t really work if there are inter-framework dependencies, and add an extra level of framework complexity, since the frameworks need to interact with the scheduler explicitly. That said, the extra overhead of this interaction is not much different from interacting with a central scheduler.

Mesos differs from previous work in a number of ways. HPC and Grid schedulers are more targeted to monolithic jobs on specialized hardware, so they are typically centralized (unlike Mesos). In the cloud, EC2 and Eucalyptus use a VM allocation model, which is more coarse-grained than Mesos, resulting in less efficient resource utilization and doesn’t allow the level of task placement that Mesos does.

It seems that Mesos could certainly be influential over the next decade, given the growth of private clouds and the fact that these clouds aren’t always utilized to their full extent—allowing room to benefit from running multiple frameworks on the same cluster. That said, many public clouds (e.g. EC2 instances) are fired up for short(er) periods of time to run a framework (e.g. Hadoop) with relatively high utilization and I wonder if Mesos is really necessary for many of these use cases.

Spark


Resilient Distributed Datasets, implemented in Spark, are a fault-tolerant distributed memory abstraction that allows in-memory computation and efficient data re-use on large clusters. An RDD is a read-only partitioned collection of records defined either as operations on data in storage or on a previous RDD’s data. RDDs provide coarse-grained transformations, where an operation (e.g. map, filter, join) is applied to many data items, creating a new RDD. RDDs contain lineage information containing the history of the operations on previous RDDs (and their partitions) used to create the current RDD. Thus, they don’t need to be materialized at all times, since their contents can be computed using this lineage information whenever any of the data needs to be retrieved (or used in a computation). Fine-grained reads are available.

Is the problem real?

The problem here—efficient data re-use and in-memory processing—is definitely a real problem, and we have see many other attempts at solving it (Piccolo, Pregel, Nectar & DryadLINQ, etc.).

What is the solution's main idea (nugget)?

The main nugget of this paper is that tracking lineage for in-memory immutable collections provides a powerful programming paradigm for many problems that require distributed memory and efficient re-use of data, while preserving fault tolerance reasonably trivially.

Why is solution different from previous work?
RDDs differ from previous work in that they are general-purpose (not specific to, say, graph algorithms like Pregel), immutable (unlike Piccolo, which provides a shared-memory abstraction with mutable state) and allow re-use of cached data between jobs (unlike, say, Dryad and MapReduce programs).

Does the paper (or do you) identify any fundamental/hard trade-offs?

Fundamentally, the immutability of RDDs is a trade-off, since modifications to data must be made on a collection-wide scope by transforming an RDD. This is not really as big of a problem as it seems, since lazy evaluation means that these transformations aren’t computed until the data is actually needed. Another trade-off of Spark is its Scala interface, which, while convenient for its interoperability with Java, doesn’t enjoy the same level of familiarity among programmers as do other languages.

Do you think the work will be influential in 10 years?
I think this work has the potential to be influential over the next decade: it provides a relatively simple, elegant, and versatile solution to working with large datasets effectively. Furthermore, with the growing trend of keeping datasets in memory (RAM is getting cheaper), I think Spark is on the right track. We’ll see if Spark itself catches on, but even if it doesn’t, it seems to be indicative of the direction that we’re heading (in a post-MapReduce world).

Piccolo: Building Fast, Distributed Programs with Partitioned Tables


Piccolo is a data-centric programming model for writing parallel in-memory applications across many machines. Computations are organized around a series of application kernel functions, where each kernel is launched concurrently on several nodes. These instances share a distributed, mutable state using a set of in-memory key-value tables with entries that reside in the memory of the various nodes. The Piccolo runtime uses a message system to read and modify table entries on the nodes. This approach makes Piccolo appropriate for algorithms that benefit from access to shared intermediate state, including machine learning and graph algorithms. This is in stark contrast to MapReduce, whose data-flow programming model doesn’t expose any shared state or provide access to intermediate state, and this must be simulated (inefficiently) by joining multiple data streams.

Concurrent updates to the same key can be handled using a programmer-defined accumulation function for each table, which Piccolo will execute on runtime to combine the updates to the individual key (alternatively, one of Piccolo’s built-in accumulation functions can be used instead). Data locality can be specified via two different types of policies: co-locate kernel execution with a certain table partition or co-locate partitions of different tables. Machine failures are handled via a global checkpoint/restore mechanism. Piccolo saves a consistent snapshot of all shared table state (using the Chandy-Lamport algorithm), but users have to save information to recover the position of their kernel execution (due to the complexity of checkpointing executables).

There are trade-offs inherent in various parts of Piccolo. Fault-tolerance is achieved through checkpointing, which adds the extra overhead of logging all changes after a snapshot is taken. Likewise, when confronted with the trade-off of providing full fault-tolerance by snapshotting kernel execution state vs. complexity, the paper opted to let users implement the functionality. 

Database Scalability, Elasticity, and Autonomy in the Cloud


This paper investigates a number of issues associated with scaling databases and key/value stores on cloud computing infrastructure, along with an overview of current research addressing these problems. Scalability is the major problem, since RDBMSs have not traditionally been designed to scale out horizontally very well past a few machines. On the other hand, key-value stores scale very well, but providing multiple-key transactions on them is difficult. They call this problem of multi-key atomicity “data fusion.” The paper’s authors have designed G-store to provide transactional multi-key access guarantees over dynamic, non-overlapping groups of keys using an ownership leader/follower key abstraction. In contrast, “data fission” is the problem of sharding a database into relatively independent partitions. Migration of data between partitions is another challenge, both for shared storage and shared-nothing DB architectures. For shared storage, the authors have designed an “Iterative Copy” technique that transfers the main memory state of the partition to avoid warm-up time at the destination. For the shared-nothing architecture, the persistent image of the database must also be migrated, which is usually much larger than just the memory state copied in “iterative copy”. To accomplish this task, Zephyr introducs a synchronized phase that allows both the source and destination to execute transactions for the tenant while the data is migrated, using a combination of on-demand pull and asynchronous push of data, minimizing the window of unavailability.

The problems addressed by this paper are real problems. Scaling databases is difficult – and there don’t seem to be any great solutions to repartitioning and live migration in sight, though the techniques mentioned can minimize the problems. Transactions on multiple keys in a key-value store are also important. That said, this paper read like a laundry list of problems and solutions and I found the overall structure somewhat incoherent. The paper itself doesn’t seem to introduce anything new and is just an overview of current research, so I somewhat doubt that the paper itself will be influential.