Computer >> บทช่วยสอนคอมพิวเตอร์ >  >> การเขียนโปรแกรม >> Redis

สร้างแอปแชทแบบเรียลไทม์ด้วย Upstash Kafka, Redis และ Next.js

คำอธิบายโครงการ

ในบล็อกโพสต์นี้ เราจะสร้างแอปพลิเคชันการรับส่งข้อความที่อนุญาตให้ผู้ใช้สร้างไคลเอ็นต์ข้อความและห้องสนทนา นอกจากนี้ ผู้ใช้จะสามารถเข้าถึงข้อความที่ผ่านมาได้

โครงการประกอบด้วยสองหน้า หน้าแรกมีไว้สำหรับการลงทะเบียนลูกค้าโดยเฉพาะ ซึ่งคุณสามารถสร้างลูกค้าได้หลายรายด้วยชื่อที่ไม่ซ้ำใคร

สร้างแอปแชทแบบเรียลไทม์ด้วย Upstash Kafka, Redis และ Next.js

เมื่อคุณคลิกที่ชื่อผู้ใช้ของลูกค้า คุณจะถูกนำไปยังไคลเอนต์ห้องสนทนาที่เกี่ยวข้องกับผู้ใช้รายนั้น

สร้างแอปแชทแบบเรียลไทม์ด้วย Upstash Kafka, Redis และ Next.js

ตรรกะของแอปพลิเคชันแชทมีดังนี้:

ผู้ใช้สามารถสร้างไคลเอนต์ได้หลายตัวบนหน้าดัชนี โดยแต่ละไคลเอนต์มีชื่อผู้ใช้ที่ไม่ซ้ำกัน การคลิกที่ชื่อผู้ใช้ของลูกค้าจะเป็นการเปิดแท็บใหม่พร้อมกับลูกค้ารายอื่นที่มีเส้นทางเฉพาะ

ลูกค้าแต่ละรายจะเชื่อมต่อกับเซิร์ฟเวอร์ข้อความผ่านการเชื่อมต่อ WebSocket เมื่อมีการสร้างข้อความใหม่บนไคลเอนต์ ข้อความนั้นจะถูกส่งไปยังเซิร์ฟเวอร์ข้อความที่เชื่อมโยงกับไคลเอนต์นั้น

เซิร์ฟเวอร์ข้อความจะจัดการการรับส่งข้อมูลข้อความ เมื่อไคลเอนต์ส่งข้อความผ่านการเชื่อมต่อ WebSocket เซิร์ฟเวอร์จะนำข้อความนั้นไปยังนายหน้า Kafka แต่ละเซิร์ฟเวอร์ข้อความจะเรียกใช้เธรด NodeJS เพื่อจัดการข้อความขาเข้า เมื่อใช้ข้อความ ข้อความนั้นจะถูกส่งไปยังไคลเอนต์ผ่านการเชื่อมต่อ WebSocket ที่มีอยู่ หากต้องการใช้ข้อความขาเข้าทางฝั่งไคลเอ็นต์ เราจะใช้ 08 ห้องสมุด

แอปพลิเคชันจะใช้ Upstash Redis เพื่อจัดเก็บประวัติข้อความ เมื่อมีการสร้างข้อความถึง Kafka ข้อความนั้นจะคงอยู่ในฐานข้อมูล Redis ด้วย เมื่อสร้างไคลเอนต์ใหม่ ข้อความเก่าจะถูกดึงมาจาก Upstash Redis และแสดงผลในหน้าจอแชท

นี่คือภาพรวมทั่วไปของแอปพลิเคชัน:

หมายเหตุ: ในการใช้งานของเรา เราจะสร้างเซิร์ฟเวอร์ข้อความเดียวเพื่อการสาธิต โดยสามารถเพิ่มจำนวนเซิร์ฟเวอร์เพื่อรองรับการโหลดข้อความได้

สร้างแอปแชทแบบเรียลไทม์ด้วย Upstash Kafka, Redis และ Next.js

สาธิต

คุณสามารถเข้าถึงการสาธิตของแอปได้ที่นี่ แอปพลิเคชันเวอร์ชันปัจจุบันถูกปรับใช้กับ Fly

เริ่มต้นใช้งาน

