Computer >> คอมพิวเตอร์ >  >> การเขียนโปรแกรม >> Ruby

การเรียนรู้โดยการสร้าง ระบบประมวลผลเบื้องหลังใน Ruby

ในโพสต์ของวันนี้ เราจะใช้ระบบประมวลผลพื้นหลังที่ไร้เดียงสาเพื่อความสนุก! เราอาจเรียนรู้บางสิ่งไปพร้อมกันโดยมองเข้าไปในภายในของระบบประมวลผลเบื้องหลังยอดนิยม เช่น Sidekiq ผลิตภัณฑ์ของความสนุกนี้ไม่ได้มีไว้สำหรับการใช้งานจริง

สมมติว่าเรามีงานในแอปพลิเคชันของเราที่โหลดเว็บไซต์หนึ่งหรือหลายเว็บไซต์และแยกชื่อเว็บไซต์ เนื่องจากเราไม่มีอิทธิพลใดๆ ต่อประสิทธิภาพของเว็บไซต์เหล่านี้ เราจึงต้องการทำงานนอกเธรดหลักของเรา (หรือคำขอปัจจุบัน—หากเรากำลังสร้างเว็บแอปพลิเคชัน) แต่ทำงานในเบื้องหลัง

การห่อหุ้มงาน

ก่อนที่เราจะเข้าสู่การประมวลผลเบื้องหลัง เรามาสร้างวัตถุบริการเพื่อทำงานในมือก่อน เราจะใช้ OpenURI และ Nokogiri เพื่อแยกเนื้อหาของแท็กชื่อ

require 'open-uri'
require 'nokogiri'
 
class TitleExtractorService
  def call(url)
    document = Nokogiri::HTML(open(url))
    title = document.css('html > head > title').first.content
    puts title.gsub(/[[:space:]]+/, ' ').strip
  rescue
    puts "Unable to find a title for #{url}"
  end
end

การเรียกใช้บริการจะพิมพ์ชื่อของ URL ที่กำหนด

TitleExtractorService.new.call('https://appsignal.com')
# AppSignal: Application Performance Monitoring for Ruby on Rails and Elixir

ใช้งานได้ตามที่คาดไว้ แต่มาดูกันว่าเราสามารถปรับปรุงไวยากรณ์เล็กน้อยเพื่อให้ดูและรู้สึกเหมือนกับระบบประมวลผลพื้นหลังอื่นๆ ได้หรือไม่ ด้วยการสร้าง Magique::Worker โมดูล เราสามารถเพิ่มน้ำตาลวากยสัมพันธ์ให้กับวัตถุบริการได้

module Magique
  module Worker
    def self.included(base)
      base.extend(ClassMethods)
    end
 
    module ClassMethods
      def perform_now(*args)
        new.perform(*args)
      end
    end
 
    def perform(*)
      raise NotImplementedError
    end
  end
end

โมดูลเพิ่ม perform วิธีการอินสแตนซ์ของผู้ปฏิบัติงานและ perform_now วิธีไปยังคลาสผู้ปฏิบัติงานเพื่อให้การเรียกใช้ดีขึ้นเล็กน้อย

มารวมโมดูลไว้ในวัตถุบริการของเรา ขณะที่เราดำเนินการอยู่ เรามาเปลี่ยนชื่อเป็น TitleExtractorWorker กัน และเปลี่ยน call วิธีการ perform .

class TitleExtractorWorker
  include Magique::Worker
 
  def perform(url)
    document = Nokogiri::HTML(open(url))
    title = document.css('html > head > title').first.content
    puts title.gsub(/[[:space:]]+/, ' ').strip
  rescue
    puts "Unable to find a title for #{url}"
  end
end

การวิงวอนยังคงมีผลลัพธ์เหมือนเดิม แต่ชัดเจนขึ้นเล็กน้อยว่าเกิดอะไรขึ้น

