Computer >> คอมพิวเตอร์ >  >> การเขียนโปรแกรม >> Ruby

การสตรีมเหตุการณ์ใน Rails กับ Kafka

บริษัทต่างๆ ต้องการตอบสนองต่อความต้องการในการประมวลผลและแบ่งปันข้อมูลจำนวนมากในแบบเรียลไทม์อย่างรวดเร็ว เพื่อรับข้อมูลเชิงลึกและสร้างประสบการณ์ที่ลูกค้ามีส่วนร่วมมากขึ้น ดังนั้นการประมวลผลข้อมูลแบบเดิมจึงไม่สามารถทำได้ในโลกปัจจุบันอีกต่อไป

เพื่อให้บรรลุผลดังกล่าว คุณต้องประมวลผลข้อมูลจำนวนมากให้เร็วที่สุด แล้วจึงส่งไปยังบริการอื่นๆ เพื่อการประมวลผลเพิ่มเติม แต่ในระหว่างการดำเนินการอย่างรวดเร็วเหล่านี้ จำเป็นต้องแจ้งให้ผู้บริโภคทราบเมื่อมีเหตุการณ์เกิดขึ้น และเราสามารถทำได้โดยใช้การสตรีมกิจกรรม

นี่คือ repo ใน GitHub ที่เราจะใช้

เหตุการณ์

ก่อนพูดถึงการสตรีมกิจกรรม เรามาคุยกันก่อนว่ากิจกรรมคืออะไร เหตุการณ์ที่เกิดขึ้นภายในแอปพลิเคชันอาจเกี่ยวข้องกับกระบวนการของผู้ใช้หรือเพียงแค่การกระทำที่ส่งผลต่อธุรกิจ

เหตุการณ์แสดงถึงการเปลี่ยนแปลงสถานะ ไม่ใช่คำถามเกี่ยวกับวิธีการแก้ไขแอปพลิเคชัน พิจารณาสิ่งเหล่านี้เป็นตัวอย่าง:

  • ผู้ใช้ที่เข้าสู่ระบบบริการ
  • ธุรกรรมการชำระเงิน
  • นักเขียนที่เผยแพร่โพสต์ในบล็อก

ในกรณีส่วนใหญ่ เหตุการณ์จะทำให้เกิดเหตุการณ์มากขึ้น ตัวอย่างเช่น เมื่อผู้ใช้สมัครใช้บริการ แอปจะส่งการแจ้งเตือนไปยังอุปกรณ์ แทรกบันทึกลงในฐานข้อมูล และส่งอีเมลต้อนรับ

การสตรีมกิจกรรม

การสตรีมกิจกรรม เป็นรูปแบบการเก็บข้อมูลแบบเรียลไทม์จากแหล่งที่มาของเหตุการณ์ เช่น ฐานข้อมูล ส่วนหลักของการสตรีมกิจกรรมมีดังนี้:

  • นายหน้า :ระบบที่รับผิดชอบการจัดเก็บเหตุการณ์
  • หัวข้อ :หมวดหมู่ของเหตุการณ์
  • โปรดิวเซอร์ :ส่งกิจกรรมไปยังนายหน้าในหัวข้อเฉพาะ
  • ผู้บริโภค :อ่านเหตุการณ์
  • เหตุการณ์ :ข้อมูลที่ผู้ผลิตต้องการสื่อสารกับผู้บริโภค

หลีกเลี่ยงไม่ได้ที่จะพูดถึง เผยแพร่และสมัครรูปแบบสถาปัตยกรรม (รูปแบบ pub/sub) ณ จุดนี้; การสตรีมเหตุการณ์เป็นการนำรูปแบบดังกล่าวไปใช้ แต่มีการเปลี่ยนแปลงเหล่านี้:

  • เหตุการณ์เกิดขึ้นแทนข้อความ
  • กิจกรรมจะเรียงลำดับ โดยปกติตามเวลา
  • ผู้บริโภคสามารถอ่านเหตุการณ์จากจุดเฉพาะในหัวข้อ
  • เหตุการณ์มีความคงทนชั่วขณะ

