Computer >> คอมพิวเตอร์ >  >> การเขียนโปรแกรม >> ฐานข้อมูล

การแยกวิเคราะห์ไฟล์ csv ด้วย Filebeat และ Elasticsearch Ingest Pipelines

การแยกวิเคราะห์ไฟล์ csv ด้วย Filebeat และ Elasticsearch Ingest Pipelines

หนึ่งในคุณสมบัติใหม่ที่ยอดเยี่ยมที่สุดใน Elasticsearch 5 คือโหนดการนำเข้า ซึ่งเพิ่มการประมวลผลแบบ Logstash บางอย่างให้กับคลัสเตอร์ Elasticsearch จึงสามารถแปลงข้อมูลก่อนที่จะสร้างดัชนีได้โดยไม่ต้องใช้บริการและ/หรือโครงสร้างพื้นฐานอื่นเพื่อทำ ย้อนกลับไป เราได้โพสต์บล็อกสั้นๆ เกี่ยวกับวิธีแยกวิเคราะห์ไฟล์ csv ด้วย Logstash ดังนั้นฉันจึงอยากจะนำเสนอเวอร์ชันไปป์ไลน์ที่นำเข้ามาเพื่อการเปรียบเทียบ

สิ่งที่เราจะแสดงที่นี่คือตัวอย่างการใช้ Filebeat เพื่อส่งข้อมูลไปยังไปป์ไลน์นำเข้า จัดทำดัชนี และแสดงภาพด้วย Kibana

ข้อมูล

มีแหล่งข้อมูลดีๆ มากมายสำหรับข้อมูลฟรี แต่เนื่องจากพวกเราส่วนใหญ่ที่ ObjectRocket อยู่ในออสติน รัฐเท็กซัส เราจะใช้ข้อมูลบางส่วนจาก data.austintexas.gov ชุดข้อมูลการตรวจสอบร้านอาหารเป็นชุดข้อมูลขนาดพอเหมาะที่มีข้อมูลที่เกี่ยวข้องเพียงพอที่จะทำให้เราเห็นภาพตัวอย่างในโลกแห่งความเป็นจริง

ด้านล่างนี้คือสองสามบรรทัดจากชุดข้อมูลนี้เพื่อให้คุณเข้าใจถึงโครงสร้างของข้อมูล:

Restaurant Name,Zip Code,Inspection Date,Score,Address,Facility ID,Process Description
Westminster Manor,78731,07/21/2015,96,"4100 JACKSON AVE
AUSTIN, TX 78731
(30.314499, -97.755166)",2800365,Routine Inspection
Wieland Elementary,78660,10/02/2014,100,"900 TUDOR HOUSE RD
AUSTIN, TX 78660
(30.422862, -97.640183)",10051637,Routine Inspection

DOH… นี่จะไม่ใช่บรรทัดเดียวที่ดี เป็นมิตรต่อกรณีเข้าทำงาน แต่นั่นก็ใช้ได้ อย่างที่คุณจะได้เห็น Filebeat มีความสามารถในตัวเพื่อจัดการรายการหลายบรรทัดและแก้ปัญหาการขึ้นบรรทัดใหม่ที่ฝังอยู่ในข้อมูล

หมายเหตุบรรณาธิการ:ฉันกำลังวางแผนเกี่ยวกับตัวอย่างง่ายๆ ที่ดีโดยมี "ปัญหา" เล็กน้อย แต่ในท้ายที่สุด ฉันคิดว่าอาจน่าสนใจที่ได้เห็นเครื่องมือบางอย่างที่ Elastic Stack ให้คุณแก้ไขสถานการณ์เหล่านี้

การตั้งค่าไฟล์บีท

ขั้นตอนแรกคือการทำให้ Filebeat พร้อมที่จะเริ่มจัดส่งข้อมูลไปยังคลัสเตอร์ Elasticsearch ของคุณ เมื่อคุณดาวน์โหลด Filebeat แล้ว (ลองใช้เวอร์ชันเดียวกับคลัสเตอร์ ES ของคุณ) และแตกไฟล์ออกมา การติดตั้งผ่านไฟล์กำหนดค่า filebeat.yml ที่รวมมานั้นทำได้ง่ายมาก สำหรับสถานการณ์ของเรา นี่คือการกำหนดค่าที่ฉันใช้

