Search This Blog

Sunday, August 9, 2020

Creating a simple application to process Tweets from Twitter and adding to Kinesis Data stream, Kinesis Firehose - Python, AWS Kinesis, Tweepy, Boto, Twitter API, Firehose, Lambda




Here we will create an application (coded in python), which will read tweets from twitter using an API, push them to the Kinesis Data stream, and consume the data from the other end of the Data stream. We will use Tweepy Library to read the tweets via Twitter API, Boto library to write to Kinesis Data stream and also read the Data stream. Additionally we configure a Kinesis Firehose to read from Kinesis Data Stream, Transform data and Write it to S3.


Technologies used :

AWS Kinesis Data Stream :
Amazon Kinesis Data Streams is a scalable and durable real-time data streaming service. Unlike Message Queues, the data is retained for a specific duration and different consumers can read data at different locations in the stream. Each Data stream is made up of Shards, which is uniquely identified sequence of data records in a stream (A mini pipe inside the main pipeline). Each Shard can support up to 2 Mbps for Read and up to 1000 records/Sec - 1 Mbps for write. If you need more capacity you need to add more Shards. It is charged based on the bandwidth used and time period for which data is retained in the Data stream.

Python Tweepy Library :
This Python Library is used to perform read/write information from/to twitter API.

Python Boto Library :
This is an AWS Library to perform AWS operations. We use it here to write data to Kinesis Data Stream.

AWS Kinesis Firehose : 
It is a service that can read real time data from Kinesis/API and write it to various targets like S3, Redshift, Elastic search.

AWS S3 :
Is Amazons cloud storage service.

AWS Lambda Function:
Is a serverless computing (backend services) offering by Amazon AWS. Lambda supports following languages as of this day - Ruby, Python, NodeJS, Java, .NET(C#), Go.


Let's Begin :

To access the twitter tweets, you need to create a developers account with Twitter. This is fairly easy.
Apply for a developer access : 

https://developer.twitter.com/en/apply-for-access

You can apply as a student and select read access only to the tweets.
Verify your email and you will be asked to create your first app. Enter the app name as per your liking and click on Get keys. In the next screen your keys will be displayed, so make sure you save them somewhere safe.




Now lets try if the api works as expected. We will call a test api url to get a sample tweet. 

curl -X GET -H "Authorization: Bearer <BEARER_TOKEN>" https://api.twitter.com/labs/2/tweets/1138505981460193280?

Replace <BEARER_TOKEN> with the token you received above and run this code in the command prompt/terminal.



Coming to our Python code, we first need to install the boto and tweepy library in Python (Assuming Python3 is already installed).


python3 -m pip install tweepy

python3 -m pip install boto



Make sure they are installed.


python3 -m pip list



Now create a Python script - Twitter_Test.py
Initially we will write a code to read tweets from the api and print it on the screen.



import os
import tweepy as tw

consumer_key = 'zdNolW7xxuI8Pcck93xxxxx'
consumer_secret = 'OSxcaaaaaYSSfd9XwCrwgevvvvv6RRnuMfTwccYKE2xxxxx'

auth = tw.AppAuthHandler(consumer_key, consumer_secret)
api = tw.API(auth)
for tweet in tw.Cursor(api.search, q='aws' , result_type='popular').items(5):
    print(tweet.text)

Copy and paste the consumer key and secret which you received in the consumer_key and consumer_secret variables. The above code will authenticate using the api keys and then fetch top 5 popular Tweets by doing a search using the keyword "aws". It will then print the tweets on the screen.




Now that we can successfully read tweets from Twitter we will proceed with our code to write this data to our Kinesis data stream.
To start with, we need to create a Data stream in AWS. Search for Kinesis in AWS console, and click on create a data stream. Enter a name for your stream and number of open shards as 1 for now. A stream can contain multiple Shards. Each Shard contains a set of data records. Click on create data stream.






We will now modify our code in Twitter_Test.py. We will import the boto library, connect to kinesis us-east-1 region. Then we will insert each tweet into the data stream using kinesis.put_record(). It accepts the data stream name and partition Id as argument. We can keep the partition Id as one for now. we print the output to the screen just for debugging purpose.

import os
import tweepy as tw
from boto import kinesis

consumer_key = 'xxxxxsDduI8P0A6k9xxxxxx'
consumer_secret = 'xxxbYSSfd9XwCxxxxb2MZX6RRnuxxxxxxxxxx'

auth = tw.AppAuthHandler(consumer_key, consumer_secret)
api = tw.API(auth)
kinesis = kinesis.connect_to_region("us-east-1")

for tweet in tw.Cursor(api.search, q='kinesis' , result_type='recent').items(5):
    print(kinesis.put_record("kinesis_test", tweet.text, "1"))

We will create a new Python script - Twitter_Consumer_Test.py to read the data from the other end of the Data stream. Here we use the Shard Id "shardId-000000000000" since we just have one shard. Next we get the Shard Iterator. Shard Iterator is nothing but the pointer from where we need to start reading data. Once we read a record we need to move to the next pointer. We have written an infinite while loop to keep pooling the Data stream for new data sets. kinesis.get_records() fetches the data sets and accepts the data stream name, shard Id, and Iterator Type (LATEST, TRIM_HORIZON, AT_TIMESTAMP). We use LATEST to get the newly added 5 records only. NextShardIterator is the pointer to next Shard iterator. We have added sleep to wait for 1 second before checking the Data stream again.

import os
import time
import tweepy as tw
from boto import kinesis

kinesis = kinesis.connect_to_region("us-east-1")

shard_id = 'shardId-000000000000'
shard_itr = kinesis.get_shard_iterator("kinesis_test", shard_id, "LATEST")["ShardIterator"]

while 1==1:
    tweets = kinesis.get_records(shard_itr, limit=5)
    for tweet in tweets["Records"]:
        print(tweet["Data"])

    shard_itr = tweets["NextShardIterator"]
    time.sleep(1)

We can run the Twitter_Consumer_Test.py first which will keep on polling for new data.
In another window we can run the Twitter_Test.py which will insert data in the Data stream.
As soon as the data is inserted you will see the tweets printed by the Twitter_Consumer_Test.py.


python3 Twitter_Consumer_Test.py



python3 Twitter_Test.py



As you see above the Twitter_Consumer_Test.py has read the data and printed the tweets on the screen.


You have further options to connect it to a AWS Kinesis Data Firehose for delivering the data to S3, Redshift, Elastic Search etc. Before delivering you can also transform the data using a Lambda function.

Create a new Data Firehose in Kinesis.

Select Kinesis Data stream as source.



Enable transformation if you want to apply data transformation rules before delivering. Below I am creating a new lambda function in Node JS to process the data. The Lambda function receives input data, and returns the transformed data.

I have selected the first option to create a base code.




You can test you function with a test event and make sure there are no errors.



Once your lambda function is ready you can go back to Firehose setup page and select the Lambda function from the drop down.
Next select S3 as destination if you want it delivered as a file to S3. Create a new bucket or select an existing bucket.




When you create an IAM role, make sure it has access to Kinesis Data Stream, Lambda Function execution and S3 operations.




In the following NodeJS Lambda function code we are:

1) Writing the data to Test_File.json in output_testing S3 bucket.
2) Returning the record to Kinesis Firehose to write it to the target S3 bucket - input_testing



var AWS = require('aws-sdk');
var s3 = new AWS.S3();

console.log('Loading function');

exports.handler = async (event, context, callback) => {
    /* Process the list of records and transform them */
    console.log(event.records);
    const output = event.records.map((record) => ({
        /* This transformation is the "identity" transformation, the data is left intact */
        recordId: record.recordId,
        result: 'Ok',
        data: record.data,
    }));
    console.log(`Processing completed.  Successful records ${output.length}.`);
    var params = { Bucket: "output-testing", Key: "Test_File.json", Body: JSON.stringify({ records: output }) };
    try {
    const res = await s3.putObject(params, function (err, data) {
                if (err)
                    console.log(err)
                else
                    console.log("Successfully saved object to output-testing/Test_File.json");
    }).promise();
    console.log('complete:', res);
    }
    catch(err)
    {
        console.log(err)
    }
    return { records: output } ;
};


You will see the files created by Firehose in similar path: 

 

And files created by Lambda :