Skip to main content
Glama
dispatcher.lua7.55 kB
kb_to_mb = 1 / 1000 bytes_to_mb = 1 / (1024 * 1024) buffered_records = buffered_records or {} process_names = {"lunar_engine", "fluent-bit", "haproxy", "squid"} process_to_buffer_key = { ["lunar_engine"] = "engine", ["fluent-bit"] = "fluent", ["haproxy"] = "haproxy", ["squid"] = "squid" } local function process_exists(process_name) local result = os.execute("pgrep -x " .. process_name .. " > /dev/null 2>&1") return result == 0 end local function get_running_processes() local running_processes = {} for _, process_name in ipairs(process_names) do if process_exists(process_name) then running_processes[process_name] = true else running_processes[process_name] = false end end return running_processes end local function check_all_values_exists(processes) value_exists = buffered_records["system_memory"] and buffered_records["system_disk"] for process_name, process_running in pairs(processes) do if process_running then local key = process_to_buffer_key[process_name] value_exists = value_exists and buffered_records[key .. "_memory"] and buffered_records[key .. "_cpu"] end end return value_exists end local function check_all_values_not_empty(processes) values_not_empty = #buffered_records["system_memory"] > 0 and #buffered_records["system_disk"] > 0 for process_name, process_running in pairs(processes) do if process_running then local key = process_to_buffer_key[process_name] values_not_empty = values_not_empty and #buffered_records[key .. "_memory"] > 0 and #buffered_records[key .. "_cpu"] > 0 end end return values_not_empty end local function truncate_to_two_decimal_places(value) local multiplier = 10^2 return math.floor(value * multiplier) / multiplier end local function is_numeric(value) return tonumber(value) ~= nil end local function create_disabled_record() return { ["cpu"] = { ["used_percentage"] = 0 }, ["memory"] = { ["used_mb"] = 0, ["total_mb"] = 0, ["used_percentage"] = 0 }, ["fd"] = 0, ["uptime"] = "00:00", ["running"] = false } end local function create_record(cpu, memory) if not cpu or not memory then return nil end new_record = cpu new_record["memory"] = memory["memory"] new_record["fd"] = memory["fd"] new_record["running"] = true return new_record end function read_file(file_path) local file = io.open(file_path, "r") if file then local content = file:read("*a") file:close() return content end return nil end function get_cgroup_memory_info(max_memory) local usage_path = "/sys/fs/cgroup/memory.current" local limit_path = "/sys/fs/cgroup/memory.max" local usage = 0 local limit = 0 local usage_file = read_file(usage_path) if usage_file then usage = tonumber(usage_file) * bytes_to_mb end local limit_file = read_file(limit_path) if limit_file and is_numeric(limit_file) then limit = tonumber(limit_file) * bytes_to_mb elseif max_memory then limit = tonumber(max_memory) * kb_to_mb end local usage_mb = math.floor(usage * 10 + 0.5) / 10 local limit_mb = math.floor(limit * 10 + 0.5) / 10 return usage_mb, limit_mb end function add_memory_info(record) local usage_mb, limit_mb = get_cgroup_memory_info(record["Mem.total"]) new_record = {} new_record["used_mb"] = truncate_to_two_decimal_places(usage_mb) new_record["total_mb"] = truncate_to_two_decimal_places(limit_mb) new_record["used_percentage"] = truncate_to_two_decimal_places((usage_mb / limit_mb) * 100) return new_record end function add_cpu_info(record) new_record = {} new_record["used_percentage"] = truncate_to_two_decimal_places(tonumber(record["cpu_p"]) or 0) return new_record end function add_disk_info(record) if record["exec"] then local total, used = string.match(record["exec"], "(%d+)%a+%s+(%d+)%a") if not total or not used then return nil end new_record = {} new_record["total_gb"] = tonumber(total) new_record["used_gb"] = tonumber(used) return new_record end return nil end function convert_process_memory_to_mb(record) new_record = {} if record["mem.VmRSS"] and record["fd"] then new_record["memory"] = {} new_record["memory"]["used_mb"] = truncate_to_two_decimal_places(record["mem.VmRSS"] * bytes_to_mb) or 0 new_record["fd"] = tonumber(record["fd"]) or 0 return new_record end return nil end function get_cpu_stats(record) if record["exec"] then local cpu, uptime = string.match(record["exec"], "([%d%.]+)%s+(%S+)") new_record = {} new_record["cpu"] = {} if is_numeric(cpu) and uptime then new_record["cpu"]["used_percentage"] = tonumber(cpu) or 0 new_record["uptime"] = uptime or 0 return new_record end end return nil end function generate_combined_record(running_processes) local combined_record = { ["system"] = { ["memory"] = add_memory_info(table.remove(buffered_records["system_memory"], 1)), ["cpu"] = add_cpu_info(table.remove(buffered_records["system_cpu"], 1)), ["disk"] = add_disk_info(table.remove(buffered_records["system_disk"], 1)) } } for process_name, process_running in pairs(running_processes) do local key = process_to_buffer_key[process_name] if process_running then local engine_mem_record = convert_process_memory_to_mb(table.remove(buffered_records[key .. "_memory"], 1)) local engine_cpu_record = get_cpu_stats(table.remove(buffered_records[key .. "_cpu"], 1)) combined_record[key] = create_record(engine_cpu_record, engine_mem_record) else combined_record[key] = create_disabled_record() end end return combined_record end local function process_metrics() local metrics_port = os.getenv("METRICS_LISTEN_PORT") local record = { ["api_call_total"] = 0, ["api_call_engine"] = 0 } if not metrics_port then metrics_port = "3000" end local url = "'http://127.0.0.1:" .. metrics_port .. "/metrics'" local handle = io.popen("wget -qO- " .. url) if not handle then return record end local wget_output = handle:read("*a") if not wget_output then return record end handle:close() local api_call_count_total = 0 local flow_invocations_total = 0 for line in wget_output:gmatch("[^\n]+") do if line:find("api_call_count_total{") then local value = line:match('api_call_count_total{.*} (%d+)') if value then api_call_count_total = api_call_count_total + tonumber(value) end elseif line:find("flow_invocations_total{") then local value = line:match('flow_invocations_total{.*} (%d+)') if value then flow_invocations_total = flow_invocations_total + tonumber(value) end end end record["api_call_total"] = api_call_count_total record["api_call_engine"] = flow_invocations_total return record end function buffer_and_dispatch(tag, timestamp, record) if not buffered_records[tag] then buffered_records[tag] = {} end table.insert(buffered_records[tag], record) local running_processes = get_running_processes() if check_all_values_exists(running_processes) and check_all_values_not_empty(running_processes) then local combined_record = generate_combined_record(running_processes) combined_record.api_call_metrics = process_metrics() return 2, timestamp, { combined_record } end return -1, timestamp, nil end

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/TheLunarCompany/lunar'

If you have feedback or need assistance with the MCP directory API, please join our Discord server