ต่อไปนี้เป็นขั้นตอนในการสร้างแอปพลิเคชันแชท:

  1. การสร้างฐานข้อมูล Upstash Redis
  2. การสร้างคลัสเตอร์ Upstash Kafka
  3. การสร้างแอปพลิเคชันถัดไป (ส่วนหน้า)
  4. การสร้างเซิร์ฟเวอร์ข้อความ WebSocket
  5. การปรับใช้แอปพลิเคชันไปยัง Fly.io

การสร้างฐานข้อมูล Upstash Redis

ไปที่ Upstash Console และเข้าสู่ระบบ จากนั้นไปที่ Redis แท็บ คลิก สร้างฐานข้อมูล ปุ่ม.

สร้างแอปแชทแบบเรียลไทม์ด้วย Upstash Kafka, Redis และ Next.js

เพียงเท่านี้ Redis ของเราก็พร้อมใช้งานแล้ว! เราจะกลับมาที่คอนโซล Redis เพื่อรับข้อมูลรับรอง

การสร้างคลัสเตอร์ Upstash Kafka

ตอนนี้ เปลี่ยนไปใช้ คาฟคา Tab แล้วคลิกสร้างคลัสเตอร์ ปุ่ม พิมพ์ชื่อคลัสเตอร์และดำเนินการต่อ จากนั้น สร้างหัวข้อ Kafka และยืนยัน

สร้างแอปแชทแบบเรียลไทม์ด้วย Upstash Kafka, Redis และ Next.js

การสร้างแอปถัดไป

ขั้นแรก สร้างและนำทางไปยังโฟลเดอร์รูทของแอปพลิเคชันของคุณจากเทอร์มินัล เราจะเก็บแอปถัดไปและเซิร์ฟเวอร์ไว้ในโฟลเดอร์นี้

mkdir chat-app
cd chat-app

จากนั้นสร้างแอปถัดไปของคุณ

$ npx create-next-app@latest
 
✔ What is your project named? … next-chat-app
✔ Would you like to use TypeScript? … Yes
✔ Would you like to use ESLint? … Yes
✔ Would you like to use Tailwind CSS? … No
✔ Would you like to use `src/` directory? … No
✔ Would you like to use App Router? (recommended) … No
✔ Would you like to customize the default import alias? … No

การจัดการข้อมูลรับรอง

เราจะสร้างไฟล์ชื่อ 19 เพื่อจัดเก็บข้อมูลประจำตัว เราไม่จำเป็นต้องคัดลอกและวางข้อมูลรับรองซ้ำแล้วซ้ำอีก เราจะนำเข้าจากไฟล์นี้เท่านั้น

ขั้นแรก สร้าง 24 ไฟล์.

จากนั้นไปที่คอนโซล Redis และคัดลอก/วาง 32 และ 41 ข้อมูลประจำตัวเป็น 50 ไฟล์.

สร้างแอปแชทแบบเรียลไทม์ด้วย Upstash Kafka, Redis และ Next.js

สุดท้าย เปลี่ยนไปใช้คอนโซล Kafka และโอน 63 , 73 , 87

สร้างแอปแชทแบบเรียลไทม์ด้วย Upstash Kafka, Redis และ Next.js

ตอนนี้ 91 ของคุณ ไฟล์ควรมีลักษณะคล้ายกัน

.env
UPSTASH_REDIS_REST_URL=...
UPSTASH_REDIS_REST_TOKEN=...
 
UPSTASH_KAFKA_REST_URL=...
UPSTASH_KAFKA_REST_USERNAME=...
UPSTASH_KAFKA_REST_PASSWORD=...

ตอนนี้เราได้กำหนดค่าข้อมูลรับรองแล้ว เราอาจดำเนินการสมัครต่อ

หน้าลงทะเบียนลูกค้า

หน้าดัชนีจะมีการดำเนินการลงทะเบียน/สร้างลูกค้า เมื่อส่งชื่อผู้ใช้แล้ว ลูกค้าใหม่จะถูกสร้างขึ้นและแสดงรายการภายใต้ ลูกค้าปัจจุบัน ตาราง

หน้า/index.tsx
import { useState } from "react";
import Link from "next/link";
 
import { Redis } from "@upstash/redis";
 
import styles from "@/styles/Home.module.css";
 