การไหลเริ่มต้นเมื่อ ผู้ผลิต เผยแพร่ กิจกรรมใหม่ เป็น หัวข้อ (ดังที่เราเห็นก่อนหน้านี้ หัวข้อเป็นเพียงการจัดหมวดหมู่สำหรับประเภทเหตุการณ์เฉพาะ) จากนั้น ผู้บริโภค สนใจในกิจกรรมของหมวดหมู่เฉพาะสมัครสมาชิกหัวข้อนั้น สุดท้ายนี้ โบรกเกอร์ ระบุผู้บริโภคของหัวข้อและทำให้กิจกรรมที่ต้องการพร้อมใช้งาน

ข้อดีของการสตรีมเหตุการณ์

  • ดีคัปปลิ้ง ไม่มีการพึ่งพาระหว่างผู้เผยแพร่และผู้บริโภคเพราะพวกเขาไม่จำเป็นต้องรู้จักกัน นอกจากนี้ เหตุการณ์ไม่ได้ระบุการกระทำ ดังนั้นผู้บริโภคจำนวนมากจึงสามารถรับเหตุการณ์เดียวกันและดำเนินการต่างๆ ได้

  • เวลาแฝงต่ำ เหตุการณ์ถูกแยกออกและให้ผู้บริโภคใช้งานได้ทุกเวลา มันสามารถเกิดขึ้นได้ในหน่วยมิลลิวินาที

  • อิสรภาพ อย่างที่เราทราบกันดีว่าผู้เผยแพร่โฆษณาและผู้บริโภคมีความเป็นอิสระ ดังนั้นทีมต่างๆ จึงสามารถทำงานร่วมกับพวกเขาได้โดยใช้เหตุการณ์เดียวกันสำหรับการดำเนินการหรือวัตถุประสงค์อื่น

  • ความทนทานต่อความผิดพลาด แพลตฟอร์มการสตรีมเหตุการณ์บางอย่างช่วยให้คุณจัดการกับความล้มเหลวของผู้บริโภค ตัวอย่างเช่น ผู้บริโภคสามารถบันทึกตำแหน่งของตนและเริ่มต้นจากจุดนั้นอีกครั้งหากมีข้อผิดพลาดเกิดขึ้น

  • การจัดการตามเวลาจริง จะได้รับคำติชมแบบเรียลไทม์ ดังนั้นผู้ใช้จึงไม่ต้องรอเป็นนาทีหรือชั่วโมงเพื่อดูการตอบกลับของกิจกรรม

  • ประสิทธิภาพสูง แพลตฟอร์มกิจกรรมสามารถรองรับข้อความจำนวนมากได้เนื่องจากเวลาในการตอบสนองต่ำ เช่น หลายพันเหตุการณ์ในหนึ่งวินาที

ข้อเสียของการสตรีมเหตุการณ์

  • การตรวจสอบ เครื่องมือสตรีมเหตุการณ์บางอย่างไม่มีเครื่องมือตรวจสอบที่สมบูรณ์ พวกเขาเรียกร้องให้มีการติดตั้งเครื่องมือเพิ่มเติม เช่น Datadog หรือ New Relic

  • การกำหนดค่า การกำหนดค่าในเครื่องมือบางอย่างอาจดูล้นหลามแม้กระทั่งกับคนที่มีประสบการณ์ มีพารามิเตอร์มากมาย และบางครั้ง คุณจำเป็นต้องรู้เชิงลึกเกี่ยวกับตัวแบบเพื่อนำไปใช้

  • ไลบรารีไคลเอนต์ การนำ Kafka ไปใช้ในภาษาอื่นที่ไม่ใช่ Java ไม่ใช่เรื่องง่าย บางครั้ง ไลบรารีของไคลเอ็นต์ไม่ทันสมัย ​​แสดงความไม่เสถียร หรือไม่มีทางเลือกมากมายให้เลือก

เครื่องมือยอดนิยมสำหรับการสตรีมกิจกรรมคือ Apache Kafka . เครื่องมือนี้อนุญาตให้ผู้ใช้ส่ง จัดเก็บ และขอข้อมูลได้ทุกที่ทุกเวลาที่ต้องการ มาคุยกันครับ

อาปาเช่ คาฟคา

"Apache Kafka เป็นแพลตฟอร์มการสตรีมเหตุการณ์แบบโอเพนซอร์สแบบกระจายที่ใช้โดยบริษัทหลายพันแห่งสำหรับไปป์ไลน์ข้อมูลประสิทธิภาพสูง การวิเคราะห์การสตรีม การรวมข้อมูล และแอปพลิเคชันที่สำคัญต่อภารกิจ"

