Computer >> บทช่วยสอนคอมพิวเตอร์ >  >> การเขียนโปรแกรม >> Redis

แชทบอท AI ผู้เชี่ยวชาญ:สร้างบอทที่ขับเคลื่อนด้วย GPT อันทรงพลังด้วย Redis และ Python – คู่มือที่ครอบคลุม

แชทบอท AI ผู้เชี่ยวชาญ:สร้างบอทที่ขับเคลื่อนด้วย GPT อันทรงพลังด้วย Redis และ Python – คู่มือที่ครอบคลุม

โดย สตีเฟน ซันโว

ในการสร้างแอปพลิเคชันแบบฟูลสแตกที่ใช้งานได้ มีหลายส่วนที่เคลื่อนไหวได้ที่ต้องคำนึงถึง และคุณจะต้องทำการตัดสินใจหลายอย่างที่สำคัญต่อความสำเร็จของแอปของคุณ

ตัวอย่างเช่น คุณจะใช้ภาษาอะไร และคุณจะปรับใช้บนแพลตฟอร์มใด คุณจะปรับใช้ซอฟต์แวร์คอนเทนเนอร์บนเซิร์ฟเวอร์ หรือใช้ฟังก์ชันไร้เซิร์ฟเวอร์เพื่อจัดการแบ็กเอนด์หรือไม่? คุณวางแผนที่จะใช้ API ของบริษัทอื่นเพื่อจัดการส่วนที่ซับซ้อนของแอปพลิเคชันของคุณ เช่น การตรวจสอบสิทธิ์หรือการชำระเงิน หรือไม่ คุณเก็บข้อมูลไว้ที่ไหน?

นอกจากนี้ คุณยังต้องคำนึงถึงอินเทอร์เฟซผู้ใช้ การออกแบบและการใช้งานแอปพลิเคชันของคุณ และอื่นๆ อีกมากมาย

นี่คือเหตุผลว่าทำไมแอปพลิเคชันขนาดใหญ่ที่ซับซ้อนจึงต้องอาศัยทีมพัฒนาแบบมัลติฟังก์ชั่นที่ทำงานร่วมกันเพื่อสร้างแอป

หนึ่งในวิธีที่ดีที่สุดในการเรียนรู้วิธีพัฒนาแอปพลิเคชันแบบฟูลสแตกคือการสร้างโปรเจ็กต์ที่ครอบคลุมกระบวนการพัฒนาตั้งแต่ต้นทางถึงปลายทาง คุณจะได้ผ่านการออกแบบสถาปัตยกรรม การพัฒนาบริการ API การพัฒนาอินเทอร์เฟซผู้ใช้ และสุดท้ายคือปรับใช้แอปพลิเคชันของคุณ

ดังนั้นบทช่วยสอนนี้จะแนะนำคุณตลอดกระบวนการสร้างแชทบอท AI เพื่อช่วยให้คุณเรียนรู้แนวคิดเหล่านี้ในเชิงลึก

หัวข้อบางส่วนที่เราจะกล่าวถึง ได้แก่:

  • วิธีสร้าง API ด้วย Python, FastAPI และ WebSockets
  • วิธีสร้างระบบแบบเรียลไทม์ด้วย Redis
  • วิธีสร้างอินเทอร์เฟซผู้ใช้แชทด้วย React

หมายเหตุสำคัญ: นี่คือโครงการพัฒนาซอฟต์แวร์สแต็กเต็มรูปแบบระดับกลางที่ต้องใช้ความรู้พื้นฐานเกี่ยวกับ Python และ JavaScript

ฉันได้แบ่งโครงการออกเป็นส่วนๆ อย่างระมัดระวังเพื่อให้แน่ใจว่าคุณสามารถเลือกระยะที่สำคัญสำหรับคุณได้อย่างง่ายดาย ในกรณีที่คุณไม่ต้องการเขียนโค้ดแอปพลิเคชันแบบเต็ม

คุณสามารถดาวน์โหลดพื้นที่เก็บข้อมูลแบบเต็มได้ที่ My Github ที่นี่

สารบัญ

ส่วนที่ 1

  • สถาปัตยกรรมแอปพลิเคชัน
  • วิธีการตั้งค่าสภาพแวดล้อมการพัฒนา

    ส่วนที่ 2

  • วิธีสร้างเซิร์ฟเวอร์แชทด้วย Python, FastAPI และ WebSockets
    • วิธีการตั้งค่าสภาพแวดล้อม Python
    • การตั้งค่าเซิร์ฟเวอร์ FastAPI
    • วิธีเพิ่มเส้นทางไปยัง API
    • วิธีสร้างโทเค็นเซสชันการแชทด้วย UUID
    • วิธีทดสอบ API กับบุรุษไปรษณีย์
    • เว็บซ็อกเก็ตและเครื่องมือจัดการการเชื่อมต่อ
    • การพึ่งพาการฉีดใน FastAPI

      ส่วนที่ 3

  • วิธีการสร้างระบบเรียลไทม์ด้วย Redis
    • แจกจ่ายและกระจายคิวการส่งข้อความ
    • วิธีเชื่อมต่อกับคลัสเตอร์ Redis ใน Python ด้วยไคลเอนต์ Redis
    • วิธีการทำงานกับ Redis Streams
    • วิธีสร้างโมเดลข้อมูลแชท
    • วิธีการทำงานกับ Redis JSON
    • วิธีอัปเดตการพึ่งพาโทเค็น

      ส่วนที่ 4

  • วิธีเพิ่มความฉลาดให้กับ Chatbots ด้วยโมเดล AI
    • วิธีการเริ่มต้นใช้งาน Huggingface
    • วิธีการโต้ตอบกับโมเดลภาษา
    • วิธีจำลองหน่วยความจำระยะสั้นสำหรับโมเดล AI
    • สตรีมผู้บริโภคและดึงข้อมูลแบบเรียลไทม์จากคิวข้อความ
    • วิธีอัปเดตไคลเอ็นต์การแชทด้วยการตอบกลับของ AI
    • รีเฟรชโทเค็น
    • วิธีทดสอบการแชทกับลูกค้าหลายรายในบุรุษไปรษณีย์

สถาปัตยกรรมแอปพลิเคชัน

การร่างสถาปัตยกรรมโซลูชันช่วยให้คุณเห็นภาพรวมระดับสูงของแอปพลิเคชันของคุณ เครื่องมือที่คุณตั้งใจจะใช้ และวิธีที่ส่วนประกอบต่างๆ จะสื่อสารระหว่างกัน

ฉันได้วาดสถาปัตยกรรมที่เรียบง่ายด้านล่างโดยใช้ Draw.io:

แชทบอท AI ผู้เชี่ยวชาญ:สร้างบอทที่ขับเคลื่อนด้วย GPT อันทรงพลังด้วย Redis และ Python – คู่มือที่ครอบคลุม สถาปัตยกรรมแชทบอทแบบ Fullstack

เรามาดูรายละเอียดส่วนต่างๆ ของสถาปัตยกรรมกันดีกว่า:

ส่วนต่อประสานไคลเอนต์/ผู้ใช้

เราจะใช้ React เวอร์ชัน 18 เพื่อสร้างอินเทอร์เฟซผู้ใช้ Chat UI จะสื่อสารกับแบ็กเอนด์ผ่าน WebSockets

GPT-J-6B และ API การอนุมาน Huggingface

GPT-J-6B เป็นโมเดลภาษาเชิงกำเนิดซึ่งได้รับการฝึกฝนด้วยพารามิเตอร์ 6 พันล้านพารามิเตอร์ และทำงานอย่างใกล้ชิดกับ GPT-3 ของ OpenAI ในบางงาน

ฉันเลือกใช้ GPT-J-6B เนื่องจากเป็นรุ่นโอเพ่นซอร์สและไม่ต้องใช้โทเค็นแบบชำระเงินสำหรับกรณีการใช้งานทั่วไป

Huggingface ยังให้ API ตามความต้องการแก่เราเพื่อเชื่อมต่อกับโมเดลนี้โดยไม่มีค่าใช้จ่าย คุณสามารถอ่านเพิ่มเติมเกี่ยวกับ GPT-J-6B และ Hugging Face Inference API

แดง

เมื่อเราส่งข้อความแจ้งไปยัง GPT เราจำเป็นต้องมีวิธีจัดเก็บข้อความแจ้งและเรียกข้อมูลการตอบกลับได้อย่างง่ายดาย เราจะใช้ Redis JSON เพื่อจัดเก็บข้อมูลการแชท และยังใช้ Redis Streams เพื่อจัดการการสื่อสารแบบเรียลไทม์ด้วย API การอนุมานของ Huggingface

Redis เป็นที่จัดเก็บคีย์-ค่าในหน่วยความจำที่ช่วยให้ดึงและจัดเก็บข้อมูลที่คล้ายกับ JSON ได้อย่างรวดเร็วเป็นพิเศษ สำหรับบทแนะนำสอนการใช้งานนี้ เราจะใช้พื้นที่เก็บข้อมูล Redis ฟรีที่มีการจัดการซึ่งจัดทำโดย Redis Enterprise เพื่อการทดสอบ

ซ็อกเก็ตเว็บและ Chat API

หากต้องการส่งข้อความระหว่างไคลเอนต์และเซิร์ฟเวอร์แบบเรียลไทม์ เราจำเป็นต้องเปิดการเชื่อมต่อซ็อกเก็ต เนื่องจากการเชื่อมต่อ HTTP จะไม่เพียงพอสำหรับการสื่อสารแบบสองทิศทางแบบเรียลไทม์ระหว่างไคลเอนต์และเซิร์ฟเวอร์

เราจะใช้ FastAPI สำหรับเซิร์ฟเวอร์แชท เนื่องจากมีเซิร์ฟเวอร์ Python ที่รวดเร็วและทันสมัยสำหรับการใช้งานของเรา ตรวจสอบเอกสาร FastAPI) เพื่อเรียนรู้เพิ่มเติมเกี่ยวกับ WebSockets

วิธีการตั้งค่าสภาพแวดล้อมการพัฒนา

คุณสามารถใช้ระบบปฏิบัติการที่คุณต้องการเพื่อสร้างแอปนี้ได้ - ฉันกำลังใช้ MacOS และ Visual Studio Code เพียงตรวจสอบให้แน่ใจว่าคุณได้ติดตั้ง Python และ NodeJs แล้ว

หากต้องการตั้งค่าโครงสร้างโปรเจ็กต์ ให้สร้างโฟลเดอร์ชื่อ06 . จากนั้นสร้างสองโฟลเดอร์ภายในโปรเจ็กต์ชื่อ 17 และ 24 . เซิร์ฟเวอร์จะเก็บโค้ดไว้สำหรับส่วนหลัง ในขณะที่ไคลเอ็นต์จะเก็บโค้ดไว้สำหรับส่วนหน้า

