bulk nodejs
Elasticsearch Bulk Insert JSON Data | by Onexlab | Medium
npm install @elastic/elasticsearch
npm install fast-glob
// elastic-glob.js
import { Client } from '@elastic/elasticsearch'
import fg from 'fast-glob'
import fs from 'fs'
async function dumb_to_elastic(
export_dir,
elastic_node,
elastic_user,
elastic_pass,
elastic_index)
{
const client = new Client({
node: elastic_node,
auth: {
username: elastic_user,
password: elastic_pass
},
ssl: {
rejectUnauthorized: false
}
})
await client.indices.delete({
index: elastic_index,
})
export_dir = export_dir.split("/")[export_dir.split("/").length - 1]
let glob_path = `**/${export_dir}/*.json`
let files_to_index = await fg.sync([glob_path])
let create_index_cmd = await client.create({
id: "PaulWasHerePlaceholder",
index: elastic_index,
body: {
hello:"world"
}
})
console.log(create_index_cmd)
var response = await client.indices.putSettings({
index: elastic_index,
body:{
"index.mapping.total_fields.limit": 4000
}
}
)
response = await client.indices.putMapping({
index: elastic_index,
body: {
"properties": {
"msg.channel.topic_name": {
"type": "text",
"fielddata": true
},
"msg.content.type": {
"type": "text",
"fielddata": true
},
"msg.sender.username": {
"type": "text",
"fielddata": true
},
"msg.content.reaction.b": {
"type": "text",
"fielddata": true
},
"msg.channel.name": {
"type": "text",
"fielddata": true
}
}
}
})
for(let j = 0; j < files_to_index.length; j++){
let file_content = fs.readFileSync(files_to_index[j], 'utf8')
let dataset = JSON.parse(file_content)
const body = dataset.flatMap(doc => [{ index: { _index: elastic_index } }, doc])
const { body: bulkResponse } = await client.bulk({ refresh: true, body })
if (bulkResponse.errors) {
const erroredDocuments = []
// The items array has the same order of the dataset we just indexed.
// The presence of the `error` key indicates that the operation
// that we did for the document has failed.
bulkResponse.items.forEach((action, i) => {
const operation = Object.keys(action)[0]
if (action[operation].error) {
erroredDocuments.push({
// If the status is 429 it means that you can retry the document,
// otherwise it's very likely a mapping error, and you should
// fix the document before to try it again.
status: action[operation].status,
error: action[operation].error,
operation: body[i * 2],
document: body[i * 2 + 1]
})
}
})
console.log(erroredDocuments)
}
const { body: count } = await client.count({ index: elastic_index })
console.log(count)
}
}
export {
dumb_to_elastic
}