export default function Home() {
 const [usernameInput, setUsernameInput] = useState<string>("");
 const [usernameList, setUsernameList] = useState<string[]>(Array<string>);
 
 const handleInputChange = (e: React.ChangeEvent<HTMLInputElement>): void => {
 const inputValue: string = e.target.value;
 setUsernameInput(inputValue);
 };
 
 const addUsernameClient = (e: React.FormEvent<HTMLFormElement>): void => {
 e.preventDefault();
 setUsernameList([...usernameList, usernameInput]);
 setUsernameInput("");
 };
 return (
 <div className={styles.container}>
 <div className={styles.welcomeSection}>
 <h1>Welcome to the demo message app!</h1>
 <p>
 This application uses Upstash Kafka for message passing, and Upstash
 Redis for state management.
 <br />
 <br />
 To get started, create several clients by typing in unique usernames to
 the input section below and submitting.
 <br />
 <br />
 The usernames will be added to the list of current clients. Click on a
 username to open a new tab with that client&apos;s message display.
 <br />
 <br />
 You can have multiple sessions open at once.
 </p>
 </div>
 <form className={styles.formSection} onSubmit={addUsernameClient}>
 <input
 type="text"
 className={styles.formInput}
 value={usernameInput}
 onChange={handleInputChange}
 ></input>
 
 <button className={styles.formSubmit} type="submit">
 Create the client!
 </button>
 </form>
 <div className={styles.clientListSection}>
 <p className={styles.clientListHeader}>Current Clients</p>
 <div className={styles.clientList}>
 {usernameList.map((username, i) => {
 return (
 <Link
 href={`/user/${username}`}
 key={`${i}`}
 className={styles.userClient}
 target={"_blank"}
 >
 <p>{username}</p>
 </Link>
 );
 })}
 </div>
 </div>
 </div>
 );
}

หากคุณต้องการรีเซ็ตประวัติการแชททุกครั้งที่โหลดแอปใหม่ คุณสามารถใช้ฟังก์ชันต่อไปนี้:

หน้า/index.tsx
export async function getServerSideProps() {
 const redis = new Redis({
 url: process.env.UPSTASH_REDIS_REST_URL,
 token: process.env.UPSTASH_REDIS_REST_TOKEN,
 });
 
 await redis.del("messagesList");
 
 return {
 props: {},
 };
}

ด้วยเหตุนี้ หน้าดัชนีจึงพร้อมใช้งาน เรียกใช้ 106 คำสั่งใน 117 โฟลเดอร์เพื่อดูหน้าดัชนีใช้งานจริง

หน้าไคลเอ็นต์ข้อความ

หากต้องการใช้การกำหนดเส้นทางแบบไดนามิกสำหรับไคลเอนต์ เราจะสร้างโฟลเดอร์ชื่อ 126 โครงสร้างโฟลเดอร์นี้จะช่วยให้เราสามารถสร้างเส้นทางแบบไดนามิกสำหรับลูกค้าแต่ละรายตามชื่อผู้ใช้ของพวกเขา

นี่คือองค์ประกอบหลักของลูกค้า ส่วนประกอบนี้จะเก็บสถานะของรายการข้อความ ชื่อผู้ใช้ ฯลฯ เรากำลังใช้ useWebSocket hook เพื่อสร้างเหตุการณ์ข้อความ การเชื่อมต่อ และการตัดการเชื่อมต่อจาก WebSocket เมื่อมีการส่งเหตุการณ์ข้อความ ข้อความจะถูกเพิ่มไปยังรายการข้อความและส่วนประกอบ MessageDisplay จะถูกเรนเดอร์อีกครั้ง

/pages/user/[ชื่อผู้ใช้].tsx
import { useState } from "react";
import { useRouter } from "next/router";
 
import { Redis } from "@upstash/redis";
import useWebSocket from "react-use-websocket";
 
import styles from "@/styles/Home.module.css";
 
type Message = {
 id: number;
 sender: string;
 text: string;
};
 
