คุณเคยพยายามสร้างคิวข้อความของคุณเองแต่ต้องเผชิญกับความท้าทายหรือไม่ ถ้าเป็นเช่นนั้น คุณไม่ได้อยู่คนเดียว ในบทช่วยสอนนี้ เราจะสร้างคิวข้อความตั้งแต่ต้นโดยใช้รายการ Redis แม้ว่าจะมีหลายวิธีในการสร้างคิวข้อความด้วย Redis เช่น สตรีม รายการ และ pub/subs แต่เราจะเน้นที่แนวทางที่ง่ายที่สุดและตรงไปตรงมาที่สุด นั่นก็คือ รายการ เข้าร่วมกับฉันในขณะที่เราเจาะลึกคู่มือเชิงปฏิบัตินี้
สิ่งที่เราจะใช้
- สุดยอด
- คู่มือ
สิ่งที่คุณต้องการ
- บุญ
- คู่มือ
การตั้งค่า Upstash Redis
ขั้นแรก มาตั้งค่าอินสแตนซ์ Redis กัน ในการดำเนินการนี้ เพียงไปที่ Upstash และคลิกที่ สร้างฐานข้อมูล .หลังจากนั้น ให้เลื่อนลงเพื่อค้นหาสตริงการเชื่อมต่อของคุณ ซึ่งเป็นสิ่งที่เราจะใช้ในการเชื่อมต่อลูกค้าของเรา ฉันจะไม่ลงรายละเอียดที่นี่ แต่นั่นคือสิ่งที่คุณต้องเริ่มต้น
ตัวอย่างสตริงการเชื่อมต่อ:
redis://XXXXe@social-XXX-39281.upstash.io:39281 การเริ่มโครงการ
มาเริ่มโครงการ TypeScript ของเราโดยใช้ Bun ตัวเลือกไม่ได้เพียงเพราะมันเร็วกว่า Node เท่านั้น แต่ยังตั้งค่าได้ง่ายกว่ามากอีกด้วย ใช่แล้ว มันเร็วอย่างน่าประทับใจด้วย! 🚀
mkdir upstash-mq
cd upstash-mq
bun init
> package name (upstash-mq-tutorial): upstash-mq
> entry point (index.ts):
> Done!
bun add ioredis โครงสร้างโครงการ
┣ 📂src
┃ ┣ 📂lua-scripts
┃ ┃ ┣ 📜add-job.lua
┃ ┃ ┗ 📜remove-job.lua
┃ ┣ 📜index.ts
┃ ┣ 📜job.ts
┃ ┣ 📜queue.ts
┃ ┗ 📜utils.ts
┣ 📜.env
┣ 📜.gitignore
┣ 📜README.md
┣ 📜bun.lockb
┣ 📜index.ts
┣ 📜package.json
┗ 📜tsconfig.json
การแสดงภาพ 09 ของเรา จะเป็นเช่น:

งาน
ชั้นเรียนงานของเราต้องการสิ่งสำคัญบางประการ ขั้นแรก เราต้องติดตามสถานะของงานแต่ละงาน ซึ่งช่วยให้เราตัดสินใจว่าจะดำเนินการหรือไม่ ลองอีกครั้ง หรือย้ายงานไปที่อื่นหากเสร็จแล้ว งานแต่ละงานยังมี ID และข้อมูลบางอย่างซึ่งจำเป็นต้องเป็นข้อมูลทั่วไปเพื่อให้เราสามารถมอบประสบการณ์การใช้งานที่ยอดเยี่ยมแก่ผู้ใช้ สุดท้ายนี้ เราจำเป็นต้องเชื่อมโยงงานแต่ละงานกับคิวและรวมชื่อคิวเพื่อให้การจัดการง่าย
นี่คือแกนหลักของ 18 ของเรา คลาส:
type OwnerQueue = {
redis: Redis;
queueName: string;
};
export type JobStatuses =
| "created"
| "waiting"
| "active"
| "succeeded"
| "failed";
export class Job<T> {
id: string;
status: JobStatuses;
config: OwnerQueue;
data: T;
constructor(ownerConfig: OwnerQueue, data: T, jobId = randomUUID()) {
this.id = jobId;
this.status = "created";
this.data = data;
this.config = ownerConfig;
}
}
เพื่อสร้าง 23 โดยทั่วไป เราต้องสร้าง 37 ก่อน ตัวเองทั่วไป ส่วนที่เหลือดำเนินไปในลักษณะตรงไปตรงมา เราสามารถสร้างอินสแตนซ์ Redis แยกต่างหากสำหรับ 42 แต่ละรายการได้ องค์ประกอบ แต่การจัดการสิ่งนี้จะซับซ้อน
โชคดีที่แนวทางของเราช่วยให้กำหนดค่าอินสแตนซ์ Redis ภายในคิวได้ง่าย และเราสามารถส่งอินสแตนซ์นี้ได้ตามต้องการ หลักการเดียวกันนี้ใช้กับ 53 . เนื่องจากเราจะใช้เพื่อบันทึกงานลงในคิวบ่อยครั้ง งานของเราต้องตระหนักถึงคิวหลักเพื่อให้สามารถบันทึกงานลงในคิวได้ เราจำเป็นต้องมีสองสิ่ง:สคริปต์ Lua เพื่อโต้ตอบกับ Redis และยูทิลิตี้บางอย่าง
มาสร้างยูทิลิตี้ของเรากันก่อน:
import { JobStatuses } from "./job";
const MQ_PREFIX = "UpstashMQ";
export const formatMessageQueueKey = (queueName: string, key: string) => {
return `${MQ_PREFIX}:${queueName}:${key}`;
};
export const convertToJSONString = <T>(
data: T,
status: JobStatuses,
): string => {
return JSON.stringify({
data,
status,
});
};
เนื่องจากการสร้างชื่อคิวด้วยตนเองทุกครั้งที่เราใช้ Redis นั้นไม่เหมาะ เราจึงสร้างยูทิลิตี้ชื่อ 62 . ยูทิลิตี้นี้เพียงเชื่อมสตริงเข้าด้วยกัน นอกจากนี้ เราจำเป็นต้องทำให้ข้อมูลของเราเป็นอนุกรมเพื่อจัดเก็บไว้ใน Redis - เราไม่สามารถส่งออบเจ็กต์ JS เป็นแหล่งข้อมูลได้ จำเป็นต้องแปลงเป็นสตริงก่อน เนื่องจากข้อมูลนั้นเป็นข้อมูลทั่วไป เราจึงได้ใช้ฟังก์ชันทั่วไป 74 เพื่อจุดประสงค์นี้
ตอนนี้เรามาเพิ่มสคริปต์ lua แรกของเรา:
เพิ่ม-job.lua
--[[
key 1 -> [prefix]:name:jobs
key 2 -> [prefix]:name:waiting
arg 1 -> job id
arg 2 -> job data
]]
local jobId = ARGV[1]
local payload = ARGV[2]
if redis.call("hexists", KEYS[1], jobId) == 1 then return nil end
redis.call("hset", KEYS[1], jobId, payload)
redis.call("lpush", KEYS[2], jobId)
return jobId เราสามารถดำเนินการเรียกเหล่านี้กับอินสแตนซ์ Redis ของเราทีละรายการได้ดังนี้:
85รหัส>98รหัส>102รหัส>
อย่างไรก็ตาม วิธีการนี้จะส่งผลให้มีการเรียกแยกกันสามครั้ง เพื่อลดการเดินทางไปกลับไปยังเซิร์ฟเวอร์ Redis เราตั้งเป้าที่จะรวมกระบวนการทั้งหมดไว้ในการโทรครั้งเดียว
มาเพิ่ม 112 ของเรากัน วิธีการ
private createQueueKey(key: string) {
return formatMessageQueueKey(this.config.queueName, key);
}
async save(): Promise<string | null> {
const addJobToQueueScript = await Bun.file("./src/lua-scripts/add-job.lua").text();
const resJobId = (await this.config.redis.eval(
addJobToQueueScript,
2,
this.createQueueKey("jobs"),
this.createQueueKey("waiting"),
this.id,
convertToJSONString(this.data, this.status)
)) as string | null;
if (resJobId) {
this.id = resJobId;
return resJobId;
}
return null;
}
รหัสนั้นตรงไปตรงมา แต่ให้ฉันอธิบายเพิ่มเติม หลังจากสร้างสคริปต์ Lua แล้ว เราจะเรียกมันว่า 124 ซึ่งจำเป็นสำหรับการรันสคริปต์ Lua พารามิเตอร์สำหรับ 136 มีดังนี้:
- พารามิเตอร์แรกต้องใช้สคริปต์
- พารามิเตอร์ตัวที่สองระบุจำนวนอาร์กิวเมนต์
- พารามิเตอร์ที่สามและสี่ใช้สำหรับคีย์
- สุดท้ายนี้ เราผ่านการโต้แย้งที่แท้จริง
ก่อนที่เราจะจบ 145 เราจะเพิ่มวิธีการอีกสองสามวิธีสำหรับอนาคต
fromId = async <T>(jobId: string): Promise<Job<T> | null> => {
const jobData = await this.config.redis.hget(this.createQueueKey("jobs"), jobId);
if (jobData) {
return this.fromData<T>(jobId, jobData);
}
return null;
};
private fromData = <T>(jobId: string, stringifiedJobData: string): Job<T> => {
const parsedData = JSON.parse(stringifiedJobData) as Job<T>;
const job = new Job<T>(this.config, parsedData.data, jobId);
job.status = parsedData.status;
return job;
};
ขณะนี้เราอาจไม่ต้องการฟังก์ชันเหล่านี้ แต่จะมีความสำคัญเมื่อเราเริ่มดำเนินการงานในอนาคต ณ จุดนั้น เราจะมีเพียง ID ของงาน 151 เท่านั้น และเราจะต้องมีวิธีการในการสร้างงานของเราใหม่ตั้งแต่ต้น นี่คือสิ่งที่ 164 สำเร็จ โดยดึงข้อมูลงานจาก Redis แปลงเป็นอินสแตนซ์งาน และส่งคืน เพื่อให้คิวประมวลผลงานนี้ในภายหลังได้
ไปยังคิว
หลังจากกรอก 177 แล้ว ในส่วน เราจะมาเจาะลึกรายละเอียดของ Queue กัน นี่คือวัตถุประสงค์ของเรา:
- เรามุ่งหวังให้คิวของเราเก็บรักษาหรือลบข้อมูลเมื่อสำเร็จหรือล้มเหลว เนื่องจากอาจมีบางกรณีที่จำเป็นต้องประมวลผลใหม่ในภายหลัง
- เราตั้งใจที่จะออกแบบคิวให้ทำงานพร้อมกัน ทำให้งานหลายงานทำงานพร้อมกันได้
- เป้าหมายของเราคือการอนุญาตให้ส่งผ่านฟังก์ชันเรียกกลับสำหรับการประมวลผลข้อมูล ฟังก์ชันนี้ควรอนุมานประเภทของงานเพื่อประสบการณ์นักพัฒนาที่ดีขึ้น
- เราวางแผนที่จะเปิดใช้งานการโทร
181จากภายในคิว ทำให้เราส่งอินสแตนซ์ Redis และ195ได้ . - สุดท้ายนี้ เราต้องการให้แน่ใจว่าจะสามารถทำลายคิวได้หากจำเป็น และลบงานออกจากคิวได้
มาเริ่มต้นด้วยการกำหนดคิวของเรา
export type QueueConfig = {
redis: Redis;
queueName: string;
keepOnSuccess?: boolean;
keepOnFailure?: boolean;
};
export class Queue extends EventEmitter {
config: QueueConfig;
concurrency = 0;
worker: any;
running = 0;
queued = 0;
constructor(config: QueueConfig) {
super();
this.config = {
redis: config.redis,
queueName: config.queueName,
keepOnFailure: config.keepOnFailure ?? true,
keepOnSuccess: config.keepOnSuccess ?? true,
};
}
createQueueKey(key: string) {
return formatMessageQueueKey(this.config.queueName, key);
}
} คลาส Queue มีการกำหนดค่าเพื่อยอมรับข้อมูลภายนอก เช่น ชื่อคิว อินสแตนซ์ Redis และการตั้งค่าสำหรับการเก็บรักษาหรือการลบข้อมูล เรายินดีรับการใช้งาน Redis ใดๆ ที่ผู้ใช้ต้องการ แม้ว่าเราจะมีการตั้งค่าพิเศษสำหรับ Upstash ☺ ความยืดหยุ่นนี้ทำให้ผู้ใช้สามารถรวมคิวเข้ากับระบบที่มีอยู่ได้อย่างง่ายดาย
และชั้นเรียนของเราได้ขยาย Event Emitter เพื่อแจ้งเตือนพวกเขาเมื่อมีบางอย่างเกิดขึ้น
นี่คือตัวอย่างการเริ่มต้น:
const queue = new Queue({
redis: new Redis(process.env.UPSTASH_REDIS_URL),
queueName: "upstash-rocks",
keepOnFailure: true,
keepOnSuccess: true,
}); เพิ่ม
async add<T>(payload: T) {
return new Job<T>(this.config, payload).save();
} งานของเรารับรายละเอียดการกำหนดค่าของคิวพาเรนต์และเพย์โหลด ซึ่งเป็นข้อมูลที่จะจัดเก็บไว้ใน Redis จากนั้นเราก็บันทึกมัน
ตอนนี้เราทำได้:
const queue = new Queue({
redis: new Redis(process.env.UPSTASH_REDIS_URL!),
queueName: "mytest-queue",
keepOnFailure: true,
keepOnSuccess: true,
});
const payload = {
upstash: "best-redis-ever",
};
await queue.add(payload); ตอนนี้ เราต้องการวิธีประมวลผล
กำลังประมวลผล
นี่เป็นส่วนที่ท้าทายที่สุดในการดำเนินการคิวของเรา เรากำหนดให้ผู้ใช้ระบุจำนวนกระบวนการที่เกิดขึ้นพร้อมกัน และจัดเตรียมผู้ปฏิบัติงาน ฟังก์ชันเรียกกลับเพื่อประมวลผลงานที่จะอนุมานประเภทของงาน นอกจากนี้ เรายังจำเป็นต้องมีกลไกในการติดตามจำนวนงานที่กำลังดำเนินอยู่และอยู่ในคิว ทำให้เราสามารถเลือกงานถัดไปจากคิวได้อย่างปลอดภัย
async process<TJobPayload>(
worker: (job: TJobPayload) => void,
concurrency: number
): Promise<void> {
this.concurrency = concurrency;
this.worker = worker;
this.running = 0;
this.queued = 1;
this.jobTick();
}
วัตถุประสงค์หลักของการยอมรับ 204 ทั่วไป คือการปรับปรุงประสบการณ์ของนักพัฒนาซอฟต์แวร์ให้กับผู้ใช้ของเรา เรามุ่งมั่นที่จะทำให้พวกเขาได้รับประโยชน์จากระบบอัจฉริยะเมื่อใช้คิวของเรา ผู้ใช้ทราบดีว่าพวกเขาได้เก็บข้อมูลเช่น 212 ในงาน แต่ TypeScript ต้องการความช่วยเหลือในการมอบ Intellisense ที่แม่นยำ นี่คือเหตุผลที่เรามีกลไกนี้เพื่อแจ้ง TypeScript และบังคับให้อนุมานเพื่อประสบการณ์ของนักพัฒนาที่ดีขึ้น
ก่อนที่จะก้าวไปสู่ 228 เรามาพิจารณากระบวนการนี้อย่างรอบคอบ:
- เนื่องจากคิวของเราดำเนินการแบบ FIFO (เข้าก่อนออกก่อน) เราจึงต้องเริ่มต้นด้วยการเปิดงานจากด้านขวาของคิว
- ต่อไป เราจะเรียกใช้ฟังก์ชันผู้ปฏิบัติงานของเราในงานนี้
- หลังจากงานเสร็จสิ้น เราจะแจ้งผลลัพธ์ให้กับผู้ใช้ของเรา
- สุดท้ายนี้ เราเรียก
230อีกครั้งเพื่อประมวลผลงานถัดไป
ดังนั้น 240 จะประกอบด้วยส่วนสำคัญสามส่วนนี้"
private jobTick() {
this.getNextJob()
.then(async (jobId) => {
this.running += 1;
this.queued -= 1;
if (this.running + this.queued < this.concurrency) {
this.queued += 1;
setImmediate(this.jobTick);
}
if (!jobId) {
return;
}
const jobCreatedById = await new Job(this.config, null).fromId(jobId);
if (jobCreatedById) {
await this.executeJob(jobCreatedById);
} else {
console.error(`Job not found with ID: ${jobId}`);
}
})
.catch((error) => {
console.error("Error in jobTick:", error);
})
.finally(() => {
setImmediate(() => this.jobTick());
});
}
ฉันจะพยายามอธิบายฟังก์ชันตามฟังก์ชัน เริ่มต้นด้วย 259 ป>
private async getNextJob() {
try {
const jobId = await this.config.redis.brpoplpush(
this.createQueueKey("waiting"),
this.createQueueKey("active"),
0
);
return jobId;
} catch (error) {
console.error("Error fetching the next job:", error);
throw error;
}
}
เราเพียงแค่โทรหา Redis แต่ด้วยแนวทางเชิงกลยุทธ์:เราใช้การโทรแบบบล็อกรวมกับ 260 เพื่อลดจำนวนรอบการเดินทาง การใช้การบล็อคการโทรนั้นเป็นไปโดยเจตนา เราต้องการป้องกันไม่ให้พนักงานคนอื่นประมวลผลงานเดียวกันพร้อมกัน เพื่อหลีกเลี่ยงสภาพการแข่งขัน นอกจากนี้ เรายังย้ายงานจากสถานะ 'กำลังรอ' เป็น 'ใช้งานอยู่' เพื่อเตรียมงานให้พร้อมสำหรับขั้นตอนต่อไปในกระบวนการนี้อย่างมีประสิทธิภาพ
this.getNextJob().then(async (jobId) => {
this.running += 1;
this.queued -= 1;
if (this.running + this.queued < this.concurrency) {
this.queued += 1;
setImmediate(this.jobTick);
}
if (!jobId) {
return;
}
const jobCreatedById = await new Job(this.config, null).fromId(jobId);
if (jobCreatedById) {
await this.executeJob(jobCreatedById);
} else {
console.error(`Job not found with ID: ${jobId}`);
}
});
ตอนนี้เรามี 277 แล้ว เราเพิ่มจำนวนงานที่กำลังทำงานอยู่หนึ่งงานและลดจำนวนที่อยู่ในคิวลงหนึ่งรายการ นอกจากนี้ เรายังพยายามเริ่มต้นงานใหม่ให้มากที่สุดเท่าที่จะเป็นไปได้โดยคำนึงถึงขีดจำกัดการทำงานพร้อมกัน:
if (this.running + this.queued < this.concurrency) {
this.queued += 1;
setImmediate(this.jobTick);
}
หากทุกอย่างเป็นไปด้วยดี เราจะสร้าง 286 ของเราต่อไป ใช้ 297 .เมื่อเราสร้างงานของเราขึ้นมาใหม่ตาม ID ของมันสำเร็จแล้ว เราก็ดำเนินการงานต่อไปด้วยฟังก์ชันผู้ปฏิบัติงานของเรา
ไปที่ 303 กันดีกว่า ป>
private async executeJob<TJobPayload>(jobCreatedById: Job<TJobPayload>) {
let hasError = false;
try {
await this.worker(jobCreatedById.data);
this.running -= 1;
this.queued += 1;
} catch (error) {
hasError = true;
} finally {
const [jobStatus, job] = await this.finishJob<TJobPayload>(jobCreatedById, hasError);
this.emit(jobStatus, job.id);
return;
}
}
ตอนนี้เรามีข้อมูลของงานแล้ว เราจะส่งข้อมูลนี้ไปยัง 316 ของเรา . หากดำเนินการสำเร็จ เราจะเพิ่มจำนวนงานที่อยู่ในคิวหนึ่งรายการ และลดจำนวนงานที่รันอยู่ลงหนึ่งรายการ ขั้นตอนนี้มีความสำคัญ หากไม่ได้รับการจัดการอย่างถูกต้อง อาจส่งผลต่อความสามารถของเราในการเปิดตัวงานใหม่ไปพร้อมๆ กัน ในกรณีที่เกิดข้อผิดพลาดระหว่างการทำงานของพนักงาน เราเพียงแค่เปลี่ยน 326 ธง สุดท้ายนี้ เราเรียก 339 ด้วย 349 ของเรา และ 350 ตั้งค่าสถานะและปล่อยสถานะด้วย 368 .
หมายเหตุด้านข้าง:ขณะนี้ผู้ใช้สามารถฟังการอัปเดตที่ปล่อยออกมาเช่นนี้ได้
queue.on("succeeded", (jobId) => console.log("Succeeded jobId", jobId));
ไปที่ 373 กันดีกว่า ป>
private async finishJob<TJobPayload>(
job: Job<TJobPayload>,
hasFailed?: boolean
): Promise<[JobStatuses, Job<TJobPayload>]> {
const multi = this.config.redis.multi();
multi.lrem(this.createQueueKey("active"), 0, job.id);
if (hasFailed) {
if (this.config.keepOnFailure) {
multi.hset(this.createQueueKey("jobs"), job.id, convertToJSONString(job.data, job.status));
multi.sadd(this.createQueueKey("failed"), job.id);
} else {
multi.hdel(this.createQueueKey("jobs"), job.id);
}
job.status = "failed";
} else {
if (this.config.keepOnSuccess) {
multi.hset(this.createQueueKey("jobs"), job.id, convertToJSONString(job.data, job.status));
multi.sadd(this.createQueueKey("succeeded"), job.id);
} else {
multi.hdel(this.createQueueKey("jobs"), job.id);
}
job.status = "succeeded";
}
await multi.exec();
return [job.status, job];
}
สิ่งสำคัญที่นี่คือการใช้ 386 เนื่องจากเป้าหมายของเราคือการลดการเดินทางไปกลับให้น้อยที่สุดโดยใช้ 395 Redis เลื่อนการดำเนินการออกไปจนกว่าเราจะเรียก 402 .หากผู้ใช้ตั้งค่า 418 และ 429รหัส> เพื่อรักษาข้อมูล เราจะสร้างสองชุด:ชุดหนึ่งมีข้อมูลของงาน และอีกชุดหนึ่งมีรายการรหัสงานสำหรับการเข้าถึงข้อมูลนี้ วิธีการนี้ใช้ได้กับทั้งสถานการณ์ความสำเร็จและความล้มเหลว โดยปกติแล้วเราจะปรับสถานะของงานให้เหมาะสม สุดท้าย เรารันคำสั่งหลายคำสั่งด้วย 433 และส่งคืนสถานะของงานและตัวงานเองเพื่อวัตถุประสงค์ในการปล่อยเหตุการณ์
สุดท้ายนี้ เรามีวิธีที่เหลืออีกสองวิธี ซึ่งฉันจะไม่ลงรายละเอียดอย่างกว้างขวาง เนื่องจากเป็นวิธีที่ใช้แนวคิดที่เราคุ้นเคยอยู่แล้ว
async removeJob(jobId: string) {
const addJobToQueueScript = await Bun.file("./src/lua-scripts/remove-job.lua").text();
return await this.config.redis.eval(
addJobToQueueScript,
5,
this.createQueueKey("succeeded"),
this.createQueueKey("failed"),
this.createQueueKey("waiting"),
this.createQueueKey("active"),
this.createQueueKey("jobs"),
jobId
);
}
async destroy() {
const args = ["id", "jobs", "waiting", "active", "succeeded", "failed"].map((key) =>
this.createQueueKey(key)
);
const res = await this.config.redis.del(...args);
return res;
} remove-job.lua
--[[
key 1 -> [prefix]:test:succeeded
key 2 -> [prefix]:test:failed
key 3 -> [prefix]:test:waiting
key 4 -> [prefix]:test:active
key 5 -> [prefix]:test:jobs
arg 1 -> jobId
]]
local jobId = ARGV[1]
if (redis.call("sismember", KEYS[1], jobId) + redis.call("sismember", KEYS[2], jobId)) == 0 then
redis.call("lrem", KEYS[3], 0, jobId)
redis.call("lrem", KEYS[4], 0, jobId)
end
redis.call("srem", KEYS[1], jobId)
redis.call("srem", KEYS[2], jobId)
redis.call("hdel", KEYS[5], jobId)
448รหัส> คือการล้างคิวทั้งหมดอย่างสมบูรณ์ และอีกอย่างคือการลบงานเฉพาะออกจากคิว
มาดูการทำงานทุกอย่างกัน
import { sleep } from "bun";
import Redis from "ioredis";
import { Queue } from "./queue";
type Payload = {
id: number;
data: string;
};
const queue = new Queue({
redis: new Redis(process.env.UPSTASH_REDIS_URL),
queueName: "mytest-queue",
});
async function main() {
await generateQueueItems(queue, 20);
console.log("Sleep starting for 5 sec");
await sleep(5000);
queue.on("succeeded", (jobId) => console.log("Succeeded jobId", jobId));
await queue.process<Payload>((job) => {
console.log("Processing job:", job.data);
sleep(1000);
}, 3);
}
main();
async function generateQueueItems(queue: Queue, itemCount: number) {
for (let i = 0; i < itemCount; i++) {
const payload = {
id: i,
data: `dummy-data-${i}`,
// Add more properties as needed for your testing
};
const jobId = await queue.add(payload);
console.log(`Added item ${i} with jobId: ${jobId}`);
}
} การท้าทายโบนัส
- ใช้ตรรกะการลองใหม่ด้วย Exponential Backoffs สำหรับทั้งการเข้าถึง Redis และกระบวนการของผู้ปฏิบัติงาน
- พัฒนากลไกการรับประกัน 'อย่างน้อยหนึ่งครั้ง'
- พยายามดำเนินการผู้ปฏิบัติงานใน Service Workers เพื่อประสิทธิภาพที่ดีขึ้น
- เพิ่มงานที่กำหนดเวลาไว้
สรุป
วิธีที่ดีที่สุดในการเรียนรู้บางสิ่งคือการสร้างมันขึ้นมา และแนวทางที่ดียิ่งขึ้นคือการใช้ Upstash Redis โยกต่อไป
🔗 ที่อยู่ Github ของโครงการ