ถัดไปภายในไดเร็กทอรีโปรเจ็กต์ เริ่มต้นพื้นที่เก็บข้อมูล Git ภายในรูทของโฟลเดอร์โปรเจ็กต์โดยใช้คำสั่ง "git init" จากนั้นสร้างไฟล์ .gitignore โดยใช้ "touch .gitignore":

git init
touch .gitignore

ในส่วนถัดไป เราจะสร้างเว็บเซิร์ฟเวอร์แชทของเราโดยใช้ FastAPI และ Python

วิธีสร้างเซิร์ฟเวอร์แชทด้วย Python, FastAPI และ WebSockets

ในส่วนนี้ เราจะสร้างเซิร์ฟเวอร์แชทโดยใช้ FastAPI เพื่อสื่อสารกับผู้ใช้ เราจะใช้ WebSockets เพื่อรับรองการสื่อสารแบบสองทิศทางระหว่างไคลเอนต์และเซิร์ฟเวอร์ เพื่อให้เราสามารถส่งการตอบกลับไปยังผู้ใช้แบบเรียลไทม์

วิธีการตั้งค่าสภาพแวดล้อม Python

เพื่อเริ่มต้นเซิร์ฟเวอร์ของเรา เราจำเป็นต้องตั้งค่าสภาพแวดล้อม Python ของเรา เปิดโฟลเดอร์โปรเจ็กต์ภายใน VS Code และเปิดเทอร์มินัล

จากรูทโปรเจ็กต์ ให้ cd ลงในไดเร็กทอรีเซิร์ฟเวอร์และรัน 33 . สิ่งนี้จะสร้างสภาพแวดล้อมเสมือนจริง สำหรับโปรเจ็กต์ Python ของเรา ซึ่งจะชื่อว่า 44 . หากต้องการเปิดใช้งานสภาพแวดล้อมเสมือน ให้รัน 50

จากนั้น ให้ติดตั้งไลบรารีสองสามไลบรารีในสภาพแวดล้อม Python ของคุณ

pip install fastapi uuid uvicorn gunicorn WebSockets python-dotenv aioredis

ถัดไปสร้างไฟล์สภาพแวดล้อมโดยเรียกใช้ 63 ในอาคารผู้โดยสาร เราจะกำหนดตัวแปรแอปและตัวแปรลับภายใน 73 ไฟล์.

เพิ่มตัวแปรสภาพแวดล้อมของแอปและตั้งค่าเป็น "การพัฒนา" ดังนี้:86 . ต่อไป เราจะตั้งค่าเซิร์ฟเวอร์การพัฒนาด้วยเซิร์ฟเวอร์ FastAPI

การตั้งค่าเซิร์ฟเวอร์ FastAPI

ที่รากของไดเร็กทอรีเซิร์ฟเวอร์ ให้สร้างไฟล์ใหม่ชื่อ 96 จากนั้นวางโค้ดด้านล่างสำหรับเซิร์ฟเวอร์การพัฒนา:

from fastapi import FastAPI, Request
import uvicorn
import os
from dotenv import load_dotenv
load_dotenv()
api = FastAPI()
@api.get("/test")
async def root():
 return {"msg": "API is Online"}
if __name__ == "__main__":
 if os.environ.get('APP_ENV') == "development":
 uvicorn.run("main:api", host="0.0.0.0", port=3500,
 workers=4, reload=True)
 else:
 pass

ก่อนอื่นเรา 107 และกำหนดค่าเริ่มต้นเป็น 115 . จากนั้นเรา 120 จาก 133 ไลบรารี และเริ่มต้นให้โหลดตัวแปรจาก 142 ไฟล์

จากนั้นเราจะสร้างเส้นทางทดสอบง่ายๆ เพื่อทดสอบ API เส้นทางทดสอบจะส่งคืนการตอบสนอง JSON แบบง่ายที่บอกเราว่า API ออนไลน์อยู่

สุดท้ายนี้ เราตั้งค่าเซิร์ฟเวอร์การพัฒนาโดยใช้ 159 และจัดเตรียมข้อโต้แย้งที่จำเป็น API จะทำงานบนพอร์ต 165 .

สุดท้าย ให้รันเซิร์ฟเวอร์ในเทอร์มินัลด้วย 175 . เมื่อคุณเห็น 189 ในเทอร์มินัล ไปที่ URL http://localhost:3500/test บนเบราว์เซอร์ของคุณ และคุณควรจะได้หน้าเว็บดังนี้:

แชทบอท AI ผู้เชี่ยวชาญ:สร้างบอทที่ขับเคลื่อนด้วย GPT อันทรงพลังด้วย Redis และ Python – คู่มือที่ครอบคลุม หน้าทดสอบ API

วิธีเพิ่มเส้นทางไปยัง API

ในส่วนนี้ เราจะเพิ่มเส้นทางไปยัง API ของเรา สร้างโฟลเดอร์ใหม่ชื่อ 195 . นี่คือไดเร็กทอรีที่โค้ด API ทั้งหมดของเราจะใช้งานได้

สร้างโฟลเดอร์ย่อยชื่อ 206 ใส่ซีดีลงในโฟลเดอร์ สร้างไฟล์ใหม่ชื่อ 218 จากนั้นเพิ่มโค้ดด้านล่าง:

import os
from fastapi import APIRouter, FastAPI, WebSocket, Request
chat = APIRouter()
# @route POST /token
# @desc Route to generate chat token
# @access Public
@chat.post("/token")
async def token_generator(request: Request):
 return None
# @route POST /refresh_token
# @desc Route to refresh token
# @access Public
@chat.post("/refresh_token")
async def refresh_token(request: Request):
 return None
# @route Websocket /chat
# @desc Socket for chatbot
# @access Public
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket = WebSocket):
 return None

เราสร้างจุดสิ้นสุดสามจุด:

  • 220 จะออกโทเค็นเซสชันให้กับผู้ใช้เพื่อเข้าถึงเซสชันการแชท เนื่องจากแอปแชทจะเปิดสู่สาธารณะ เราจึงไม่ต้องการกังวลเกี่ยวกับการตรวจสอบสิทธิ์และเพียงทำให้มันเรียบง่าย แต่เรายังคงต้องการวิธีในการระบุเซสชันผู้ใช้ที่ไม่ซ้ำกันแต่ละเซสชัน
  • 235 จะได้รับประวัติเซสชันสำหรับผู้ใช้หากการเชื่อมต่อขาดหาย ตราบใดที่โทเค็นยังคงใช้งานอยู่และไม่หมดอายุ
  • 247 จะเปิด WebSocket เพื่อส่งข้อความระหว่างไคลเอนต์และเซิร์ฟเวอร์

จากนั้น เชื่อมต่อเส้นทางการแชทกับ API หลักของเรา ก่อนอื่นเราต้อง 255 ภายใน 262 ของเรา ไฟล์. จากนั้นเราจะรวมเราเตอร์ด้วยการเรียก 277 อย่างแท้จริง วิธีการในการเริ่มต้น 282 ชั้นเรียนและผ่านการแชทเป็นข้อโต้แย้ง

อัปเดต 291 ของคุณ รหัสตามที่แสดงด้านล่าง:

from fastapi import FastAPI, Request
import uvicorn
import os
from dotenv import load_dotenv
from routes.chat import chat
load_dotenv()
api = FastAPI()
api.include_router(chat)
@api.get("/test")
async def root():
 return {"msg": "API is Online"}
if __name__ == "__main__":
 if os.environ.get('APP_ENV') == "development":
 uvicorn.run("main:api", host="0.0.0.0", port=3500,
 workers=4, reload=True)
 else:
 pass

วิธีสร้างโทเค็นเซสชันการแชทด้วย UUID

ในการสร้างโทเค็นผู้ใช้ เราจะใช้ 307 เพื่อสร้างเส้นทางแบบไดนามิกสำหรับปลายทางการแชทของเรา เนื่องจากนี่คือจุดสิ้นสุดที่เปิดเผยต่อสาธารณะ เราจึงไม่จำเป็นต้องลงรายละเอียดเกี่ยวกับ JWT และการตรวจสอบสิทธิ์

หากคุณไม่ได้ติดตั้ง 313 เริ่มแรกให้รัน 326 . ถัดไปใน chat.py นำเข้า UUID และอัปเดต 334 เส้นทางด้วยรหัสด้านล่าง:


from fastapi import APIRouter, FastAPI, WebSocket, Request, BackgroundTasks, HTTPException
import uuid
# @route POST /token
# @desc Route generating chat token
# @access Public
@chat.post("/token")
async def token_generator(name: str, request: Request):
 if name == "":
 raise HTTPException(status_code=400, detail={
 "loc": "name", "msg": "Enter a valid name"})
 token = str(uuid.uuid4())
 data = {"name": name, "token": token}
 return data

ในโค้ดด้านบน ลูกค้าระบุชื่อของตนซึ่งจำเป็น เราทำการตรวจสอบอย่างรวดเร็วเพื่อให้แน่ใจว่าช่องชื่อไม่ว่างเปล่า จากนั้นสร้างโทเค็นโดยใช้ uuid4

ข้อมูลเซสชันเป็นพจนานุกรมอย่างง่ายสำหรับชื่อและโทเค็น ท้ายที่สุดแล้ว เราจะต้องคงข้อมูลเซสชันนี้ไว้และตั้งค่าการหมดเวลา แต่ตอนนี้เราเพียงส่งคืนข้อมูลดังกล่าวไปยังไคลเอ็นต์

วิธีทดสอบ API กับบุรุษไปรษณีย์

เนื่องจากเราจะทดสอบตำแหน่งข้อมูล WebSocket เราจึงจำเป็นต้องใช้เครื่องมือเช่นบุรุษไปรษณีย์ที่อนุญาต (เนื่องจากเอกสารผยองเริ่มต้นบน FastAPI ไม่รองรับ WebSockets)

ในบุรุษไปรษณีย์ สร้างคอลเลกชันสำหรับสภาพแวดล้อมการพัฒนาของคุณและส่งคำขอ POST ไปที่ 345 การระบุชื่อเป็นพารามิเตอร์การสืบค้นและส่งผ่านค่า คุณควรได้รับคำตอบตามที่แสดงด้านล่าง:

แชทบอท AI ผู้เชี่ยวชาญ:สร้างบอทที่ขับเคลื่อนด้วย GPT อันทรงพลังด้วย Redis และ Python – คู่มือที่ครอบคลุม บุรุษไปรษณีย์สร้างโทเค็น

เว็บซ็อกเก็ตและตัวจัดการการเชื่อมต่อ