export default function MessageApp(props: { messagesData: Message[] }) {
 const { messagesData } = props;
 const { username } = useRouter().query;
 const [inputText, setInputText] = useState<string>("");
 const [messageList, setMessageList] = useState<Message[]>(messagesData);
 const [messageCounter, setMessageCounter] = useState<number>(0);
 
 const handleMessage = function (message: Message) {
 const nextMessages = [...messageList, message];
 setMessageList(nextMessages);
 };
 
 // handling WebSocket events
 const { sendMessage } = useWebSocket("ws://localhost:8080", {
 share: true,
 filter: () => false,
 onOpen: () => {
 console.log("WebSocket connection!");
 return "connection";
 },
 
 onMessage: (message) => {
 const data = JSON.parse(message.data);
 const { sender, text }: { sender: string; text: string } = data;
 const messageData: Message = {
 id: messageCounter,
 sender: sender,
 text: text,
 };
 setMessageCounter(messageCounter + 1);
 handleMessage(messageData);
 return message;
 },
 
 onClose: () => {
 console.log("WebSocket disconnected!");
 return "disconnected";
 },
 });
 
 function handleSendMessage(messageText: string) {
 const messageData = {
 sender: username,
 text: messageText,
 };
 
 sendMessage(JSON.stringify(messageData));
 }
 
 return (
 <div className={styles.Container}>
 <MessageDisplay messages={messageList} />
 <MessageInput
 inputText={inputText}
 setInputText={setInputText}
 handleSendMessage={handleSendMessage}
 />
 </div>
 );
}

นี่คือคอมโพเนนต์ MessageDisplay และ MessageInput:

/pages/user/[ชื่อผู้ใช้].tsx
const MessageDisplay = function (props: { messages: Message[] }) {
 const { messages } = props;
 
 return (
 <div className={styles.messageContainer}>
 {messages.map((message) => (
 <MessageBubble
 key={message.id}
 sender={message.sender}
 text={message.text}
 />
 ))}
 </div>
 );
};
 
const MessageInput = (props: {
 inputText: string;
 setInputText: (msg: string) => void;
 handleSendMessage: (msg: string) => void;
}) => {
 const { inputText, setInputText, handleSendMessage } = props;
 
 const handleInputChange = (
 e: React.ChangeEvent<HTMLInputElement>
 ): void => {
 const inputValue: string = e.target.value;
 setInputText(inputValue);
 };
 
 const handleSubmit = (e: React.FormEvent<HTMLFormElement>): void => {
 e.preventDefault();
 handleSendMessage(inputText);
 if (inputText.trim() !== "") {
 setInputText(" ");
 }
 };
 
 return (
 <form className={styles.inputSection} onSubmit={handleSubmit}>
 <input
 className={styles.inputText}
 type="text"
 value={inputText}
 onChange={handleInputChange}
 ></input>
 <button className={styles.inputSendButton} type="submit">
 Send
 </button>
 </form>
 );
};
 
const MessageBubble = (props: {
 sender: string;
 text: string;
 key: number;
}) => {
 const { sender, text } = props;
 
 const { username } = useRouter().query;
 
 const isSender = sender === username;
 const senderClass = isSender ? "sender" : "receiver";
 return (
 <div className={`${styles["messageBubble"]} ${styles[senderClass]}`}>
 <div className={styles.messageSender}>
 {isSender ? "You" : sender}
 </div>
 <div className={styles.messageText}>{text}</div>
 </div>
 );
};

เพื่อมอบประวัติการแชทให้กับลูกค้า เราจะใช้ 139 ฟังก์ชั่น

/pages/user/[ชื่อผู้ใช้].tsx
export async function getServerSideProps() {
 const redis = new Redis({
 url: process.env.UPSTASH_REDIS_REST_URL,
 token: process.env.UPSTASH_REDIS_REST_TOKEN,
 });
 
 const messagesData = (await redis.lrange("messagesList", 0, -1)).reverse();
 
 return {
 props: {
 messagesData,
 },
 };
}

แอป Next.js ของเราใช้งานได้แล้ว รีเฟรชเพจ สร้างไคลเอนต์ และนำทางหนึ่งในนั้น คุณจะเห็นหน้าลูกค้า แต่ถึงกระนั้น เราจำเป็นต้องมีเซิร์ฟเวอร์ข้อความเพื่อจัดการการไหลของข้อความ

การสร้างเซิร์ฟเวอร์ข้อความ

