Twitter’s real-time stack: Processing billions of events with Heron and DistributedLog

At the first day of the Strata+Hadoop, Maosong Fu, Tech Lead for Realtime Compute at Twitter shared some details on Twitter’s real-time stack

img_6462

There are many industries where optimizing in real-time can have a large impact on overall business performance, leading to instant benefits in customer acquisition, retention, and marketing.

valueofdata

But how fast is real-time? It depends on the context, whether it’s financial trading, tweeting, ad impression count or monthly dashboard.

what-is-real-time

 

Earlier Twitter messaging stack

twittermessaging

Kestrel is a message queue server we use to asynchronously connect many of the services and functions underlying the Twitter product. For example, when users update, any tweets destined for SMS delivery are queued in a Kestrel; the SMS service then reads tweets from this queue and communicates with the SMS carriers for delivery to phones. This implementation isolates the behavior of SMS delivery from the behavior of the rest of the system, making SMS delivery easier to operate, maintain, and scale independently.

Scribe is a server for aggregating log data streamed in real time from a large number of servers.

Some of Kestrel’s limitations are listed in the below:

  • Durability is hard to achieve
  • Read-behind degrades performance
  • Adding subscribers is expensive
  • Scales poorly as number of queues increase
  • Cross DC replication

kestrellimitations

From Twitter Github:

We’ve deprecated Kestrel because internally we’ve shifted our attention to an alternative project based on DistributedLog, and we no longer have the resources to contribute fixes or accept pull requests. While Kestrel is a great solution up to a certain point (simple, fast, durable, and easy to deploy), it hasn’t been able to cope with Twitter’s massive scale (in terms of number of tenants, QPS, operability, diversity of workloads etc.) or operating environment (an Aurora cluster without persistent storage).

Kafka™ is used for building real-time data pipelines and streaming apps. It is horizontally scalable, fault-tolerant, wicked fast, and runs in production in thousands of companies.

Kafka relies on file system page cache with performance degradation when subscribers fall behind – too many random I/O

kafkalimitations

Rethinking messaging

rethinkingmessaging

Apache DistributedLog (DL) is a high-throughput, low-latency replicated log service, offering durability, replication and strong consistency as essentials for building reliable real-time applications.

distributedlogs

Event Bus

eventbus

Features of DistributedLog at Twitter:

High Performance

DL is able to provide milliseconds latency on durable writes with a large number of concurrent logs, and handle high volume reads and writes per second from thousands of clients.

Durable and Consistent

Messages are persisted on disk and replicated to store multiple copies to prevent data loss. They are guaranteed to be consistent among writers and readers in terms of strict ordering.

Efficient Fan-in and Fan-out

DL provides an efficient service layer that is optimized for running in a multi- tenant datacenter environment such as Mesos or Yarn. The service layer is able to support large scale writes (fan-in) and reads (fan-out).

Various Workloads

DL supports various workloads from latency-sensitive online transaction processing (OLTP) applications (e.g. WAL for distributed database and in-memory replicated state machines), real-time stream ingestion and computing, to analytical processing.

Multi Tenant

To support a large number of logs for multi-tenants, DL is designed for I/O isolation in real-world workloads.

Layered Architecture

DL has a modern layered architecture design, which separates the stateless service tier from the stateful storage tier. To support large scale writes (fan- in) and reads (fan-out), DL allows scaling storage independent of scaling CPU and memory.

 

 

distibutedlogs

Storm was no longer able to support Twitter’s requirements and although Twitter improved Storm’s performance eventually Twitter decided to develop Heron.

Heron is a realtime, distributed, fault-tolerant stream processing engine from Twitter. Heron is built with a wide array of architectural improvements that contribute to high efficiency gains.

heron

Heron has powered all realtime analytics with varied use cases at Twitter since 2014. Incident reports dropped by an order of magnitude demonstrating proven reliability and scalability

 

heronusecases

Heron is in production for the last 3 years, reducing hardware requirements by 3x. Heron is highly scalable both in the ability to execute large number of components for each topology and the ability to launch and track large numbers of topologies.

 

heronattwitter

Lambda architecture is a data-processing architecture designed to handle massive quantities of data by taking advantage of both batch– and stream-processing methods. This approach to architecture attempts to balance latency, throughput, and fault-tolerance by using batch processing to provide comprehensive and accurate views of batch data, while simultaneously using real-time stream processing to provide views of online data. The two view outputs may be joined before presentation.

The way this works is that an immutable sequence of records is captured and fed into a batch system and a stream processing system in parallel. You implement your transformation logic twice, once in the batch system and once in the stream processing system. You stitch together the results from both systems at query time to produce a complete answer.

lambda-16338c9225c8e6b0c33a3f953133a4cb

Lambda Architecture: the good

lambdathegood

The problem with the Lambda Architecture is that maintaining code that needs to produce the same result in two complex distributed systems is exactly as painful as it seems like it would be.

LambdaTheBad.png

Summingbird to the Rescue! Summingbird is a library that lets you write MapReduce programs that look like native Scala or Java collection transformations and execute them on a number of well-known distributed MapReduce platforms, including Storm and Scalding.

Summingbird.png

Curious to Learn More?

curioustolearnmore

 

Interested in Heron?

Code at: https://github.com/twitter/heron

http://twitter.github.io/heron/

 

inerestedinheron

Install MongoDB Community Edition and PyMongo on OS X

  • Install Homebew, a free and open-source software package management system that simplifies the installation of software on Apple’s macOS operating system.

/usr/bin/ruby -e “$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)

  • Ensure that you’re running the newest version of Homebrew and that it has the newest list of formulae available from the main repository