TitleExtractorWorker.perform_now('https://appsignal.com')
# AppSignal: Application Performance Monitoring for Ruby on Rails and Elixir

การนำการประมวลผลแบบอะซิงโครนัสไปใช้

ตอนนี้เราแยกชื่อเรื่องได้แล้ว เราก็สามารถดึงชื่อทั้งหมดจากบทความ Ruby Magic ที่ผ่านมาได้ ในการทำเช่นนี้ สมมติว่าเรามี RUBYMAGIC คงที่พร้อมรายการ URL ทั้งหมดของบทความที่ผ่านมา

RUBYMAGIC.each do |url|
  TitleExtractorWorker.perform_now(url)
end
 
# Unraveling Classes, Instances and Metaclasses in Ruby | AppSignal Blog
# Bindings and Lexical Scope in Ruby | AppSignal Blog
# Building a Ruby C Extension From Scratch | AppSignal Blog
# Closures in Ruby: Blocks, Procs and Lambdas | AppSignal Blog
# ...

เราได้รับชื่อบทความที่ผ่านมา แต่ต้องใช้เวลาสักครู่ในการดึงข้อมูลทั้งหมด นั่นเป็นเพราะเรารอจนกว่าคำขอแต่ละรายการจะเสร็จสิ้นก่อนที่จะดำเนินการต่อไป

มาปรับปรุงกันด้วยการแนะนำ perform_async วิธีการโมดูลผู้ปฏิบัติงานของเรา เพื่อเพิ่มความเร็ว จะสร้างเธรดใหม่สำหรับแต่ละ URL

module Magique
  module Worker
    module ClassMethods
      def perform_async(*args)
        Thread.new { new.perform(*args) }
      end
    end
  end
end

หลังจากเปลี่ยนการเรียกใช้เป็น TitleExtractorWorker.perform_async(url) , เราได้รับชื่อทั้งหมดเกือบจะในครั้งเดียว อย่างไรก็ตาม นี่ยังหมายความว่าเรากำลังเปิดมากกว่า 20 การเชื่อมต่อไปยังบล็อก Ruby Magic ในครั้งเดียว (ขออภัยที่ทำให้บล็อกของคุณยุ่งเหยิง! 😅)

หากคุณกำลังติดตามการใช้งานของคุณเองและทดสอบสิ่งนี้นอกกระบวนการที่ใช้เวลานาน (เช่น เว็บเซิร์ฟเวอร์) อย่าลืมเพิ่มบางอย่างเช่น loop { sleep 1 } ที่ส่วนท้ายของสคริปต์เพื่อให้แน่ใจว่ากระบวนการจะไม่ยุติในทันที

การจัดคิวงาน

ด้วยวิธีการสร้างเธรดใหม่สำหรับทุกการร้องขอ ในที่สุดเราจะถึงขีดจำกัดทรัพยากร (ทั้งทางฝั่งของเราและบนเว็บไซต์ที่เรากำลังเข้าถึง) เมื่อเราอยากเป็นพลเมืองดี เรามาเปลี่ยนการใช้งานเป็นสิ่งที่ไม่พร้อมกันแต่ไม่รู้สึกเหมือนเป็นการโจมตีแบบปฏิเสธการให้บริการ

วิธีทั่วไปในการแก้ปัญหานี้คือการใช้รูปแบบผู้ผลิต/ผู้บริโภค ผู้ผลิตอย่างน้อยหนึ่งรายพุชงานไปยังคิวในขณะที่ผู้บริโภคหนึ่งรายหรือมากกว่าใช้งานจากคิวและประมวลผล