ในรูท src ให้สร้างโฟลเดอร์ใหม่ชื่อ 350 และเพิ่มไฟล์ชื่อ 365 . ในไฟล์นี้ เราจะกำหนดคลาสที่ควบคุมการเชื่อมต่อกับ WebSockets ของเรา และวิธีการช่วยเหลือทั้งหมดในการเชื่อมต่อและยกเลิกการเชื่อมต่อ

ใน 377 เพิ่มรหัสด้านล่าง:


from fastapi import WebSocket
class ConnectionManager:
 def __init__(self):
 self.active_connections: List[WebSocket] = []
 async def connect(self, websocket: WebSocket):
 await websocket.accept()
 self.active_connections.append(websocket)
 def disconnect(self, websocket: WebSocket):
 self.active_connections.remove(websocket)
 async def send_personal_message(self, message: str, websocket: WebSocket):
 await websocket.send_text(message)

386 คลาสเริ่มต้นด้วย 399 คุณลักษณะที่เป็นรายการการเชื่อมต่อที่ใช้งานอยู่

จากนั้นอะซิงโครนัส 409 วิธีการจะยอมรับ 413 และเพิ่มลงในรายการการเชื่อมต่อที่ใช้งานอยู่ในขณะที่ 427 วิธีการจะลบ 432 จากรายการการเชื่อมต่อที่ใช้งานอยู่

สุดท้าย 440 วิธีการจะรับข้อความและ 450 เราต้องการส่งข้อความถึงและส่งข้อความแบบอะซิงโครนัส

WebSockets เป็นหัวข้อที่กว้างมาก และเราคัดลอกเฉพาะส่วนต่างๆ ที่นี่เท่านั้น อย่างไรก็ตาม สิ่งนี้ควรจะเพียงพอที่จะสร้างการเชื่อมต่อหลายรายการและจัดการข้อความไปยังการเชื่อมต่อเหล่านั้นแบบอะซิงโครนัส

คุณสามารถอ่านเพิ่มเติมเกี่ยวกับการเขียนโปรแกรม FastAPI Websockets และ Sockets ได้

หากต้องการใช้ 462 นำเข้าและเริ่มต้นภายใน 475 และอัปเดต 488 เส้นทาง WebSocket ด้วยรหัสด้านล่าง:

from ..socket.connection import ConnectionManager
manager = ConnectionManager()
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket):
 await manager.connect(websocket)
 try:
 while True:
 data = await websocket.receive_text()
 print(data)
 await manager.send_personal_message(f"Response: Simulating response from the GPT service", websocket)
 except WebSocketDisconnect:
 manager.disconnect(websocket)

ใน 492 ซึ่งใช้ WebSocket เราจะเพิ่ม websocket ใหม่ให้กับตัวจัดการการเชื่อมต่อและเรียกใช้ 500 เพื่อให้แน่ใจว่าซ็อกเก็ตยังคงเปิดอยู่ ยกเว้นในกรณีที่ปลั๊กไฟถูกถอดออก

ในขณะที่การเชื่อมต่อเปิดอยู่ เราได้รับข้อความใด ๆ ที่ส่งโดยไคลเอนต์ด้วย 512 และพิมพ์ไปที่เทอร์มินัลในตอนนี้

จากนั้นเราจะส่งการตอบกลับแบบฮาร์ดโค้ดกลับไปยังลูกค้าในตอนนี้ ในที่สุดข้อความที่ได้รับจากไคลเอนต์จะถูกส่งไปยังโมเดล AI และการตอบกลับที่ส่งกลับไปยังลูกค้าจะเป็นการตอบกลับจากโมเดล AI

ในบุรุษไปรษณีย์ เราสามารถทดสอบตำแหน่งข้อมูลนี้ได้โดยการสร้างคำขอ WebSocket ใหม่และเชื่อมต่อกับตำแหน่งข้อมูล WebSocket 526 .

เมื่อคุณคลิกเชื่อมต่อ บานหน้าต่างข้อความจะแสดงว่าไคลเอ็นต์ API เชื่อมต่อกับ URL และซ็อกเก็ตเปิดอยู่

หากต้องการทดสอบ ให้ส่งข้อความ "Hello Bot" ไปยังเซิร์ฟเวอร์แชท และคุณควรได้รับการตอบกลับการทดสอบทันที "การตอบสนอง:การจำลองการตอบสนองจากบริการ GPT" ดังที่แสดงด้านล่าง:

แชทบอท AI ผู้เชี่ยวชาญ:สร้างบอทที่ขับเคลื่อนด้วย GPT อันทรงพลังด้วย Redis และ Python – คู่มือที่ครอบคลุม ทดสอบการแชทกับบุรุษไปรษณีย์

การพึ่งพาการฉีดใน FastAPI

เพื่อให้สามารถแยกความแตกต่างระหว่างเซสชันไคลเอ็นต์ที่แตกต่างกันสองเซสชันและจำกัดเซสชันการแชท เราจะใช้โทเค็นแบบกำหนดเวลาซึ่งส่งผ่านเป็นพารามิเตอร์แบบสอบถามไปยังการเชื่อมต่อ WebSocket

ในโฟลเดอร์ซ็อกเก็ต ให้สร้างไฟล์ชื่อ 535 จากนั้นเพิ่มโค้ดด้านล่าง:

from fastapi import WebSocket, status, Query
from typing import Optional
async def get_token(
 websocket: WebSocket,
 token: Optional[str] = Query(None),
):
 if token is None or token == "":
 await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
 return token

ฟังก์ชัน get_token รับ WebSocket และโทเค็น จากนั้นตรวจสอบว่าโทเค็นไม่มีหรือเป็นโมฆะ

หากเป็นกรณีนี้ ฟังก์ชันจะส่งกลับสถานะการละเมิดนโยบาย และหากมี ฟังก์ชันจะส่งคืนโทเค็นเท่านั้น ในที่สุดเราจะขยายฟังก์ชันนี้ในภายหลังด้วยการตรวจสอบโทเค็นเพิ่มเติม

หากต้องการใช้ฟังก์ชันนี้ เราจะแทรกฟังก์ชันนี้ลงใน 541 เส้นทาง FastAPI มีคลาส Depends เพื่อแทรกการขึ้นต่อกันได้อย่างง่ายดาย ดังนั้นเราจึงไม่ต้องปรับแต่งการตกแต่ง

อัปเดต 551 เส้นทางไปยังสิ่งต่อไปนี้:

from ..socket.utils import get_token
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
 await manager.connect(websocket)
 try:
 while True:
 data = await websocket.receive_text()
 print(data)
 await manager.send_personal_message(f"Response: Simulating response from the GPT service", websocket)
 except WebSocketDisconnect:
 manager.disconnect(websocket)

ตอนนี้เมื่อคุณพยายามเชื่อมต่อกับ 567 จุดสิ้นสุดในบุรุษไปรษณีย์ คุณจะได้รับข้อผิดพลาด 403 ระบุโทเค็นเป็นพารามิเตอร์การสืบค้นและระบุค่าใดๆ ให้กับโทเค็นในตอนนี้ จากนั้นคุณควรจะสามารถเชื่อมต่อได้เหมือนเมื่อก่อน เพียงแต่ตอนนี้การเชื่อมต่อต้องใช้โทเค็น

แชทบอท AI ผู้เชี่ยวชาญ:สร้างบอทที่ขับเคลื่อนด้วย GPT อันทรงพลังด้วย Redis และ Python – คู่มือที่ครอบคลุม ทดสอบการแชทบุรุษไปรษณีย์ด้วยโทเค็น

ยินดีด้วยที่ได้มาไกลขนาดนี้! 571ของคุณ ไฟล์ควรมีลักษณะดังนี้:

import os
from fastapi import APIRouter, FastAPI, WebSocket, WebSocketDisconnect, Request, Depends, HTTPException
import uuid
from ..socket.connection import ConnectionManager
from ..socket.utils import get_token
chat = APIRouter()
manager = ConnectionManager()
# @route POST /token
# @desc Route to generate chat token
# @access Public
@chat.post("/token")
async def token_generator(name: str, request: Request):
 token = str(uuid.uuid4())
 if name == "":
 raise HTTPException(status_code=400, detail={
 "loc": "name", "msg": "Enter a valid name"})
 data = {"name": name, "token": token}
 return data
# @route POST /refresh_token
# @desc Route to refresh token
# @access Public
@chat.post("/refresh_token")
async def refresh_token(request: Request):
 return None
# @route Websocket /chat
# @desc Socket for chatbot
# @access Public
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
 await manager.connect(websocket)
 try:
 while True:
 data = await websocket.receive_text()
 print(data)
 await manager.send_personal_message(f"Response: Simulating response from the GPT service", websocket)
 except WebSocketDisconnect:
 manager.disconnect(websocket)

ในส่วนถัดไปของบทช่วยสอนนี้ เราจะเน้นไปที่การจัดการสถานะของแอปพลิเคชันของเราและการส่งข้อมูลระหว่างไคลเอนต์และเซิร์ฟเวอร์

วิธีสร้างระบบเรียลไทม์ด้วย Redis

ขณะนี้แอปพลิเคชันของเราไม่ได้จัดเก็บสถานะใด ๆ และไม่มีวิธีใดที่จะระบุผู้ใช้หรือจัดเก็บและเรียกข้อมูลการแชท นอกจากนี้เรายังส่งคืนการตอบกลับแบบฮาร์ดโค้ดให้กับลูกค้าในระหว่างเซสชันการแชท

ในบทช่วยสอนในส่วนนี้ เราจะกล่าวถึงสิ่งต่อไปนี้:

  • วิธีเชื่อมต่อกับ คลัสเตอร์ Redis ใน Python และตั้งค่า ไคลเอ็นต์ Redis
  • วิธีจัดเก็บและเรียกข้อมูลด้วย Redis JSON
  • วิธีการตั้งค่า สตรีม Redis เป็นคิวข้อความระหว่างเว็บเซิร์ฟเวอร์และสภาพแวดล้อมของผู้ปฏิบัติงาน

Redis และคิวการส่งข้อความแบบกระจาย

Redis คือที่เก็บข้อมูลในหน่วยความจำแบบโอเพ่นซอร์สที่คุณสามารถใช้เป็นฐานข้อมูล แคช นายหน้าข้อความ และกลไกการสตรีมได้ รองรับโครงสร้างข้อมูลจำนวนหนึ่งและเป็นโซลูชันที่สมบูรณ์แบบสำหรับแอปพลิเคชันแบบกระจายที่มีความสามารถแบบเรียลไทม์

Redis Enterprise Cloud คือบริการคลาวด์ที่มีการจัดการเต็มรูปแบบที่ Redis มอบให้ ซึ่งช่วยให้เราปรับใช้คลัสเตอร์ Redis ในขนาดที่ไม่จำกัดโดยไม่ต้องกังวลเรื่องโครงสร้างพื้นฐาน

