Computer >> บทช่วยสอนคอมพิวเตอร์ >  >> การเขียนโปรแกรม >> Redis

สร้างคิวข้อความที่กำหนดเองด้วยรายการ Redis และ TypeScript

คุณเคยพยายามสร้างคิวข้อความของคุณเองแต่ต้องเผชิญกับความท้าทายหรือไม่ ถ้าเป็นเช่นนั้น คุณไม่ได้อยู่คนเดียว ในบทช่วยสอนนี้ เราจะสร้างคิวข้อความตั้งแต่ต้นโดยใช้รายการ Redis แม้ว่าจะมีหลายวิธีในการสร้างคิวข้อความด้วย Redis เช่น สตรีม รายการ และ pub/subs แต่เราจะเน้นที่แนวทางที่ง่ายที่สุดและตรงไปตรงมาที่สุด นั่นก็คือ รายการ เข้าร่วมกับฉันในขณะที่เราเจาะลึกคู่มือเชิงปฏิบัตินี้

สิ่งที่เราจะใช้

  • สุดยอด
  • คู่มือ

สิ่งที่คุณต้องการ

  • บุญ
  • คู่มือ

การตั้งค่า Upstash Redis

ขั้นแรก มาตั้งค่าอินสแตนซ์ Redis กัน ในการดำเนินการนี้ เพียงไปที่ Upstash และคลิกที่ สร้างฐานข้อมูล .หลังจากนั้น ให้เลื่อนลงเพื่อค้นหาสตริงการเชื่อมต่อของคุณ ซึ่งเป็นสิ่งที่เราจะใช้ในการเชื่อมต่อลูกค้าของเรา ฉันจะไม่ลงรายละเอียดที่นี่ แต่นั่นคือสิ่งที่คุณต้องเริ่มต้น

ตัวอย่างสตริงการเชื่อมต่อ:

redis://XXXXe@social-XXX-39281.upstash.io:39281

การเริ่มโครงการ

มาเริ่มโครงการ TypeScript ของเราโดยใช้ Bun ตัวเลือกไม่ได้เพียงเพราะมันเร็วกว่า Node เท่านั้น แต่ยังตั้งค่าได้ง่ายกว่ามากอีกด้วย ใช่แล้ว มันเร็วอย่างน่าประทับใจด้วย! 🚀

mkdir upstash-mq
cd upstash-mq
 
bun init
> package name (upstash-mq-tutorial): upstash-mq
> entry point (index.ts):
> Done!
 
bun add ioredis

โครงสร้างโครงการ

 ┣ 📂src
 ┃ ┣ 📂lua-scripts
 ┃ ┃ ┣ 📜add-job.lua
 ┃ ┃ ┗ 📜remove-job.lua
 ┃ ┣ 📜index.ts
 ┃ ┣ 📜job.ts
 ┃ ┣ 📜queue.ts
 ┃ ┗ 📜utils.ts
 ┣ 📜.env
 ┣ 📜.gitignore
 ┣ 📜README.md
 ┣ 📜bun.lockb
 ┣ 📜index.ts
 ┣ 📜package.json
 ┗ 📜tsconfig.json

การแสดงภาพ 09 ของเรา จะเป็นเช่น:

สร้างคิวข้อความที่กำหนดเองด้วยรายการ Redis และ TypeScript

งาน

ชั้นเรียนงานของเราต้องการสิ่งสำคัญบางประการ ขั้นแรก เราต้องติดตามสถานะของงานแต่ละงาน ซึ่งช่วยให้เราตัดสินใจว่าจะดำเนินการหรือไม่ ลองอีกครั้ง หรือย้ายงานไปที่อื่นหากเสร็จแล้ว งานแต่ละงานยังมี ID และข้อมูลบางอย่างซึ่งจำเป็นต้องเป็นข้อมูลทั่วไปเพื่อให้เราสามารถมอบประสบการณ์การใช้งานที่ยอดเยี่ยมแก่ผู้ใช้ สุดท้ายนี้ เราจำเป็นต้องเชื่อมโยงงานแต่ละงานกับคิวและรวมชื่อคิวเพื่อให้การจัดการง่าย