คิวนั้นเป็นรายการขององค์ประกอบ ในทางทฤษฎี อาร์เรย์แบบธรรมดาจะทำงานได้ดี อย่างไรก็ตาม ในขณะที่เรากำลังจัดการกับการทำงานพร้อมกัน เราจำเป็นต้องตรวจสอบให้แน่ใจว่าผู้ผลิตหรือผู้บริโภครายเดียวเท่านั้นที่สามารถเข้าถึงคิวได้ในแต่ละครั้ง หากเราไม่ระวังเรื่องนี้ สิ่งต่างๆ จะจบลงด้วยความโกลาหล เช่นเดียวกับคนสองคนที่พยายามจะเปิดประตูพร้อมกัน

ปัญหานี้เรียกว่าปัญหาของผู้ผลิต-ผู้บริโภค และมีวิธีแก้ไขหลายวิธี โชคดีที่เป็นปัญหาที่พบบ่อยมากและ Ruby จัดส่ง Queue ที่เหมาะสม การใช้งานที่เราสามารถใช้ได้โดยไม่ต้องกังวลกับการซิงโครไนซ์เธรด

ในการใช้งาน ให้ตรวจสอบให้แน่ใจว่าทั้งผู้ผลิตและผู้บริโภคสามารถเข้าถึงคิวได้ เราทำได้โดยเพิ่มวิธีการเรียนใน Magique โมดูลและกำหนดอินสแตนซ์ของ Queue ไปเลย

module Magique
  def self.backend
    @backend
  end
 
  def self.backend=(backend)
    @backend = backend
  end
end
 
Magique.backend = Queue.new

ต่อไป เราเปลี่ยน perform_async การนำไปใช้เพื่อผลักดันงานไปยังคิวแทนที่จะสร้างเธรดใหม่ของตัวเอง งานจะแสดงเป็นแฮชรวมถึงการอ้างอิงถึงคลาสผู้ปฏิบัติงานตลอดจนอาร์กิวเมนต์ที่ส่งไปยัง perform_async วิธีการ

module Magique
  module Worker
    module ClassMethods
      def perform_async(*args)
        Magique.backend.push(worker: self, args: args)
      end
    end
  end
end

ด้วยเหตุนี้เราจึงทำสิ่งต่างๆ ด้านโปรดิวเซอร์เสร็จแล้ว ต่อไปมาดูฝั่งผู้บริโภคกัน

ผู้บริโภคแต่ละรายเป็นเธรดที่แยกจากกันที่รับงานจากคิวและดำเนินการ แทนที่จะหยุดหลังจากงานหนึ่ง เช่น เธรด ผู้ใช้บริการรับงานอื่นจากคิวและดำเนินการ เป็นต้น นี่คือการใช้งานพื้นฐานของผู้บริโภคที่เรียกว่า Magique::Processor . โปรเซสเซอร์แต่ละตัวสร้างเธรดใหม่ที่วนซ้ำไม่รู้จบ สำหรับการทำซ้ำทุกครั้ง มันพยายามดึงงานใหม่จากคิว สร้างอินสแตนซ์ใหม่ของคลาสผู้ปฏิบัติงาน และเรียก perform วิธีการด้วยอาร์กิวเมนต์ที่กำหนด

module Magique
  class Processor
    def self.start(concurrency = 1)
      concurrency.times { |n| new("Processor #{n}") }
    end
 
    def initialize(name)
      thread = Thread.new do
        loop do
          payload = Magique.backend.pop
          worker_class = payload[:worker]
          worker_class.new.perform(*payload[:args])
        end
      end
 
      thread.name = name
    end
  end
end

นอกจากการวนรอบการประมวลผลแล้ว เรายังเพิ่มวิธีการอำนวยความสะดวกที่เรียกว่า Magique::Processor.start . สิ่งนี้ทำให้เราสามารถหมุนโปรเซสเซอร์หลายตัวพร้อมกันได้ แม้ว่าการตั้งชื่อชุดข้อความจะไม่จำเป็นจริงๆ แต่จะช่วยให้เราทราบได้ว่าสิ่งต่างๆ ทำงานตามที่คาดไว้จริงหรือไม่