brew update

  • To install the MongoDB binaries, issue the following command in a system shell:

brew install mongodb

  • Create a data directory (-p create nested directories, but only if they don’t exist already)

mkdir -p ./data/db

  • Before running mongodb for the first time, ensure that the user account running mongodb has read and write permissions for the directory

sudo chmod 765 data

  • Run MongoDB

mongod –dbpath data/db

  • To stop MongoDB, press Control+C in the terminal where the mongo instance is running

Install PyMongo

pip install pymongo

  • In a Python interactive shell:

import pymongo

from pymongo import MongoClient

RoboMongo

  • Create a Connection

client = MongoClient()

  • Access Database Objects

MongoDB creates new databases implicitly upon their first use.

db = client.test

  • Query for All Documents in a Collection

cursor = db.restaurants.find()

for document in cursor: print(document)

  • Query by a Top Level Field

cursor = db.restaurants.find({“borough”: “Manhattan”})

for document in cursor: print(document)

  • Query by a Field in an Embedded Document

cursor = db.restaurants.find({“address.zipcode”: “10075”})

for document in cursor: print(document)

  • Query by a Field in an Array

cursor = db.restaurants.find({“grades.grade”: “B”})

for document in cursor: print(document)

 

  • Insert a Document

Insert a document into a collection named restaurants. The operation will create the collection if the collection does not currently exist.

result = db.restaurants.insert_one(

{

“address”: {            “street”: “2 Avenue”,            “zipcode”: “10075”,            “building”: “1480”,            “coord”: [-73.9557413, 40.7720266]        },

“borough”: “Manhattan”,

“cuisine”: “Italian”,

“grades”: [

{                “date”: datetime.strptime(“2014-10-01”, “%Y-%m-%d“),                “grade”: “A”,                “score”: 11            },

{                “date”: datetime.strptime(“2014-01-16”, “%Y-%m-%d“),                “grade”: “B”,                “score”: 17            }        ],

“name”: “Vella”,

“restaurant_id”: “41704620”

})

result.inserted_id

 

Git Pull While Ignoring Local Changes

Git pull may result with the following error message:

error: Your local changes to the following files would be overwritten by merge

Please, commit your changes or stash them before you can merge.

The following solution works for me:

Fetch all changes:

git fetch --all

Reset

git reset --hard origin/master

Pull

git pull

 

Changing the Game with Data and Insights – Data Science Singapore

Another great Data Science Singapore (DSSG) event! Hong Cao from McLaren Applied Technologies shared his insights on applications of data science at McLaren.

The first project is using economic sensors for continuous human conditions monitoring, including sleep quality, gait and activities, perceived stress and cognitive performance.

DataScience

Gait outlier analysis provides unique insight on fatigue levels while exercising, probability of injury and post surgery performance and recovery.

Gait Analysis Data Science

DataScience(1)DataScience(3)

A related study looks into how biotelemetry assist in patient treatment such as ALS (Amyotrophic Lateral Sclerosis) disease progression monitoring. The prototype tools collect heart rate, activity and speech data to analyse disease progression.

DataScience(3)

HRV (Heart Rate Variability) features are extracted from both the time and from the frequency domains.

DataScience(4)

Activity score is derived from the three-axis accelerometer data.

DataScience(5)DataScience(6)

The second project was a predictive failure POC, to help determine the condition of Haul Trucks in order to predict when a failure might happen. The cost of having an excavator go down in the field is $5 million a day, while the cost of losing a haul truck is $1.8 million per day. If you can prevent it from going down in the field, that makes a huge difference

DataScience(7)DataScience(8)DataScience(9)DataScience(10)DataScience(11)DataScience(12)

Enhance your Privacy: 3 Easy Steps To Setup Opera’s Unlimited and FREE VPN

Opera is the first major browser maker to integrate an unlimited and free VPN or virtual private network. Now, you don’t have to download VPN extensions or pay for VPN subscriptions to access blocked websites and to shield your browsing when on public Wi-Fi.

OperaVPNOverall

 

With a free, unlimited, native VPN that just works out-of-the-box and doesn’t require any subscription, Opera wants to make VPNs available to everyone.

According to Global Web Index, more than half a billion people (24 percent of the world’s online population) have tried or are currently using VPN services. According to the research, the primary reasons people use a VPN are for better access to entertainment content, browser anonymity, and the ability to access sites restricted by their workplace or country.

  1. Download & Install

To test this new feature, start by downloading and installing Opera’s Developer version 

After downloading OperaSetupDeveloper.zip, unzip it and double-click the Opera Installer.

 OperaInstallerInternet

Click Open

OperaInstaller

Complete the installation process.

  1. To enable the VPN function, click the Opera menu, then scroll down to Preferences…

 

OperaDeveloperPreferences

Under Privacy & Security, click the checkbox to enable the VPN function

OperaPrivacySecurityDeveloper

  1. Open a new browser tab or window in Opera and click on the “VPN” blue button that is in the URL link bar, pull down the ‘Virtual Location’ menu to choose the IP region to mimic (currently Canada, Germany, United States).

OperaVPNOK

Now that the Opera VPN has been enabled, you can toggle the VPN off by clicking on the VPN button and flipping the switch to the OFF position, and back on again by returning to the same menu and flipping it back to the ON position

Prior to this new feature Opera recommended VPN provider was SurfEasy VPN, a company that was bought by Opera about a year ago. The cheapest SurfEasy plan starts from $6.49 a month.

OperaSurfEasyPlan

Most of the times the VPN worked well, but occasionally I’ve received the following error message “VPN is temporarily unavailable. Opera is resolving this issue”. At this stage instability is expected