นี่คือแกนหลักของ 18 ของเรา คลาส:

type OwnerQueue = {
 redis: Redis;
 queueName: string;
};
export type JobStatuses =
 | "created"
 | "waiting"
 | "active"
 | "succeeded"
 | "failed";
 
export class Job<T> {
 id: string;
 status: JobStatuses;
 config: OwnerQueue;
 data: T;
 
 constructor(ownerConfig: OwnerQueue, data: T, jobId = randomUUID()) {
 this.id = jobId;
 this.status = "created";
 this.data = data;
 this.config = ownerConfig;
 }
}

เพื่อสร้าง 23 โดยทั่วไป เราต้องสร้าง 37 ก่อน ตัวเองทั่วไป ส่วนที่เหลือดำเนินไปในลักษณะตรงไปตรงมา เราสามารถสร้างอินสแตนซ์ Redis แยกต่างหากสำหรับ 42 แต่ละรายการได้ องค์ประกอบ แต่การจัดการสิ่งนี้จะซับซ้อน

โชคดีที่แนวทางของเราช่วยให้กำหนดค่าอินสแตนซ์ Redis ภายในคิวได้ง่าย และเราสามารถส่งอินสแตนซ์นี้ได้ตามต้องการ หลักการเดียวกันนี้ใช้กับ 53 . เนื่องจากเราจะใช้เพื่อบันทึกงานลงในคิวบ่อยครั้ง งานของเราต้องตระหนักถึงคิวหลักเพื่อให้สามารถบันทึกงานลงในคิวได้ เราจำเป็นต้องมีสองสิ่ง:สคริปต์ Lua เพื่อโต้ตอบกับ Redis และยูทิลิตี้บางอย่าง

มาสร้างยูทิลิตี้ของเรากันก่อน:

import { JobStatuses } from "./job";
 
const MQ_PREFIX = "UpstashMQ";
 
export const formatMessageQueueKey = (queueName: string, key: string) => {
 return `${MQ_PREFIX}:${queueName}:${key}`;
};
 
export const convertToJSONString = <T>(
 data: T,
 status: JobStatuses,
): string => {
 return JSON.stringify({
 data,
 status,
 });
};

เนื่องจากการสร้างชื่อคิวด้วยตนเองทุกครั้งที่เราใช้ Redis นั้นไม่เหมาะ เราจึงสร้างยูทิลิตี้ชื่อ 62 . ยูทิลิตี้นี้เพียงเชื่อมสตริงเข้าด้วยกัน นอกจากนี้ เราจำเป็นต้องทำให้ข้อมูลของเราเป็นอนุกรมเพื่อจัดเก็บไว้ใน Redis - เราไม่สามารถส่งออบเจ็กต์ JS เป็นแหล่งข้อมูลได้ จำเป็นต้องแปลงเป็นสตริงก่อน เนื่องจากข้อมูลนั้นเป็นข้อมูลทั่วไป เราจึงได้ใช้ฟังก์ชันทั่วไป 74 เพื่อจุดประสงค์นี้

ตอนนี้เรามาเพิ่มสคริปต์ lua แรกของเรา:

เพิ่ม-job.lua

--[[
key 1 -> [prefix]:name:jobs
key 2 -> [prefix]:name:waiting
arg 1 -> job id
arg 2 -> job data
]]
 
 
local jobId = ARGV[1]
local payload = ARGV[2]
 
if redis.call("hexists", KEYS[1], jobId) == 1 then return nil end
redis.call("hset", KEYS[1], jobId, payload)
redis.call("lpush", KEYS[2], jobId)
 
return jobId

เราสามารถดำเนินการเรียกเหล่านี้กับอินสแตนซ์ Redis ของเราทีละรายการได้ดังนี้:

  • 85
  • 98
  • 102

อย่างไรก็ตาม วิธีการนี้จะส่งผลให้มีการเรียกแยกกันสามครั้ง เพื่อลดการเดินทางไปกลับไปยังเซิร์ฟเวอร์ Redis เราตั้งเป้าที่จะรวมกระบวนการทั้งหมดไว้ในการโทรครั้งเดียว