มาปรับผลลัพธ์ของ TitleExtractorWorker . ของเรา เพื่อใส่ชื่อกระทู้ปัจจุบัน

puts "[#{Thread.current.name}] #{title.gsub(/[[:space:]]+/, ' ').strip}"

ในการทดสอบการตั้งค่าการประมวลผลเบื้องหลัง ขั้นแรกเราต้องสร้างชุดตัวประมวลผลก่อนที่จะจัดคิวงานของเรา

Magique.backend = Queue.new
Magique::Processor.start(5)
 
RUBYMAGIC.each do |url|
  TitleExtractorWorker.perform_async(url)
end
 
# [Processor 3] Bindings and Lexical Scope in Ruby | AppSignal Blog
# [Processor 4] Building a Ruby C Extension From Scratch | AppSignal Blog
# [Processor 1] Unraveling Classes, Instances and Metaclasses in Ruby | AppSignal Blog
# [Processor 0] Ruby's Hidden Gems, StringScanner | AppSignal Blog
# [Processor 2] Fibers and Enumerators in Ruby: Turning Blocks Inside Out | AppSignal Blog
# [Processor 4] Closures in Ruby: Blocks, Procs and Lambdas | AppSignal Blog
# ...

เมื่อดำเนินการนี้ เรายังคงได้รับชื่อบทความทั้งหมด แม้ว่าจะไม่เร็วเท่ากับการใช้เธรดแยกกันสำหรับงานทุกอย่าง แต่ก็ยังเร็วกว่าการใช้งานครั้งแรกที่ไม่มีการประมวลผลเบื้องหลัง ด้วยชื่อโปรเซสเซอร์ที่เพิ่มเข้ามา เรายังสามารถยืนยันได้ว่าโปรเซสเซอร์ทั้งหมดทำงานผ่านคิว การปรับจำนวนโปรเซสเซอร์ที่ทำงานพร้อมกันทำให้สามารถค้นหาจุดสมดุลระหว่างความเร็วในการประมวลผลกับข้อจำกัดของทรัพยากรที่มีอยู่ได้

การขยายไปสู่หลายกระบวนการและเครื่องจักร

จนถึงตอนนี้ การนำระบบประมวลผลพื้นหลังของเราไปใช้ในปัจจุบันก็ทำงานได้ดีพอตัว มันยัง จำกัด อยู่ที่กระบวนการเดียวกัน งานที่ต้องใช้ทรัพยากรมากจะยังคงส่งผลต่อประสิทธิภาพของกระบวนการทั้งหมด ในขั้นตอนสุดท้าย มาดูการกระจายภาระงานในหลายกระบวนการและอาจถึงหลายเครื่อง

คิวเป็นเพียงการเชื่อมต่อระหว่างผู้ผลิตและผู้บริโภค ตอนนี้กำลังใช้การใช้งานในหน่วยความจำ มาหาแรงบันดาลใจเพิ่มเติมจาก Sidekiq และใช้คิวโดยใช้ Redis

Redis รองรับรายการที่ช่วยให้เราพุชและดึงงานได้ นอกจากนี้ Redis Ruby gem ยังปลอดภัยต่อเธรด และคำสั่ง Redis เพื่อแก้ไขรายการนั้นเป็นแบบปรมาณู คุณสมบัติเหล่านี้ทำให้สามารถใช้กับระบบประมวลผลพื้นหลังแบบอะซิงโครนัสของเราได้โดยไม่มีปัญหาในการซิงโครไนซ์

มาสร้างคิวสำรอง Redis ที่ใช้ push และ shift วิธีการเช่นเดียวกับ Queue ที่เราใช้ก่อนหน้านี้

require 'json'
require 'redis'
 
module Magique
  module Backend
    class Redis
      def initialize(connection = ::Redis.new)
        @connection = connection
      end
 
      def push(job)
        @connection.lpush('magique:queue', JSON.dump(job))
      end
 
      def shift
        _queue, job = @connection.brpop('magique:queue')
        payload = JSON.parse(job, symbolize_names: true)
        payload[:worker] = Object.const_get(payload[:worker])
        payload
      end
    end
  end
