ในโพสต์ของวันนี้ เราจะใช้ระบบประมวลผลพื้นหลังที่ไร้เดียงสาเพื่อความสนุก! เราอาจเรียนรู้บางสิ่งไปพร้อมกันโดยมองเข้าไปในภายในของระบบประมวลผลเบื้องหลังยอดนิยม เช่น Sidekiq ผลิตภัณฑ์ของความสนุกนี้ไม่ได้มีไว้สำหรับการใช้งานจริง
สมมติว่าเรามีงานในแอปพลิเคชันของเราที่โหลดเว็บไซต์หนึ่งหรือหลายเว็บไซต์และแยกชื่อเว็บไซต์ เนื่องจากเราไม่มีอิทธิพลใดๆ ต่อประสิทธิภาพของเว็บไซต์เหล่านี้ เราจึงต้องการทำงานนอกเธรดหลักของเรา (หรือคำขอปัจจุบัน—หากเรากำลังสร้างเว็บแอปพลิเคชัน) แต่ทำงานในเบื้องหลัง
การห่อหุ้มงาน
ก่อนที่เราจะเข้าสู่การประมวลผลเบื้องหลัง เรามาสร้างวัตถุบริการเพื่อทำงานในมือก่อน เราจะใช้ OpenURI และ Nokogiri เพื่อแยกเนื้อหาของแท็กชื่อ
require 'open-uri'
require 'nokogiri'
class TitleExtractorService
def call(url)
document = Nokogiri::HTML(open(url))
title = document.css('html > head > title').first.content
puts title.gsub(/[[:space:]]+/, ' ').strip
rescue
puts "Unable to find a title for #{url}"
end
end
การเรียกใช้บริการจะพิมพ์ชื่อของ URL ที่กำหนด
TitleExtractorService.new.call('https://appsignal.com')
# AppSignal: Application Performance Monitoring for Ruby on Rails and Elixir
ใช้งานได้ตามที่คาดไว้ แต่มาดูกันว่าเราสามารถปรับปรุงไวยากรณ์เล็กน้อยเพื่อให้ดูและรู้สึกเหมือนกับระบบประมวลผลพื้นหลังอื่นๆ ได้หรือไม่ ด้วยการสร้าง Magique::Worker
โมดูล เราสามารถเพิ่มน้ำตาลวากยสัมพันธ์ให้กับวัตถุบริการได้
module Magique
module Worker
def self.included(base)
base.extend(ClassMethods)
end
module ClassMethods
def perform_now(*args)
new.perform(*args)
end
end
def perform(*)
raise NotImplementedError
end
end
end
โมดูลเพิ่ม perform
วิธีการอินสแตนซ์ของผู้ปฏิบัติงานและ perform_now
วิธีไปยังคลาสผู้ปฏิบัติงานเพื่อให้การเรียกใช้ดีขึ้นเล็กน้อย
มารวมโมดูลไว้ในวัตถุบริการของเรา ขณะที่เราดำเนินการอยู่ เรามาเปลี่ยนชื่อเป็น TitleExtractorWorker
กัน และเปลี่ยน call
วิธีการ perform
.
class TitleExtractorWorker
include Magique::Worker
def perform(url)
document = Nokogiri::HTML(open(url))
title = document.css('html > head > title').first.content
puts title.gsub(/[[:space:]]+/, ' ').strip
rescue
puts "Unable to find a title for #{url}"
end
end
การวิงวอนยังคงมีผลลัพธ์เหมือนเดิม แต่ชัดเจนขึ้นเล็กน้อยว่าเกิดอะไรขึ้น
TitleExtractorWorker.perform_now('https://appsignal.com')
# AppSignal: Application Performance Monitoring for Ruby on Rails and Elixir
การนำการประมวลผลแบบอะซิงโครนัสไปใช้
ตอนนี้เราแยกชื่อเรื่องได้แล้ว เราก็สามารถดึงชื่อทั้งหมดจากบทความ Ruby Magic ที่ผ่านมาได้ ในการทำเช่นนี้ สมมติว่าเรามี RUBYMAGIC
คงที่พร้อมรายการ URL ทั้งหมดของบทความที่ผ่านมา
RUBYMAGIC.each do |url|
TitleExtractorWorker.perform_now(url)
end
# Unraveling Classes, Instances and Metaclasses in Ruby | AppSignal Blog
# Bindings and Lexical Scope in Ruby | AppSignal Blog
# Building a Ruby C Extension From Scratch | AppSignal Blog
# Closures in Ruby: Blocks, Procs and Lambdas | AppSignal Blog
# ...
เราได้รับชื่อบทความที่ผ่านมา แต่ต้องใช้เวลาสักครู่ในการดึงข้อมูลทั้งหมด นั่นเป็นเพราะเรารอจนกว่าคำขอแต่ละรายการจะเสร็จสิ้นก่อนที่จะดำเนินการต่อไป
มาปรับปรุงกันด้วยการแนะนำ perform_async
วิธีการโมดูลผู้ปฏิบัติงานของเรา เพื่อเพิ่มความเร็ว จะสร้างเธรดใหม่สำหรับแต่ละ URL
module Magique
module Worker
module ClassMethods
def perform_async(*args)
Thread.new { new.perform(*args) }
end
end
end
end
หลังจากเปลี่ยนการเรียกใช้เป็น TitleExtractorWorker.perform_async(url)
, เราได้รับชื่อทั้งหมดเกือบจะในครั้งเดียว อย่างไรก็ตาม นี่ยังหมายความว่าเรากำลังเปิดมากกว่า 20 การเชื่อมต่อไปยังบล็อก Ruby Magic ในครั้งเดียว (ขออภัยที่ทำให้บล็อกของคุณยุ่งเหยิง! 😅)
หากคุณกำลังติดตามการใช้งานของคุณเองและทดสอบสิ่งนี้นอกกระบวนการที่ใช้เวลานาน (เช่น เว็บเซิร์ฟเวอร์) อย่าลืมเพิ่มบางอย่างเช่น loop { sleep 1 }
ที่ส่วนท้ายของสคริปต์เพื่อให้แน่ใจว่ากระบวนการจะไม่ยุติในทันที
การจัดคิวงาน
ด้วยวิธีการสร้างเธรดใหม่สำหรับทุกการร้องขอ ในที่สุดเราจะถึงขีดจำกัดทรัพยากร (ทั้งทางฝั่งของเราและบนเว็บไซต์ที่เรากำลังเข้าถึง) เมื่อเราอยากเป็นพลเมืองดี เรามาเปลี่ยนการใช้งานเป็นสิ่งที่ไม่พร้อมกันแต่ไม่รู้สึกเหมือนเป็นการโจมตีแบบปฏิเสธการให้บริการ
วิธีทั่วไปในการแก้ปัญหานี้คือการใช้รูปแบบผู้ผลิต/ผู้บริโภค ผู้ผลิตอย่างน้อยหนึ่งรายพุชงานไปยังคิวในขณะที่ผู้บริโภคหนึ่งรายหรือมากกว่าใช้งานจากคิวและประมวลผล
คิวนั้นเป็นรายการขององค์ประกอบ ในทางทฤษฎี อาร์เรย์แบบธรรมดาจะทำงานได้ดี อย่างไรก็ตาม ในขณะที่เรากำลังจัดการกับการทำงานพร้อมกัน เราจำเป็นต้องตรวจสอบให้แน่ใจว่าผู้ผลิตหรือผู้บริโภครายเดียวเท่านั้นที่สามารถเข้าถึงคิวได้ในแต่ละครั้ง หากเราไม่ระวังเรื่องนี้ สิ่งต่างๆ จะจบลงด้วยความโกลาหล เช่นเดียวกับคนสองคนที่พยายามจะเปิดประตูพร้อมกัน
ปัญหานี้เรียกว่าปัญหาของผู้ผลิต-ผู้บริโภค และมีวิธีแก้ไขหลายวิธี โชคดีที่เป็นปัญหาที่พบบ่อยมากและ Ruby จัดส่ง Queue
ที่เหมาะสม การใช้งานที่เราสามารถใช้ได้โดยไม่ต้องกังวลกับการซิงโครไนซ์เธรด
ในการใช้งาน ให้ตรวจสอบให้แน่ใจว่าทั้งผู้ผลิตและผู้บริโภคสามารถเข้าถึงคิวได้ เราทำได้โดยเพิ่มวิธีการเรียนใน Magique
โมดูลและกำหนดอินสแตนซ์ของ Queue
ไปเลย
module Magique
def self.backend
@backend
end
def self.backend=(backend)
@backend = backend
end
end
Magique.backend = Queue.new
ต่อไป เราเปลี่ยน perform_async
การนำไปใช้เพื่อผลักดันงานไปยังคิวแทนที่จะสร้างเธรดใหม่ของตัวเอง งานจะแสดงเป็นแฮชรวมถึงการอ้างอิงถึงคลาสผู้ปฏิบัติงานตลอดจนอาร์กิวเมนต์ที่ส่งไปยัง perform_async
วิธีการ
module Magique
module Worker
module ClassMethods
def perform_async(*args)
Magique.backend.push(worker: self, args: args)
end
end
end
end
ด้วยเหตุนี้เราจึงทำสิ่งต่างๆ ด้านโปรดิวเซอร์เสร็จแล้ว ต่อไปมาดูฝั่งผู้บริโภคกัน
ผู้บริโภคแต่ละรายเป็นเธรดที่แยกจากกันที่รับงานจากคิวและดำเนินการ แทนที่จะหยุดหลังจากงานหนึ่ง เช่น เธรด ผู้ใช้บริการรับงานอื่นจากคิวและดำเนินการ เป็นต้น นี่คือการใช้งานพื้นฐานของผู้บริโภคที่เรียกว่า Magique::Processor
. โปรเซสเซอร์แต่ละตัวสร้างเธรดใหม่ที่วนซ้ำไม่รู้จบ สำหรับการทำซ้ำทุกครั้ง มันพยายามดึงงานใหม่จากคิว สร้างอินสแตนซ์ใหม่ของคลาสผู้ปฏิบัติงาน และเรียก perform
วิธีการด้วยอาร์กิวเมนต์ที่กำหนด
module Magique
class Processor
def self.start(concurrency = 1)
concurrency.times { |n| new("Processor #{n}") }
end
def initialize(name)
thread = Thread.new do
loop do
payload = Magique.backend.pop
worker_class = payload[:worker]
worker_class.new.perform(*payload[:args])
end
end
thread.name = name
end
end
end
นอกจากการวนรอบการประมวลผลแล้ว เรายังเพิ่มวิธีการอำนวยความสะดวกที่เรียกว่า Magique::Processor.start
. สิ่งนี้ทำให้เราสามารถหมุนโปรเซสเซอร์หลายตัวพร้อมกันได้ แม้ว่าการตั้งชื่อชุดข้อความจะไม่จำเป็นจริงๆ แต่จะช่วยให้เราทราบได้ว่าสิ่งต่างๆ ทำงานตามที่คาดไว้จริงหรือไม่
มาปรับผลลัพธ์ของ TitleExtractorWorker
. ของเรา เพื่อใส่ชื่อกระทู้ปัจจุบัน
puts "[#{Thread.current.name}] #{title.gsub(/[[:space:]]+/, ' ').strip}"
ในการทดสอบการตั้งค่าการประมวลผลเบื้องหลัง ขั้นแรกเราต้องสร้างชุดตัวประมวลผลก่อนที่จะจัดคิวงานของเรา
Magique.backend = Queue.new
Magique::Processor.start(5)
RUBYMAGIC.each do |url|
TitleExtractorWorker.perform_async(url)
end
# [Processor 3] Bindings and Lexical Scope in Ruby | AppSignal Blog
# [Processor 4] Building a Ruby C Extension From Scratch | AppSignal Blog
# [Processor 1] Unraveling Classes, Instances and Metaclasses in Ruby | AppSignal Blog
# [Processor 0] Ruby's Hidden Gems, StringScanner | AppSignal Blog
# [Processor 2] Fibers and Enumerators in Ruby: Turning Blocks Inside Out | AppSignal Blog
# [Processor 4] Closures in Ruby: Blocks, Procs and Lambdas | AppSignal Blog
# ...
เมื่อดำเนินการนี้ เรายังคงได้รับชื่อบทความทั้งหมด แม้ว่าจะไม่เร็วเท่ากับการใช้เธรดแยกกันสำหรับงานทุกอย่าง แต่ก็ยังเร็วกว่าการใช้งานครั้งแรกที่ไม่มีการประมวลผลเบื้องหลัง ด้วยชื่อโปรเซสเซอร์ที่เพิ่มเข้ามา เรายังสามารถยืนยันได้ว่าโปรเซสเซอร์ทั้งหมดทำงานผ่านคิว การปรับจำนวนโปรเซสเซอร์ที่ทำงานพร้อมกันทำให้สามารถค้นหาจุดสมดุลระหว่างความเร็วในการประมวลผลกับข้อจำกัดของทรัพยากรที่มีอยู่ได้
การขยายไปสู่หลายกระบวนการและเครื่องจักร
จนถึงตอนนี้ การนำระบบประมวลผลพื้นหลังของเราไปใช้ในปัจจุบันก็ทำงานได้ดีพอตัว มันยัง จำกัด อยู่ที่กระบวนการเดียวกัน งานที่ต้องใช้ทรัพยากรมากจะยังคงส่งผลต่อประสิทธิภาพของกระบวนการทั้งหมด ในขั้นตอนสุดท้าย มาดูการกระจายภาระงานในหลายกระบวนการและอาจถึงหลายเครื่อง
คิวเป็นเพียงการเชื่อมต่อระหว่างผู้ผลิตและผู้บริโภค ตอนนี้กำลังใช้การใช้งานในหน่วยความจำ มาหาแรงบันดาลใจเพิ่มเติมจาก Sidekiq และใช้คิวโดยใช้ Redis
Redis รองรับรายการที่ช่วยให้เราพุชและดึงงานได้ นอกจากนี้ Redis Ruby gem ยังปลอดภัยต่อเธรด และคำสั่ง Redis เพื่อแก้ไขรายการนั้นเป็นแบบปรมาณู คุณสมบัติเหล่านี้ทำให้สามารถใช้กับระบบประมวลผลพื้นหลังแบบอะซิงโครนัสของเราได้โดยไม่มีปัญหาในการซิงโครไนซ์
มาสร้างคิวสำรอง Redis ที่ใช้ push
และ shift
วิธีการเช่นเดียวกับ Queue
ที่เราใช้ก่อนหน้านี้
require 'json'
require 'redis'
module Magique
module Backend
class Redis
def initialize(connection = ::Redis.new)
@connection = connection
end
def push(job)
@connection.lpush('magique:queue', JSON.dump(job))
end
def shift
_queue, job = @connection.brpop('magique:queue')
payload = JSON.parse(job, symbolize_names: true)
payload[:worker] = Object.const_get(payload[:worker])
payload
end
end
end
end
เนื่องจาก Redis ไม่รู้อะไรเลยเกี่ยวกับออบเจ็กต์ Ruby เราจึงต้องทำให้งานของเราเป็น JSON ก่อนจัดเก็บไว้ในฐานข้อมูลโดยใช้ lpush
คำสั่งที่เพิ่มองค์ประกอบที่ด้านหน้าของรายการ
ในการดึงงานจากคิว เราใช้ brpop
คำสั่งซึ่งรับองค์ประกอบสุดท้ายจากรายการ หากรายการว่างเปล่า รายการจะถูกบล็อกจนกว่าจะมีองค์ประกอบใหม่ นี่เป็นวิธีที่ดีในการหยุดโปรเซสเซอร์ของเราชั่วคราวเมื่อไม่มีงานใดๆ สุดท้าย หลังจากได้รับงานจาก Redis แล้ว เราต้องค้นหาคลาส Ruby จริงตามชื่อของผู้ปฏิบัติงานโดยใช้ Object.const_get
.
ในขั้นตอนสุดท้าย เรามาแยกสิ่งต่าง ๆ ออกเป็นหลายกระบวนการ ในด้านผู้ผลิต สิ่งเดียวที่เราต้องทำคือเปลี่ยนแบ็กเอนด์เป็นคิว Redis ที่นำมาใช้ใหม่
# ...
Magique.backend = Magique::Backend::Redis.new
RUBYMAGIC.each do |url|
TitleExtractorWorker.perform_async(url)
end
ในด้านของผู้บริโภค เราสามารถหลีกเลี่ยงได้ดังนี้:
# ...
Magique.backend = Magique::Backend::Redis.new
Magique::Processor.start(5)
loop { sleep 1 }
เมื่อดำเนินการแล้ว กระบวนการของผู้บริโภคจะรอให้งานใหม่มาถึงคิว เมื่อเราเริ่มกระบวนการผลิตที่พุชงานเข้าคิว เราจะเห็นว่างานเหล่านั้นได้รับการประมวลผลทันที
สนุกอย่างมีความรับผิดชอบและอย่าใช้สิ่งนี้ในการผลิต
แม้ว่าเราจะรักษามันให้ห่างไกลจากการตั้งค่าในโลกแห่งความเป็นจริงที่คุณจะใช้ในการผลิต (อย่าทำเช่นนั้น!) เราได้ดำเนินการสองสามขั้นตอนในการสร้างตัวประมวลผลพื้นหลัง เราเริ่มต้นด้วยการทำให้กระบวนการทำงานเป็นบริการพื้นหลัง จากนั้นเราก็ทำให้มัน async และใช้ Queue
เพื่อแก้ปัญหาผู้ผลิต-ผู้บริโภค จากนั้นเราขยายกระบวนการเป็นหลายกระบวนการหรือหลายเครื่องโดยใช้ Redis แทนที่จะเป็นการใช้งานในหน่วยความจำ
ดังที่ได้กล่าวไว้ก่อนหน้านี้ นี่เป็นการนำระบบการประมวลผลเบื้องหลังไปใช้อย่างง่าย มีหลายสิ่งที่ขาดหายไปและไม่ได้จัดการอย่างชัดเจน ซึ่งรวมถึง (แต่ไม่จำกัดเพียง) การจัดการข้อผิดพลาด หลายคิว การตั้งเวลา การรวมการเชื่อมต่อ และการจัดการสัญญาณ
อย่างไรก็ตาม เราสนุกกับการเขียนสิ่งนี้และหวังว่าคุณจะสนุกกับการดูระบบประมวลผลพื้นหลัง บางทีคุณอาจจะเอาของไปสักหนึ่งหรือสองอย่าง