filebeat.prospectors:
- input_type: log
  paths:
    - /Path/To/logs/*.csv

  # Ignore the first line with column headings
  exclude_lines: ["^Restaurant Name,"]

  # Identifies the last two columns as the end of an entry and then prepends the previous lines to it
  multiline.pattern: ',\d+,[^\",]+$'
  multiline.negate: true
  multiline.match: before

#================================ Outputs =====================================

output.elasticsearch:
  # Array of hosts to connect to.
  hosts: ["https://dfw-xxxxx-0.es.objectrocket.com:xxxxx", "https://dfw-xxxxx-1.es.objectrocket.com:xxxxx", "https://dfw-xxxxx-2.es.objectrocket.com:xxxxx", "https://dfw-xxxxx-3.es.objectrocket.com:xxxxx"]
  pipeline: "inspectioncsvs"

  # Optional protocol and basic auth credentials.
  username: "esuser"
  password: "supersecretpassword"

ทุกอย่างค่อนข้างตรงไปตรงมาที่นี่ คุณมีส่วนที่จะระบุตำแหน่งและวิธีที่จะดึงไฟล์อินพุตและส่วนเพื่อระบุตำแหน่งที่จะจัดส่งข้อมูล ส่วนเดียวที่ฉันจะเรียกโดยเฉพาะคือบิตหลายบรรทัดและชิ้นส่วนการกำหนดค่า Elasticsearch

เนื่องจากการจัดรูปแบบสำหรับชุดข้อมูลนี้ไม่ได้เข้มงวดมากนัก ด้วยการใช้เครื่องหมายคำพูดคู่และการขึ้นบรรทัดใหม่ที่ไม่สอดคล้องกัน ทางเลือกที่ดีที่สุดคือมองหาจุดสิ้นสุดของรายการ ซึ่งประกอบด้วย ID ตัวเลขตามด้วยประเภทการตรวจสอบ โดยไม่มีการเปลี่ยนแปลงมากหรือ double-quotes/newline จากนั้น Filebeat จะจัดคิวบรรทัดที่ไม่ตรงกันและต่อท้ายบรรทัดสุดท้ายที่ตรงกับรูปแบบ หากข้อมูลของคุณสะอาดกว่าและยึดติดกับรูปแบบบรรทัดเดียวต่อรูปแบบรายการ คุณแทบจะเพิกเฉยการตั้งค่าหลายบรรทัดได้

เมื่อดูที่ส่วนผลลัพธ์ของ Elasticsearch จะเป็นการตั้งค่า Elasticsearch มาตรฐานพร้อมการเพิ่มชื่อของไปป์ไลน์ที่คุณต้องการใช้กับไปป์ไลน์:directive หากคุณอยู่ในบริการ ObjectRocket คุณสามารถดึงข้อมูลโค้ดเอาต์พุตจากแท็บ "เชื่อมต่อ" ใน UI ซึ่งจะมีโฮสต์ที่เหมาะสมทั้งหมดไว้ล่วงหน้า และเพียงเพิ่มไปป์ไลน์แล้วกรอกผู้ใช้และรหัสผ่านของคุณ . นอกจากนี้ ตรวจสอบให้แน่ใจว่าคุณได้เพิ่ม IP ของระบบลงใน ACL ของคลัสเตอร์แล้ว หากยังไม่ได้ดำเนินการ

การสร้างไปป์ไลน์การนำเข้า

ตอนนี้เรามีข้อมูลอินพุตและ Filebeat ที่พร้อมใช้งานแล้ว เราสามารถสร้างและปรับแต่งไปป์ไลน์การนำเข้าของเราได้ งานหลักที่ไปป์ไลน์ต้องดำเนินการคือ:

  • แบ่งเนื้อหา csv เป็นฟิลด์ที่ถูกต้อง
  • แปลงคะแนนการตรวจสอบเป็นจำนวนเต็ม
  • ตั้งค่า @timestamp สนาม
  • ล้างการจัดรูปแบบข้อมูลอื่นๆ

นี่คือไปป์ไลน์ที่สามารถทำได้ทั้งหมด:

PUT _ingest/pipeline/inspectioncsvs
{
  "description" : "Convert Restaurant inspections csv data to indexed data",
  "processors" : [
    {
      "grok": {
        "field": "message",
        "patterns": ["%{REST_NAME:RestaurantName},%{REST_ZIP:ZipCode},%{MONTHNUM2:InspectionMonth}/%{MONTHDAY:InspectionDay}/%{YEAR:InspectionYear},%{NUMBER:Score},\"%{DATA:StreetAddress}\n%{DATA:City},?\\s+%{WORD:State}\\s*%{NUMBER:ZipCode2}\\s*\n\\(?%{DATA:Location}\\)?\",%{NUMBER:FacilityID},%{DATA:InspectionType}$"],
        "pattern_definitions": {
          "REST_NAME": "%{DATA}|%{QUOTEDSTRING}",
          "REST_ZIP": "%{QUOTEDSTRING}|%{NUMBER}"
        }
      }
    },
    {
      "grok": {
        "field": "ZipCode",
        "patterns": [".*%{ZIP:ZipCode}\"?$"],
        "pattern_definitions": {
          "ZIP": "\\d{5}"
        }
      }
    },
    {
      "convert": {
        "field" : "Score",
        "type": "integer"
      }
    },
    {
      "set": {
        "field" : "@timestamp",
        "value" : "//"
      }
    },
    {
      "date" : {
        "field" : "@timestamp",
        "formats" : ["yyyy/MM/dd"]
      }
    }
  ],
  "on_failure" : [
    {
      "set" : {
        "field" : "error",
        "value" : " - Error processing message - "
      }
    }
  ]
}

ไม่เหมือนกับ Logstash ไปป์ไลน์การนำเข้าไม่มี (ในขณะที่เขียนบทความนี้) มีตัวประมวลผล/ปลั๊กอิน csv ดังนั้น คุณจะต้องแปลง csv ด้วยตัวคุณเอง ฉันใช้ตัวประมวลผล grok ในการยกของหนัก เนื่องจากแต่ละแถวมีเพียงไม่กี่คอลัมน์ สำหรับข้อมูลที่มีคอลัมน์มากขึ้น ตัวประมวลผล grok อาจมีขนดก ดังนั้นอีกทางเลือกหนึ่งคือการใช้ตัวประมวลผลแบบแยกส่วนและการเขียนสคริปต์ที่ไม่เจ็บปวดเพื่อประมวลผลบรรทัดในลักษณะวนซ้ำมากขึ้น คุณอาจสังเกตเห็นตัวประมวลผล grok ตัวที่สองซึ่งมีไว้เพื่อจัดการกับสองวิธีที่แตกต่างกันในการป้อนรหัสไปรษณีย์ในชุดข้อมูลนี้

เพื่อจุดประสงค์ในการดีบัก ฉันได้รวมส่วน on_failure ทั่วไปซึ่งจะตรวจจับข้อผิดพลาดทั้งหมดและพิมพ์ว่าตัวประมวลผลประเภทใดที่ล้มเหลวและข้อความใดที่ทำลายไปป์ไลน์ ทำให้การดีบักง่ายขึ้น ฉันสามารถสอบถามดัชนีของฉันสำหรับเอกสารใดๆ ที่มีการตั้งค่าข้อผิดพลาด และสามารถดีบักด้วย API จำลองได้ เพิ่มเติมเกี่ยวกับที่ตอนนี้…

ทดสอบระบบท่อ

ตอนนี้เราได้กำหนดค่าไพพ์ไลน์การนำเข้าแล้ว มาทดสอบและรันมันด้วย API จำลอง ขั้นแรกคุณจะต้องมีเอกสารตัวอย่าง คุณสามารถทำได้สองวิธี คุณสามารถเรียกใช้ Filebeat โดยไม่ต้องตั้งค่าไปป์ไลน์ จากนั้นเพียงแค่หยิบเอกสารที่ยังไม่ได้ประมวลผลจาก Elasticsearch หรือคุณสามารถเรียกใช้ Filebeat โดยเปิดใช้งานเอาต์พุตคอนโซล โดยแสดงความคิดเห็นในส่วน Elasticsearch และเพิ่มสิ่งต่อไปนี้ลงในไฟล์ yml:

output.console:
  pretty: true

นี่คือตัวอย่างเอกสารที่ฉันหยิบมาจากสภาพแวดล้อมของฉัน:

POST _ingest/pipeline/inspectioncsvs/_simulate
{
  "docs" : [
    {
      "_index": "inspections",
      "_type": "log",
      "_id": "AVpsUYR_du9kwoEnKsSA",
      "_score": 1,
      "_source": {
        "@timestamp": "2017-03-31T18:22:25.981Z",
        "beat": {
          "hostname": "systemx",
          "name": "RestReviews",
          "version": "5.1.1"
        },
        "input_type": "log",
        "message": "Wieland Elementary,78660,10/02/2014,100,\"900 TUDOR HOUSE RD\nAUSTIN, TX 78660\n(30.422862, -97.640183)\",10051637,Routine Inspection",
        "offset": 2109798,
        "source": "/Path/to/my/logs/Restaurant_Inspection_Scores.csv",
        "tags": [
          "debug",
          "reviews"
        ],
        "type": "log"
      }
    }
  ]
}

และการตอบสนอง (ฉันได้ตัดมันลงไปยังฟิลด์ที่เราพยายามจะตั้งค่า):

{
  "docs": [
    {
      "doc": {
        "_id": "AVpsUYR_du9kwoEnKsSA",
        "_type": "log",
        "_index": "inspections",
        "_source": {
          "InspectionType": "Routine Inspection",
          "ZipCode": "78660",
          "InspectionMonth": "10",
          "City": "AUSTIN",
          "message": "Wieland Elementary,78660,10/02/2014,100,\"900 TUDOR HOUSE RD\nAUSTIN, TX 78660\n(30.422862, -97.640183)\",10051637,Routine Inspection",
          "RestaurantName": "Wieland Elementary",
          "FacilityID": "10051637",
          "Score": 100,
          "StreetAddress": "900 TUDOR HOUSE RD",
          "State": "TX",
          "InspectionDay": "02",
          "InspectionYear": "2014",
          "ZipCode2": "78660",
          "Location": "30.422862, -97.640183"
        },
        "_ingest": {
          "timestamp": "2017-03-31T20:36:59.574+0000"
        }
      }
    }
  ]
}

ไปป์ไลน์ประสบความสำเร็จอย่างแน่นอน แต่ที่สำคัญที่สุด ข้อมูลทั้งหมดดูเหมือนจะมาถูกที่แล้ว

รันไฟล์บีต

ก่อนที่เราจะเรียกใช้ Filebeat เราจะทำสิ่งสุดท้าย ส่วนนี้เป็นทางเลือกโดยสมบูรณ์ หากคุณต้องการเพียงแค่ทำความคุ้นเคยกับไปป์ไลน์การนำเข้า แต่ถ้าคุณต้องการใช้ฟิลด์ตำแหน่งที่เราตั้งค่าในตัวประมวลผล grok เป็น Geo-point คุณจะต้องเพิ่มการแมปไปยัง filebeat template.json โดยเพิ่มข้อมูลต่อไปนี้ในส่วนคุณสมบัติ:

"Location": {
  "type": "geo_point"
},

เท่านี้ก็เรียบร้อย เราสามารถเปิด Filebeat ได้ด้วยการเรียกใช้ ./filebeat -e -c filebeat.yml -d “elasticsearch”

การใช้ข้อมูล

GET /filebeat-*/_count
{}