end

เนื่องจาก Redis ไม่รู้อะไรเลยเกี่ยวกับออบเจ็กต์ Ruby เราจึงต้องทำให้งานของเราเป็น JSON ก่อนจัดเก็บไว้ในฐานข้อมูลโดยใช้ lpush คำสั่งที่เพิ่มองค์ประกอบที่ด้านหน้าของรายการ

ในการดึงงานจากคิว เราใช้ brpop คำสั่งซึ่งรับองค์ประกอบสุดท้ายจากรายการ หากรายการว่างเปล่า รายการจะถูกบล็อกจนกว่าจะมีองค์ประกอบใหม่ นี่เป็นวิธีที่ดีในการหยุดโปรเซสเซอร์ของเราชั่วคราวเมื่อไม่มีงานใดๆ สุดท้าย หลังจากได้รับงานจาก Redis แล้ว เราต้องค้นหาคลาส Ruby จริงตามชื่อของผู้ปฏิบัติงานโดยใช้ Object.const_get .

ในขั้นตอนสุดท้าย เรามาแยกสิ่งต่าง ๆ ออกเป็นหลายกระบวนการ ในด้านผู้ผลิต สิ่งเดียวที่เราต้องทำคือเปลี่ยนแบ็กเอนด์เป็นคิว Redis ที่นำมาใช้ใหม่

# ...
 
Magique.backend = Magique::Backend::Redis.new
 
RUBYMAGIC.each do |url|
  TitleExtractorWorker.perform_async(url)
end

ในด้านของผู้บริโภค เราสามารถหลีกเลี่ยงได้ดังนี้:

# ...
 
Magique.backend = Magique::Backend::Redis.new
Magique::Processor.start(5)
 
loop { sleep 1 }

เมื่อดำเนินการแล้ว กระบวนการของผู้บริโภคจะรอให้งานใหม่มาถึงคิว เมื่อเราเริ่มกระบวนการผลิตที่พุชงานเข้าคิว เราจะเห็นว่างานเหล่านั้นได้รับการประมวลผลทันที

สนุกอย่างมีความรับผิดชอบและอย่าใช้สิ่งนี้ในการผลิต

แม้ว่าเราจะรักษามันให้ห่างไกลจากการตั้งค่าในโลกแห่งความเป็นจริงที่คุณจะใช้ในการผลิต (อย่าทำเช่นนั้น!) เราได้ดำเนินการสองสามขั้นตอนในการสร้างตัวประมวลผลพื้นหลัง เราเริ่มต้นด้วยการทำให้กระบวนการทำงานเป็นบริการพื้นหลัง จากนั้นเราก็ทำให้มัน async และใช้ Queue เพื่อแก้ปัญหาผู้ผลิต-ผู้บริโภค จากนั้นเราขยายกระบวนการเป็นหลายกระบวนการหรือหลายเครื่องโดยใช้ Redis แทนที่จะเป็นการใช้งานในหน่วยความจำ

ดังที่ได้กล่าวไว้ก่อนหน้านี้ นี่เป็นการนำระบบการประมวลผลเบื้องหลังไปใช้อย่างง่าย มีหลายสิ่งที่ขาดหายไปและไม่ได้จัดการอย่างชัดเจน ซึ่งรวมถึง (แต่ไม่จำกัดเพียง) การจัดการข้อผิดพลาด หลายคิว การตั้งเวลา การรวมการเชื่อมต่อ และการจัดการสัญญาณ

อย่างไรก็ตาม เราสนุกกับการเขียนสิ่งนี้และหวังว่าคุณจะสนุกกับการดูระบบประมวลผลพื้นหลัง บางทีคุณอาจจะเอาของไปสักหนึ่งหรือสองอย่าง