มาเพิ่ม 112 ของเรากัน วิธีการ

 private createQueueKey(key: string) {
 return formatMessageQueueKey(this.config.queueName, key);
 }
 
 async save(): Promise<string | null> {
 const addJobToQueueScript = await Bun.file("./src/lua-scripts/add-job.lua").text();
 const resJobId = (await this.config.redis.eval(
 addJobToQueueScript,
 2,
 this.createQueueKey("jobs"),
 this.createQueueKey("waiting"),
 this.id,
 convertToJSONString(this.data, this.status)
 )) as string | null;
 
 if (resJobId) {
 this.id = resJobId;
 return resJobId;
 }
 return null;
 }

รหัสนั้นตรงไปตรงมา แต่ให้ฉันอธิบายเพิ่มเติม หลังจากสร้างสคริปต์ Lua แล้ว เราจะเรียกมันว่า 124 ซึ่งจำเป็นสำหรับการรันสคริปต์ Lua พารามิเตอร์สำหรับ 136 มีดังนี้:

  • พารามิเตอร์แรกต้องใช้สคริปต์
  • พารามิเตอร์ตัวที่สองระบุจำนวนอาร์กิวเมนต์
  • พารามิเตอร์ที่สามและสี่ใช้สำหรับคีย์
  • สุดท้ายนี้ เราผ่านการโต้แย้งที่แท้จริง

ก่อนที่เราจะจบ 145 เราจะเพิ่มวิธีการอีกสองสามวิธีสำหรับอนาคต

fromId = async <T>(jobId: string): Promise<Job<T> | null> => {
 const jobData = await this.config.redis.hget(this.createQueueKey("jobs"), jobId);
 if (jobData) {
 return this.fromData<T>(jobId, jobData);
 }
 return null;
 };
 
private fromData = <T>(jobId: string, stringifiedJobData: string): Job<T> => {
 const parsedData = JSON.parse(stringifiedJobData) as Job<T>;
 const job = new Job<T>(this.config, parsedData.data, jobId);
 job.status = parsedData.status;
 return job;
};

ขณะนี้เราอาจไม่ต้องการฟังก์ชันเหล่านี้ แต่จะมีความสำคัญเมื่อเราเริ่มดำเนินการงานในอนาคต ณ จุดนั้น เราจะมีเพียง ID ของงาน 151 เท่านั้น และเราจะต้องมีวิธีการในการสร้างงานของเราใหม่ตั้งแต่ต้น นี่คือสิ่งที่ 164 สำเร็จ โดยดึงข้อมูลงานจาก Redis แปลงเป็นอินสแตนซ์งาน และส่งคืน เพื่อให้คิวประมวลผลงานนี้ในภายหลังได้

ไปยังคิว

หลังจากกรอก 177 แล้ว ในส่วน เราจะมาเจาะลึกรายละเอียดของ Queue กัน นี่คือวัตถุประสงค์ของเรา:

  • เรามุ่งหวังให้คิวของเราเก็บรักษาหรือลบข้อมูลเมื่อสำเร็จหรือล้มเหลว เนื่องจากอาจมีบางกรณีที่จำเป็นต้องประมวลผลใหม่ในภายหลัง
  • เราตั้งใจที่จะออกแบบคิวให้ทำงานพร้อมกัน ทำให้งานหลายงานทำงานพร้อมกันได้
  • เป้าหมายของเราคือการอนุญาตให้ส่งผ่านฟังก์ชันเรียกกลับสำหรับการประมวลผลข้อมูล ฟังก์ชันนี้ควรอนุมานประเภทของงานเพื่อประสบการณ์นักพัฒนาที่ดีขึ้น
  • เราวางแผนที่จะเปิดใช้งานการโทร 181 จากภายในคิว ทำให้เราส่งอินสแตนซ์ Redis และ 195 ได้ .
  • สุดท้ายนี้ เราต้องการให้แน่ใจว่าจะสามารถทำลายคิวได้หากจำเป็น และลบงานออกจากคิวได้