Apache Kafka ได้รับการออกแบบมาโดยเฉพาะสำหรับการส่งบันทึกแบบเรียลไทม์ เหมาะสำหรับแอปพลิเคชันที่ต้องการสิ่งต่อไปนี้:

  • การแลกเปลี่ยนข้อมูลที่เชื่อถือได้ระหว่างส่วนประกอบต่างๆ
  • ความสามารถในการแบ่งปริมาณงานการส่งข้อความเมื่อข้อกำหนดของแอปพลิเคชันเปลี่ยนไป
  • การรับส่งข้อมูลแบบเรียลไทม์สำหรับการประมวลผลข้อมูล

มาใช้ Kafka ในแอปพลิเคชัน Rails กันเถอะ!

การใช้ Kafka กับ Rails

อัญมณีที่มีชื่อเสียงที่สุดที่ใช้ Kafka ใน Ruby เรียกว่า ruby-kafka โดย Zendesk และยอดเยี่ยมมาก! ถึงกระนั้น คุณต้องดำเนินการทั้งหมดด้วยตนเอง ซึ่งเป็นเหตุผลว่าทำไมเราจึงมี "เฟรมเวิร์ก" ที่สร้างขึ้นด้วย ruby-kafka นอกจากนี้ยังช่วยเราในการกำหนดค่าและขั้นตอนการดำเนินการทั้งหมด

Karafka เป็นเฟรมเวิร์กที่ใช้เพื่อทำให้การพัฒนาแอปพลิเคชัน Ruby บน Apache Kafka ง่ายขึ้น

ในการทำงานกับ Kafka จำเป็นต้องติดตั้ง Java เนื่องจาก Kafka เป็นแอปพลิเคชัน Scala และ Java จึงต้องติดตั้ง Zookeeper

ก่อนการติดตั้ง ฉันต้องการอธิบายเล็กน้อยเกี่ยวกับ Zookeeper Zookeeper เป็นบริการแบบรวมศูนย์ที่จำเป็นสำหรับ Kafka; จะส่งการแจ้งเตือนในกรณีที่มีการเปลี่ยนแปลง เช่น การสร้างหัวข้อใหม่ ความผิดพลาดของนายหน้า การลบนายหน้า การลบหัวข้อ และอื่นๆ

งานหลักคือการจัดการโบรกเกอร์ Kafka รักษารายการด้วยเมตาดาต้าที่เกี่ยวข้อง และอำนวยความสะดวกกลไกการตรวจสอบสภาพ นอกจากนี้ยังช่วยในการเลือกโบรกเกอร์ชั้นนำสำหรับพาร์ติชันต่างๆ ของหัวข้อ

ข้อกำหนด

สำหรับ MacOS:

ตอนนี้ มาติดตั้ง Java และ Zookeeper ด้วยคำสั่งต่อไปนี้:

brew install java
brew install zookeeper

จากนั้น เราสามารถดำเนินการติดตั้ง Kafka ต่อไปได้:

brew install kafka

เมื่อเราติดตั้ง Kafka และ Zookeeper แล้ว จำเป็นต้องเริ่มบริการด้วยวิธีนี้:

brew services start zookeeper
brew services start kafka

สำหรับ Windows และ Linux:

คำแนะนำ:

  1. การติดตั้งจาวา
  2. ดาวน์โหลด Zookeeper

การติดตั้งราง

เพียงสร้างแอปพลิเคชัน Rails อย่างง่ายตามปกติ:

rails new karafka_example

และเพิ่ม karafka gem ภายใน Gemfile:

gem 'karafka'

จากนั้นเรียกใช้ bundle install เพื่อติดตั้งอัญมณีที่เพิ่งเพิ่มเข้ามา และอย่าลืมเรียกใช้คำสั่งต่อไปนี้เพื่อรับทุกสิ่งของ Karafka:

bundle exec karafka install

คำสั่งนั้นควรสร้างไฟล์ที่น่าสนใจ:อันแรกคือ karafka.rb ในไดเรกทอรีราก app/consumers/application_consumer.rb และ app/responders/application_responder.rb .

โปรแกรมเริ่มต้น Karafka

karafka.rb file เหมือนกับโปรแกรม initializer ที่แยกจาก Rails config ช่วยให้คุณสามารถกำหนดค่าแอปพลิเคชัน Karafka และวาดเส้นทางบางเส้นทาง ซึ่งคล้ายกับ API เป็นเส้นทางแอปพลิเคชัน Rails แต่สำหรับหัวข้อและผู้บริโภค