เราจะใช้อินสแตนซ์ Redis Enterprise Cloud ฟรีสำหรับบทช่วยสอนนี้ คุณสามารถเริ่มต้นใช้งาน Redis Cloud ได้ฟรีที่นี่ และทำตามบทช่วยสอนนี้เพื่อตั้งค่าฐานข้อมูล Redis และ Redis Insight ซึ่งเป็น GUI เพื่อโต้ตอบกับ Redis

เมื่อคุณตั้งค่าฐานข้อมูล Redis แล้ว ให้สร้างโฟลเดอร์ใหม่ในรูทโปรเจ็กต์ (นอกโฟลเดอร์เซิร์ฟเวอร์) ชื่อ 582 .

เราจะแยกสภาพแวดล้อมของผู้ปฏิบัติงานออกจากเว็บเซิร์ฟเวอร์ เพื่อที่ว่าเมื่อลูกค้าส่งข้อความไปยัง WebSocket ของเรา เว็บเซิร์ฟเวอร์จะไม่ต้องจัดการคำขอไปยังบริการของบุคคลที่สาม นอกจากนี้ยังสามารถปล่อยทรัพยากรให้กับผู้ใช้รายอื่นได้อีกด้วย

การสื่อสารเบื้องหลังกับ API การอนุมานได้รับการจัดการโดยบริการสำหรับผู้ปฏิบัติงานนี้ผ่านทาง Redis

คำขอจากไคลเอนต์ที่เชื่อมต่อทั้งหมดจะถูกผนวกเข้ากับคิวข้อความ (ตัวสร้าง) ในขณะที่ผู้ปฏิบัติงานใช้ข้อความ ส่งคำขอไปยัง API การอนุมาน และเพิ่มการตอบกลับต่อคิวการตอบกลับ

เมื่อ API ได้รับการตอบกลับ API จะส่งกลับไปยังไคลเอ็นต์

ในระหว่างการเดินทางระหว่างผู้ผลิตและผู้บริโภค ลูกค้าสามารถส่งข้อความได้หลายข้อความ และข้อความเหล่านี้จะถูกจัดคิวและตอบกลับตามลำดับ

ตามหลักการแล้ว เราอาจให้ผู้ปฏิบัติงานรายนี้ทำงานบนเซิร์ฟเวอร์ที่แตกต่างอย่างสิ้นเชิงในสภาพแวดล้อมของตัวเอง แต่สำหรับตอนนี้ เราจะสร้างสภาพแวดล้อม Python ของตัวเองบนเครื่องของเรา

คุณอาจสงสัยว่า ทำไมเราถึงต้องการคนงาน ลองนึกภาพสถานการณ์ที่เว็บเซิร์ฟเวอร์สร้างคำขอไปยังบริการของบุคคลที่สามด้วย ซึ่งหมายความว่าในขณะที่รอการตอบสนองจากบริการของบุคคลที่สามระหว่างการเชื่อมต่อซ็อกเก็ต เซิร์ฟเวอร์จะถูกบล็อกและทรัพยากรจะถูกเชื่อมโยงจนกว่าจะได้รับการตอบสนองจาก API

คุณสามารถทดลองใช้ได้โดยสร้างการนอนหลับแบบสุ่ม 594 ก่อนที่จะส่งการตอบกลับแบบฮาร์ดโค้ดและส่งข้อความใหม่ จากนั้นลองเชื่อมต่อกับโทเค็นอื่นในเซสชันบุรุษไปรษณีย์ใหม่

คุณจะสังเกตเห็นว่าเซสชันการแชทจะไม่เชื่อมต่อจนกว่าโหมดสลีปแบบสุ่มจะหมดเวลา

แม้ว่าเราจะสามารถใช้เทคนิคอะซิงโครนัสและกลุ่มผู้ปฏิบัติงานในการตั้งค่าเซิร์ฟเวอร์ที่เน้นการใช้งานจริงมากขึ้น แต่นั่นจะไม่เพียงพอเมื่อจำนวนผู้ใช้พร้อมกันเพิ่มขึ้น

ท้ายที่สุดแล้ว เราต้องการหลีกเลี่ยงการผูกมัดทรัพยากรของเว็บเซิร์ฟเวอร์โดยใช้ Redis เพื่อเป็นนายหน้าในการสื่อสารระหว่าง API การแชทของเรากับ API ของบริษัทอื่น

จากนั้นให้เปิดเทอร์มินัลใหม่ ซีดีลงในโฟลเดอร์ผู้ปฏิบัติงาน และสร้างและเปิดใช้งานสภาพแวดล้อมเสมือน Python ใหม่ ซึ่งคล้ายกับที่เราทำในตอนที่ 1

ถัดไป ติดตั้งการอ้างอิงต่อไปนี้:

pip install aiohttp aioredis python-dotenv

วิธีเชื่อมต่อกับคลัสเตอร์ Redis ใน Python ด้วยไคลเอนต์ Redis

เราจะใช้ไคลเอ็นต์ aioredis เพื่อเชื่อมต่อกับฐานข้อมูล Redis นอกจากนี้เรายังใช้ไลบรารีคำขอเพื่อส่งคำขอไปยัง API การอนุมาน Huggingface

สร้างสองไฟล์ 602 และ 618 . จากนั้นสร้างโฟลเดอร์ชื่อ 620 . นอกจากนี้ ให้สร้างโฟลเดอร์ชื่อ 630 และเพิ่มไฟล์ใหม่ชื่อ 649 .

ใน 650 เพิ่มโค้ดต่อไปนี้ – และตรวจสอบให้แน่ใจว่าคุณอัปเดตฟิลด์ด้วยข้อมูลประจำตัวที่ให้ไว้ในคลัสเตอร์ Redis ของคุณ

export REDIS_URL=<REDIS URL PROVIDED IN REDIS CLOUD>
export REDIS_USER=<REDIS USER IN REDIS CLOUD>
export REDIS_PASSWORD=<DATABASE PASSWORD IN REDIS CLOUD>
export REDIS_HOST=<REDIS HOST IN REDIS CLOUD>
export REDIS_PORT=<REDIS PORT IN REDIS CLOUD>

ใน config.py ให้เพิ่มคลาส Redis ด้านล่าง:

import os
from dotenv import load_dotenv
import aioredis
load_dotenv()
class Redis():
 def __init__(self):
 """initialize connection """
 self.REDIS_URL = os.environ['REDIS_URL']
 self.REDIS_PASSWORD = os.environ['REDIS_PASSWORD']
 self.REDIS_USER = os.environ['REDIS_USER']
 self.connection_url = f"redis://{self.REDIS_USER}:{self.REDIS_PASSWORD}@{self.REDIS_URL}"
 async def create_connection(self):
 self.connection = aioredis.from_url(
 self.connection_url, db=0)
 return self.connection

เราสร้างวัตถุ Redis และเริ่มต้นพารามิเตอร์ที่ต้องการจากตัวแปรสภาพแวดล้อม จากนั้นเราสร้างวิธีการอะซิงโครนัส 665 เพื่อสร้างการเชื่อมต่อ Redis และส่งคืนพูลการเชื่อมต่อที่ได้รับจาก 676 วิธีการ 688 .

ต่อไป เราจะทดสอบการเชื่อมต่อ Redis ใน main.py โดยเรียกใช้โค้ดด้านล่าง สิ่งนี้จะสร้างพูลการเชื่อมต่อ Redis ใหม่ ตั้งค่า "คีย์" คีย์ธรรมดา และกำหนดสตริง "ค่า" ให้กับมัน


from src.redis.config import Redis
import asyncio
async def main():
 redis = Redis()
 redis = await redis.create_connection()
 print(redis)
 await redis.set("key", "value")
if __name__ == "__main__":
 asyncio.run(main())

ตอนนี้เปิด Redis Insight (หากคุณทำตามบทช่วยสอนเพื่อดาวน์โหลดและติดตั้ง) คุณจะเห็นสิ่งนี้:

แชทบอท AI ผู้เชี่ยวชาญ:สร้างบอทที่ขับเคลื่อนด้วย GPT อันทรงพลังด้วย Redis และ Python – คู่มือที่ครอบคลุม การทดสอบข้อมูลเชิงลึก Redis

วิธีทำงานกับสตรีม Redis

ตอนนี้เรามีการตั้งค่าสภาพแวดล้อมของผู้ปฏิบัติงานแล้ว เราสามารถสร้างผู้ผลิตบนเว็บเซิร์ฟเวอร์และผู้บริโภคบนผู้ปฏิบัติงานได้

ก่อนอื่น มาสร้างคลาส Redis ของเราอีกครั้งบนเซิร์ฟเวอร์ ใน 693 สร้างโฟลเดอร์ชื่อ 706 และเพิ่มสองไฟล์ 716 และ 729 .

ใน 730 ให้เพิ่มโค้ดด้านล่างเหมือนกับที่เราทำกับสภาพแวดล้อมของผู้ปฏิบัติงาน:

import os
from dotenv import load_dotenv
import aioredis
load_dotenv()
class Redis():
 def __init__(self):
 """initialize connection """
 self.REDIS_URL = os.environ['REDIS_URL']
 self.REDIS_PASSWORD = os.environ['REDIS_PASSWORD']
 self.REDIS_USER = os.environ['REDIS_USER']
 self.connection_url = f"redis://{self.REDIS_USER}:{self.REDIS_PASSWORD}@{self.REDIS_URL}"
 async def create_connection(self):
 self.connection = aioredis.from_url(
 self.connection_url, db=0)
 return self.connection

ในไฟล์ .env ให้เพิ่มข้อมูลประจำตัว Redis ด้วย:

export REDIS_URL=<REDIS URL PROVIDED IN REDIS CLOUD>
export REDIS_USER=<REDIS USER IN REDIS CLOUD>
export REDIS_PASSWORD=<DATABASE PASSWORD IN REDIS CLOUD>
export REDIS_HOST=<REDIS HOST IN REDIS CLOUD>
export REDIS_PORT=<REDIS PORT IN REDIS CLOUD>

สุดท้ายใน 740 เพิ่มรหัสต่อไปนี้:


from .config import Redis
class Producer:
 def __init__(self, redis_client):
 self.redis_client = redis_client
 async def add_to_stream(self, data: dict, stream_channel):
 try:
 msg_id = await self.redis_client.xadd(name=stream_channel, id="*", fields=data)
 print(f"Message id {msg_id} added to {stream_channel} stream")
 return msg_id
 except Exception as e:
 print(f"Error sending msg to stream => {e}")

เราสร้างคลาส Producer ที่เริ่มต้นด้วยไคลเอนต์ Redis เราใช้ไคลเอ็นต์นี้เพื่อเพิ่มข้อมูลลงในสตรีมด้วย 758 ซึ่งรับข้อมูลและชื่อช่อง Redis

คำสั่ง Redis สำหรับการเพิ่มข้อมูลลงในช่องสตรีมคือ 763 และมีทั้งฟังก์ชันระดับสูงและระดับต่ำใน aioredis