โครงสร้างของเซิร์ฟเวอร์ค่อนข้างเรียบง่าย เราจะใช้ Node.js, ws Library และ Upstash Kafka เพื่อให้มันใช้งานได้ ขั้นแรก สร้าง 149 โฟลเดอร์ภายใน 152 .

mkdir server
cd server

ภายใน 166 โฟลเดอร์ เราจะติดตั้งข้อกำหนดและกำหนดค่าไฟล์

npm install typescript ws tsc @upstash/kafka @types/ws
tsc --init

จากนั้น เราจะสร้างไคลเอ็นต์ WebSocket, Kafka Producer และ Kafka Consumer ภายใน 177 ไฟล์:

/server/message_server.ts
import * as http from "http";
 
import { Kafka } from "@upstash/kafka";
import { Redis } from "@upstash/redis";
import { WebSocket } from "ws";
 
const server = http.createServer();
const wss = new WebSocket.Server({ server });
 
server.listen(8080, () => {
 console.log("Server is running on port 8080");
});
 
const kafka = new Kafka({
 url: process.env.UPSTASH_KAFKA_REST_URL,
 username: process.env.UPSTASH_KAFKA_REST_USERNAME,
 password: process.env.UPSTASH_KAFKA_REST_PASSWORD,
});
 
const redis = new Redis({
 url: process.env.UPSTASH_REDIS_REST_URL,
 token: process.env.UPSTASH_REDIS_REST_TOKEN,
});
 
const consumer = kafka.consumer();
 
const producer = kafka.producer();
 
const clients = new Set<WebSocket>();

เพื่อโต้ตอบกับ WebSocket เรากำลังสร้าง 188 และ 190 เหตุการณ์

/server/message_server.ts
wss.on("connection", async (connection, req) => {
 clients.add(connection);
 console.log(`New client connected!`);
 
 connection.on("message", async (message) => {
 const jsonMessage = message.toString();
 
 console.log("Received message:", JSON.parse(jsonMessage));
 
 producer.produce("chat", jsonMessage);
 });
 
 connection.on("close", () => {
 console.log(`Client disconnected:`);
 clients.delete(connection);
 });
});

สุดท้ายนี้ เราจะสร้างและรันเธรดที่ใช้ข้อความตามช่วงเวลาที่กำหนดไว้ล่วงหน้า:

/server/message_server.ts
async function run() {
 while (true) {
 const messages = await consumer.consume({
 consumerGroupId: "group_1",
 instanceId: "instance_1",
 topics: ["chat"],
 autoOffsetReset: "earliest",
 });
 
 if (messages.length != 0) {
 for (let i = 0; i < messages.length; i++) {
 await redis.lpush("messagesList", messages[i].value);
 console.log(`Message sending: ${messages[i].value}`);
 
 clients.forEach((connection: WebSocket) => {
 connection.send(messages[i].value);
 });
 }
 }
 
 console.log("Run!");
 
 await new Promise((resolve) => setTimeout(resolve, 1000));
 }
}

ทุกอย่างพร้อมแล้ว แอพของเราน่าจะใช้งานได้อย่างมีเสน่ห์ในตอนนี้ หากคุณเรียกใช้เซิร์ฟเวอร์ข้อความบนเครื่องและรีเฟรชเพจไคลเอนต์ คุณจะเห็นข้อความที่ส่งระหว่างไคลเอนต์ คำสั่งด้านล่างจะแปลงไฟล์ TS และเรียกใช้เซิร์ฟเวอร์บน 209

tsc message_server.ts
node message_server.js

การปรับใช้

เราจะใช้ Fly.io สำหรับการปรับใช้ โปรดสร้างบัญชีก่อนที่เราจะเริ่มต้น หากคุณยังไม่มี

การปรับใช้เซิร์ฟเวอร์ข้อความ

ไปที่ 215 โฟลเดอร์และติดตั้ง 228 เครื่องมือ CLI และอนุญาตผ่านเชลล์

npm install flyctl
flyctl auth login

หากต้องการสร้างไฟล์การกำหนดค่า ให้เรียกใช้ 232 . สิ่งนี้จะสร้าง 245 . ไปที่ 259 และแทรกบรรทัดต่อไปนี้สำหรับการกำหนดค่าการเชื่อมต่อ WebSocket:

fly.toml
[[services]]
 internal_port = 8080
 protocol = "tcp"
 
 [services.concurrency]
 hard_limit = 25
 soft_limit = 20
 
 [[services.ports]]
 handlers = ["http"]
 port = "80"
 
 [[services.ports]]
 handlers = ["tls", "http"]
 port = "443"
 
 [[services.tcp_checks]]
 interval = 10000
 timeout = 2000

ตอนนี้เป็นขั้นตอนสุดท้ายสำหรับเซิร์ฟเวอร์ เรียกใช้ 268 และเราพร้อมแล้ว! เมื่อกระบวนการปรับใช้เสร็จสิ้น flyctl จะจัดเตรียมอุปกรณ์ปลายทางสำหรับเซิร์ฟเวอร์ของคุณ โปรดคัดลอกปลายทางนั้น ในกรณีของเรา จุดสิ้นสุดคือ 276 .

การปรับใช้แอปพลิเคชันถัดไป

ก่อนที่จะปรับใช้แอปพลิเคชัน Next.js เราจำเป็นต้องฝังจุดสิ้นสุดการปรับใช้ของเซิร์ฟเวอร์ข้อความ โปรดแทนที่ WebSocket URL ใน 282 ไฟล์จาก 294 ไปยังจุดสิ้นสุดจาก 304 รวมกับ 314 คำนำหน้า ในกรณีของเรา มันคือ 325 .

จากนั้นใน 332 โฟลเดอร์ ให้รันคำสั่งเดียวกันกับ 340 . คราวนี้เราไม่จำเป็นต้องแก้ไข 352 ไฟล์ เพื่อให้เราสามารถดำเนินการต่อไปโดยไม่มีขั้นตอนนั้นได้

flyctl init
flyctl deploy

เราทำเสร็จแล้ว! หากคุณเรียกใช้ 368 คำสั่ง คุณจะถูกนำไปยังโปรเจ็กต์ที่ปรับใช้ของคุณ

บทสรุปและข้อเสนอแนะ

ขอบคุณที่ติดตามกัน!

คุณสามารถค้นหาพื้นที่เก็บข้อมูล Github ของโปรเจ็กต์ได้ที่นี่

หากคุณต้องการทำงานในโครงการต่อไป นี่คือคำแนะนำบางส่วน:

  • ในปัจจุบัน เมื่อใดก็ตามที่โหลดหน้าซ้ำ ข้อความที่จัดเก็บไว้ใน Upstash Redis จะถูกล้างทิ้ง ลักษณะการทำงานนี้ถูกควบคุมโดยโค้ดใน 370 โดยเฉพาะภายใน 380 ฟังก์ชั่น อย่างไรก็ตาม ปัญหาสำคัญเกิดขึ้นเมื่อผู้ใช้ตัดสินใจที่จะโหลดหน้าซ้ำ ซึ่งส่งผลให้มีการลบประวัติการแชทสำหรับผู้เข้าร่วมทั้งหมดในห้องสนทนา
    เพื่อแก้ไขปัญหานี้ วิธีแก้ไขที่แนะนำคือการใช้ส่วนขยายของ TTL สำหรับประวัติการแชททุกครั้งที่ส่งข้อความ การปรับปรุงนี้จะทำให้แน่ใจได้ว่าประวัติการแชทยังคงสามารถเข้าถึงได้และเก็บรักษาไว้แม้จะโหลดหน้าซ้ำแล้วก็ตาม

  • คุณสามารถใช้คุณสมบัติห้องสนทนาได้หลายห้อง เพื่อให้บรรลุเป้าหมายนี้ คุณสามารถสร้างหัวข้อ Kafka หลายหัวข้อพร้อมชื่อเฉพาะสำหรับห้องสนทนาแต่ละห้องได้ อีกวิธีหนึ่งคือการจัดการบนเซิร์ฟเวอร์ข้อความโดยใช้โครงสร้างข้อมูลที่ถูกต้อง

  • คุณยังสามารถใช้เซิร์ฟเวอร์ข้อความหลายเซิร์ฟเวอร์และตัวจัดสรรภาระงานเพื่อใช้แนวทางปฏิบัติในการออกแบบระบบที่ดีที่สุด

หากคุณมีคำถามใดๆ คุณสามารถติดต่อเราได้ที่ fahreddin@upstash.com