มาเริ่มต้นด้วยการกำหนดคิวของเรา

export type QueueConfig = {
 redis: Redis;
 queueName: string;
 keepOnSuccess?: boolean;
 keepOnFailure?: boolean;
};
 
export class Queue extends EventEmitter {
 config: QueueConfig;
 concurrency = 0;
 worker: any;
 running = 0;
 queued = 0;
 
 constructor(config: QueueConfig) {
 super();
 this.config = {
 redis: config.redis,
 queueName: config.queueName,
 keepOnFailure: config.keepOnFailure ?? true,
 keepOnSuccess: config.keepOnSuccess ?? true,
 };
 }
 
 createQueueKey(key: string) {
 return formatMessageQueueKey(this.config.queueName, key);
 }
}

คลาส Queue มีการกำหนดค่าเพื่อยอมรับข้อมูลภายนอก เช่น ชื่อคิว อินสแตนซ์ Redis และการตั้งค่าสำหรับการเก็บรักษาหรือการลบข้อมูล เรายินดีรับการใช้งาน Redis ใดๆ ที่ผู้ใช้ต้องการ แม้ว่าเราจะมีการตั้งค่าพิเศษสำหรับ Upstash ☺ ความยืดหยุ่นนี้ทำให้ผู้ใช้สามารถรวมคิวเข้ากับระบบที่มีอยู่ได้อย่างง่ายดาย

และชั้นเรียนของเราได้ขยาย Event Emitter เพื่อแจ้งเตือนพวกเขาเมื่อมีบางอย่างเกิดขึ้น

นี่คือตัวอย่างการเริ่มต้น:

const queue = new Queue({
 redis: new Redis(process.env.UPSTASH_REDIS_URL),
 queueName: "upstash-rocks",
 keepOnFailure: true,
 keepOnSuccess: true,
});

เพิ่ม

async add<T>(payload: T) {
 return new Job<T>(this.config, payload).save();
 }

งานของเรารับรายละเอียดการกำหนดค่าของคิวพาเรนต์และเพย์โหลด ซึ่งเป็นข้อมูลที่จะจัดเก็บไว้ใน Redis จากนั้นเราก็บันทึกมัน

ตอนนี้เราทำได้:

const queue = new Queue({
 redis: new Redis(process.env.UPSTASH_REDIS_URL!),
 queueName: "mytest-queue",
 keepOnFailure: true,
 keepOnSuccess: true,
});
 
const payload = {
 upstash: "best-redis-ever",
};
 
await queue.add(payload);

ตอนนี้ เราต้องการวิธีประมวลผล

กำลังประมวลผล

นี่เป็นส่วนที่ท้าทายที่สุดในการดำเนินการคิวของเรา เรากำหนดให้ผู้ใช้ระบุจำนวนกระบวนการที่เกิดขึ้นพร้อมกัน และจัดเตรียมผู้ปฏิบัติงาน ฟังก์ชันเรียกกลับเพื่อประมวลผลงานที่จะอนุมานประเภทของงาน นอกจากนี้ เรายังจำเป็นต้องมีกลไกในการติดตามจำนวนงานที่กำลังดำเนินอยู่และอยู่ในคิว ทำให้เราสามารถเลือกงานถัดไปจากคิวได้อย่างปลอดภัย

 async process<TJobPayload>(
 worker: (job: TJobPayload) => void,
 concurrency: number
 ): Promise<void> {
 this.concurrency = concurrency;
 this.worker = worker;
 this.running = 0;
 this.queued = 1;
 
 this.jobTick();
 }

วัตถุประสงค์หลักของการยอมรับ 204 ทั่วไป คือการปรับปรุงประสบการณ์ของนักพัฒนาซอฟต์แวร์ให้กับผู้ใช้ของเรา เรามุ่งมั่นที่จะทำให้พวกเขาได้รับประโยชน์จากระบบอัจฉริยะเมื่อใช้คิวของเรา ผู้ใช้ทราบดีว่าพวกเขาได้เก็บข้อมูลเช่น 212 ในงาน แต่ TypeScript ต้องการความช่วยเหลือในการมอบ Intellisense ที่แม่นยำ นี่คือเหตุผลที่เรามีกลไกนี้เพื่อแจ้ง TypeScript และบังคับให้อนุมานเพื่อประสบการณ์ของนักพัฒนาที่ดีขึ้น