{
  "count": 25081,
  "_shards": {
    "total": 5,
    "successful": 5,
    "failed": 0
  }
}

นั่นเป็นสัญญาณที่ดี! มาดูกันว่าเรามีข้อผิดพลาดหรือไม่:

GET /filebeat-*/_search
{
    "query": {
        "exists" : { "field" : "error" }
    }
}

{
  "took": 1,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "failed": 0
  },
  "hits": {
    "total": 0,
    "max_score": null,
    "hits": []
  }
}

อีกหนึ่งสัญญาณที่ดี!

ตอนนี้เราพร้อมที่จะแสดงภาพและแสดงข้อมูลของเราใน Kibana แล้ว เราสามารถดำเนินการสร้างแดชบอร์ด Kibana ได้อีกครั้ง แต่เนื่องจากเรามีวันที่ ชื่อร้านอาหาร คะแนน และสถานที่ เราจึงมีอิสระมากมายในการสร้างภาพที่สวยงาม

การแยกวิเคราะห์ไฟล์ csv ด้วย Filebeat และ Elasticsearch Ingest Pipelines

หมายเหตุสุดท้าย

อีกครั้งที่ไปป์ไลน์การส่งผ่านข้อมูลมีประสิทธิภาพมากและสามารถจัดการกับการเปลี่ยนแปลงได้อย่างง่ายดาย คุณสามารถย้ายการประมวลผลทั้งหมดของคุณไปที่ Elasticsearch และใช้ Beats แบบเบาบนโฮสต์ของคุณเท่านั้น โดยไม่ต้องใช้ Logstash ที่ใดที่หนึ่งในไปป์ไลน์ อย่างไรก็ตาม ยังมีช่องว่างในโหนดการนำเข้าเมื่อเทียบกับ Logstash ตัวอย่างเช่น จำนวนโปรเซสเซอร์ที่มีอยู่ในไพพ์ไลน์การนำเข้ายังคงมีจำกัด ดังนั้นงานง่ายๆ เช่น การแยกวิเคราะห์ CSV จึงไม่ง่ายเหมือนใน Logstash ดูเหมือนว่าทีม Elasticsearch จะเปิดตัวโปรเซสเซอร์ใหม่เป็นประจำ ดังนั้นหวังว่ารายการความแตกต่างจะเล็กลงเรื่อยๆ