ถัดไป หากต้องการเรียกใช้ Producer ที่สร้างขึ้นใหม่ ให้อัปเดต 776 และ WebSocket 785 จุดสิ้นสุดเช่นด้านล่าง สังเกตชื่อช่องที่อัปเดต 792 .


from ..redis.producer import Producer
from ..redis.config import Redis
chat = APIRouter()
manager = ConnectionManager()
redis = Redis()
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
 await manager.connect(websocket)
 redis_client = await redis.create_connection()
 producer = Producer(redis_client)
 try:
 while True:
 data = await websocket.receive_text()
 print(data)
 stream_data = {}
 stream_data[token] = data
 await producer.add_to_stream(stream_data, "message_channel")
 await manager.send_personal_message(f"Response: Simulating response from the GPT service", websocket)
 except WebSocketDisconnect:
 manager.disconnect(websocket)

จากนั้น ในบุรุษไปรษณีย์ ให้สร้างการเชื่อมต่อและส่งข้อความจำนวนเท่าใดก็ได้ที่ระบุว่า 808 . คุณควรพิมพ์ข้อความสตรีมไปที่เทอร์มินัลดังนี้:

แชทบอท AI ผู้เชี่ยวชาญ:สร้างบอทที่ขับเคลื่อนด้วย GPT อันทรงพลังด้วย Redis และ Python – คู่มือที่ครอบคลุม การทดสอบข้อความช่องสัญญาณเทอร์มินัล

ใน Redis Insight คุณจะเห็น 819 ใหม่ สร้างและคิวประทับเวลาที่เต็มไปด้วยข้อความที่ส่งจากไคลเอนต์ คิวที่ประทับเวลานี้มีความสำคัญต่อการรักษาลำดับของข้อความ

แชทบอท AI ผู้เชี่ยวชาญ:สร้างบอทที่ขับเคลื่อนด้วย GPT อันทรงพลังด้วย Redis และ Python – คู่มือที่ครอบคลุม ช่อง Redis Insight

วิธีสร้างโมเดลข้อมูลแชท

ต่อไป เราจะสร้างแบบจำลองสำหรับข้อความแชทของเรา โปรดจำไว้ว่าเรากำลังส่งข้อมูลข้อความผ่าน WebSockets แต่ข้อมูลการแชทของเราต้องเก็บข้อมูลมากกว่าแค่ข้อความ เราจำเป็นต้องประทับเวลาเมื่อมีการส่งแชท สร้าง ID สำหรับแต่ละข้อความ และรวบรวมข้อมูลเกี่ยวกับเซสชันแชท จากนั้นจัดเก็บข้อมูลนี้ในรูปแบบ JSON

เราสามารถจัดเก็บข้อมูล JSON นี้ไว้ใน Redis เพื่อที่เราจะได้ไม่สูญเสียประวัติการแชทเมื่อการเชื่อมต่อขาดหาย เนื่องจาก WebSocket ของเราจะไม่จัดเก็บสถานะ

ใน 823 สร้างโฟลเดอร์ใหม่ชื่อ 838 . จากนั้นสร้างไฟล์ชื่อ 849 ใน 852 เพิ่มรหัสต่อไปนี้:

from datetime import datetime
from pydantic import BaseModel
from typing import List, Optional
import uuid
class Message(BaseModel):
 id = uuid.uuid4()
 msg: str
 timestamp = str(datetime.now())
class Chat(BaseModel):
 token: str
 messages: List[Message]
 name: str
 session_start = str(datetime.now())

เรากำลังใช้ 861 ของ Pydantic คลาสเพื่อสร้างโมเดลข้อมูลการแชท 878 class จะเก็บข้อมูลเกี่ยวกับเซสชันแชทเดียว จะจัดเก็บโทเค็น ชื่อผู้ใช้ และการประทับเวลาที่สร้างขึ้นโดยอัตโนมัติสำหรับเวลาเริ่มต้นเซสชันการแชทโดยใช้ 880 .

ข้อความที่ส่งและรับภายในเซสชันการแชทนี้จะถูกเก็บไว้ที่ 895 คลาสที่สร้างรหัสแชทได้ทันทีโดยใช้ 905 . ข้อมูลเดียวที่เราจำเป็นต้องระบุเมื่อเริ่มต้น 910 นี้ class คือข้อความ

วิธีทำงานกับ Redis JSON

ในการใช้ความสามารถของ Redis JSON ในการจัดเก็บประวัติการแชทของเรา เราจำเป็นต้องติดตั้ง rejson ที่แล็บ Redis มอบให้

ในเทอร์มินัลให้ cd ลงใน 922 และติดตั้ง rejson ด้วย 934 . จากนั้นอัปเดต 940 ของคุณ คลาสใน 954 เพื่อรวม 960 วิธีการ:


import os
from dotenv import load_dotenv
import aioredis
from rejson import Client
load_dotenv()
class Redis():
 def __init__(self):
 """initialize connection """
 self.REDIS_URL = os.environ['REDIS_URL']
 self.REDIS_PASSWORD = os.environ['REDIS_PASSWORD']
 self.REDIS_USER = os.environ['REDIS_USER']
 self.connection_url = f"redis://{self.REDIS_USER}:{self.REDIS_PASSWORD}@{self.REDIS_URL}"
 self.REDIS_HOST = os.environ['REDIS_HOST']
 self.REDIS_PORT = os.environ['REDIS_PORT']
 async def create_connection(self):
 self.connection = aioredis.from_url(
 self.connection_url, db=0)
 return self.connection
 def create_rejson_connection(self):
 self.redisJson = Client(host=self.REDIS_HOST,
 port=self.REDIS_PORT, decode_responses=True, username=self.REDIS_USER, password=self.REDIS_PASSWORD)
 return self.redisJson

เรากำลังเพิ่ม 974 วิธีเชื่อมต่อกับ Redis ด้วย rejson 988 . สิ่งนี้ทำให้เรามีวิธีในการสร้างและจัดการข้อมูล JSON ใน Redis ซึ่งไม่สามารถใช้ได้กับ aioredis

ถัดไปใน 993 เราสามารถอัปเดต 1008 ได้ จุดสิ้นสุดเพื่อสร้าง 1010 ใหม่ อินสแตนซ์และจัดเก็บข้อมูลเซสชันใน Redis JSON ดังนี้:

@chat.post("/token")
async def token_generator(name: str, request: Request):
 token = str(uuid.uuid4())
 if name == "":
 raise HTTPException(status_code=400, detail={
 "loc": "name", "msg": "Enter a valid name"})
 # Create new chat session
 json_client = redis.create_rejson_connection()
 chat_session = Chat(
 token=token,
 messages=[],
 name=name
 )
 # Store chat session in redis JSON with the token as key
 json_client.jsonset(str(token), Path.rootPath(), chat_session.dict())
 # Set a timeout for redis data
 redis_client = await redis.create_connection()
 await redis_client.expire(str(token), 3600)
 return chat_session.dict()

หมายเหตุ:เนื่องจากนี่คือแอปสาธิต ฉันจึงไม่ต้องการจัดเก็บข้อมูลการแชทใน Redis นานเกินไป ดังนั้นฉันจึงเพิ่มการหมดเวลา 60 นาทีบนโทเค็นโดยใช้ไคลเอนต์ aioredis (rejson ไม่ได้ใช้การหมดเวลา) ซึ่งหมายความว่าหลังจากผ่านไป 60 นาที ข้อมูลเซสชันการแชทจะหายไป

นี่เป็นสิ่งจำเป็นเนื่องจากเราไม่ได้ตรวจสอบสิทธิ์ผู้ใช้ และเราต้องการถ่ายโอนข้อมูลแชทหลังจากระยะเวลาที่กำหนด ขั้นตอนนี้เป็นทางเลือก และคุณไม่จำเป็นต้องรวมไว้

ถัดไปในบุรุษไปรษณีย์ เมื่อคุณส่งคำขอ POST เพื่อสร้างโทเค็นใหม่ คุณจะได้รับคำตอบที่มีโครงสร้างเหมือนกับด้านล่างนี้ คุณยังสามารถตรวจสอบ Redis Insight เพื่อดูข้อมูลแชทของคุณที่จัดเก็บโดยมีโทเค็นเป็นคีย์ JSON และข้อมูลเป็นค่า

แชทบอท AI ผู้เชี่ยวชาญ:สร้างบอทที่ขับเคลื่อนด้วย GPT อันทรงพลังด้วย Redis และ Python – คู่มือที่ครอบคลุม ตัวสร้างโทเค็นอัปเดตแล้ว

วิธีอัปเดตการพึ่งพาโทเค็น

ขณะนี้เราได้สร้างและจัดเก็บโทเค็นแล้ว นี่เป็นเวลาที่ดีในการอัปเดต 1029 การพึ่งพาใน 1032 ของเรา เว็บซ็อกเก็ต เราทำเช่นนี้เพื่อตรวจสอบโทเค็นที่ถูกต้องก่อนเริ่มเซสชันการแชท

ใน 1049 อัปเดต 1053 ฟังก์ชั่นเพื่อตรวจสอบว่าโทเค็นมีอยู่ในอินสแตนซ์ Redis หรือไม่ หากเป็นเช่นนั้น เราจะส่งคืนโทเค็น ซึ่งหมายความว่าการเชื่อมต่อซ็อกเก็ตนั้นถูกต้อง หากไม่มีเราจะปิดการเชื่อมต่อ

โทเค็นที่สร้างโดย 1060 จะหยุดอยู่หลังจาก 60 นาที ดังนั้นเราจึงมีตรรกะง่ายๆ ในส่วนหน้าเพื่อเปลี่ยนเส้นทางผู้ใช้เพื่อสร้างโทเค็นใหม่ หากมีการสร้างการตอบสนองข้อผิดพลาดขณะพยายามเริ่มแชท


from ..redis.config import Redis
async def get_token(
 websocket: WebSocket,
 token: Optional[str] = Query(None),
):
 if token is None or token == "":
 await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
 redis_client = await redis.create_connection()
 isexists = await redis_client.exists(token)
 if isexists == 1:
 return token
 else:
 await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason="Session not authenticated or expired token")

หากต้องการทดสอบการขึ้นต่อกัน ให้เชื่อมต่อกับเซสชันแชทด้วยโทเค็นแบบสุ่มที่เราใช้ และคุณควรได้รับข้อผิดพลาด 403 (โปรดทราบว่าคุณต้องลบโทเค็นใน Redis Insight ด้วยตนเอง)

ตอนนี้คัดลอกโทเค็นที่สร้างขึ้นเมื่อคุณส่งคำขอโพสต์ไปที่ 1072 จุดสิ้นสุด (หรือสร้างคำขอใหม่) และวางเป็นค่าให้กับพารามิเตอร์การสืบค้นโทเค็นที่ต้องการโดย 1084 เว็บซ็อกเก็ต จากนั้นเชื่อมต่อ คุณควรจะเชื่อมต่อได้สำเร็จ

แชทบอท AI ผู้เชี่ยวชาญ:สร้างบอทที่ขับเคลื่อนด้วย GPT อันทรงพลังด้วย Redis และ Python – คู่มือที่ครอบคลุม เซสชันสนทนาด้วยโทเค็น

เมื่อนำมารวมกันแล้ว chat.py ของคุณควรมีลักษณะดังนี้


import os
from fastapi import APIRouter, FastAPI, WebSocket, WebSocketDisconnect, Request, Depends
import uuid
from ..socket.connection import ConnectionManager
from ..socket.utils import get_token
import time
from ..redis.producer import Producer
from ..redis.config import Redis
from ..schema.chat import Chat
from rejson import Path
chat = APIRouter()
manager = ConnectionManager()
redis = Redis()
# @route POST /token
# @desc Route to generate chat token
# @access Public
@chat.post("/token")
async def token_generator(name: str, request: Request):
 token = str(uuid.uuid4())
 if name == "":
 raise HTTPException(status_code=400, detail={
 "loc": "name", "msg": "Enter a valid name"})
 # Create nee chat session
 json_client = redis.create_rejson_connection()
 chat_session = Chat(
 token=token,
 messages=[],
 name=name
 )
 print(chat_session.dict())
 # Store chat session in redis JSON with the token as key
 json_client.jsonset(str(token), Path.rootPath(), chat_session.dict())
 # Set a timeout for redis data
 redis_client = await redis.create_connection()
 await redis_client.expire(str(token), 3600)
 return chat_session.dict()
# @route POST /refresh_token
# @desc Route to refresh token
# @access Public
@chat.post("/refresh_token")
async def refresh_token(request: Request):
 return None
# @route Websocket /chat
# @desc Socket for chat bot
# @access Public
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
 await manager.connect(websocket)
 redis_client = await redis.create_connection()
 producer = Producer(redis_client)
 json_client = redis.create_rejson_connection()
 try:
 while True:
 data = await websocket.receive_text()
 stream_data = {}
 stream_data[token] = data
 await producer.add_to_stream(stream_data, "message_channel")
 await manager.send_personal_message(f"Response: Simulating response from the GPT service", websocket)
 except WebSocketDisconnect:
 manager.disconnect(websocket)

ทำได้ดีมากที่มาถึงขนาดนี้! ในส่วนถัดไป เราจะมุ่งเน้นไปที่การสื่อสารกับโมเดล AI และการจัดการการถ่ายโอนข้อมูลระหว่างไคลเอนต์ เซิร์ฟเวอร์ ผู้ปฏิบัติงาน และ API ภายนอก

วิธีเพิ่มความฉลาดให้กับ Chatbots ด้วยโมเดล AI

ในส่วนนี้ เราจะมุ่งเน้นไปที่การสร้าง Wrapper เพื่อสื่อสารกับโมเดล Transformer ส่งข้อความแจ้งจากผู้ใช้ไปยัง API ในรูปแบบการสนทนา และรับและแปลงการตอบกลับสำหรับแอปพลิเคชันแชทของเรา

วิธีการเริ่มต้นใช้งาน Huggingface

เราจะไม่สร้างหรือปรับใช้โมเดลภาษาใดๆ บน Hugginface แต่เราจะมุ่งเน้นไปที่การใช้ API การอนุมานแบบเร่งของ Huggingface เพื่อเชื่อมต่อกับโมเดลที่ได้รับการฝึกอบรมล่วงหน้าแทน

โมเดลที่เราจะใช้คือรุ่น GPT-J-6B จัดทำโดย EleutherAI เป็นโมเดลภาษาเชิงกำเนิดซึ่งได้รับการฝึกฝนด้วยพารามิเตอร์ 6 พันล้านพารามิเตอร์

Huggingface ให้ API แบบจำกัดตามความต้องการแก่เราเพื่อเชื่อมต่อกับโมเดลนี้โดยไม่มีค่าใช้จ่าย

ในการเริ่มต้นกับ Huggingface ให้สร้างบัญชีฟรี ในการตั้งค่าของคุณ ให้สร้างโทเค็นเพื่อการเข้าถึงใหม่ สำหรับโทเค็นสูงสุด 30,000 รายการ Huggingface ให้การเข้าถึง API การอนุมานได้ฟรี

คุณสามารถตรวจสอบการใช้งาน API ของคุณได้ที่นี่ ตรวจสอบให้แน่ใจว่าคุณเก็บโทเค็นนี้ไว้อย่างปลอดภัยและอย่าเปิดเผยต่อสาธารณะ

หมายเหตุ:เราจะใช้การเชื่อมต่อ HTTP เพื่อสื่อสารกับ API เนื่องจากเราใช้บัญชีฟรี แต่บัญชี PRO Huggingface รองรับการสตรีมด้วย WebSockets ดูการทำงานแบบขนานและแบบแบตช์

ซึ่งสามารถช่วยปรับปรุงเวลาตอบสนองระหว่างโมเดลและแอปพลิเคชันแชทของเราได้อย่างมาก และหวังว่าจะกล่าวถึงวิธีนี้ในบทความติดตามผล

วิธีการโต้ตอบกับโมเดลภาษา

ขั้นแรก เราเพิ่มข้อมูลรับรองการเชื่อมต่อ Huggingface ให้กับไฟล์ .env ภายในไดเร็กทอรีผู้ปฏิบัติงานของเรา

export HUGGINFACE_INFERENCE_TOKEN=<HUGGINGFACE ACCESS TOKEN>
export MODEL_URL=https://api-inference.huggingface.co/models/EleutherAI/gpt-j-6B

ถัดไปใน 1093 สร้างโฟลเดอร์ชื่อ 1105 จากนั้นเพิ่มไฟล์ 1118 . จากนั้นเพิ่มคลาส GPT ด้านล่าง:

import os
from dotenv import load_dotenv
import requests
import json
load_dotenv()
class GPT:
 def __init__(self):
 self.url = os.environ.get('MODEL_URL')
 self.headers = {
 "Authorization": f"Bearer {os.environ.get('HUGGINFACE_INFERENCE_TOKEN')}"}
 self.payload = {
 "inputs": "",
 "parameters": {
 "return_full_text": False,
 "use_cache": True,
 "max_new_tokens": 25
 }
 }
 def query(self, input: str) -> list:
 self.payload["inputs"] = input
 data = json.dumps(self.payload)
 response = requests.request(
 "POST", self.url, headers=self.headers, data=data)
 print(json.loads(response.content.decode("utf-8")))
 return json.loads(response.content.decode("utf-8"))
if __name__ == "__main__":
 GPT().query("Will artificial intelligence help humanity conquer the universe?")

1121 คลาสเริ่มต้นด้วยโมเดล Huggingface 1133 , การรับรองความถูกต้อง 1143 และกำหนดไว้ล่วงหน้า 1155 . แต่อินพุตเพย์โหลดเป็นฟิลด์ไดนามิกที่ระบุโดย 1169 และอัปเดตก่อนที่เราจะส่งคำขอไปยังจุดสิ้นสุด Huggingface

สุดท้ายนี้ เราทดสอบสิ่งนี้โดยการเรียกใช้วิธีการสืบค้นบนอินสแตนซ์ของคลาส GPT โดยตรง ในเทอร์มินัล ให้รัน 1178 และคุณควรได้รับคำตอบเช่นนี้ (เพียงจำไว้ว่าคำตอบของคุณจะแตกต่างไปจากนี้อย่างแน่นอน):

[{'generated_text': ' (AI) could solve all the problems on this planet? I am of the opinion that in the short term artificial intelligence is much better than human beings, but in the long and distant future human beings will surpass artificial intelligence.\n\nIn the distant'}]

ต่อไป เราจะเพิ่มการปรับแต่งบางอย่างให้กับอินพุตเพื่อทำให้การโต้ตอบกับโมเดลมีการสนทนากันมากขึ้นโดยการเปลี่ยนรูปแบบของอินพุต

อัปเดต 1184 คลาสเช่นนั้น:


class GPT:
 def __init__(self):
 self.url = os.environ.get('MODEL_URL')
 self.headers = {
 "Authorization": f"Bearer {os.environ.get('HUGGINFACE_INFERENCE_TOKEN')}"}
 self.payload = {
 "inputs": "",
 "parameters": {
 "return_full_text": False,
 "use_cache": False,
 "max_new_tokens": 25
 }
 }
 def query(self, input: str) -> list:
 self.payload["inputs"] = f"Human: {input} Bot:"
 data = json.dumps(self.payload)
 response = requests.request(
 "POST", self.url, headers=self.headers, data=data)
 data = json.loads(response.content.decode("utf-8"))
 text = data[0]['generated_text']
 res = str(text.split("Human:")[0]).strip("\n").strip()
 return res
if __name__ == "__main__":
 GPT().query("Will artificial intelligence help humanity conquer the universe?")

เราอัปเดตอินพุตด้วยสตริงลิเทอรัล 1194 . อินพุตของมนุษย์จะถูกวางไว้ในสตริงและบอทจะจัดเตรียมการตอบกลับ รูปแบบอินพุตนี้จะเปลี่ยน GPT-J6B ให้เป็นโมเดลการสนทนา การเปลี่ยนแปลงอื่นๆ ที่คุณอาจสังเกตเห็น ได้แก่

  • use_cache:คุณสามารถทำให้เป็นเท็จได้ หากคุณต้องการให้โมเดลสร้างการตอบสนองใหม่เมื่ออินพุตเหมือนกัน ฉันขอแนะนำให้ปล่อยให้สิ่งนี้เป็น True ในการใช้งานจริงเพื่อป้องกันไม่ให้โทเค็นฟรีของคุณหมดลง หากผู้ใช้เพียงแค่ส่งสแปมบอทด้วยข้อความเดียวกัน การใช้แคชไม่ได้โหลดการตอบสนองใหม่จากโมเดลจริงๆ
  • return_full_text:เป็นเท็จ เนื่องจากเราไม่จำเป็นต้องส่งคืนอินพุต เรามีอยู่แล้ว เมื่อเราได้รับคำตอบ เราจะตัด "Bot:" และช่องว่างนำหน้า/ต่อท้ายออกจากคำตอบ และส่งกลับเฉพาะข้อความตอบกลับ

วิธีจำลองหน่วยความจำระยะสั้นสำหรับโมเดล AI

สำหรับทุกข้อมูลใหม่ที่เราส่งไปยังโมเดล ไม่มีทางที่โมเดลจะจดจำประวัติการสนทนาได้ นี่เป็นสิ่งสำคัญหากเราต้องการเก็บบริบทในการสนทนา

แต่โปรดจำไว้ว่าเมื่อจำนวนโทเค็นที่เราส่งไปยังโมเดลเพิ่มขึ้น การประมวลผลก็จะมีราคาแพงขึ้น และเวลาตอบสนองก็นานขึ้นด้วย

เราจึงต้องหาวิธีดึงประวัติระยะสั้นแล้วส่งไปที่โมเดล นอกจากนี้เรายังต้องหาจุดที่น่าสนใจด้วย - เราต้องการดึงข้อมูลในอดีตและส่งไปยังโมเดลจำนวนเท่าใด

ในการจัดการประวัติการแชท เราต้องถอยกลับไปที่ฐานข้อมูล JSON ของเรา เราจะใช้ 1201 เพื่อรับข้อมูลแชทล่าสุด จากนั้นเมื่อเราได้รับคำตอบ ให้เพิ่มการตอบกลับเข้ากับฐานข้อมูล JSON

อัปเดต 1212 เพื่อรวม 1225 วิธีการ นอกจากนี้ ให้อัปเดตไฟล์ .env ด้วยข้อมูลการตรวจสอบสิทธิ์ และตรวจสอบให้แน่ใจว่าติดตั้ง rejson แล้ว

1237 ของคุณ ควรมีลักษณะเช่นนี้:


import os
from dotenv import load_dotenv
import aioredis
from rejson import Client
load_dotenv()
class Redis():
 def __init__(self):
 """initialize connection """
 self.REDIS_URL = os.environ['REDIS_URL']
 self.REDIS_PASSWORD = os.environ['REDIS_PASSWORD']
 self.REDIS_USER = os.environ['REDIS_USER']
 self.connection_url = f"redis://{self.REDIS_USER}:{self.REDIS_PASSWORD}@{self.REDIS_URL}"
 self.REDIS_HOST = os.environ['REDIS_HOST']
 self.REDIS_PORT = os.environ['REDIS_PORT']
 async def create_connection(self):
 self.connection = aioredis.from_url(
 self.connection_url, db=0)
 return self.connection
 def create_rejson_connection(self):
 self.redisJson = Client(host=self.REDIS_HOST,
 port=self.REDIS_PORT, decode_responses=True, username=self.REDIS_USER, password=self.REDIS_PASSWORD)
 return self.redisJson

ในขณะที่ไฟล์ .env ของคุณควรมีลักษณะดังนี้:

export REDIS_URL=<REDIS URL PROVIDED IN REDIS CLOUD>
export REDIS_USER=<REDIS USER IN REDIS CLOUD>
export REDIS_PASSWORD=<DATABASE PASSWORD IN REDIS CLOUD>
export REDIS_HOST=<REDIS HOST IN REDIS CLOUD>
export REDIS_PORT=<REDIS PORT IN REDIS CLOUD>
export HUGGINFACE_INFERENCE_TOKEN=<HUGGINGFACE ACCESS TOKEN>
export MODEL_URL=https://api-inference.huggingface.co/models/EleutherAI/gpt-j-6B

ถัดไปใน 1245 สร้างไฟล์ใหม่ชื่อ 1251 และเพิ่มโค้ดด้านล่าง:

from .config import Redis
from rejson import Path
class Cache:
 def __init__(self, json_client):
 self.json_client = json_client
 async def get_chat_history(self, token: str):
 data = self.json_client.jsonget(
 str(token), Path.rootPath())
 return data

แคชเริ่มต้นได้ด้วยไคลเอนต์ rejson และวิธีการ 1260 รับโทเค็นเพื่อรับประวัติการแชทสำหรับโทเค็นนั้นจาก Redis ตรวจสอบให้แน่ใจว่าคุณนำเข้าวัตถุ Path จาก rejson

ต่อไป ให้อัพเดต 1270 ด้วยรหัสด้านล่าง:

from src.redis.config import Redis
import asyncio
from src.model.gptj import GPT
from src.redis.cache import Cache
redis = Redis()
async def main():
 json_client = redis.create_rejson_connection()
 data = await Cache(json_client).get_chat_history(token="18196e23-763b-4808-ae84-064348a0daff")
 print(data)
if __name__ == "__main__":
 asyncio.run(main())

ฉันได้ฮาร์ดโค้ดโทเค็นตัวอย่างที่สร้างจากการทดสอบครั้งก่อนในบุรุษไปรษณีย์ หากคุณไม่ได้สร้างโทเค็น เพียงส่งคำขอใหม่ไปที่ 1280 และคัดลอกโทเค็น จากนั้นเรียกใช้ 1291 ในอาคารผู้โดยสาร คุณควรเห็นข้อมูลในเทอร์มินัลดังนี้:

{'token': '18196e23-763b-4808-ae84-064348a0daff', 'messages': [], 'name': 'Stephen', 'session_start': '2022-07-16 13:20:01.092109'}

ต่อไปเราต้องเพิ่ม 1308 วิธีการ 1319 ของเรา คลาสที่เพิ่มข้อความไปยัง Redis สำหรับโทเค็นเฉพาะ


 async def add_message_to_cache(self, token: str, message_data: dict):
 self.json_client.jsonarrappend(
 str(token), Path('.messages'), message_data)

1321 วิธีการจัดทำโดย rejson จะเพิ่มข้อความใหม่ต่อท้ายอาร์เรย์ข้อความ

โปรดทราบว่าในการเข้าถึงอาร์เรย์ข้อความ เราจำเป็นต้องระบุ 1334 เป็นข้อโต้แย้งต่อเส้นทาง หากข้อมูลข้อความของคุณมีโครงสร้างที่แตกต่าง/ซ้อนกัน เพียงระบุเส้นทางไปยังอาร์เรย์ที่คุณต้องการผนวกข้อมูลใหม่เข้าไป

หากต้องการทดสอบวิธีนี้ ให้อัปเดตฟังก์ชันหลักในไฟล์ main.py ด้วยโค้ดด้านล่าง:

async def main():
 json_client = redis.create_rejson_connection()
 await Cache(json_client).add_message_to_cache(token="18196e23-763b-4808-ae84-064348a0daff", message_data={
 "id": "1",
 "msg": "Hello",
 "timestamp": "2022-07-16 13:20:01.092109"
 })
 data = await Cache(json_client).get_chat_history(token="18196e23-763b-4808-ae84-064348a0daff")
 print(data)

เรากำลังส่งข้อความฮาร์ดโค้ดไปยังแคช และรับประวัติการแชทจากแคช เมื่อคุณเรียกใช้ 1349 ในเทอร์มินัลภายในไดเร็กทอรีของผู้ปฏิบัติงาน คุณควรได้รับข้อความลักษณะนี้พิมพ์ในเทอร์มินัล โดยมีข้อความเพิ่มลงในอาร์เรย์ข้อความ

{'token': '18196e23-763b-4808-ae84-064348a0daff', 'messages': [{'id': '1', 'msg': 'Hello', 'timestamp': '2022-07-16 13:20:01.092109'}], 'name': 'Stephen', 'session_start': '2022-07-16 13:20:01.092109'}

สุดท้ายนี้ เราจำเป็นต้องอัปเดตฟังก์ชันหลักเพื่อส่งข้อมูลข้อความไปยังโมเดล GPT และอัปเดตอินพุตด้วย 4 ตัวสุดท้าย ข้อความที่ส่งระหว่างไคลเอนต์และโมเดล

ก่อนอื่น มาอัปเดต 1359 ของเรากันก่อน ทำงานด้วยอาร์กิวเมนต์ "แหล่งที่มา" ใหม่ที่จะแจ้งให้เราทราบว่าข้อความนั้นเป็นมนุษย์หรือบอท We can then use this arg to add the "Human:" or "Bot:" tags to the data before storing it in the cache.

Update the 1362 method in the Cache class like so:

 async def add_message_to_cache(self, token: str, source: str, message_data: dict):
 if source == "human":
 message_data['msg'] = "Human: " + (message_data['msg'])
 elif source == "bot":
 message_data['msg'] = "Bot: " + (message_data['msg'])
 self.json_client.jsonarrappend(
 str(token), Path('.messages'), message_data)

Then update the main function in main.py in the worker directory, and run 1370 to see the new results in the Redis database.

async def main():
 json_client = redis.create_rejson_connection()
 await Cache(json_client).add_message_to_cache(token="18196e23-763b-4808-ae84-064348a0daff", source="human", message_data={
 "id": "1",
 "msg": "Hello",
 "timestamp": "2022-07-16 13:20:01.092109"
 })
 data = await Cache(json_client).get_chat_history(token="18196e23-763b-4808-ae84-064348a0daff")
 print(data)

Next, we need to update the main function to add new messages to the cache, read the previous 4 messages from the cache, and then make an API call to the model using the query method. It'll have a payload consisting of a composite string of the last 4 messages.

You can always tune the number of messages in the history you want to extract, but I think 4 messages is a pretty good number for a demo.

In 1381 , create a new folder schema. Then create a new file named 1399 and paste our message schema in chat.py like so:

from datetime import datetime
from pydantic import BaseModel
from typing import List, Optional
import uuid
class Message(BaseModel):
 id = str(uuid.uuid4())
 msg: str
 timestamp = str(datetime.now())

Next, update the main.py file like below:

async def main():
 json_client = redis.create_rejson_connection()
 await Cache(json_client).add_message_to_cache(token="18196e23-763b-4808-ae84-064348a0daff", source="human", message_data={
 "id": "3",
 "msg": "I would like to go to the moon to, would you take me?",
 "timestamp": "2022-07-16 13:20:01.092109"
 })
 data = await Cache(json_client).get_chat_history(token="18196e23-763b-4808-ae84-064348a0daff")
 print(data)
 message_data = data['messages'][-4:]
 input = ["" + i['msg'] for i in message_data]
 input = " ".join(input)
 res = GPT().query(input=input)
 msg = Message(
 msg=res
 )
 print(msg)
 await Cache(json_client).add_message_to_cache(token="18196e23-763b-4808-ae84-064348a0daff", source="bot", message_data=msg.dict())

In the code above, we add new message data to the cache. This message will ultimately come from the message queue. Next we get the chat history from the cache, which will now include the most recent data we added.

Note that we are using the same hard-coded token to add to the cache and get from the cache, temporarily just to test this out.

Next, we trim off the cache data and extract only the last 4 items. Then we consolidate the input data by extracting the msg in a list and join it to an empty string.

Finally, we create a new Message instance for the bot response and add the response to the cache specifying the source as "bot"

Next, run 1404 a couple of times, changing the human message and id as desired with each run. You should have a full conversation input and output with the model.

Open Redis Insight and you should have something similar to the below:

แชทบอท AI ผู้เชี่ยวชาญ:สร้างบอทที่ขับเคลื่อนด้วย GPT อันทรงพลังด้วย Redis และ Python – คู่มือที่ครอบคลุม Conversational Chat

Stream Consumer and Real-time Data Pull from the Message Queue

Next, we want to create a consumer and update our 1419 to connect to the message queue. We want it to pull the token data in real-time, as we are currently hard-coding the tokens and message inputs.

In 1425 create a new file named 1435 . Add a 1441 class with the code below:

class StreamConsumer:
 def __init__(self, redis_client):
 self.redis_client = redis_client
 async def consume_stream(self, count: int, block: int, stream_channel):
 response = await self.redis_client.xread(
 streams={stream_channel: '0-0'}, count=count, block=block)
 return response
 async def delete_message(self, stream_channel, message_id):
 await self.redis_client.xdel(stream_channel, message_id)

The 1459 class is initialized with a Redis client. The 1463 method pulls a new message from the queue from the message channel, using the 1471 method provided by aioredis.

Next, update the 1489 file with a while loop to keep the connection to the message channel alive, like so:


from src.redis.config import Redis
import asyncio
from src.model.gptj import GPT
from src.redis.cache import Cache
from src.redis.config import Redis
from src.redis.stream import StreamConsumer
import os
from src.schema.chat import Message
redis = Redis()
async def main():
 json_client = redis.create_rejson_connection()
 redis_client = await redis.create_connection()
 consumer = StreamConsumer(redis_client)
 cache = Cache(json_client)
 print("Stream consumer started")
 print("Stream waiting for new messages")
 while True:
 response = await consumer.consume_stream(stream_channel="message_channel", count=1, block=0)
 if response:
 for stream, messages in response:
 # Get message from stream, and extract token, message data and message id
 for message in messages:
 message_id = message[0]
 token = [k.decode('utf-8')
 for k, v in message[1].items()][0]
 message = [v.decode('utf-8')
 for k, v in message[1].items()][0]
 print(token)
 # Create a new message instance and add to cache, specifying the source as human
 msg = Message(msg=message)
 await cache.add_message_to_cache(token=token, source="human", message_data=msg.dict())
 # Get chat history from cache
 data = await cache.get_chat_history(token=token)
 # Clean message input and send to query
 message_data = data['messages'][-4:]
 input = ["" + i['msg'] for i in message_data]
 input = " ".join(input)
 res = GPT().query(input=input)
 msg = Message(
 msg=res
 )
 print(msg)
 await cache.add_message_to_cache(token=token, source="bot", message_data=msg.dict())
 # Delete messaage from queue after it has been processed
 await consumer.delete_message(stream_channel="message_channel", message_id=message_id)
if __name__ == "__main__":
 asyncio.run(main())

This is quite the update, so let's take it step by step:

We use a 1492 loop so that the worker can be online listening to messages from the queue.

Next, we await new messages from the message_channel by calling our 1504 method. If we have a message in the queue, we extract the message_id, token, and message. Then we create a new instance of the Message class, add the message to the cache, and then get the last 4 messages. We set it as input to the GPT model 1511 method.

Once we get a response, we then add the response to the cache using the 1521 method, then delete the message from the queue.

How to Update the Chat Client with the AI Response

So far, we are sending a chat message from the client to the message_channel (which is received by the worker that queries the AI model) to get a response.

Next, we need to send this response to the client. As long as the socket connection is still open, the client should be able to receive the response.

If the connection is closed, the client can always get a response from the chat history using the 1538 จุดสิ้นสุด

In 1548 create a new file named 1554 , and add a 1565 class similar to what we had on the chat web server:


class Producer:
 def __init__(self, redis_client):
 self.redis_client = redis_client
 async def add_to_stream(self, data: dict, stream_channel) -> bool:
 msg_id = await self.redis_client.xadd(name=stream_channel, id="*", fields=data)
 print(f"Message id {msg_id} added to {stream_channel} stream")
 return msg_id

Next, in the 1578 file, update the main function to initialize the producer, create a stream data, and send the response to a 1583 using the 1597 method:

from src.redis.config import Redis
import asyncio
from src.model.gptj import GPT
from src.redis.cache import Cache
from src.redis.config import Redis
from src.redis.stream import StreamConsumer
import os
from src.schema.chat import Message
from src.redis.producer import Producer
redis = Redis()
async def main():
 json_client = redis.create_rejson_connection()
 redis_client = await redis.create_connection()
 consumer = StreamConsumer(redis_client)
 cache = Cache(json_client)
 producer = Producer(redis_client)
 print("Stream consumer started")
 print("Stream waiting for new messages")
 while True:
 response = await consumer.consume_stream(stream_channel="message_channel", count=1, block=0)
 if response:
 for stream, messages in response:
 # Get message from stream, and extract token, message data and message id
 for message in messages:
 message_id = message[0]
 token = [k.decode('utf-8')
 for k, v in message[1].items()][0]
 message = [v.decode('utf-8')
 for k, v in message[1].items()][0]
 # Create a new message instance and add to cache, specifying the source as human
 msg = Message(msg=message)
 await cache.add_message_to_cache(token=token, source="human", message_data=msg.dict())
 # Get chat history from cache
 data = await cache.get_chat_history(token=token)
 # Clean message input and send to query
 message_data = data['messages'][-4:]
 input = ["" + i['msg'] for i in message_data]
 input = " ".join(input)
 res = GPT().query(input=input)
 msg = Message(
 msg=res
 )
 stream_data = {}
 stream_data[str(token)] = str(msg.dict())
 await producer.add_to_stream(stream_data, "response_channel")
 await cache.add_message_to_cache(token=token, source="bot", message_data=msg.dict())
 # Delete messaage from queue after it has been processed
 await consumer.delete_message(stream_channel="message_channel", message_id=message_id)
if __name__ == "__main__":
 asyncio.run(main())

Next, we need to let the client know when we receive responses from the worker in the 1603 socket endpoint. We do this by listening to the response stream. We do not need to include a while loop here as the socket will be listening as long as the connection is open.

Note that we also need to check which client the response is for by adding logic to check if the token connected is equal to the token in the response. Then we delete the message in the response queue once it's been read.

In 1617 create a new file named stream.py and add our 1621 class like this:

from .config import Redis
class StreamConsumer:
 def __init__(self, redis_client):
 self.redis_client = redis_client
 async def consume_stream(self, count: int, block: int, stream_channel):
 response = await self.redis_client.xread(
 streams={stream_channel: '0-0'}, count=count, block=block)
 return response
 async def delete_message(self, stream_channel, message_id):
 await self.redis_client.xdel(stream_channel, message_id)

Next, update the 1633 socket endpoint like so:

from ..redis.stream import StreamConsumer
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
 await manager.connect(websocket)
 redis_client = await redis.create_connection()
 producer = Producer(redis_client)
 json_client = redis.create_rejson_connection()
 consumer = StreamConsumer(redis_client)
 try:
 while True:
 data = await websocket.receive_text()
 stream_data = {}
 stream_data[str(token)] = str(data)
 await producer.add_to_stream(stream_data, "message_channel")
 response = await consumer.consume_stream(stream_channel="response_channel", block=0)
 print(response)
 for stream, messages in response:
 for message in messages:
 response_token = [k.decode('utf-8')
 for k, v in message[1].items()][0]
 if token == response_token:
 response_message = [v.decode('utf-8')
 for k, v in message[1].items()][0]
 print(message[0].decode('utf-8'))
 print(token)
 print(response_token)
 await manager.send_personal_message(response_message, websocket)
 await consumer.delete_message(stream_channel="response_channel", message_id=message[0].decode('utf-8'))
 except WebSocketDisconnect:
 manager.disconnect(websocket)

Refresh Token

Finally, we need to update the 1641 endpoint to get the chat history from the Redis database using our 1650 class.

In 1669 , add a 1673 file and add the code below:


from rejson import Path
class Cache:
 def __init__(self, json_client):
 self.json_client = json_client
 async def get_chat_history(self, token: str):
 data = self.json_client.jsonget(
 str(token), Path.rootPath())
 return data

Next, in 1682 import the 1697 class and update the 1703 endpoint to the below:


from ..redis.cache import Cache
@chat.get("/refresh_token")
async def refresh_token(request: Request, token: str):
 json_client = redis.create_rejson_connection()
 cache = Cache(json_client)
 data = await cache.get_chat_history(token)
 if data == None:
 raise HTTPException(
 status_code=400, detail="Session expired or does not exist")
 else:
 return data

Now, when we send a GET request to the 1716 endpoint with any token, the endpoint will fetch the data from the Redis database.

If the token has not timed out, the data will be sent to the user. Or it'll send a 400 response if the token is not found.

How to Test the Chat with multiple Clients in Postman

Finally, we will test the chat system by creating multiple chat sessions in Postman, connecting multiple clients in Postman, and chatting with the bot on the clients.

Lastly, we will try to get the chat history for the clients and hopefully get a proper response.

แชทบอท AI ผู้เชี่ยวชาญ:สร้างบอทที่ขับเคลื่อนด้วย GPT อันทรงพลังด้วย Redis และ Python – คู่มือที่ครอบคลุม

Recap

Let's have a quick recap as to what we have achieved with our chat system. The chat client creates a token for each chat session with a client. This token is used to identify each client, and each message sent by clients connected to or web server is queued in a Redis channel (message_chanel), identified by the token.

Our worker environment reads from this channel. It does not have any clue who the client is (except that it's a unique token) and uses the message in the queue to send requests to the Huggingface inference API.

When it gets a response, the response is added to a response channel and the chat history is updated. The client listening to the response_channel immediately sends the response to the client once it receives a response with its token.

If the socket is still open, this response is sent. If the socket is closed, we are certain that the response is preserved because the response is added to the chat history. The client can get the history, even if a page refresh happens or in the event of a lost connection.

Congratulations on getting this far! You have been able to build a working chat system.

In follow-up articles, I will focus on building a chat user interface for the client, creating unit and functional tests, fine-tuning our worker environment for faster response time with WebSockets and asynchronous requests, and ultimately deploying the chat application on AWS.

This Article is part of a series on building full-stack intelligent chatbots with tools like Python, React, Huggingface, Redis, and so on. You can follow the full series on my blog:blog.stephensanwo.dev - AI ChatBot Series**

You can download the full repository on My Github Repository

I wrote this tutorial in collaboration with Redis. Need help getting started with Redis? Try the following resources:

  • Try Redis Cloud free of charge
  • Watch this video on the benefits of Redis Cloud over other Redis providers
  • Redis Developer Hub - tools, guides, and tutorials about Redis
  • RedisInsight Desktop GUI

เรียนรู้การเขียนโค้ดฟรี หลักสูตรโอเพ่นซอร์สของ freeCodeCamp ช่วยให้ผู้คนมากกว่า 40,000 คนได้งานในตำแหน่งนักพัฒนา เริ่มต้น