ก่อนที่จะก้าวไปสู่ 228 เรามาพิจารณากระบวนการนี้อย่างรอบคอบ:

  • เนื่องจากคิวของเราดำเนินการแบบ FIFO (เข้าก่อนออกก่อน) เราจึงต้องเริ่มต้นด้วยการเปิดงานจากด้านขวาของคิว
  • ต่อไป เราจะเรียกใช้ฟังก์ชันผู้ปฏิบัติงานของเราในงานนี้
  • หลังจากงานเสร็จสิ้น เราจะแจ้งผลลัพธ์ให้กับผู้ใช้ของเรา
  • สุดท้ายนี้ เราเรียก 230 อีกครั้งเพื่อประมวลผลงานถัดไป

ดังนั้น 240 จะประกอบด้วยส่วนสำคัญสามส่วนนี้"

private jobTick() {
 this.getNextJob()
 .then(async (jobId) => {
 this.running += 1;
 this.queued -= 1;
 if (this.running + this.queued < this.concurrency) {
 this.queued += 1;
 setImmediate(this.jobTick);
 }
 
 if (!jobId) {
 return;
 }
 
 const jobCreatedById = await new Job(this.config, null).fromId(jobId);
 if (jobCreatedById) {
 await this.executeJob(jobCreatedById);
 } else {
 console.error(`Job not found with ID: ${jobId}`);
 }
 })
 .catch((error) => {
 console.error("Error in jobTick:", error);
 })
 .finally(() => {
 setImmediate(() => this.jobTick());
 });
 }

ฉันจะพยายามอธิบายฟังก์ชันตามฟังก์ชัน เริ่มต้นด้วย 259

 private async getNextJob() {
 try {
 const jobId = await this.config.redis.brpoplpush(
 this.createQueueKey("waiting"),
 this.createQueueKey("active"),
 0
 );
 return jobId;
 } catch (error) {
 console.error("Error fetching the next job:", error);
 throw error;
 }
 }
 

เราเพียงแค่โทรหา Redis แต่ด้วยแนวทางเชิงกลยุทธ์:เราใช้การโทรแบบบล็อกรวมกับ 260 เพื่อลดจำนวนรอบการเดินทาง การใช้การบล็อคการโทรนั้นเป็นไปโดยเจตนา เราต้องการป้องกันไม่ให้พนักงานคนอื่นประมวลผลงานเดียวกันพร้อมกัน เพื่อหลีกเลี่ยงสภาพการแข่งขัน นอกจากนี้ เรายังย้ายงานจากสถานะ 'กำลังรอ' เป็น 'ใช้งานอยู่' เพื่อเตรียมงานให้พร้อมสำหรับขั้นตอนต่อไปในกระบวนการนี้อย่างมีประสิทธิภาพ

this.getNextJob().then(async (jobId) => {
 this.running += 1;
 this.queued -= 1;
 if (this.running + this.queued < this.concurrency) {
 this.queued += 1;
 setImmediate(this.jobTick);
 }
 
 if (!jobId) {
 return;
 }
 
 const jobCreatedById = await new Job(this.config, null).fromId(jobId);
 if (jobCreatedById) {
 await this.executeJob(jobCreatedById);
 } else {
 console.error(`Job not found with ID: ${jobId}`);
 }
});

ตอนนี้เรามี 277 แล้ว เราเพิ่มจำนวนงานที่กำลังทำงานอยู่หนึ่งงานและลดจำนวนที่อยู่ในคิวลงหนึ่งรายการ นอกจากนี้ เรายังพยายามเริ่มต้นงานใหม่ให้มากที่สุดเท่าที่จะเป็นไปได้โดยคำนึงถึงขีดจำกัดการทำงานพร้อมกัน:

if (this.running + this.queued < this.concurrency) {
 this.queued += 1;
 setImmediate(this.jobTick);
}

หากทุกอย่างเป็นไปด้วยดี เราจะสร้าง 286 ของเราต่อไป ใช้ 297 .เมื่อเราสร้างงานของเราขึ้นมาใหม่ตาม ID ของมันสำเร็จแล้ว เราก็ดำเนินการงานต่อไปด้วยฟังก์ชันผู้ปฏิบัติงานของเรา

ไปที่ 303 กันดีกว่า

private async executeJob<TJobPayload>(jobCreatedById: Job<TJobPayload>) {
 let hasError = false;
 try {
 await this.worker(jobCreatedById.data);
 this.running -= 1;
 this.queued += 1;
 } catch (error) {
 hasError = true;
 } finally {
 const [jobStatus, job] = await this.finishJob<TJobPayload>(jobCreatedById, hasError);
 this.emit(jobStatus, job.id);
 return;
 }
 }

ตอนนี้เรามีข้อมูลของงานแล้ว เราจะส่งข้อมูลนี้ไปยัง 316 ของเรา . หากดำเนินการสำเร็จ เราจะเพิ่มจำนวนงานที่อยู่ในคิวหนึ่งรายการ และลดจำนวนงานที่รันอยู่ลงหนึ่งรายการ ขั้นตอนนี้มีความสำคัญ หากไม่ได้รับการจัดการอย่างถูกต้อง อาจส่งผลต่อความสามารถของเราในการเปิดตัวงานใหม่ไปพร้อมๆ กัน ในกรณีที่เกิดข้อผิดพลาดระหว่างการทำงานของพนักงาน เราเพียงแค่เปลี่ยน 326 ธง สุดท้ายนี้ เราเรียก 339 ด้วย 349 ของเรา และ 350 ตั้งค่าสถานะและปล่อยสถานะด้วย 368 .

หมายเหตุด้านข้าง:ขณะนี้ผู้ใช้สามารถฟังการอัปเดตที่ปล่อยออกมาเช่นนี้ได้

queue.on("succeeded", (jobId) => console.log("Succeeded jobId", jobId));

ไปที่ 373 กันดีกว่า

private async finishJob<TJobPayload>(
 job: Job<TJobPayload>,
 hasFailed?: boolean
 ): Promise<[JobStatuses, Job<TJobPayload>]> {
 const multi = this.config.redis.multi();
 
 multi.lrem(this.createQueueKey("active"), 0, job.id);
 
 if (hasFailed) {
 if (this.config.keepOnFailure) {
 multi.hset(this.createQueueKey("jobs"), job.id, convertToJSONString(job.data, job.status));
 multi.sadd(this.createQueueKey("failed"), job.id);
 } else {
 multi.hdel(this.createQueueKey("jobs"), job.id);
 }
 job.status = "failed";
 } else {
 if (this.config.keepOnSuccess) {
 multi.hset(this.createQueueKey("jobs"), job.id, convertToJSONString(job.data, job.status));
 multi.sadd(this.createQueueKey("succeeded"), job.id);
 } else {
 multi.hdel(this.createQueueKey("jobs"), job.id);
 }
 job.status = "succeeded";
 }
 
 await multi.exec();
 return [job.status, job];
 }

สิ่งสำคัญที่นี่คือการใช้ 386 เนื่องจากเป้าหมายของเราคือการลดการเดินทางไปกลับให้น้อยที่สุดโดยใช้ 395 Redis เลื่อนการดำเนินการออกไปจนกว่าเราจะเรียก 402 .หากผู้ใช้ตั้งค่า 418 และ 429 เพื่อรักษาข้อมูล เราจะสร้างสองชุด:ชุดหนึ่งมีข้อมูลของงาน และอีกชุดหนึ่งมีรายการรหัสงานสำหรับการเข้าถึงข้อมูลนี้ วิธีการนี้ใช้ได้กับทั้งสถานการณ์ความสำเร็จและความล้มเหลว โดยปกติแล้วเราจะปรับสถานะของงานให้เหมาะสม สุดท้าย เรารันคำสั่งหลายคำสั่งด้วย 433 และส่งคืนสถานะของงานและตัวงานเองเพื่อวัตถุประสงค์ในการปล่อยเหตุการณ์