โปรดิวเซอร์

โปรดิวเซอร์ มีหน้าที่สร้างกิจกรรม และเราสามารถเพิ่มลงใน app/responders โฟลเดอร์ ตอนนี้ มาสร้างผู้ผลิตอย่างง่ายสำหรับผู้ใช้:

# app/responders/users_responder.rb

class UsersResponder < ApplicationResponder
  topic :users

  def respond(event_payload)
    respond_to :users, event_payload
  end
end

ผู้บริโภค

ผู้บริโภค มีหน้าที่อ่านเหตุการณ์/ข้อความทั้งหมดที่ส่งจากผู้ผลิต นี่เป็นเพียงผู้บริโภคที่บันทึกข้อความที่ได้รับ

# app/consumers/users_consumer.rb

class UsersConsumer < ApplicationConsumer
  def consume
    Karafka.logger.info "New [User] event: #{params}"
  end
end

เราใช้ params เพื่อรับเหตุการณ์ แต่ถ้าคุณจะอ่านเหตุการณ์เป็นชุดและคุณมี config config.batch_fetching ตามจริง คุณควรใช้ params_batch .

กำลังทดสอบ

ในการเรียกใช้บริการ Karafka ของเรา (บริการที่จะได้ยินเหตุการณ์) ให้ไปที่คอนโซล เปิดแท็บใหม่ ไปที่โครงการ Rails และเรียกใช้:

bundle exec karafka server

กิจกรรมที่ประสบความสำเร็จ

ตอนนี้ เปิดแท็บคอนโซลอื่น ไปที่โปรเจ็กต์ Rails แล้วพิมพ์สิ่งนี้:

rails c

ที่นั่น มาสร้างกิจกรรมด้วยการตอบกลับของเรา:

> UsersResponder.call({ event_name: "user_created", payload: { user_id: 1 } })

หากคุณตรวจสอบคอนโซล Rails เราจะได้รับข้อความนี้หลังจากสร้างกิจกรรมแล้ว:

Successfully appended 1 messages to users/0 on 192.168.1.77:9092 (node_id=0)
=> {"users"=>[["{\"event_name\":\"user_created\",\"payload\":{\"user_id\":1}}", {:topic=>"users"}]]}

และในแท็บบริการ Karafka คุณจะเห็นสิ่งนี้:

New [User] event: #<Karafka::Params::Params:0x00007fa76f0316c8>
Inline processing of topic users with 1 messages took 0 ms
1 message on users topic delegated to UsersConsumer
[[karafka_example] {}:] Marking users/0:1 as processed
[[karafka_example] {}:] Committing offsets: users/0:2
[[karafka_example] {}:] [offset_commit] Sending offset_commit API request 28 to 192.168.1.77:9092

แต่ถ้าคุณต้องการแค่เพย์โหลดข้อความ คุณสามารถเพิ่ม params.payload ในผู้บริโภคของคุณและคุณจะมีสิ่งนี้:

Params deserialization for users topic successful in 0 ms
New [User] event: {"event_name"=>"user_created", "payload"=>{"user_id"=>1}}
Inline processing of topic users with 1 messages took 1 ms
1 message on users topic delegated to UsersConsumer

เหตุการณ์ที่ล้มเหลว

คุณสร้างโมเดลผู้ใช้ที่มีแอตทริบิวต์บางอย่างได้ เช่น email , first_name และ last_name รันคำสั่งต่อไปนี้:

rails g model User email first_name last_name

จากนั้น คุณสามารถเรียกใช้การย้ายข้อมูลด้วยสิ่งนี้:

rails db:migrate

ตอนนี้ ให้เพิ่มการตรวจสอบดังนี้:

class User < ApplicationRecord
  validates :email, uniqueness: true
end

สุดท้าย เราสามารถเปลี่ยนแปลงผู้บริโภคได้:

class UsersConsumer < ApplicationConsumer
  def consume
    Karafka.logger.info "New [User] event: #{params.payload}"
    User.create!(params.payload['user'])
  end
end

ดังนั้น มาสร้างสองกิจกรรมด้วยอีเมลเดียวกัน:

UsersResponder.call({ event_name: "user_created", user: { user_id: 1, email: 'batman@mail.com', first_name: 'Bruce', last_name: 'Wayne' } } )

UsersResponder.call({ event_name: "user_created", user: { user_id: 2, email: 'batman@mail.com', first_name: 'Bruce', last_name: 'Wayne' } } )

