โดย สตีเฟน ซันโว
ในการสร้างแอปพลิเคชันแบบฟูลสแตกที่ใช้งานได้ มีหลายส่วนที่เคลื่อนไหวได้ที่ต้องคำนึงถึง และคุณจะต้องทำการตัดสินใจหลายอย่างที่สำคัญต่อความสำเร็จของแอปของคุณ
ตัวอย่างเช่น คุณจะใช้ภาษาอะไร และคุณจะปรับใช้บนแพลตฟอร์มใด คุณจะปรับใช้ซอฟต์แวร์คอนเทนเนอร์บนเซิร์ฟเวอร์ หรือใช้ฟังก์ชันไร้เซิร์ฟเวอร์เพื่อจัดการแบ็กเอนด์หรือไม่? คุณวางแผนที่จะใช้ API ของบริษัทอื่นเพื่อจัดการส่วนที่ซับซ้อนของแอปพลิเคชันของคุณ เช่น การตรวจสอบสิทธิ์หรือการชำระเงิน หรือไม่ คุณเก็บข้อมูลไว้ที่ไหน? ป>
นอกจากนี้ คุณยังต้องคำนึงถึงอินเทอร์เฟซผู้ใช้ การออกแบบและการใช้งานแอปพลิเคชันของคุณ และอื่นๆ อีกมากมาย ป>
นี่คือเหตุผลว่าทำไมแอปพลิเคชันขนาดใหญ่ที่ซับซ้อนจึงต้องอาศัยทีมพัฒนาแบบมัลติฟังก์ชั่นที่ทำงานร่วมกันเพื่อสร้างแอป
หนึ่งในวิธีที่ดีที่สุดในการเรียนรู้วิธีพัฒนาแอปพลิเคชันแบบฟูลสแตกคือการสร้างโปรเจ็กต์ที่ครอบคลุมกระบวนการพัฒนาตั้งแต่ต้นทางถึงปลายทาง คุณจะได้ผ่านการออกแบบสถาปัตยกรรม การพัฒนาบริการ API การพัฒนาอินเทอร์เฟซผู้ใช้ และสุดท้ายคือปรับใช้แอปพลิเคชันของคุณ ป>
ดังนั้นบทช่วยสอนนี้จะแนะนำคุณตลอดกระบวนการสร้างแชทบอท AI เพื่อช่วยให้คุณเรียนรู้แนวคิดเหล่านี้ในเชิงลึก
หัวข้อบางส่วนที่เราจะกล่าวถึง ได้แก่:
- วิธีสร้าง API ด้วย Python, FastAPI และ WebSockets
- วิธีสร้างระบบแบบเรียลไทม์ด้วย Redis
- วิธีสร้างอินเทอร์เฟซผู้ใช้แชทด้วย React
หมายเหตุสำคัญ: นี่คือโครงการพัฒนาซอฟต์แวร์สแต็กเต็มรูปแบบระดับกลางที่ต้องใช้ความรู้พื้นฐานเกี่ยวกับ Python และ JavaScript ป>
ฉันได้แบ่งโครงการออกเป็นส่วนๆ อย่างระมัดระวังเพื่อให้แน่ใจว่าคุณสามารถเลือกระยะที่สำคัญสำหรับคุณได้อย่างง่ายดาย ในกรณีที่คุณไม่ต้องการเขียนโค้ดแอปพลิเคชันแบบเต็ม
คุณสามารถดาวน์โหลดพื้นที่เก็บข้อมูลแบบเต็มได้ที่ My Github ที่นี่
สารบัญ
ส่วนที่ 1
- สถาปัตยกรรมแอปพลิเคชัน
- วิธีการตั้งค่าสภาพแวดล้อมการพัฒนา
ส่วนที่ 2
- วิธีสร้างเซิร์ฟเวอร์แชทด้วย Python, FastAPI และ WebSockets
- วิธีการตั้งค่าสภาพแวดล้อม Python
- การตั้งค่าเซิร์ฟเวอร์ FastAPI
- วิธีเพิ่มเส้นทางไปยัง API
- วิธีสร้างโทเค็นเซสชันการแชทด้วย UUID
- วิธีทดสอบ API กับบุรุษไปรษณีย์
- เว็บซ็อกเก็ตและเครื่องมือจัดการการเชื่อมต่อ
- การพึ่งพาการฉีดใน FastAPI
ส่วนที่ 3
- วิธีการสร้างระบบเรียลไทม์ด้วย Redis
- แจกจ่ายและกระจายคิวการส่งข้อความ
- วิธีเชื่อมต่อกับคลัสเตอร์ Redis ใน Python ด้วยไคลเอนต์ Redis
- วิธีการทำงานกับ Redis Streams
- วิธีสร้างโมเดลข้อมูลแชท
- วิธีการทำงานกับ Redis JSON
- วิธีอัปเดตการพึ่งพาโทเค็น
ส่วนที่ 4
- วิธีเพิ่มความฉลาดให้กับ Chatbots ด้วยโมเดล AI
- วิธีการเริ่มต้นใช้งาน Huggingface
- วิธีการโต้ตอบกับโมเดลภาษา
- วิธีจำลองหน่วยความจำระยะสั้นสำหรับโมเดล AI
- สตรีมผู้บริโภคและดึงข้อมูลแบบเรียลไทม์จากคิวข้อความ
- วิธีอัปเดตไคลเอ็นต์การแชทด้วยการตอบกลับของ AI
- รีเฟรชโทเค็น
- วิธีทดสอบการแชทกับลูกค้าหลายรายในบุรุษไปรษณีย์
สถาปัตยกรรมแอปพลิเคชัน
การร่างสถาปัตยกรรมโซลูชันช่วยให้คุณเห็นภาพรวมระดับสูงของแอปพลิเคชันของคุณ เครื่องมือที่คุณตั้งใจจะใช้ และวิธีที่ส่วนประกอบต่างๆ จะสื่อสารระหว่างกัน ป>
ฉันได้วาดสถาปัตยกรรมที่เรียบง่ายด้านล่างโดยใช้ Draw.io:
สถาปัตยกรรมแชทบอทแบบ Fullstack ป>
เรามาดูรายละเอียดส่วนต่างๆ ของสถาปัตยกรรมกันดีกว่า:
ส่วนต่อประสานไคลเอนต์/ผู้ใช้
เราจะใช้ React เวอร์ชัน 18 เพื่อสร้างอินเทอร์เฟซผู้ใช้ Chat UI จะสื่อสารกับแบ็กเอนด์ผ่าน WebSockets
GPT-J-6B และ API การอนุมาน Huggingface
GPT-J-6B เป็นโมเดลภาษาเชิงกำเนิดซึ่งได้รับการฝึกฝนด้วยพารามิเตอร์ 6 พันล้านพารามิเตอร์ และทำงานอย่างใกล้ชิดกับ GPT-3 ของ OpenAI ในบางงาน ป>
ฉันเลือกใช้ GPT-J-6B เนื่องจากเป็นรุ่นโอเพ่นซอร์สและไม่ต้องใช้โทเค็นแบบชำระเงินสำหรับกรณีการใช้งานทั่วไป ป>
Huggingface ยังให้ API ตามความต้องการแก่เราเพื่อเชื่อมต่อกับโมเดลนี้โดยไม่มีค่าใช้จ่าย คุณสามารถอ่านเพิ่มเติมเกี่ยวกับ GPT-J-6B และ Hugging Face Inference API
แดง
เมื่อเราส่งข้อความแจ้งไปยัง GPT เราจำเป็นต้องมีวิธีจัดเก็บข้อความแจ้งและเรียกข้อมูลการตอบกลับได้อย่างง่ายดาย เราจะใช้ Redis JSON เพื่อจัดเก็บข้อมูลการแชท และยังใช้ Redis Streams เพื่อจัดการการสื่อสารแบบเรียลไทม์ด้วย API การอนุมานของ Huggingface ป>
Redis เป็นที่จัดเก็บคีย์-ค่าในหน่วยความจำที่ช่วยให้ดึงและจัดเก็บข้อมูลที่คล้ายกับ JSON ได้อย่างรวดเร็วเป็นพิเศษ สำหรับบทแนะนำสอนการใช้งานนี้ เราจะใช้พื้นที่เก็บข้อมูล Redis ฟรีที่มีการจัดการซึ่งจัดทำโดย Redis Enterprise เพื่อการทดสอบ
ซ็อกเก็ตเว็บและ Chat API
หากต้องการส่งข้อความระหว่างไคลเอนต์และเซิร์ฟเวอร์แบบเรียลไทม์ เราจำเป็นต้องเปิดการเชื่อมต่อซ็อกเก็ต เนื่องจากการเชื่อมต่อ HTTP จะไม่เพียงพอสำหรับการสื่อสารแบบสองทิศทางแบบเรียลไทม์ระหว่างไคลเอนต์และเซิร์ฟเวอร์ ป>
เราจะใช้ FastAPI สำหรับเซิร์ฟเวอร์แชท เนื่องจากมีเซิร์ฟเวอร์ Python ที่รวดเร็วและทันสมัยสำหรับการใช้งานของเรา ตรวจสอบเอกสาร FastAPI) เพื่อเรียนรู้เพิ่มเติมเกี่ยวกับ WebSockets
วิธีการตั้งค่าสภาพแวดล้อมการพัฒนา
คุณสามารถใช้ระบบปฏิบัติการที่คุณต้องการเพื่อสร้างแอปนี้ได้ - ฉันกำลังใช้ MacOS และ Visual Studio Code เพียงตรวจสอบให้แน่ใจว่าคุณได้ติดตั้ง Python และ NodeJs แล้ว ป>
หากต้องการตั้งค่าโครงสร้างโปรเจ็กต์ ให้สร้างโฟลเดอร์ชื่อ06 . จากนั้นสร้างสองโฟลเดอร์ภายในโปรเจ็กต์ชื่อ 17 และ 24รหัส> . เซิร์ฟเวอร์จะเก็บโค้ดไว้สำหรับส่วนหลัง ในขณะที่ไคลเอ็นต์จะเก็บโค้ดไว้สำหรับส่วนหน้า
ถัดไปภายในไดเร็กทอรีโปรเจ็กต์ เริ่มต้นพื้นที่เก็บข้อมูล Git ภายในรูทของโฟลเดอร์โปรเจ็กต์โดยใช้คำสั่ง "git init" จากนั้นสร้างไฟล์ .gitignore โดยใช้ "touch .gitignore":
git init
touch .gitignore
ในส่วนถัดไป เราจะสร้างเว็บเซิร์ฟเวอร์แชทของเราโดยใช้ FastAPI และ Python
วิธีสร้างเซิร์ฟเวอร์แชทด้วย Python, FastAPI และ WebSockets
ในส่วนนี้ เราจะสร้างเซิร์ฟเวอร์แชทโดยใช้ FastAPI เพื่อสื่อสารกับผู้ใช้ เราจะใช้ WebSockets เพื่อรับรองการสื่อสารแบบสองทิศทางระหว่างไคลเอนต์และเซิร์ฟเวอร์ เพื่อให้เราสามารถส่งการตอบกลับไปยังผู้ใช้แบบเรียลไทม์
วิธีการตั้งค่าสภาพแวดล้อม Python
เพื่อเริ่มต้นเซิร์ฟเวอร์ของเรา เราจำเป็นต้องตั้งค่าสภาพแวดล้อม Python ของเรา เปิดโฟลเดอร์โปรเจ็กต์ภายใน VS Code และเปิดเทอร์มินัล
จากรูทโปรเจ็กต์ ให้ cd ลงในไดเร็กทอรีเซิร์ฟเวอร์และรัน 33 . สิ่งนี้จะสร้างสภาพแวดล้อมเสมือนจริง สำหรับโปรเจ็กต์ Python ของเรา ซึ่งจะชื่อว่า 44 . หากต้องการเปิดใช้งานสภาพแวดล้อมเสมือน ให้รัน 50 ป>
จากนั้น ให้ติดตั้งไลบรารีสองสามไลบรารีในสภาพแวดล้อม Python ของคุณ
pip install fastapi uuid uvicorn gunicorn WebSockets python-dotenv aioredis
ถัดไปสร้างไฟล์สภาพแวดล้อมโดยเรียกใช้ 63 ในอาคารผู้โดยสาร เราจะกำหนดตัวแปรแอปและตัวแปรลับภายใน 73 ไฟล์. ป>
เพิ่มตัวแปรสภาพแวดล้อมของแอปและตั้งค่าเป็น "การพัฒนา" ดังนี้:86 . ต่อไป เราจะตั้งค่าเซิร์ฟเวอร์การพัฒนาด้วยเซิร์ฟเวอร์ FastAPI
การตั้งค่าเซิร์ฟเวอร์ FastAPI
ที่รากของไดเร็กทอรีเซิร์ฟเวอร์ ให้สร้างไฟล์ใหม่ชื่อ 96 จากนั้นวางโค้ดด้านล่างสำหรับเซิร์ฟเวอร์การพัฒนา:
from fastapi import FastAPI, Request
import uvicorn
import os
from dotenv import load_dotenv
load_dotenv()
api = FastAPI()
@api.get("/test")
async def root():
return {"msg": "API is Online"}
if __name__ == "__main__":
if os.environ.get('APP_ENV') == "development":
uvicorn.run("main:api", host="0.0.0.0", port=3500,
workers=4, reload=True)
else:
pass
ก่อนอื่นเรา 107 และกำหนดค่าเริ่มต้นเป็น 115 . จากนั้นเรา 120 จาก 133 ไลบรารี และเริ่มต้นให้โหลดตัวแปรจาก 142 ไฟล์
จากนั้นเราจะสร้างเส้นทางทดสอบง่ายๆ เพื่อทดสอบ API เส้นทางทดสอบจะส่งคืนการตอบสนอง JSON แบบง่ายที่บอกเราว่า API ออนไลน์อยู่ ป>
สุดท้ายนี้ เราตั้งค่าเซิร์ฟเวอร์การพัฒนาโดยใช้ 159 และจัดเตรียมข้อโต้แย้งที่จำเป็น API จะทำงานบนพอร์ต 165 .
สุดท้าย ให้รันเซิร์ฟเวอร์ในเทอร์มินัลด้วย 175 . เมื่อคุณเห็น 189 ในเทอร์มินัล ไปที่ URL http://localhost:3500/test บนเบราว์เซอร์ของคุณ และคุณควรจะได้หน้าเว็บดังนี้:
หน้าทดสอบ API ป>
วิธีเพิ่มเส้นทางไปยัง API
ในส่วนนี้ เราจะเพิ่มเส้นทางไปยัง API ของเรา สร้างโฟลเดอร์ใหม่ชื่อ 195 . นี่คือไดเร็กทอรีที่โค้ด API ทั้งหมดของเราจะใช้งานได้ ป>
สร้างโฟลเดอร์ย่อยชื่อ 206 ใส่ซีดีลงในโฟลเดอร์ สร้างไฟล์ใหม่ชื่อ 218 จากนั้นเพิ่มโค้ดด้านล่าง:
import os
from fastapi import APIRouter, FastAPI, WebSocket, Request
chat = APIRouter()
# @route POST /token
# @desc Route to generate chat token
# @access Public
@chat.post("/token")
async def token_generator(request: Request):
return None
# @route POST /refresh_token
# @desc Route to refresh token
# @access Public
@chat.post("/refresh_token")
async def refresh_token(request: Request):
return None
# @route Websocket /chat
# @desc Socket for chatbot
# @access Public
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket = WebSocket):
return None
เราสร้างจุดสิ้นสุดสามจุด:
220รหัส> จะออกโทเค็นเซสชันให้กับผู้ใช้เพื่อเข้าถึงเซสชันการแชท เนื่องจากแอปแชทจะเปิดสู่สาธารณะ เราจึงไม่ต้องการกังวลเกี่ยวกับการตรวจสอบสิทธิ์และเพียงทำให้มันเรียบง่าย แต่เรายังคงต้องการวิธีในการระบุเซสชันผู้ใช้ที่ไม่ซ้ำกันแต่ละเซสชัน235รหัส> จะได้รับประวัติเซสชันสำหรับผู้ใช้หากการเชื่อมต่อขาดหาย ตราบใดที่โทเค็นยังคงใช้งานอยู่และไม่หมดอายุ247รหัส> จะเปิด WebSocket เพื่อส่งข้อความระหว่างไคลเอนต์และเซิร์ฟเวอร์
จากนั้น เชื่อมต่อเส้นทางการแชทกับ API หลักของเรา ก่อนอื่นเราต้อง 255 ภายใน 262 ของเรา ไฟล์. จากนั้นเราจะรวมเราเตอร์ด้วยการเรียก 277 อย่างแท้จริง วิธีการในการเริ่มต้น 282 ชั้นเรียนและผ่านการแชทเป็นข้อโต้แย้ง ป>
อัปเดต 291 ของคุณ รหัสตามที่แสดงด้านล่าง:
from fastapi import FastAPI, Request
import uvicorn
import os
from dotenv import load_dotenv
from routes.chat import chat
load_dotenv()
api = FastAPI()
api.include_router(chat)
@api.get("/test")
async def root():
return {"msg": "API is Online"}
if __name__ == "__main__":
if os.environ.get('APP_ENV') == "development":
uvicorn.run("main:api", host="0.0.0.0", port=3500,
workers=4, reload=True)
else:
pass
วิธีสร้างโทเค็นเซสชันการแชทด้วย UUID
ในการสร้างโทเค็นผู้ใช้ เราจะใช้ 307 เพื่อสร้างเส้นทางแบบไดนามิกสำหรับปลายทางการแชทของเรา เนื่องจากนี่คือจุดสิ้นสุดที่เปิดเผยต่อสาธารณะ เราจึงไม่จำเป็นต้องลงรายละเอียดเกี่ยวกับ JWT และการตรวจสอบสิทธิ์ ป>
หากคุณไม่ได้ติดตั้ง 313 เริ่มแรกให้รัน 326 . ถัดไปใน chat.py นำเข้า UUID และอัปเดต 334 เส้นทางด้วยรหัสด้านล่าง:
from fastapi import APIRouter, FastAPI, WebSocket, Request, BackgroundTasks, HTTPException
import uuid
# @route POST /token
# @desc Route generating chat token
# @access Public
@chat.post("/token")
async def token_generator(name: str, request: Request):
if name == "":
raise HTTPException(status_code=400, detail={
"loc": "name", "msg": "Enter a valid name"})
token = str(uuid.uuid4())
data = {"name": name, "token": token}
return data
ในโค้ดด้านบน ลูกค้าระบุชื่อของตนซึ่งจำเป็น เราทำการตรวจสอบอย่างรวดเร็วเพื่อให้แน่ใจว่าช่องชื่อไม่ว่างเปล่า จากนั้นสร้างโทเค็นโดยใช้ uuid4 ป>
ข้อมูลเซสชันเป็นพจนานุกรมอย่างง่ายสำหรับชื่อและโทเค็น ท้ายที่สุดแล้ว เราจะต้องคงข้อมูลเซสชันนี้ไว้และตั้งค่าการหมดเวลา แต่ตอนนี้เราเพียงส่งคืนข้อมูลดังกล่าวไปยังไคลเอ็นต์
วิธีทดสอบ API กับบุรุษไปรษณีย์
เนื่องจากเราจะทดสอบตำแหน่งข้อมูล WebSocket เราจึงจำเป็นต้องใช้เครื่องมือเช่นบุรุษไปรษณีย์ที่อนุญาต (เนื่องจากเอกสารผยองเริ่มต้นบน FastAPI ไม่รองรับ WebSockets) ป>
ในบุรุษไปรษณีย์ สร้างคอลเลกชันสำหรับสภาพแวดล้อมการพัฒนาของคุณและส่งคำขอ POST ไปที่ 345 การระบุชื่อเป็นพารามิเตอร์การสืบค้นและส่งผ่านค่า คุณควรได้รับคำตอบตามที่แสดงด้านล่าง:
บุรุษไปรษณีย์สร้างโทเค็น ป>
เว็บซ็อกเก็ตและตัวจัดการการเชื่อมต่อ
ในรูท src ให้สร้างโฟลเดอร์ใหม่ชื่อ 350 และเพิ่มไฟล์ชื่อ 365 . ในไฟล์นี้ เราจะกำหนดคลาสที่ควบคุมการเชื่อมต่อกับ WebSockets ของเรา และวิธีการช่วยเหลือทั้งหมดในการเชื่อมต่อและยกเลิกการเชื่อมต่อ ป>
ใน 377 เพิ่มรหัสด้านล่าง:
from fastapi import WebSocket
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
จากนั้นอะซิงโครนัส
สุดท้าย
WebSockets เป็นหัวข้อที่กว้างมาก และเราคัดลอกเฉพาะส่วนต่างๆ ที่นี่เท่านั้น อย่างไรก็ตาม สิ่งนี้ควรจะเพียงพอที่จะสร้างการเชื่อมต่อหลายรายการและจัดการข้อความไปยังการเชื่อมต่อเหล่านั้นแบบอะซิงโครนัส ป>
คุณสามารถอ่านเพิ่มเติมเกี่ยวกับการเขียนโปรแกรม FastAPI Websockets และ Sockets ได้
หากต้องการใช้
ใน
ในขณะที่การเชื่อมต่อเปิดอยู่ เราได้รับข้อความใด ๆ ที่ส่งโดยไคลเอนต์ด้วย
จากนั้นเราจะส่งการตอบกลับแบบฮาร์ดโค้ดกลับไปยังลูกค้าในตอนนี้ ในที่สุดข้อความที่ได้รับจากไคลเอนต์จะถูกส่งไปยังโมเดล AI และการตอบกลับที่ส่งกลับไปยังลูกค้าจะเป็นการตอบกลับจากโมเดล AI
ในบุรุษไปรษณีย์ เราสามารถทดสอบตำแหน่งข้อมูลนี้ได้โดยการสร้างคำขอ WebSocket ใหม่และเชื่อมต่อกับตำแหน่งข้อมูล WebSocket
เมื่อคุณคลิกเชื่อมต่อ บานหน้าต่างข้อความจะแสดงว่าไคลเอ็นต์ API เชื่อมต่อกับ URL และซ็อกเก็ตเปิดอยู่ ป>
หากต้องการทดสอบ ให้ส่งข้อความ "Hello Bot" ไปยังเซิร์ฟเวอร์แชท และคุณควรได้รับการตอบกลับการทดสอบทันที "การตอบสนอง:การจำลองการตอบสนองจากบริการ GPT" ดังที่แสดงด้านล่าง:
เพื่อให้สามารถแยกความแตกต่างระหว่างเซสชันไคลเอ็นต์ที่แตกต่างกันสองเซสชันและจำกัดเซสชันการแชท เราจะใช้โทเค็นแบบกำหนดเวลาซึ่งส่งผ่านเป็นพารามิเตอร์แบบสอบถามไปยังการเชื่อมต่อ WebSocket ป>
ในโฟลเดอร์ซ็อกเก็ต ให้สร้างไฟล์ชื่อ
ฟังก์ชัน get_token รับ WebSocket และโทเค็น จากนั้นตรวจสอบว่าโทเค็นไม่มีหรือเป็นโมฆะ ป>
หากเป็นกรณีนี้ ฟังก์ชันจะส่งกลับสถานะการละเมิดนโยบาย และหากมี ฟังก์ชันจะส่งคืนโทเค็นเท่านั้น ในที่สุดเราจะขยายฟังก์ชันนี้ในภายหลังด้วยการตรวจสอบโทเค็นเพิ่มเติม
หากต้องการใช้ฟังก์ชันนี้ เราจะแทรกฟังก์ชันนี้ลงใน
อัปเดต
ตอนนี้เมื่อคุณพยายามเชื่อมต่อกับ
ยินดีด้วยที่ได้มาไกลขนาดนี้!
ในส่วนถัดไปของบทช่วยสอนนี้ เราจะเน้นไปที่การจัดการสถานะของแอปพลิเคชันของเราและการส่งข้อมูลระหว่างไคลเอนต์และเซิร์ฟเวอร์
ขณะนี้แอปพลิเคชันของเราไม่ได้จัดเก็บสถานะใด ๆ และไม่มีวิธีใดที่จะระบุผู้ใช้หรือจัดเก็บและเรียกข้อมูลการแชท นอกจากนี้เรายังส่งคืนการตอบกลับแบบฮาร์ดโค้ดให้กับลูกค้าในระหว่างเซสชันการแชท ป>
ในบทช่วยสอนในส่วนนี้ เราจะกล่าวถึงสิ่งต่อไปนี้:
Redis คือที่เก็บข้อมูลในหน่วยความจำแบบโอเพ่นซอร์สที่คุณสามารถใช้เป็นฐานข้อมูล แคช นายหน้าข้อความ และกลไกการสตรีมได้ รองรับโครงสร้างข้อมูลจำนวนหนึ่งและเป็นโซลูชันที่สมบูรณ์แบบสำหรับแอปพลิเคชันแบบกระจายที่มีความสามารถแบบเรียลไทม์ ป>
Redis Enterprise Cloud คือบริการคลาวด์ที่มีการจัดการเต็มรูปแบบที่ Redis มอบให้ ซึ่งช่วยให้เราปรับใช้คลัสเตอร์ Redis ในขนาดที่ไม่จำกัดโดยไม่ต้องกังวลเรื่องโครงสร้างพื้นฐาน ป>
เราจะใช้อินสแตนซ์ Redis Enterprise Cloud ฟรีสำหรับบทช่วยสอนนี้ คุณสามารถเริ่มต้นใช้งาน Redis Cloud ได้ฟรีที่นี่ และทำตามบทช่วยสอนนี้เพื่อตั้งค่าฐานข้อมูล Redis และ Redis Insight ซึ่งเป็น GUI เพื่อโต้ตอบกับ Redis
เมื่อคุณตั้งค่าฐานข้อมูล Redis แล้ว ให้สร้างโฟลเดอร์ใหม่ในรูทโปรเจ็กต์ (นอกโฟลเดอร์เซิร์ฟเวอร์) ชื่อ
เราจะแยกสภาพแวดล้อมของผู้ปฏิบัติงานออกจากเว็บเซิร์ฟเวอร์ เพื่อที่ว่าเมื่อลูกค้าส่งข้อความไปยัง WebSocket ของเรา เว็บเซิร์ฟเวอร์จะไม่ต้องจัดการคำขอไปยังบริการของบุคคลที่สาม นอกจากนี้ยังสามารถปล่อยทรัพยากรให้กับผู้ใช้รายอื่นได้อีกด้วย ป>
การสื่อสารเบื้องหลังกับ API การอนุมานได้รับการจัดการโดยบริการสำหรับผู้ปฏิบัติงานนี้ผ่านทาง Redis
คำขอจากไคลเอนต์ที่เชื่อมต่อทั้งหมดจะถูกผนวกเข้ากับคิวข้อความ (ตัวสร้าง) ในขณะที่ผู้ปฏิบัติงานใช้ข้อความ ส่งคำขอไปยัง API การอนุมาน และเพิ่มการตอบกลับต่อคิวการตอบกลับ ป>
เมื่อ API ได้รับการตอบกลับ API จะส่งกลับไปยังไคลเอ็นต์ ป>
ในระหว่างการเดินทางระหว่างผู้ผลิตและผู้บริโภค ลูกค้าสามารถส่งข้อความได้หลายข้อความ และข้อความเหล่านี้จะถูกจัดคิวและตอบกลับตามลำดับ ป>
ตามหลักการแล้ว เราอาจให้ผู้ปฏิบัติงานรายนี้ทำงานบนเซิร์ฟเวอร์ที่แตกต่างอย่างสิ้นเชิงในสภาพแวดล้อมของตัวเอง แต่สำหรับตอนนี้ เราจะสร้างสภาพแวดล้อม Python ของตัวเองบนเครื่องของเรา
คุณอาจสงสัยว่า ทำไมเราถึงต้องการคนงาน ลองนึกภาพสถานการณ์ที่เว็บเซิร์ฟเวอร์สร้างคำขอไปยังบริการของบุคคลที่สามด้วย ซึ่งหมายความว่าในขณะที่รอการตอบสนองจากบริการของบุคคลที่สามระหว่างการเชื่อมต่อซ็อกเก็ต เซิร์ฟเวอร์จะถูกบล็อกและทรัพยากรจะถูกเชื่อมโยงจนกว่าจะได้รับการตอบสนองจาก API ป>
คุณสามารถทดลองใช้ได้โดยสร้างการนอนหลับแบบสุ่ม
คุณจะสังเกตเห็นว่าเซสชันการแชทจะไม่เชื่อมต่อจนกว่าโหมดสลีปแบบสุ่มจะหมดเวลา
แม้ว่าเราจะสามารถใช้เทคนิคอะซิงโครนัสและกลุ่มผู้ปฏิบัติงานในการตั้งค่าเซิร์ฟเวอร์ที่เน้นการใช้งานจริงมากขึ้น แต่นั่นจะไม่เพียงพอเมื่อจำนวนผู้ใช้พร้อมกันเพิ่มขึ้น ป>
ท้ายที่สุดแล้ว เราต้องการหลีกเลี่ยงการผูกมัดทรัพยากรของเว็บเซิร์ฟเวอร์โดยใช้ Redis เพื่อเป็นนายหน้าในการสื่อสารระหว่าง API การแชทของเรากับ API ของบริษัทอื่น
จากนั้นให้เปิดเทอร์มินัลใหม่ ซีดีลงในโฟลเดอร์ผู้ปฏิบัติงาน และสร้างและเปิดใช้งานสภาพแวดล้อมเสมือน Python ใหม่ ซึ่งคล้ายกับที่เราทำในตอนที่ 1
ถัดไป ติดตั้งการอ้างอิงต่อไปนี้:
เราจะใช้ไคลเอ็นต์ aioredis เพื่อเชื่อมต่อกับฐานข้อมูล Redis นอกจากนี้เรายังใช้ไลบรารีคำขอเพื่อส่งคำขอไปยัง API การอนุมาน Huggingface ป>
สร้างสองไฟล์
ใน
ใน config.py ให้เพิ่มคลาส Redis ด้านล่าง:
เราสร้างวัตถุ Redis และเริ่มต้นพารามิเตอร์ที่ต้องการจากตัวแปรสภาพแวดล้อม จากนั้นเราสร้างวิธีการอะซิงโครนัส
ต่อไป เราจะทดสอบการเชื่อมต่อ Redis ใน main.py โดยเรียกใช้โค้ดด้านล่าง สิ่งนี้จะสร้างพูลการเชื่อมต่อ Redis ใหม่ ตั้งค่า "คีย์" คีย์ธรรมดา และกำหนดสตริง "ค่า" ให้กับมัน
ตอนนี้เปิด Redis Insight (หากคุณทำตามบทช่วยสอนเพื่อดาวน์โหลดและติดตั้ง) คุณจะเห็นสิ่งนี้:
ตอนนี้เรามีการตั้งค่าสภาพแวดล้อมของผู้ปฏิบัติงานแล้ว เราสามารถสร้างผู้ผลิตบนเว็บเซิร์ฟเวอร์และผู้บริโภคบนผู้ปฏิบัติงานได้ ป>
ก่อนอื่น มาสร้างคลาส Redis ของเราอีกครั้งบนเซิร์ฟเวอร์ ใน
ใน
ในไฟล์ .env ให้เพิ่มข้อมูลประจำตัว Redis ด้วย:
สุดท้ายใน
เราสร้างคลาส Producer ที่เริ่มต้นด้วยไคลเอนต์ Redis เราใช้ไคลเอ็นต์นี้เพื่อเพิ่มข้อมูลลงในสตรีมด้วย
คำสั่ง Redis สำหรับการเพิ่มข้อมูลลงในช่องสตรีมคือ
ถัดไป หากต้องการเรียกใช้ Producer ที่สร้างขึ้นใหม่ ให้อัปเดต
จากนั้น ในบุรุษไปรษณีย์ ให้สร้างการเชื่อมต่อและส่งข้อความจำนวนเท่าใดก็ได้ที่ระบุว่า
ใน Redis Insight คุณจะเห็น
ต่อไป เราจะสร้างแบบจำลองสำหรับข้อความแชทของเรา โปรดจำไว้ว่าเรากำลังส่งข้อมูลข้อความผ่าน WebSockets แต่ข้อมูลการแชทของเราต้องเก็บข้อมูลมากกว่าแค่ข้อความ เราจำเป็นต้องประทับเวลาเมื่อมีการส่งแชท สร้าง ID สำหรับแต่ละข้อความ และรวบรวมข้อมูลเกี่ยวกับเซสชันแชท จากนั้นจัดเก็บข้อมูลนี้ในรูปแบบ JSON ป>
เราสามารถจัดเก็บข้อมูล JSON นี้ไว้ใน Redis เพื่อที่เราจะได้ไม่สูญเสียประวัติการแชทเมื่อการเชื่อมต่อขาดหาย เนื่องจาก WebSocket ของเราจะไม่จัดเก็บสถานะ
ใน
เรากำลังใช้
ข้อความที่ส่งและรับภายในเซสชันการแชทนี้จะถูกเก็บไว้ที่
ในการใช้ความสามารถของ Redis JSON ในการจัดเก็บประวัติการแชทของเรา เราจำเป็นต้องติดตั้ง rejson ที่แล็บ Redis มอบให้ ป>
ในเทอร์มินัลให้ cd ลงใน
เรากำลังเพิ่ม
ถัดไปใน
หมายเหตุ:เนื่องจากนี่คือแอปสาธิต ฉันจึงไม่ต้องการจัดเก็บข้อมูลการแชทใน Redis นานเกินไป ดังนั้นฉันจึงเพิ่มการหมดเวลา 60 นาทีบนโทเค็นโดยใช้ไคลเอนต์ aioredis (rejson ไม่ได้ใช้การหมดเวลา) ซึ่งหมายความว่าหลังจากผ่านไป 60 นาที ข้อมูลเซสชันการแชทจะหายไป ป>
นี่เป็นสิ่งจำเป็นเนื่องจากเราไม่ได้ตรวจสอบสิทธิ์ผู้ใช้ และเราต้องการถ่ายโอนข้อมูลแชทหลังจากระยะเวลาที่กำหนด ขั้นตอนนี้เป็นทางเลือก และคุณไม่จำเป็นต้องรวมไว้
ถัดไปในบุรุษไปรษณีย์ เมื่อคุณส่งคำขอ POST เพื่อสร้างโทเค็นใหม่ คุณจะได้รับคำตอบที่มีโครงสร้างเหมือนกับด้านล่างนี้ คุณยังสามารถตรวจสอบ Redis Insight เพื่อดูข้อมูลแชทของคุณที่จัดเก็บโดยมีโทเค็นเป็นคีย์ JSON และข้อมูลเป็นค่า
ขณะนี้เราได้สร้างและจัดเก็บโทเค็นแล้ว นี่เป็นเวลาที่ดีในการอัปเดต
ใน
โทเค็นที่สร้างโดย
หากต้องการทดสอบการขึ้นต่อกัน ให้เชื่อมต่อกับเซสชันแชทด้วยโทเค็นแบบสุ่มที่เราใช้ และคุณควรได้รับข้อผิดพลาด 403 (โปรดทราบว่าคุณต้องลบโทเค็นใน Redis Insight ด้วยตนเอง)
ตอนนี้คัดลอกโทเค็นที่สร้างขึ้นเมื่อคุณส่งคำขอโพสต์ไปที่
เมื่อนำมารวมกันแล้ว chat.py ของคุณควรมีลักษณะดังนี้
ทำได้ดีมากที่มาถึงขนาดนี้! ในส่วนถัดไป เราจะมุ่งเน้นไปที่การสื่อสารกับโมเดล AI และการจัดการการถ่ายโอนข้อมูลระหว่างไคลเอนต์ เซิร์ฟเวอร์ ผู้ปฏิบัติงาน และ API ภายนอก
ในส่วนนี้ เราจะมุ่งเน้นไปที่การสร้าง Wrapper เพื่อสื่อสารกับโมเดล Transformer ส่งข้อความแจ้งจากผู้ใช้ไปยัง API ในรูปแบบการสนทนา และรับและแปลงการตอบกลับสำหรับแอปพลิเคชันแชทของเรา
เราจะไม่สร้างหรือปรับใช้โมเดลภาษาใดๆ บน Hugginface แต่เราจะมุ่งเน้นไปที่การใช้ API การอนุมานแบบเร่งของ Huggingface เพื่อเชื่อมต่อกับโมเดลที่ได้รับการฝึกอบรมล่วงหน้าแทน ป>
โมเดลที่เราจะใช้คือรุ่น GPT-J-6B จัดทำโดย EleutherAI เป็นโมเดลภาษาเชิงกำเนิดซึ่งได้รับการฝึกฝนด้วยพารามิเตอร์ 6 พันล้านพารามิเตอร์ ป>
Huggingface ให้ API แบบจำกัดตามความต้องการแก่เราเพื่อเชื่อมต่อกับโมเดลนี้โดยไม่มีค่าใช้จ่าย
ในการเริ่มต้นกับ Huggingface ให้สร้างบัญชีฟรี ในการตั้งค่าของคุณ ให้สร้างโทเค็นเพื่อการเข้าถึงใหม่ สำหรับโทเค็นสูงสุด 30,000 รายการ Huggingface ให้การเข้าถึง API การอนุมานได้ฟรี ป>
คุณสามารถตรวจสอบการใช้งาน API ของคุณได้ที่นี่ ตรวจสอบให้แน่ใจว่าคุณเก็บโทเค็นนี้ไว้อย่างปลอดภัยและอย่าเปิดเผยต่อสาธารณะ
หมายเหตุ:เราจะใช้การเชื่อมต่อ HTTP เพื่อสื่อสารกับ API เนื่องจากเราใช้บัญชีฟรี แต่บัญชี PRO Huggingface รองรับการสตรีมด้วย WebSockets ดูการทำงานแบบขนานและแบบแบตช์ ป>
ซึ่งสามารถช่วยปรับปรุงเวลาตอบสนองระหว่างโมเดลและแอปพลิเคชันแชทของเราได้อย่างมาก และหวังว่าจะกล่าวถึงวิธีนี้ในบทความติดตามผล
ขั้นแรก เราเพิ่มข้อมูลรับรองการเชื่อมต่อ Huggingface ให้กับไฟล์ .env ภายในไดเร็กทอรีผู้ปฏิบัติงานของเรา
ถัดไปใน
สุดท้ายนี้ เราทดสอบสิ่งนี้โดยการเรียกใช้วิธีการสืบค้นบนอินสแตนซ์ของคลาส GPT โดยตรง ในเทอร์มินัล ให้รัน
ต่อไป เราจะเพิ่มการปรับแต่งบางอย่างให้กับอินพุตเพื่อทำให้การโต้ตอบกับโมเดลมีการสนทนากันมากขึ้นโดยการเปลี่ยนรูปแบบของอินพุต ป>
อัปเดต
เราอัปเดตอินพุตด้วยสตริงลิเทอรัล
สำหรับทุกข้อมูลใหม่ที่เราส่งไปยังโมเดล ไม่มีทางที่โมเดลจะจดจำประวัติการสนทนาได้ นี่เป็นสิ่งสำคัญหากเราต้องการเก็บบริบทในการสนทนา ป>
แต่โปรดจำไว้ว่าเมื่อจำนวนโทเค็นที่เราส่งไปยังโมเดลเพิ่มขึ้น การประมวลผลก็จะมีราคาแพงขึ้น และเวลาตอบสนองก็นานขึ้นด้วย ป>
เราจึงต้องหาวิธีดึงประวัติระยะสั้นแล้วส่งไปที่โมเดล นอกจากนี้เรายังต้องหาจุดที่น่าสนใจด้วย - เราต้องการดึงข้อมูลในอดีตและส่งไปยังโมเดลจำนวนเท่าใด
ในการจัดการประวัติการแชท เราต้องถอยกลับไปที่ฐานข้อมูล JSON ของเรา เราจะใช้
อัปเดต
ในขณะที่ไฟล์ .env ของคุณควรมีลักษณะดังนี้:
ถัดไปใน
แคชเริ่มต้นได้ด้วยไคลเอนต์ rejson และวิธีการ
ต่อไป ให้อัพเดต
ฉันได้ฮาร์ดโค้ดโทเค็นตัวอย่างที่สร้างจากการทดสอบครั้งก่อนในบุรุษไปรษณีย์ หากคุณไม่ได้สร้างโทเค็น เพียงส่งคำขอใหม่ไปที่
ต่อไปเราต้องเพิ่ม
โปรดทราบว่าในการเข้าถึงอาร์เรย์ข้อความ เราจำเป็นต้องระบุ
หากต้องการทดสอบวิธีนี้ ให้อัปเดตฟังก์ชันหลักในไฟล์ main.py ด้วยโค้ดด้านล่าง:
เรากำลังส่งข้อความฮาร์ดโค้ดไปยังแคช และรับประวัติการแชทจากแคช เมื่อคุณเรียกใช้
สุดท้ายนี้ เราจำเป็นต้องอัปเดตฟังก์ชันหลักเพื่อส่งข้อมูลข้อความไปยังโมเดล GPT และอัปเดตอินพุตด้วย 4 ตัวสุดท้าย ข้อความที่ส่งระหว่างไคลเอนต์และโมเดล ป>
ก่อนอื่น มาอัปเดต
Update the
Then update the main function in main.py in the worker directory, and run
Next, we need to update the main function to add new messages to the cache, read the previous 4 messages from the cache, and then make an API call to the model using the query method. It'll have a payload consisting of a composite string of the last 4 messages.
You can always tune the number of messages in the history you want to extract, but I think 4 messages is a pretty good number for a demo.
In
Next, update the main.py file like below:
In the code above, we add new message data to the cache. This message will ultimately come from the message queue. Next we get the chat history from the cache, which will now include the most recent data we added. ป>
Note that we are using the same hard-coded token to add to the cache and get from the cache, temporarily just to test this out. ป>
Next, we trim off the cache data and extract only the last 4 items. Then we consolidate the input data by extracting the msg in a list and join it to an empty string. ป>
Finally, we create a new Message instance for the bot response and add the response to the cache specifying the source as "bot"
Next, run
Open Redis Insight and you should have something similar to the below:
Next, we want to create a consumer and update our
In
The
Next, update the
This is quite the update, so let's take it step by step:
We use a
Next, we await new messages from the message_channel by calling our
Once we get a response, we then add the response to the cache using the
So far, we are sending a chat message from the client to the message_channel (which is received by the worker that queries the AI model) to get a response. ป>
Next, we need to send this response to the client. As long as the socket connection is still open, the client should be able to receive the response. ป>
If the connection is closed, the client can always get a response from the chat history using the
In
Next, in the
Next, we need to let the client know when we receive responses from the worker in the
Note that we also need to check which client the response is for by adding logic to check if the token connected is equal to the token in the response. Then we delete the message in the response queue once it's been read.
In
Next, update the
Finally, we need to update the
In
Next, in
Now, when we send a GET request to the
If the token has not timed out, the data will be sent to the user. Or it'll send a 400 response if the token is not found.
Finally, we will test the chat system by creating multiple chat sessions in Postman, connecting multiple clients in Postman, and chatting with the bot on the clients. ป>
Lastly, we will try to get the chat history for the clients and hopefully get a proper response.
Let's have a quick recap as to what we have achieved with our chat system. The chat client creates a token for each chat session with a client. This token is used to identify each client, and each message sent by clients connected to or web server is queued in a Redis channel (message_chanel), identified by the token.
Our worker environment reads from this channel. It does not have any clue who the client is (except that it's a unique token) and uses the message in the queue to send requests to the Huggingface inference API.
When it gets a response, the response is added to a response channel and the chat history is updated. The client listening to the response_channel immediately sends the response to the client once it receives a response with its token.
If the socket is still open, this response is sent. If the socket is closed, we are certain that the response is preserved because the response is added to the chat history. The client can get the history, even if a page refresh happens or in the event of a lost connection.
Congratulations on getting this far! You have been able to build a working chat system. ป>
In follow-up articles, I will focus on building a chat user interface for the client, creating unit and functional tests, fine-tuning our worker environment for faster response time with WebSockets and asynchronous requests, and ultimately deploying the chat application on AWS.
This Article is part of a series on building full-stack intelligent chatbots with tools like Python, React, Huggingface, Redis, and so on. You can follow the full series on my blog:blog.stephensanwo.dev - AI ChatBot Series**
You can download the full repository on My Github Repository ป>
I wrote this tutorial in collaboration with Redis. Need help getting started with Redis? Try the following resources:
เรียนรู้การเขียนโค้ดฟรี หลักสูตรโอเพ่นซอร์สของ freeCodeCamp ช่วยให้ผู้คนมากกว่า 40,000 คนได้งานในตำแหน่งนักพัฒนา เริ่มต้น 386รหัส> คลาสเริ่มต้นด้วย 399 คุณลักษณะที่เป็นรายการการเชื่อมต่อที่ใช้งานอยู่ ป> 409 วิธีการจะยอมรับ 413 และเพิ่มลงในรายการการเชื่อมต่อที่ใช้งานอยู่ในขณะที่ 427 วิธีการจะลบ 432 จากรายการการเชื่อมต่อที่ใช้งานอยู่ ป> 440 วิธีการจะรับข้อความและ 450 เราต้องการส่งข้อความถึงและส่งข้อความแบบอะซิงโครนัส462 นำเข้าและเริ่มต้นภายใน 475 และอัปเดต 488 เส้นทาง WebSocket ด้วยรหัสด้านล่าง:from ..socket.connection import ConnectionManager
manager = ConnectionManager()
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
print(data)
await manager.send_personal_message(f"Response: Simulating response from the GPT service", websocket)
except WebSocketDisconnect:
manager.disconnect(websocket)
492 ซึ่งใช้ WebSocket เราจะเพิ่ม websocket ใหม่ให้กับตัวจัดการการเชื่อมต่อและเรียกใช้ 500 เพื่อให้แน่ใจว่าซ็อกเก็ตยังคงเปิดอยู่ ยกเว้นในกรณีที่ปลั๊กไฟถูกถอดออก ป> 512 และพิมพ์ไปที่เทอร์มินัลในตอนนี้ ป> 526 . ป> ทดสอบการแชทกับบุรุษไปรษณีย์ ป>
การพึ่งพาการฉีดใน FastAPI
535 จากนั้นเพิ่มโค้ดด้านล่าง:from fastapi import WebSocket, status, Query
from typing import Optional
async def get_token(
websocket: WebSocket,
token: Optional[str] = Query(None),
):
if token is None or token == "":
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return token
541 เส้นทาง FastAPI มีคลาส Depends เพื่อแทรกการขึ้นต่อกันได้อย่างง่ายดาย ดังนั้นเราจึงไม่ต้องปรับแต่งการตกแต่ง ป> 551 เส้นทางไปยังสิ่งต่อไปนี้:from ..socket.utils import get_token
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
print(data)
await manager.send_personal_message(f"Response: Simulating response from the GPT service", websocket)
except WebSocketDisconnect:
manager.disconnect(websocket)
567 จุดสิ้นสุดในบุรุษไปรษณีย์ คุณจะได้รับข้อผิดพลาด 403 ระบุโทเค็นเป็นพารามิเตอร์การสืบค้นและระบุค่าใดๆ ให้กับโทเค็นในตอนนี้ จากนั้นคุณควรจะสามารถเชื่อมต่อได้เหมือนเมื่อก่อน เพียงแต่ตอนนี้การเชื่อมต่อต้องใช้โทเค็นทดสอบการแชทบุรุษไปรษณีย์ด้วยโทเค็น ป>
571ของคุณ ไฟล์ควรมีลักษณะดังนี้:import os
from fastapi import APIRouter, FastAPI, WebSocket, WebSocketDisconnect, Request, Depends, HTTPException
import uuid
from ..socket.connection import ConnectionManager
from ..socket.utils import get_token
chat = APIRouter()
manager = ConnectionManager()
# @route POST /token
# @desc Route to generate chat token
# @access Public
@chat.post("/token")
async def token_generator(name: str, request: Request):
token = str(uuid.uuid4())
if name == "":
raise HTTPException(status_code=400, detail={
"loc": "name", "msg": "Enter a valid name"})
data = {"name": name, "token": token}
return data
# @route POST /refresh_token
# @desc Route to refresh token
# @access Public
@chat.post("/refresh_token")
async def refresh_token(request: Request):
return None
# @route Websocket /chat
# @desc Socket for chatbot
# @access Public
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
print(data)
await manager.send_personal_message(f"Response: Simulating response from the GPT service", websocket)
except WebSocketDisconnect:
manager.disconnect(websocket)
วิธีสร้างระบบเรียลไทม์ด้วย Redis
Redis และคิวการส่งข้อความแบบกระจาย
582 . ป> 594 ก่อนที่จะส่งการตอบกลับแบบฮาร์ดโค้ดและส่งข้อความใหม่ จากนั้นลองเชื่อมต่อกับโทเค็นอื่นในเซสชันบุรุษไปรษณีย์ใหม่ ป> pip install aiohttp aioredis python-dotenv
วิธีเชื่อมต่อกับคลัสเตอร์ Redis ใน Python ด้วยไคลเอนต์ Redis
602 และ 618รหัส> . จากนั้นสร้างโฟลเดอร์ชื่อ 620 . นอกจากนี้ ให้สร้างโฟลเดอร์ชื่อ 630 และเพิ่มไฟล์ใหม่ชื่อ 649 . ป> 650 เพิ่มโค้ดต่อไปนี้ – และตรวจสอบให้แน่ใจว่าคุณอัปเดตฟิลด์ด้วยข้อมูลประจำตัวที่ให้ไว้ในคลัสเตอร์ Redis ของคุณexport REDIS_URL=<REDIS URL PROVIDED IN REDIS CLOUD>
export REDIS_USER=<REDIS USER IN REDIS CLOUD>
export REDIS_PASSWORD=<DATABASE PASSWORD IN REDIS CLOUD>
export REDIS_HOST=<REDIS HOST IN REDIS CLOUD>
export REDIS_PORT=<REDIS PORT IN REDIS CLOUD>
import os
from dotenv import load_dotenv
import aioredis
load_dotenv()
class Redis():
def __init__(self):
"""initialize connection """
self.REDIS_URL = os.environ['REDIS_URL']
self.REDIS_PASSWORD = os.environ['REDIS_PASSWORD']
self.REDIS_USER = os.environ['REDIS_USER']
self.connection_url = f"redis://{self.REDIS_USER}:{self.REDIS_PASSWORD}@{self.REDIS_URL}"
async def create_connection(self):
self.connection = aioredis.from_url(
self.connection_url, db=0)
return self.connection
665 เพื่อสร้างการเชื่อมต่อ Redis และส่งคืนพูลการเชื่อมต่อที่ได้รับจาก 676 วิธีการ 688รหัส> .
from src.redis.config import Redis
import asyncio
async def main():
redis = Redis()
redis = await redis.create_connection()
print(redis)
await redis.set("key", "value")
if __name__ == "__main__":
asyncio.run(main())
การทดสอบข้อมูลเชิงลึก Redis ป>
วิธีทำงานกับสตรีม Redis
693 สร้างโฟลเดอร์ชื่อ 706 และเพิ่มสองไฟล์ 716 และ 729 . ป> 730 ให้เพิ่มโค้ดด้านล่างเหมือนกับที่เราทำกับสภาพแวดล้อมของผู้ปฏิบัติงาน:import os
from dotenv import load_dotenv
import aioredis
load_dotenv()
class Redis():
def __init__(self):
"""initialize connection """
self.REDIS_URL = os.environ['REDIS_URL']
self.REDIS_PASSWORD = os.environ['REDIS_PASSWORD']
self.REDIS_USER = os.environ['REDIS_USER']
self.connection_url = f"redis://{self.REDIS_USER}:{self.REDIS_PASSWORD}@{self.REDIS_URL}"
async def create_connection(self):
self.connection = aioredis.from_url(
self.connection_url, db=0)
return self.connection
export REDIS_URL=<REDIS URL PROVIDED IN REDIS CLOUD>
export REDIS_USER=<REDIS USER IN REDIS CLOUD>
export REDIS_PASSWORD=<DATABASE PASSWORD IN REDIS CLOUD>
export REDIS_HOST=<REDIS HOST IN REDIS CLOUD>
export REDIS_PORT=<REDIS PORT IN REDIS CLOUD>
740 เพิ่มรหัสต่อไปนี้:
from .config import Redis
class Producer:
def __init__(self, redis_client):
self.redis_client = redis_client
async def add_to_stream(self, data: dict, stream_channel):
try:
msg_id = await self.redis_client.xadd(name=stream_channel, id="*", fields=data)
print(f"Message id {msg_id} added to {stream_channel} stream")
return msg_id
except Exception as e:
print(f"Error sending msg to stream => {e}")
758 ซึ่งรับข้อมูลและชื่อช่อง Redis ป> 763 และมีทั้งฟังก์ชันระดับสูงและระดับต่ำใน aioredis776 และ WebSocket 785 จุดสิ้นสุดเช่นด้านล่าง สังเกตชื่อช่องที่อัปเดต 792 .
from ..redis.producer import Producer
from ..redis.config import Redis
chat = APIRouter()
manager = ConnectionManager()
redis = Redis()
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
await manager.connect(websocket)
redis_client = await redis.create_connection()
producer = Producer(redis_client)
try:
while True:
data = await websocket.receive_text()
print(data)
stream_data = {}
stream_data[token] = data
await producer.add_to_stream(stream_data, "message_channel")
await manager.send_personal_message(f"Response: Simulating response from the GPT service", websocket)
except WebSocketDisconnect:
manager.disconnect(websocket)
808 . คุณควรพิมพ์ข้อความสตรีมไปที่เทอร์มินัลดังนี้:การทดสอบข้อความช่องสัญญาณเทอร์มินัล ป>
819 ใหม่ สร้างและคิวประทับเวลาที่เต็มไปด้วยข้อความที่ส่งจากไคลเอนต์ คิวที่ประทับเวลานี้มีความสำคัญต่อการรักษาลำดับของข้อความช่อง Redis Insight ป>
วิธีสร้างโมเดลข้อมูลแชท
823 สร้างโฟลเดอร์ใหม่ชื่อ 838 . จากนั้นสร้างไฟล์ชื่อ 849 ใน 852 เพิ่มรหัสต่อไปนี้:from datetime import datetime
from pydantic import BaseModel
from typing import List, Optional
import uuid
class Message(BaseModel):
id = uuid.uuid4()
msg: str
timestamp = str(datetime.now())
class Chat(BaseModel):
token: str
messages: List[Message]
name: str
session_start = str(datetime.now())
861 ของ Pydantic คลาสเพื่อสร้างโมเดลข้อมูลการแชท 878รหัส> class จะเก็บข้อมูลเกี่ยวกับเซสชันแชทเดียว จะจัดเก็บโทเค็น ชื่อผู้ใช้ และการประทับเวลาที่สร้างขึ้นโดยอัตโนมัติสำหรับเวลาเริ่มต้นเซสชันการแชทโดยใช้ 880 . ป> 895 คลาสที่สร้างรหัสแชทได้ทันทีโดยใช้ 905 . ข้อมูลเดียวที่เราจำเป็นต้องระบุเมื่อเริ่มต้น 910 นี้ class คือข้อความวิธีทำงานกับ Redis JSON
922 และติดตั้ง rejson ด้วย 934 . จากนั้นอัปเดต 940 ของคุณ คลาสใน 954 เพื่อรวม 960 วิธีการ:
import os
from dotenv import load_dotenv
import aioredis
from rejson import Client
load_dotenv()
class Redis():
def __init__(self):
"""initialize connection """
self.REDIS_URL = os.environ['REDIS_URL']
self.REDIS_PASSWORD = os.environ['REDIS_PASSWORD']
self.REDIS_USER = os.environ['REDIS_USER']
self.connection_url = f"redis://{self.REDIS_USER}:{self.REDIS_PASSWORD}@{self.REDIS_URL}"
self.REDIS_HOST = os.environ['REDIS_HOST']
self.REDIS_PORT = os.environ['REDIS_PORT']
async def create_connection(self):
self.connection = aioredis.from_url(
self.connection_url, db=0)
return self.connection
def create_rejson_connection(self):
self.redisJson = Client(host=self.REDIS_HOST,
port=self.REDIS_PORT, decode_responses=True, username=self.REDIS_USER, password=self.REDIS_PASSWORD)
return self.redisJson
974 วิธีเชื่อมต่อกับ Redis ด้วย rejson 988 . สิ่งนี้ทำให้เรามีวิธีในการสร้างและจัดการข้อมูล JSON ใน Redis ซึ่งไม่สามารถใช้ได้กับ aioredis993 เราสามารถอัปเดต 1008 ได้ จุดสิ้นสุดเพื่อสร้าง 1010 ใหม่ อินสแตนซ์และจัดเก็บข้อมูลเซสชันใน Redis JSON ดังนี้:@chat.post("/token")
async def token_generator(name: str, request: Request):
token = str(uuid.uuid4())
if name == "":
raise HTTPException(status_code=400, detail={
"loc": "name", "msg": "Enter a valid name"})
# Create new chat session
json_client = redis.create_rejson_connection()
chat_session = Chat(
token=token,
messages=[],
name=name
)
# Store chat session in redis JSON with the token as key
json_client.jsonset(str(token), Path.rootPath(), chat_session.dict())
# Set a timeout for redis data
redis_client = await redis.create_connection()
await redis_client.expire(str(token), 3600)
return chat_session.dict()
ตัวสร้างโทเค็นอัปเดตแล้ว ป>
วิธีอัปเดตการพึ่งพาโทเค็น
1029 การพึ่งพาใน 1032 ของเรา เว็บซ็อกเก็ต เราทำเช่นนี้เพื่อตรวจสอบโทเค็นที่ถูกต้องก่อนเริ่มเซสชันการแชท ป> 1049 อัปเดต 1053 ฟังก์ชั่นเพื่อตรวจสอบว่าโทเค็นมีอยู่ในอินสแตนซ์ Redis หรือไม่ หากเป็นเช่นนั้น เราจะส่งคืนโทเค็น ซึ่งหมายความว่าการเชื่อมต่อซ็อกเก็ตนั้นถูกต้อง หากไม่มีเราจะปิดการเชื่อมต่อ ป> 1060 จะหยุดอยู่หลังจาก 60 นาที ดังนั้นเราจึงมีตรรกะง่ายๆ ในส่วนหน้าเพื่อเปลี่ยนเส้นทางผู้ใช้เพื่อสร้างโทเค็นใหม่ หากมีการสร้างการตอบสนองข้อผิดพลาดขณะพยายามเริ่มแชท
from ..redis.config import Redis
async def get_token(
websocket: WebSocket,
token: Optional[str] = Query(None),
):
if token is None or token == "":
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
redis_client = await redis.create_connection()
isexists = await redis_client.exists(token)
if isexists == 1:
return token
else:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason="Session not authenticated or expired token")
1072 จุดสิ้นสุด (หรือสร้างคำขอใหม่) และวางเป็นค่าให้กับพารามิเตอร์การสืบค้นโทเค็นที่ต้องการโดย 1084 เว็บซ็อกเก็ต จากนั้นเชื่อมต่อ คุณควรจะเชื่อมต่อได้สำเร็จเซสชันสนทนาด้วยโทเค็น ป>
import os
from fastapi import APIRouter, FastAPI, WebSocket, WebSocketDisconnect, Request, Depends
import uuid
from ..socket.connection import ConnectionManager
from ..socket.utils import get_token
import time
from ..redis.producer import Producer
from ..redis.config import Redis
from ..schema.chat import Chat
from rejson import Path
chat = APIRouter()
manager = ConnectionManager()
redis = Redis()
# @route POST /token
# @desc Route to generate chat token
# @access Public
@chat.post("/token")
async def token_generator(name: str, request: Request):
token = str(uuid.uuid4())
if name == "":
raise HTTPException(status_code=400, detail={
"loc": "name", "msg": "Enter a valid name"})
# Create nee chat session
json_client = redis.create_rejson_connection()
chat_session = Chat(
token=token,
messages=[],
name=name
)
print(chat_session.dict())
# Store chat session in redis JSON with the token as key
json_client.jsonset(str(token), Path.rootPath(), chat_session.dict())
# Set a timeout for redis data
redis_client = await redis.create_connection()
await redis_client.expire(str(token), 3600)
return chat_session.dict()
# @route POST /refresh_token
# @desc Route to refresh token
# @access Public
@chat.post("/refresh_token")
async def refresh_token(request: Request):
return None
# @route Websocket /chat
# @desc Socket for chat bot
# @access Public
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
await manager.connect(websocket)
redis_client = await redis.create_connection()
producer = Producer(redis_client)
json_client = redis.create_rejson_connection()
try:
while True:
data = await websocket.receive_text()
stream_data = {}
stream_data[token] = data
await producer.add_to_stream(stream_data, "message_channel")
await manager.send_personal_message(f"Response: Simulating response from the GPT service", websocket)
except WebSocketDisconnect:
manager.disconnect(websocket)
วิธีเพิ่มความฉลาดให้กับ Chatbots ด้วยโมเดล AI
วิธีการเริ่มต้นใช้งาน Huggingface
วิธีการโต้ตอบกับโมเดลภาษา
export HUGGINFACE_INFERENCE_TOKEN=<HUGGINGFACE ACCESS TOKEN>
export MODEL_URL=https://api-inference.huggingface.co/models/EleutherAI/gpt-j-6B
1093 สร้างโฟลเดอร์ชื่อ 1105 จากนั้นเพิ่มไฟล์ 1118 . จากนั้นเพิ่มคลาส GPT ด้านล่าง:import os
from dotenv import load_dotenv
import requests
import json
load_dotenv()
class GPT:
def __init__(self):
self.url = os.environ.get('MODEL_URL')
self.headers = {
"Authorization": f"Bearer {os.environ.get('HUGGINFACE_INFERENCE_TOKEN')}"}
self.payload = {
"inputs": "",
"parameters": {
"return_full_text": False,
"use_cache": True,
"max_new_tokens": 25
}
}
def query(self, input: str) -> list:
self.payload["inputs"] = input
data = json.dumps(self.payload)
response = requests.request(
"POST", self.url, headers=self.headers, data=data)
print(json.loads(response.content.decode("utf-8")))
return json.loads(response.content.decode("utf-8"))
if __name__ == "__main__":
GPT().query("Will artificial intelligence help humanity conquer the universe?")
1121รหัส> คลาสเริ่มต้นด้วยโมเดล Huggingface 1133 , การรับรองความถูกต้อง 1143 และกำหนดไว้ล่วงหน้า 1155 . แต่อินพุตเพย์โหลดเป็นฟิลด์ไดนามิกที่ระบุโดย 1169 และอัปเดตก่อนที่เราจะส่งคำขอไปยังจุดสิ้นสุด Huggingface1178 และคุณควรได้รับคำตอบเช่นนี้ (เพียงจำไว้ว่าคำตอบของคุณจะแตกต่างไปจากนี้อย่างแน่นอน):[{'generated_text': ' (AI) could solve all the problems on this planet? I am of the opinion that in the short term artificial intelligence is much better than human beings, but in the long and distant future human beings will surpass artificial intelligence.\n\nIn the distant'}]
1184 คลาสเช่นนั้น:
class GPT:
def __init__(self):
self.url = os.environ.get('MODEL_URL')
self.headers = {
"Authorization": f"Bearer {os.environ.get('HUGGINFACE_INFERENCE_TOKEN')}"}
self.payload = {
"inputs": "",
"parameters": {
"return_full_text": False,
"use_cache": False,
"max_new_tokens": 25
}
}
def query(self, input: str) -> list:
self.payload["inputs"] = f"Human: {input} Bot:"
data = json.dumps(self.payload)
response = requests.request(
"POST", self.url, headers=self.headers, data=data)
data = json.loads(response.content.decode("utf-8"))
text = data[0]['generated_text']
res = str(text.split("Human:")[0]).strip("\n").strip()
return res
if __name__ == "__main__":
GPT().query("Will artificial intelligence help humanity conquer the universe?")
1194 . อินพุตของมนุษย์จะถูกวางไว้ในสตริงและบอทจะจัดเตรียมการตอบกลับ รูปแบบอินพุตนี้จะเปลี่ยน GPT-J6B ให้เป็นโมเดลการสนทนา การเปลี่ยนแปลงอื่นๆ ที่คุณอาจสังเกตเห็น ได้แก่
วิธีจำลองหน่วยความจำระยะสั้นสำหรับโมเดล AI
1201 เพื่อรับข้อมูลแชทล่าสุด จากนั้นเมื่อเราได้รับคำตอบ ให้เพิ่มการตอบกลับเข้ากับฐานข้อมูล JSON1212 เพื่อรวม 1225 วิธีการ นอกจากนี้ ให้อัปเดตไฟล์ .env ด้วยข้อมูลการตรวจสอบสิทธิ์ และตรวจสอบให้แน่ใจว่าติดตั้ง rejson แล้ว1237 ของคุณ ควรมีลักษณะเช่นนี้:
import os
from dotenv import load_dotenv
import aioredis
from rejson import Client
load_dotenv()
class Redis():
def __init__(self):
"""initialize connection """
self.REDIS_URL = os.environ['REDIS_URL']
self.REDIS_PASSWORD = os.environ['REDIS_PASSWORD']
self.REDIS_USER = os.environ['REDIS_USER']
self.connection_url = f"redis://{self.REDIS_USER}:{self.REDIS_PASSWORD}@{self.REDIS_URL}"
self.REDIS_HOST = os.environ['REDIS_HOST']
self.REDIS_PORT = os.environ['REDIS_PORT']
async def create_connection(self):
self.connection = aioredis.from_url(
self.connection_url, db=0)
return self.connection
def create_rejson_connection(self):
self.redisJson = Client(host=self.REDIS_HOST,
port=self.REDIS_PORT, decode_responses=True, username=self.REDIS_USER, password=self.REDIS_PASSWORD)
return self.redisJson
export REDIS_URL=<REDIS URL PROVIDED IN REDIS CLOUD>
export REDIS_USER=<REDIS USER IN REDIS CLOUD>
export REDIS_PASSWORD=<DATABASE PASSWORD IN REDIS CLOUD>
export REDIS_HOST=<REDIS HOST IN REDIS CLOUD>
export REDIS_PORT=<REDIS PORT IN REDIS CLOUD>
export HUGGINFACE_INFERENCE_TOKEN=<HUGGINGFACE ACCESS TOKEN>
export MODEL_URL=https://api-inference.huggingface.co/models/EleutherAI/gpt-j-6B
1245 สร้างไฟล์ใหม่ชื่อ 1251 และเพิ่มโค้ดด้านล่าง:from .config import Redis
from rejson import Path
class Cache:
def __init__(self, json_client):
self.json_client = json_client
async def get_chat_history(self, token: str):
data = self.json_client.jsonget(
str(token), Path.rootPath())
return data
1260 รับโทเค็นเพื่อรับประวัติการแชทสำหรับโทเค็นนั้นจาก Redis ตรวจสอบให้แน่ใจว่าคุณนำเข้าวัตถุ Path จาก rejson1270 ด้วยรหัสด้านล่าง:from src.redis.config import Redis
import asyncio
from src.model.gptj import GPT
from src.redis.cache import Cache
redis = Redis()
async def main():
json_client = redis.create_rejson_connection()
data = await Cache(json_client).get_chat_history(token="18196e23-763b-4808-ae84-064348a0daff")
print(data)
if __name__ == "__main__":
asyncio.run(main())
1280 และคัดลอกโทเค็น จากนั้นเรียกใช้ 1291 ในอาคารผู้โดยสาร คุณควรเห็นข้อมูลในเทอร์มินัลดังนี้:{'token': '18196e23-763b-4808-ae84-064348a0daff', 'messages': [], 'name': 'Stephen', 'session_start': '2022-07-16 13:20:01.092109'}
1308 วิธีการ 1319 ของเรา คลาสที่เพิ่มข้อความไปยัง Redis สำหรับโทเค็นเฉพาะ
async def add_message_to_cache(self, token: str, message_data: dict):
self.json_client.jsonarrappend(
str(token), Path('.messages'), message_data)
1321รหัส> วิธีการจัดทำโดย rejson จะเพิ่มข้อความใหม่ต่อท้ายอาร์เรย์ข้อความ ป> 1334 เป็นข้อโต้แย้งต่อเส้นทาง หากข้อมูลข้อความของคุณมีโครงสร้างที่แตกต่าง/ซ้อนกัน เพียงระบุเส้นทางไปยังอาร์เรย์ที่คุณต้องการผนวกข้อมูลใหม่เข้าไปasync def main():
json_client = redis.create_rejson_connection()
await Cache(json_client).add_message_to_cache(token="18196e23-763b-4808-ae84-064348a0daff", message_data={
"id": "1",
"msg": "Hello",
"timestamp": "2022-07-16 13:20:01.092109"
})
data = await Cache(json_client).get_chat_history(token="18196e23-763b-4808-ae84-064348a0daff")
print(data)
1349 ในเทอร์มินัลภายในไดเร็กทอรีของผู้ปฏิบัติงาน คุณควรได้รับข้อความลักษณะนี้พิมพ์ในเทอร์มินัล โดยมีข้อความเพิ่มลงในอาร์เรย์ข้อความ{'token': '18196e23-763b-4808-ae84-064348a0daff', 'messages': [{'id': '1', 'msg': 'Hello', 'timestamp': '2022-07-16 13:20:01.092109'}], 'name': 'Stephen', 'session_start': '2022-07-16 13:20:01.092109'}
1359 ของเรากันก่อน ทำงานด้วยอาร์กิวเมนต์ "แหล่งที่มา" ใหม่ที่จะแจ้งให้เราทราบว่าข้อความนั้นเป็นมนุษย์หรือบอท We can then use this arg to add the "Human:" or "Bot:" tags to the data before storing it in the cache.1362 method in the Cache class like so: async def add_message_to_cache(self, token: str, source: str, message_data: dict):
if source == "human":
message_data['msg'] = "Human: " + (message_data['msg'])
elif source == "bot":
message_data['msg'] = "Bot: " + (message_data['msg'])
self.json_client.jsonarrappend(
str(token), Path('.messages'), message_data)
1370 to see the new results in the Redis database.async def main():
json_client = redis.create_rejson_connection()
await Cache(json_client).add_message_to_cache(token="18196e23-763b-4808-ae84-064348a0daff", source="human", message_data={
"id": "1",
"msg": "Hello",
"timestamp": "2022-07-16 13:20:01.092109"
})
data = await Cache(json_client).get_chat_history(token="18196e23-763b-4808-ae84-064348a0daff")
print(data)
1381 , create a new folder schema. Then create a new file named 1399 and paste our message schema in chat.py like so:from datetime import datetime
from pydantic import BaseModel
from typing import List, Optional
import uuid
class Message(BaseModel):
id = str(uuid.uuid4())
msg: str
timestamp = str(datetime.now())
async def main():
json_client = redis.create_rejson_connection()
await Cache(json_client).add_message_to_cache(token="18196e23-763b-4808-ae84-064348a0daff", source="human", message_data={
"id": "3",
"msg": "I would like to go to the moon to, would you take me?",
"timestamp": "2022-07-16 13:20:01.092109"
})
data = await Cache(json_client).get_chat_history(token="18196e23-763b-4808-ae84-064348a0daff")
print(data)
message_data = data['messages'][-4:]
input = ["" + i['msg'] for i in message_data]
input = " ".join(input)
res = GPT().query(input=input)
msg = Message(
msg=res
)
print(msg)
await Cache(json_client).add_message_to_cache(token="18196e23-763b-4808-ae84-064348a0daff", source="bot", message_data=msg.dict())
1404 a couple of times, changing the human message and id as desired with each run. You should have a full conversation input and output with the model. ป> Conversational Chat ป>
Stream Consumer and Real-time Data Pull from the Message Queue
1419 to connect to the message queue. We want it to pull the token data in real-time, as we are currently hard-coding the tokens and message inputs.1425 create a new file named 1435 . Add a 1441 class with the code below:class StreamConsumer:
def __init__(self, redis_client):
self.redis_client = redis_client
async def consume_stream(self, count: int, block: int, stream_channel):
response = await self.redis_client.xread(
streams={stream_channel: '0-0'}, count=count, block=block)
return response
async def delete_message(self, stream_channel, message_id):
await self.redis_client.xdel(stream_channel, message_id)
1459 class is initialized with a Redis client. The 1463 method pulls a new message from the queue from the message channel, using the 1471 method provided by aioredis.1489 file with a while loop to keep the connection to the message channel alive, like so:
from src.redis.config import Redis
import asyncio
from src.model.gptj import GPT
from src.redis.cache import Cache
from src.redis.config import Redis
from src.redis.stream import StreamConsumer
import os
from src.schema.chat import Message
redis = Redis()
async def main():
json_client = redis.create_rejson_connection()
redis_client = await redis.create_connection()
consumer = StreamConsumer(redis_client)
cache = Cache(json_client)
print("Stream consumer started")
print("Stream waiting for new messages")
while True:
response = await consumer.consume_stream(stream_channel="message_channel", count=1, block=0)
if response:
for stream, messages in response:
# Get message from stream, and extract token, message data and message id
for message in messages:
message_id = message[0]
token = [k.decode('utf-8')
for k, v in message[1].items()][0]
message = [v.decode('utf-8')
for k, v in message[1].items()][0]
print(token)
# Create a new message instance and add to cache, specifying the source as human
msg = Message(msg=message)
await cache.add_message_to_cache(token=token, source="human", message_data=msg.dict())
# Get chat history from cache
data = await cache.get_chat_history(token=token)
# Clean message input and send to query
message_data = data['messages'][-4:]
input = ["" + i['msg'] for i in message_data]
input = " ".join(input)
res = GPT().query(input=input)
msg = Message(
msg=res
)
print(msg)
await cache.add_message_to_cache(token=token, source="bot", message_data=msg.dict())
# Delete messaage from queue after it has been processed
await consumer.delete_message(stream_channel="message_channel", message_id=message_id)
if __name__ == "__main__":
asyncio.run(main())
1492 loop so that the worker can be online listening to messages from the queue. ป> 1504 method. If we have a message in the queue, we extract the message_id, token, and message. Then we create a new instance of the Message class, add the message to the cache, and then get the last 4 messages. We set it as input to the GPT model 1511 method. ป> 1521 method, then delete the message from the queue.How to Update the Chat Client with the AI Response
1538 จุดสิ้นสุด1548 create a new file named 1554 , and add a 1565 class similar to what we had on the chat web server:
class Producer:
def __init__(self, redis_client):
self.redis_client = redis_client
async def add_to_stream(self, data: dict, stream_channel) -> bool:
msg_id = await self.redis_client.xadd(name=stream_channel, id="*", fields=data)
print(f"Message id {msg_id} added to {stream_channel} stream")
return msg_id
1578 file, update the main function to initialize the producer, create a stream data, and send the response to a 1583 using the 1597 method:from src.redis.config import Redis
import asyncio
from src.model.gptj import GPT
from src.redis.cache import Cache
from src.redis.config import Redis
from src.redis.stream import StreamConsumer
import os
from src.schema.chat import Message
from src.redis.producer import Producer
redis = Redis()
async def main():
json_client = redis.create_rejson_connection()
redis_client = await redis.create_connection()
consumer = StreamConsumer(redis_client)
cache = Cache(json_client)
producer = Producer(redis_client)
print("Stream consumer started")
print("Stream waiting for new messages")
while True:
response = await consumer.consume_stream(stream_channel="message_channel", count=1, block=0)
if response:
for stream, messages in response:
# Get message from stream, and extract token, message data and message id
for message in messages:
message_id = message[0]
token = [k.decode('utf-8')
for k, v in message[1].items()][0]
message = [v.decode('utf-8')
for k, v in message[1].items()][0]
# Create a new message instance and add to cache, specifying the source as human
msg = Message(msg=message)
await cache.add_message_to_cache(token=token, source="human", message_data=msg.dict())
# Get chat history from cache
data = await cache.get_chat_history(token=token)
# Clean message input and send to query
message_data = data['messages'][-4:]
input = ["" + i['msg'] for i in message_data]
input = " ".join(input)
res = GPT().query(input=input)
msg = Message(
msg=res
)
stream_data = {}
stream_data[str(token)] = str(msg.dict())
await producer.add_to_stream(stream_data, "response_channel")
await cache.add_message_to_cache(token=token, source="bot", message_data=msg.dict())
# Delete messaage from queue after it has been processed
await consumer.delete_message(stream_channel="message_channel", message_id=message_id)
if __name__ == "__main__":
asyncio.run(main())
1603 socket endpoint. We do this by listening to the response stream. We do not need to include a while loop here as the socket will be listening as long as the connection is open.1617 create a new file named stream.py and add our 1621 class like this:from .config import Redis
class StreamConsumer:
def __init__(self, redis_client):
self.redis_client = redis_client
async def consume_stream(self, count: int, block: int, stream_channel):
response = await self.redis_client.xread(
streams={stream_channel: '0-0'}, count=count, block=block)
return response
async def delete_message(self, stream_channel, message_id):
await self.redis_client.xdel(stream_channel, message_id)
1633 socket endpoint like so:from ..redis.stream import StreamConsumer
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
await manager.connect(websocket)
redis_client = await redis.create_connection()
producer = Producer(redis_client)
json_client = redis.create_rejson_connection()
consumer = StreamConsumer(redis_client)
try:
while True:
data = await websocket.receive_text()
stream_data = {}
stream_data[str(token)] = str(data)
await producer.add_to_stream(stream_data, "message_channel")
response = await consumer.consume_stream(stream_channel="response_channel", block=0)
print(response)
for stream, messages in response:
for message in messages:
response_token = [k.decode('utf-8')
for k, v in message[1].items()][0]
if token == response_token:
response_message = [v.decode('utf-8')
for k, v in message[1].items()][0]
print(message[0].decode('utf-8'))
print(token)
print(response_token)
await manager.send_personal_message(response_message, websocket)
await consumer.delete_message(stream_channel="response_channel", message_id=message[0].decode('utf-8'))
except WebSocketDisconnect:
manager.disconnect(websocket)
Refresh Token
1641 endpoint to get the chat history from the Redis database using our 1650 class. ป> 1669 , add a 1673 file and add the code below:
from rejson import Path
class Cache:
def __init__(self, json_client):
self.json_client = json_client
async def get_chat_history(self, token: str):
data = self.json_client.jsonget(
str(token), Path.rootPath())
return data
1682 import the 1697 class and update the 1703 endpoint to the below:
from ..redis.cache import Cache
@chat.get("/refresh_token")
async def refresh_token(request: Request, token: str):
json_client = redis.create_rejson_connection()
cache = Cache(json_client)
data = await cache.get_chat_history(token)
if data == None:
raise HTTPException(
status_code=400, detail="Session expired or does not exist")
else:
return data
1716 endpoint with any token, the endpoint will fetch the data from the Redis database. ป> How to Test the Chat with multiple Clients in Postman
Recap