Revolutionize Your AI Development Process with Kestra: The Ultimate AI Pipeline Builder!
As an AI developer, I am always on the lookout for tools that can help me streamline my workflow and make my development process more efficient. That's why I was so excited when I came across Kestra, the ultimate AI pipeline builder. In this article, I will introduce you to Kestra and show you how it can revolutionize your AI development process.
Introduction to Kestra: The Ultimate AI Pipeline Builder
Kestra is a powerful tool that enables you to build and manage your entire AI development pipeline from start to finish. With Kestra, you can easily integrate your favorite AI frameworks and tools, such as MindsDB, SingleStore, ChatGPT, and OpenAI, to create a seamless workflow that will help you develop and deploy AI models faster than ever before.
What is MindsDB, SingleStore, and ChatGPT?
Before we dive into how Kestra can help you revolutionize your AI development process, let's take a quick look at some of the AI frameworks and tools that Kestra integrates with.
MindsDB is an open-source machine learning framework that makes it easy to build predictive models using SQL. With MindsDB, you can train models directly in your database, making it easy to integrate machine learning into your existing workflows.
SingleStore is a distributed SQL database that provides high performance and scalability. With SingleStore, you can easily store and analyze large amounts of data, making it an ideal choice for AI applications that require real-time processing.
ChatGPT (needs no introduction) is an open-source natural language processing (NLP) model developed by Microsoft. With ChatGPT, you can build chatbots and other NLP applications that can understand and respond to natural language input.
Let’s build an intelligent travel mate!
Now that you have a better understanding of some of the AI frameworks and tools that Kestra integrates with, let's walk through a use case that demonstrates how Kestra, MindsDB and GPT can be used to build a personalized and location-based travel activities recommendation system.
The Problem Statement
Let’s consider a hypothetical company ‘travelvisor.com’, which is a popular travel website that offers millions of accommodation options to travelers worldwide. The website wants to incorporate personalized, weather, and location-based travel activity recommendations to enhance the overall travel experience for their customers. With this feature, travelers can receive tailored suggestions based on their interests, the weather forecast, and their location, making it easier for them to plan and enjoy their trip to the fullest.
A Simple Solution!
Here is the high-level idea of what we will be building:
Step 0:
Get the complete tech stack up and running!
Kestra: Using Docker deployment for this blog
SingleStore DB: Using free tier cloud account
MindsDB: Using free tier cloud account
OpenAI Key (Optional): Only if you want to bring your own key
Step 1: Exploration of the dataset utilized
The dataset to be used is generated by a Python script. It contains information about location and it’s forecasted weather for the next day at 12 PM Noon.
Data Format
Using ‘weather2’ Python library to fetch weather forecast
Using ‘singlestoredb’ Python library to ingest the generated data in SingleStore DB.
Step 2: Create an AI Model in MindsDB
In mindsdb, AI Tables are models that utilize machine learning and are saved as virtual tables in a database. They help in making predictions using your data. You can easily make time series, regression, and classification predictions within your database by querying an AI Table using simple SQL statements and get the results almost immediately.
Model Creation
Navigate to mindsdb query editor and execute below query
CREATE MODEL mindsdb.demo_travelguru_model PREDICT response USING engine = 'openai', max_tokens = 300, model_name = 'gpt-4', prompt_template = 'You are a travel guru with witty sense of humor. Here is the forecast weather of {{Location}} tomorrow midday, temperature would be {{ForecastTemp}}, wind speed would be {{ForecastWindSpeed}} m/s and precipitation amount would be {{ForecastPrecip}}. Suggest top 3 leisure activities in two categories - family and solo traveller.';
prompt_template is the way to pass the context and formulate the response from GPT
Monitor the model
Once the model is built, it may not be ready to use right away. To monitor and evaluate its progress, you can use the query provided below.
SELECT * FROM mindsdb.models WHERE name='demo_travelguru_model';
This gives you details on the model you made recently. You will see "generating" if it's still being trained and "complete" when it's done.
Step 3: Connect MindsDB with SingleStore
MindsDB is a versatile tool that can work with almost any data source. This tool has integrations with numerous databases such as MongoDB, PostgreSQL, MySQL, Airtable, and DB2, to name a few.
Just run below query with required configurations and mindsdb will be able to query data from the connected store!
-- Integrate datastore with mindsdb
CREATE DATABASE singlestore_datasource
WITH
ENGINE = 'singlestore',
PARAMETERS = {
"host": "<HOST>",
"port": 3306,
"database": "<DB>",
"user": "<USER>",
"password": "<PASSWD>"
};
To explore the data from mindsdb, execute a simple select statement
select * from singlestore_datasource.locations_weather
Step 4: Generate the intelligent recommendations!
Once the model is complete, you can use it to make predictions. There are two ways to query the model.
On demand queries: Mostly used for unit testing of the model or from an interactive conversational application.
SELECT response from mindsdb.travelguru_model
WHERE
Location = "<LOCATION>"
AND ForecastTemp="<TEMP>"
AND ForecastWindSpeed="<WINDSPEED>"
AND ForecastPrecip="<PRECIP>";
Voilla! got our first conversational AI response from our model. Exactly, in the format we asked for!
Batch queries: Mostly used for generating recommendations in bulk. All you need to do is to join two table. That’s it! Everything in SQL.
SELECT
s.Location,
r.response
FROM singlestore_datasource.locations_weather s
JOIN mindsdb.demo_travelguru_model r
Step 5: Save results back in datastore
MindsDB not only allows you to read data from a variety of datastores but also allows to write data into the datastores. This makes journey of pushing data back into the system very easy.
Again, it’s just a SQL query! INSERT INTO SELECT
INSERT INTO singlestore_datasource.recommendations_on_locations_weather
(
SELECT
s.Location,
r.response
FROM singlestore_datasource.locations_weather2 s
JOIN mindsdb.demo_travelguru_model r
);
After the recommendations are created, other downstream applications can use them for different business purposes such as sending emails or mobile app notifications.
Step 6: Orchestrate and Automate everything!
We've learned how to make AI recommendations step by step, but in reality, we need a tool like Kestra to bring all those steps together into a pipeline.
Let’s visualize the Kestra pipeline:
Take a look at the pipeline code:
id: kestra-singlestore-mindsdb-chatgpt-openai-integration
namespace: io.kestra.demo
description:
This flow will predict response using GPT-4
tasks:
- id: "IngestionStage"
type: "io.kestra.core.tasks.scripts.Python"
inputFiles:
main.py: |
from kestra import Kestra
import weather
import singlestoredb as s2
# Define the list of locations
locations = ["Miami", "New York", "Los Angeles", "London", "Tokyo"]
# Connect to the database
conn = s2.connect(host='<HOST>', port='3306', user='admin', password='<PASSWD>', database='testdb')
# Define the SQL statement for creating the table
create_table_stmt = '''CREATE TABLE locations_weather (
ID INT PRIMARY KEY AUTO_INCREMENT,
Location VARCHAR(255),
ForecastTemp FLOAT,
ForecastWindSpeed FLOAT,
ForecastPrecip FLOAT
)'''
# Define the SQL statement for inserting data into the table
insert_data_stmt = 'INSERT INTO locations_weather (Location, ForecastTemp, ForecastWindSpeed, ForecastPrecip) VALUES (%s, %s, %s, %s)'
# Create the table
with conn.cursor() as cur:
cur.execute(create_table_stmt)
# Loop through the locations and get the forecasts
for location in locations:
print(f"Getting forecast for {location}...")
forecast = weather.forecast(location)
fcast_temp = forecast.tomorrow["12:00"].temp
fcast_wind_speed = forecast.tomorrow["12:00"].wind.speed
fcast_precip = forecast.tomorrow["12:00"].precip
# Insert the location and forecast data into the table
with conn.cursor() as cur:
cur.execute(insert_data_stmt, (location, fcast_temp, fcast_wind_speed, fcast_precip))
inserted_id = cur.lastrowid
print(f"Forecast for {location} (ID: {inserted_id}) inserted into table")
# Commit the changes to the database
conn.commit()
# Close the database connection
conn.close()
print("Weather forecasts inserted into table")
requirements:
- weather2
- singlestoredb
dockerOptions:
image: python
runner: DOCKER
- id: "DatastoreConnectivityStage"
type: "io.kestra.plugin.jdbc.mysql.Query"
url: jdbc:mysql://cloud.mindsdb.com:3306/
username: <USER>
password: <PASSWD>
sql: |
CREATE DATABASE singlestore_datasource
WITH
ENGINE = 'singlestore',
PARAMETERS = {
"host": "<HOST>",
"port": 3306,
"database": "testdb",
"user": "admin",
"password": "<PASSWD>"
};
- id: "TravelActivitiesPredictionStage"
type: "io.kestra.plugin.jdbc.mysql.Query"
url: jdbc:mysql://cloud.mindsdb.com:3306/
username: <USER>
password: <PASSWD>
sql: |
INSERT INTO singlestore_datasource.recommendations_on_locations_weather
(
SELECT
s.Location,
r.response
FROM singlestore_datasource.locations_weather2 s
JOIN mindsdb.demo_travelguru_model r
);
- id: "SanityTestStage"
type: "io.kestra.plugin.jdbc.mysql.Query"
url: jdbc:mysql://cloud.mindsdb.com:3306/
username: <USER>
password: <PASSWD>
sql: |
select * from singlestore_datasource.recommendations_on_locations_weather limit 2;
fetchOne: true
Since MindsDB operates on the same wire protocol as MySQL, Kestra does not require a separate MindsDB plugin. Instead, we can use the MySQL plugin.
Breaking the code in pieces:
Stage #1: Data generation and data load stage (Python Task)
Stage #2: MindsDB and SingleStore Integration stage (MySQL Task)
Stage #3: Conversational AI Response generation stage (MySQL Task)
Stage #4: Sanity check stage ( Task)
Monitor our first AI pipeline:
First take a look at the Topology tab.
Monitor the progress to tasks in Gantt tab.
Finally, take a look at the generated output in Outputs tab.
Conclusion: Why Kestra is the Ultimate AI Pipeline Builder
As you can see, Kestra is a powerful tool that can help you revolutionize your AI development process. With Kestra, you can easily integrate your favorite AI frameworks and tools, such as MindsDB, SingleStore, ChatGPT, and OpenAI, to create a seamless workflow that will help you develop and deploy AI models faster than ever before.