ด้วยสิ่งนี้ เหตุการณ์แรกจะถูกสร้างขึ้นในฐานข้อมูล:

New [User] event: {"event_name"=>"user_created", "user"=>{"user_id"=>1, "email"=>"batman@mail.com", "first_name"=>"Bruce", "last_name"=>"Wayne"}}
[[karafka_example] {users: 0}:] [fetch] Received response 2 from 192.168.1.77:9092
[[karafka_example] {users: 0}:] Fetching batches
[[karafka_example] {users: 0}:] [fetch] Sending fetch API request 3 to 192.168.1.77:9092
[[karafka_example] {users: 0}:] [fetch] Waiting for response 3 from 192.168.1.77:9092
  TRANSACTION (0.1ms)  BEGIN
  ↳ app/consumers/users_consumer.rb:14:in `consume'
  User Create (9.6ms)  INSERT INTO "users" ("user_id", "email", "first_name", "last_name", "created_at", "updated_at") VALUES ($1, $2, $3, $4, $5, $6) RETURNING "id"  [["user_id", "1"], ["email", "batman@mail.com"], ["first_name", "Bruce"], ["last_name", "Wayne"], ["created_at", "2021-03-10 04:29:14.827778"], ["updated_at", "2021-03-10 04:29:14.827778"]]
  ↳ app/consumers/users_consumer.rb:14:in `consume'
  TRANSACTION (5.0ms)  COMMIT
  ↳ app/consumers/users_consumer.rb:14:in `consume'
Inline processing of topic users with 1 messages took 70 ms
1 message on users topic delegated to UsersConsumer

แต่อันที่สองจะล้มเหลวเพราะเรามีการตรวจสอบที่ระบุว่าอีเมลนั้นไม่ซ้ำกัน หากคุณพยายามเพิ่มระเบียนอื่นด้วยอีเมลที่มีอยู่ คุณจะเห็นดังนี้:

New [User] event: {"event_name"=>"user_created", "user"=>{"user_id"=>2, "email"=>"batman@mail.com", "first_name"=>"Bruce", "last_name"=>"Wayne"}}
[[karafka_example] {users: 0}:] [fetch] Received response 2 from 192.168.1.77:9092
[[karafka_example] {users: 0}:] Fetching batches
[[karafka_example] {users: 0}:] [fetch] Sending fetch API request 3 to 192.168.1.77:9092
[[karafka_example] {users: 0}:] [fetch] Waiting for response 3 from 192.168.1.77:9092
  TRANSACTION (0.2ms)  BEGIN
  ↳ app/consumers/users_consumer.rb:14:in `consume'
  User Exists? (0.3ms)  SELECT 1 AS one FROM "users" WHERE "users"."email" = $1 LIMIT $2  [["email", "batman@mail.com"], ["LIMIT", 1]]
  ↳ app/consumers/users_consumer.rb:14:in `consume'
  TRANSACTION (0.2ms)  ROLLBACK
  ↳ app/consumers/users_consumer.rb:14:in `consume'
[[karafka_example] {users: 0}:] Exception raised when processing users/0 at offset 42 -- ActiveRecord::RecordInvalid: Validation failed: Email has already been taken

คุณสามารถดูข้อผิดพลาดในบรรทัดสุดท้าย ActiveRecord::RecordInvalid: Validation failed: Email has already been taken . แต่สิ่งที่น่าสนใจของที่นี่คือ Kafka จะพยายามจัดการกิจกรรมครั้งแล้วครั้งเล่า แม้ว่าคุณจะรีสตาร์ทเซิร์ฟเวอร์ Karafka เซิร์ฟเวอร์จะพยายามประมวลผลเหตุการณ์ล่าสุด คาฟคารู้ได้อย่างไรว่าจะเริ่มต้นอย่างไร

หากคุณเห็นคอนโซลของคุณ หลังจากเกิดข้อผิดพลาด คุณจะเห็นสิ่งนี้:

[[karafka_example] {users: 0}:] Exception raised when processing users/0 at offset 42

มันจะบอกคุณว่าออฟเซ็ตใดถูกประมวลผล:ในกรณีนี้ มันถูกออฟเซ็ต 42 ดังนั้น หากคุณเริ่มบริการ Karafka ใหม่ บริการจะเริ่มในออฟเซ็ตนั้น