สุดท้ายนี้ เรามีวิธีที่เหลืออีกสองวิธี ซึ่งฉันจะไม่ลงรายละเอียดอย่างกว้างขวาง เนื่องจากเป็นวิธีที่ใช้แนวคิดที่เราคุ้นเคยอยู่แล้ว

 async removeJob(jobId: string) {
 const addJobToQueueScript = await Bun.file("./src/lua-scripts/remove-job.lua").text();
 return await this.config.redis.eval(
 addJobToQueueScript,
 5,
 this.createQueueKey("succeeded"),
 this.createQueueKey("failed"),
 this.createQueueKey("waiting"),
 this.createQueueKey("active"),
 this.createQueueKey("jobs"),
 jobId
 );
 }
 
 async destroy() {
 const args = ["id", "jobs", "waiting", "active", "succeeded", "failed"].map((key) =>
 this.createQueueKey(key)
 );
 const res = await this.config.redis.del(...args);
 return res;
 }

remove-job.lua

--[[
key 1 -> [prefix]:test:succeeded
key 2 -> [prefix]:test:failed
key 3 -> [prefix]:test:waiting
key 4 -> [prefix]:test:active
key 5 -> [prefix]:test:jobs
arg 1 -> jobId
]]
 
local jobId = ARGV[1]
 
if (redis.call("sismember", KEYS[1], jobId) + redis.call("sismember", KEYS[2], jobId)) == 0 then
 redis.call("lrem", KEYS[3], 0, jobId)
 redis.call("lrem", KEYS[4], 0, jobId)
end
 
redis.call("srem", KEYS[1], jobId)
redis.call("srem", KEYS[2], jobId)
redis.call("hdel", KEYS[5], jobId)
 

448 คือการล้างคิวทั้งหมดอย่างสมบูรณ์ และอีกอย่างคือการลบงานเฉพาะออกจากคิว

มาดูการทำงานทุกอย่างกัน

import { sleep } from "bun";
import Redis from "ioredis";
 
import { Queue } from "./queue";
 
type Payload = {
 id: number;
 data: string;
};
 
const queue = new Queue({
 redis: new Redis(process.env.UPSTASH_REDIS_URL),
 queueName: "mytest-queue",
});
 
async function main() {
 await generateQueueItems(queue, 20);
 console.log("Sleep starting for 5 sec");
 await sleep(5000);
 
 queue.on("succeeded", (jobId) => console.log("Succeeded jobId", jobId));
 await queue.process<Payload>((job) => {
 console.log("Processing job:", job.data);
 sleep(1000);
 }, 3);
}
 
main();
 
async function generateQueueItems(queue: Queue, itemCount: number) {
 for (let i = 0; i < itemCount; i++) {
 const payload = {
 id: i,
 data: `dummy-data-${i}`,
 // Add more properties as needed for your testing
 };
 const jobId = await queue.add(payload);
 console.log(`Added item ${i} with jobId: ${jobId}`);
 }
}

การท้าทายโบนัส

  • ใช้ตรรกะการลองใหม่ด้วย Exponential Backoffs สำหรับทั้งการเข้าถึง Redis และกระบวนการของผู้ปฏิบัติงาน
  • พัฒนากลไกการรับประกัน 'อย่างน้อยหนึ่งครั้ง'
  • พยายามดำเนินการผู้ปฏิบัติงานใน Service Workers เพื่อประสิทธิภาพที่ดีขึ้น
  • เพิ่มงานที่กำหนดเวลาไว้

สรุป

วิธีที่ดีที่สุดในการเรียนรู้บางสิ่งคือการสร้างมันขึ้นมา และแนวทางที่ดียิ่งขึ้นคือการใช้ Upstash Redis โยกต่อไป

🔗 ที่อยู่ Github ของโครงการ