Building a Robust and Real-time Chat-GPT Bot for Sentiment Analysis: Leveraging Prompt Engineering, Apache PySpark, Airflow, and Twitter API

Harshit Gola
8 min readMay 18, 2023

--

How to Deploy a scalable, fault-tolerant, and resilient Chat-GPT Bot capable of providing real-time insights using Prompt Engineering and Big Data Solutions?

Today, we will be going through the steps involved in creating a Chat-GPT Bot that fetches Data from Twitter using Python APIs and utilizes open-source Big Data tools and prompt-Engineering to provide real-time Insights generation using Chat-GPT 3.5. And, I will be covering both the High-Level to Low-level details in designing the system.

Please Note: This project only demonstrates the capabilities and can be used as a model to serve as a solution to tackle a bigger and more impactful usecase.

Contents:
Section 1 - System Design
(I) Scalability
(II) Resiliency

Section 2 - Architecture Setup
Step 1: Install Docker.
Step 2: Install the containers using the Docker Image.
Why Airflow?
Why Pyspark?

Section 3 - The Code
Step 1: Fetch Tweets using Tweepy package.
Step 2: Setup Chat-GPT Module for Prompt-Engineering.
Step 3: Setup DAGs in Airflow

Section 4 - Future Applications

Section 1 — System Design:

(I) Scalability — The system is designed in such a way that it holds the power to scale dynamically as the user data may get bigger/smaller depending on the demand and the growth.

(i) The system has the ability to use distributed computing in order to run the pyspark code using the Apache Spark Engine.

(ii) System is capable of running the code on a scheduled frequency using Apache Airflow DAGs which gives it the ability to get automated alerts on a timely basis.

(II) Resiliency — The system is designed to account for server failure. In case, the container shuts down or becomes irresponsive due to some unforeseeable reason, a copy of the data would still be maintained in the Postgres server and thus it can be recovered.

Section 2 — Architecture Setup:

In the upcoming sections, we will demonstrate the process of establishing a streamlined Apache Airflow, Apache Spark Engine, Postgres server, and development environment that is lightweight, self-contained, and effortless to deploy within a matter of minutes.

By leveraging Docker-Compose, we can optimize our development workflow, enabling rapid iteration cycles. With the simple act of launching a handful of docker containers, we will be ready to design our personalized workflows.

Time to get our hands dirty!

Step 1: Install Docker Desktop using GUI and install docker CLI using homebrew:

⦿ Here are the steps I followed to install the CLI via homebrew using Visual Studio Code

brew cask install docker
docker --version

Step 2: Install the containers using the Docker Image

(i) Clone the project folder from my below-mentioned repository which contains the docker-compose yaml file and a .env file containing our environment variable for the container setup.

⦿ These are basic instructions that comprise configurations for the environment and the variables to set the database credentials, the airflow user, and some further configurations.

  git clone https://github.com/harshit1795/Airflow_setup.git

(ii) Now open the terminal and make sure you have Visual Studio Code otherwise use your own IDE with the given git repo.

cd Airflow_setup 
code .

⦿ You should see a similar screen in your IDE with the docker-compose.yml file.

⦿ Just head over to the terminal and spin up all the necessary containers by running.

docker compose up -d

⦿ The key components include the scheduler, web server, metadatabase (PostgreSQL), and the airflow-init task responsible for initializing the database.

I also created an environment for Pyspark using docker build to handle huge loads of data in case the demand grows. You can access the docker file here and the steps for installation would stay the same as what we did for other containers.

git clone https://github.com/harshit1795/Pyspark_setup.git
cd pyspark_setup
docker build --tag tag_name .
docker run -d -p 8888:8888 -p 4040:4040 --name jupyter-lab -v "Path Name" "tag_name"

(iii) Now, check if all the docker containers are running and make sure you start the docker containers respectively.

docker ps -a
docker start jupyter-lab
docker start airflow-init
docker start airflow-scheduler
docker start airflow-webserver

⦿ Or, you can run the container manually via the Docker UI by hitting the ‘play’ button as shown below.

(iv) Now we can access the airflow webserver by accessing http://localhost:8080/home on any browser. These are the ports that we defined for port forwarding while configuring the docker container.

Why Airflow?

Apache Airflow is an open-source platform designed to execute diverse workflows & Data pipelines. Airflow leverages the Python programming language to specify pipelines. Users have the flexibility to use for loops to define pipelines, execute bash commands, utilize external modules such as pandas, sklearn, GCP, or AWS libraries for cloud service management, and explore a wide range of other possibilities.

Apache Airflow can be used to schedule:

  • ETL pipelines that extract data from multiple sources and run Spark jobs or any other data transformations.
  • Training machine learning models.
  • Report generation.
  • Backups and similar DevOps operations.

Although Airflow can be run on a single machine as it is fully designed to be deployed in a distributed manner. This is because Airflow consists of separate parts which are a Webserver, Scheduler, and a Worker.

Why Pyspark?

PySpark is an interface for Apache Spark in Python. With PySpark, you can write Python and SQL-like commands to manipulate and analyze data in a distributed processing environment.

Great Job! We have now successfully created our environment and learnt some basic Big Data Tools Concepts.

Section 3 — The Code

You can access the Project code here:

https://github.com/harshit1795/Twitter-ChatGPT_Sentiment_Analysis.git

Step 1: Fetch Tweets using the Tweepy package.

(i) I used Twitter’s Tweepy Library to fetch the latest tweets from Amazon’s customer care service account ‘AmazonHelp’ and store them in a data frame using the pandas library.

(ii) After setting up the tokens and API key on the Twitter Developer Portal, I was able to retrieve the latest tweets using the user_timeline() and cursor() methods. You can learn more about the other tweepy functions here.

(iii) Then, I performed some Data Cleaning, transformations and saved the results in a data frame using the pandas library.

Step 2: Setup Chat-GPT Module for Prompt-Engineering

(i) You will be required to set up a paid account and install the necessary packages in order to get started with ChatGPT’s API.

Setup Paid Chat GPT account

(ii) After the initial setup, we can utilize ChatGPT’s ‘get_completion’ method which is capable of ingesting a prompt and providing a response based upon a certain action that we define in the prompt shown below in the third line of code.

Defining the ChatGPT function for transforming incoming text into meaningful prompts.

(iii) As an example, I created a simple prompt to determine the tone of the tweet and categorize the nature of tweets as ‘Negative’, ‘Positive’, or ‘Neutral’ which is labeled using ChatGPT 3.5 LLM model.

As you can see below, ChatGPT has created another column ‘summary’ in our data frame which shows the tone of the tweet. We can collect such data and build metrics on top of it to keep track of the user sentiments for a product/service or to get a general idea about what people are talking about.

Prompt Engineering using ChatGPT 3.5

Similarly, we can create different prompts to determine and extract more useful information from the incoming data for various other use cases.

Step 3: Setup DAGs in Airflow

First of all, make sure to run below mentioned packagesto install the required Python modules in the airflow-init container.

sudo apt-get update
sudo apt install python3-pip
sudo pip install apache-airflow
sudo pip install pandas
sudo pip install s3fs
Sudo pip install tweepy

(i) In order to get real-time updates and run the code periodically using Airflow, we need to create a .py in the ‘dags’ folder located in the directory where Airflow was installed. This .py file would contain the dag function that basically instructs Airflow to run a piece of Python code on a scheduled basis.

(ii) Here is the main DAG function which we need to create inside the Airflow folder, navigate to dags and create a new file called ‘filename’.py dag file and it just requires the name of the function (‘run_my_function’) that we want to be executed on a scheduled basis.

Provide the start_date, and Schedule_interval

Basic DAG function Configuration

(iii) The final and last step is to execute the DAG and see the results by clicking on Run > Graph > Log

http://localhost:8080/home
Manually Triggering the DAG

As you can see below, Airflow DAG successfully executed the code and showed the same results that we manually ran in the jupyter notebook environment initially.

Alternatively, you can also run the below command to execute the DAG using the terminal.

docker exec -it --user airflow airflow-scheduler bash -c "dag name"
Airflow DAG logs showing the output

Awesome! We have now successfully tested our data pipeline to get automated insights on a scheduled basis.

Section 4— Future Applications

I have done this project for fetching data from Twitter. Providing the Exploratory Data Analysis we can see that there are a significant number of negative’ tweets that users have posted lately which could impact and affect the Company’s brand image.

Similarly, we can perform steps to identify the sentiments of other companies or take data from other platforms like LinkedIn, Reddit, etc to do a similar analysis.

Future goal: We can try generating Responses based on the prompts and ReTweet the same users about the complaints/dissatisfaction with the products and service which could result in greater Customer Loyalty and building Brand reputation.

# Create API object
api = tweepy.API(auth)

# ID of the tweet you want to retweet
tweet_id = "1234567890"

# Additional message to include in the retweet
message = "This is my retweet!"

# Retweet the tweet with a message
try:
api.retweet(tweet_id)
api.update_status(message)
print("Retweet successful!")
except tweepy.TweepError as e:
print("Error occurred while retweeting:", e)

--

--

Harshit Gola
Harshit Gola

Written by Harshit Gola

Supporting Analytics Automation needs for Voya Internal Audit. Last role was with Meta in Risk Advisory department fulfilling B.I. and Automation needs.