[[karafka_example] {}:] Committing offsets with recommit: users/0:42
[[karafka_example] {users: 0}:] Fetching batches

จะยังคงล้มเหลวเนื่องจากเรามีการตรวจสอบอีเมลในรูปแบบผู้ใช้ของเรา ณ จุดนี้ ให้หยุดเซิร์ฟเวอร์ Karafka ลบหรือแสดงความคิดเห็นการตรวจสอบนั้น และเริ่มเซิร์ฟเวอร์ของคุณอีกครั้ง คุณจะเห็นว่ากิจกรรมดำเนินการสำเร็จอย่างไร:

[[karafka_example] {}:] Committing offsets with recommit: users/0:42
[[karafka_example] {}:] [offset_commit] Sending offset_commit API request 5 to 192.168.1.77:9092
[[karafka_example] {}:] [offset_commit] Waiting for response 5 from 192.168.1.77:9092
[[karafka_example] {}:] [offset_commit] Received response 5 from 192.168.1.77:9092
Params deserialization for users topic successful in 0 ms
New [User] event: {"event_name"=>"user_created", "user"=>{"user_id"=>2, "email"=>"batman@mail.com", "first_name"=>"Bruce", "last_name"=>"Wayne"}}
  TRANSACTION (0.2ms)  BEGIN
  ↳ app/consumers/users_consumer.rb:14:in `consume'
  User Create (3.8ms)  INSERT INTO "users" ("user_id", "email", "first_name", "last_name", "created_at", "updated_at") VALUES ($1, $2, $3, $4, $5, $6) RETURNING "id"  [["user_id", "2"], ["email", "batman@mail.com"], ["first_name", "Bruce"], ["last_name", "Wayne"], ["created_at", "2021-03-10 04:49:37.832452"], ["updated_at", "2021-03-10 04:49:37.832452"]]
  ↳ app/consumers/users_consumer.rb:14:in `consume'
  TRANSACTION (5.5ms)  COMMIT
  ↳ app/consumers/users_consumer.rb:14:in `consume'
Inline processing of topic users with 1 messages took 69 ms
1 message on users topic delegated to UsersConsumer
[[karafka_example] {}:] Marking users/0:43 as processed

สุดท้าย คุณสามารถเห็นข้อความนี้ในบรรทัดสุดท้าย:Marking users/0:43 as processed .

โทรกลับ

นี่คือสิ่งที่ Karafka นำเสนอ:คุณสามารถใช้การโทรกลับใน Consumer ของคุณได้ ในการทำเช่นนั้น คุณจะต้องนำเข้าโมดูลและใช้งานเท่านั้น จากนั้นเปิด UserConsumer และเพิ่มสิ่งนี้:

class UsersConsumer < ApplicationConsumer
  include Karafka::Consumers::Callbacks

  before_poll do
    Karafka.logger.info "*** Checking something new for #{topic.name}"
  end

  after_poll do
    Karafka.logger.info '*** We just checked for new messages!'
  end

  def consume
    Karafka.logger.info "New [User] event: #{params.payload}"
    User.create!(params.payload['user'])
  end
end

โพลเป็นสื่อกลางที่เราดึงข้อมูลบันทึกตามการชดเชยพาร์ติชั่นปัจจุบัน ดังนั้นการโทรกลับเหล่านั้น before_poll และ after_poll เหมือนกับชื่อของพวกเขา ถูกประหารชีวิตในขณะนั้น เราเพิ่งบันทึกข้อความ และคุณสามารถเห็นข้อความเหล่านั้นในเซิร์ฟเวอร์ Karafka ของคุณ อันหนึ่งก่อนดึงข้อมูล และอีกอันหลังจากนั้น:

*** Checking something new for users
[[karafka_example] {}:] No batches to process
[[karafka_example] {users: 0}:] [fetch] Received response 325 from 192.168.1.77:9092
[[karafka_example] {users: 0}:] Fetching batches
[[karafka_example] {users: 0}:] [fetch] Sending fetch API request 326 to 192.168.1.77:9092
[[karafka_example] {users: 0}:] [fetch] Waiting for response 326 from 192.168.1.77:9092
*** We just checked for new messages!

การเต้นของหัวใจ

การเต้นของหัวใจเป็นเพียงวิธีที่เราในฐานะผู้บริโภคพูดกับคาฟคาว่าเรายังมีชีวิตอยู่ มิฉะนั้น คาฟคาจะถือว่าผู้บริโภคเสียชีวิต

ใน Karafka เรามีการกำหนดค่าเริ่มต้นที่จะทำสิ่งนี้ในช่วงเวลาหนึ่ง มันคือ kafka.heartbeat_interval และค่าเริ่มต้นคือ 10 วินาที คุณสามารถเห็นการเต้นของหัวใจนี้ในเซิร์ฟเวอร์ Karafka ของคุณ

*** Checking something new for users
[[karafka_example_example] {}:] Sending heartbeat...
[[karafka_example_example] {}:] [heartbeat] Sending heartbeat API request 72 to 192.168.1.77:9092
[[karafka_example_example] {}:] [heartbeat] Waiting for response 72 from 192.168.1.77:9092
[[karafka_example_example] {}:] [heartbeat] Received response 72 from 192.168.1.77:9092
*** We just checked for new messages!

ด้วย Sending heartbeat... , Kafka รู้ว่าเรายังมีชีวิตอยู่และเราเป็นสมาชิกที่ถูกต้องของกลุ่มผู้บริโภค นอกจากนี้เรายังสามารถใช้บันทึกได้มากขึ้น

มุ่งมั่น

การทำเครื่องหมายออฟเซ็ตว่าใช้แล้วเรียกว่าการออฟเซ็ต ใน Kafka เราบันทึกการชดเชยโดยการเขียนไปยังหัวข้อ Kafka ภายในที่เรียกว่าหัวข้อ offsets ระบบจะถือว่าข้อความถูกใช้ไปก็ต่อเมื่อมีการคอมมิตออฟเซ็ตกับหัวข้อออฟเซ็ต

Karafka มีการกำหนดค่าเพื่อดำเนินการนี้โดยอัตโนมัติในแต่ละครั้ง การกำหนดค่าคือ kafka.offset_commit_interval และค่าของมันคือ 10 วินาทีโดยค่าเริ่มต้น ด้วยสิ่งนี้ Karakfa จะทำการชดเชยทุก ๆ 10 วินาที และคุณสามารถดูข้อความนั้นในเซิร์ฟเวอร์ Karafka ของคุณ:

*** Checking something new for users
[[karafka_example] {}:] No batches to process
[[karafka_example] {users: 0}:] [fetch] Received response 307 from 192.168.1.77:9092
[[karafka_example] {users: 0}:] Fetching batches
[[karafka_example] {users: 0}:] [fetch] Sending fetch API request 308 to 192.168.1.77:9092
[[karafka_example] {users: 0}:] [fetch] Waiting for response 308 from 192.168.1.77:9092
[[karafka_example] {}:] Committing offsets: users/0:44
[[karafka_example] {}:] [offset_commit] Sending offset_commit API request 69 to 192.168.1.77:9092
[[karafka_example] {}:] [offset_commit] Waiting for response 69 from 192.168.1.77:9092
[[karafka_example] {}:] [offset_commit] Received response 69 from 192.168.1.77:9092
*** We just checked for new messages!

Committing offsets: users/0:44 บอกเราว่าการชดเชยใดที่กระทำ; ในกรณีของฉัน มันบอก Kafka ว่าสามารถคอมมิตหมายเลขออฟเซ็ต 44 จากหัวข้อ 0 ได้ ด้วยวิธีนี้ หากมีอะไรเกิดขึ้นกับบริการของเรา Karafka สามารถเริ่มต้นอีกครั้งเพื่อประมวลผลกิจกรรมจากออฟเซ็ตนั้น

บทสรุป

การสตรีมกิจกรรมช่วยให้เราเร็วขึ้น ใช้ข้อมูลได้ดีขึ้น และออกแบบประสบการณ์ผู้ใช้ที่ดียิ่งขึ้น ตามความเป็นจริง หลายบริษัทใช้รูปแบบนี้เพื่อสื่อสารบริการทั้งหมดของตนและเพื่อให้สามารถตอบสนองต่อเหตุการณ์ต่างๆ ได้แบบเรียลไทม์ ดังที่ฉันได้กล่าวไว้ก่อนหน้านี้ มีทางเลือกอื่นนอกเหนือจาก Karafka ที่คุณสามารถใช้กับ Rails ได้ คุณมีพื้นฐานอยู่แล้ว ตอนนี้ อย่าลังเลที่จะทดลองกับพวกเขา

ข้อมูลอ้างอิง

  • https://kafka.apache.org/
  • https://github.com/karafka/karafka
  • https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern