Skip to main content
Glama
global_log.go9.77 kB
package log import ( "bufio" "context" "fmt" "regexp" "sort" "strings" "time" "github.com/gin-gonic/gin" "github.com/weibaohui/k8m/pkg/comm/utils/amis" "github.com/weibaohui/k8m/pkg/service" "github.com/weibaohui/kom/kom" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) // GlobalLogEntry 全局日志条目 type GlobalLogEntry struct { ID uint `json:"id"` ClusterName string `json:"cluster_name"` Namespace string `json:"namespace,omitempty"` NodeName string `json:"node_name,omitempty"` PodName string `json:"pod_name,omitempty"` Container string `json:"container,omitempty"` LogLevel string `json:"log_level"` Source string `json:"source"` Message string `json:"message"` Timestamp time.Time `json:"timestamp"` } // parseLogLevel 从日志消息中解析日志级别 func parseLogLevel(message string) string { patterns := []struct { regex *regexp.Regexp level string }{ {regexp.MustCompile(`(?i)^\s*\[?(FATAL|CRITICAL)\]?\s*[::]?`), "FATAL"}, {regexp.MustCompile(`(?i)^\s*\[?(ERROR|ERR)\]?\s*[::]?`), "ERROR"}, {regexp.MustCompile(`(?i)^\s*\[?(WARNING|WARN)\]?\s*[::]?`), "WARN"}, {regexp.MustCompile(`(?i)^\s*\[?(INFO|INFORMATION)\]?\s*[::]?`), "INFO"}, {regexp.MustCompile(`(?i)^\s*\[?(DEBUG|DBG)\]?\s*[::]?`), "DEBUG"}, {regexp.MustCompile(`(?i)^\s*\[?(TRACE|TRC)\]?\s*[::]?`), "TRACE"}, {regexp.MustCompile(`(?i)\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}.*?\s+(FATAL|CRITICAL)\s*[::]?`), "FATAL"}, {regexp.MustCompile(`(?i)\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}.*?\s+(ERROR|ERR)\s*[::]?`), "ERROR"}, {regexp.MustCompile(`(?i)\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}.*?\s+(WARNING|WARN)\s*[::]?`), "WARN"}, {regexp.MustCompile(`(?i)\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}.*?\s+(INFO|INFORMATION)\s*[::]?`), "INFO"}, {regexp.MustCompile(`(?i)\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}.*?\s+(DEBUG|DBG)\s*[::]?`), "DEBUG"}, {regexp.MustCompile(`(?i)\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}.*?\s+(TRACE|TRC)\s*[::]?`), "TRACE"}, } // 优先检查明确的日志级别标识 for _, pattern := range patterns { if pattern.regex.MatchString(message) { return pattern.level } } // 检查是否包含错误相关的独立词汇 if regexp.MustCompile(`(?i)\b(error|failed|exception|panic|crash)\b`).MatchString(message) { return "ERROR" } if regexp.MustCompile(`(?i)\b(warning|warn)\b`).MatchString(message) { return "WARN" } if regexp.MustCompile(`(?i)\b(debug)\b`).MatchString(message) { return "DEBUG" } // 默认返回 INFO return "INFO" } // ListGlobalLog 全局日志列表 func (lc *Controller) ListGlobalLog(c *gin.Context) { cluster := c.Query("cluster") namespace := c.Query("namespace") nodeName := c.Query("node_name") podName := c.Query("pod_name") container := c.Query("container") keyword := c.Query("keyword") logLevel := c.Query("log_level") source := c.Query("source") startTime := c.Query("start_time") endTime := c.Query("end_time") // 集群是必需的 if cluster == "" { amis.WriteJsonError(c, fmt.Errorf("cluster parameter is required")) return } // 手动设置集群到上下文中,因为 /mgm/ 路径不会经过集群中间件 c.Set("cluster", cluster) // 检查集群是否连接 if !service.ClusterService().IsConnected(cluster) { amis.WriteJsonError(c, fmt.Errorf("cluster %s is not connected", cluster)) return } ctx := amis.GetContextWithUser(c) // 使用 GetSelectedCluster 来获取集群,保持与其他 API 的一致性 selectedCluster, err := amis.GetSelectedCluster(c) if err != nil { amis.WriteJsonError(c, err) return } var logs []*GlobalLogEntry // 根据查询参数决定查询策略 if podName != "" { // 查询特定 Pod 的日志 logs, err = lc.queryPodLogs(ctx, selectedCluster, namespace, nodeName, podName, container, keyword, logLevel, source, startTime, endTime) } else if namespace != "" || nodeName != "" { // 查询特定命名空间或节点下的 Pod 日志 logs, err = lc.queryFilteredPodsLogs(ctx, selectedCluster, namespace, nodeName, container, keyword, logLevel, source, startTime, endTime) } else { // 查询整个集群的 Pod 日志(限制数量) logs, err = lc.queryClusterLogs(ctx, selectedCluster, container, keyword, logLevel, source, startTime, endTime) } if err != nil { amis.WriteJsonError(c, err) return } // 按时间倒序排序 sort.Slice(logs, func(i, j int) bool { return logs[i].Timestamp.After(logs[j].Timestamp) }) amis.WriteJsonListWithTotal(c, int64(len(logs)), logs) } // queryPodLogs 查询特定 Pod 的日志 func (lc *Controller) queryPodLogs(ctx context.Context, cluster, namespace, nodeName, podName, container, keyword, logLevel, source, startTime, endTime string) ([]*GlobalLogEntry, error) { var pods []v1.Pod q := kom.Cluster(cluster).WithContext(ctx).Resource(&v1.Pod{}) if namespace != "" { q = q.Namespace(namespace) } listOpt := metav1.ListOptions{FieldSelector: fmt.Sprintf("metadata.name=%s", podName)} if nodeName != "" { listOpt.FieldSelector += fmt.Sprintf(",spec.nodeName=%s", nodeName) } if err := q.List(&pods, listOpt).Error; err != nil { return nil, fmt.Errorf("failed to list pods: %v", err) } if len(pods) == 0 { return []*GlobalLogEntry{}, nil } var allLogs []*GlobalLogEntry tail := int64(200) // 单个 Pod 获取更多日志 for _, pod := range pods { logs, err := lc.fetchPodLogs(ctx, cluster, pod, container, tail) if err != nil { continue // 跳过错误的 Pod,继续处理其他 } allLogs = append(allLogs, lc.filterLogs(logs, keyword, logLevel, source, startTime, endTime)...) } return allLogs, nil } // queryFilteredPodsLogs 查询特定命名空间或节点下的 Pod 日志 func (lc *Controller) queryFilteredPodsLogs(ctx context.Context, cluster, namespace, nodeName, container, keyword, logLevel, source, startTime, endTime string) ([]*GlobalLogEntry, error) { var pods []v1.Pod q := kom.Cluster(cluster).WithContext(ctx).Resource(&v1.Pod{}) if namespace != "" { q = q.Namespace(namespace) } listOpt := metav1.ListOptions{} if nodeName != "" { listOpt.FieldSelector = fmt.Sprintf("spec.nodeName=%s", nodeName) } if err := q.List(&pods, listOpt).Error; err != nil { return nil, fmt.Errorf("failed to list pods: %v", err) } if len(pods) == 0 { return []*GlobalLogEntry{}, nil } // 限制 Pod 数量以避免过多请求 if len(pods) > 10 { pods = pods[:10] } var allLogs []*GlobalLogEntry tail := int64(100) for _, pod := range pods { logs, err := lc.fetchPodLogs(ctx, cluster, pod, container, tail) if err != nil { continue } allLogs = append(allLogs, lc.filterLogs(logs, keyword, logLevel, source, startTime, endTime)...) } return allLogs, nil } // queryClusterLogs 查询整个集群的 Pod 日志 func (lc *Controller) queryClusterLogs(ctx context.Context, cluster, container, keyword, logLevel, source, startTime, endTime string) ([]*GlobalLogEntry, error) { var pods []v1.Pod if err := kom.Cluster(cluster).WithContext(ctx).Resource(&v1.Pod{}).List(&pods, metav1.ListOptions{}).Error; err != nil { return nil, fmt.Errorf("failed to list pods: %v", err) } if len(pods) == 0 { return []*GlobalLogEntry{}, nil } // 严格限制全集群查询的 Pod 数量 if len(pods) > 20 { pods = pods[:20] } var allLogs []*GlobalLogEntry tail := int64(50) for _, pod := range pods { logs, err := lc.fetchPodLogs(ctx, cluster, pod, container, tail) if err != nil { continue } allLogs = append(allLogs, lc.filterLogs(logs, keyword, logLevel, source, startTime, endTime)...) } return allLogs, nil } // fetchPodLogs 从单个 Pod 获取日志 func (lc *Controller) fetchPodLogs(ctx context.Context, cluster string, pod v1.Pod, container string, tail int64) ([]*GlobalLogEntry, error) { opt := &v1.PodLogOptions{TailLines: &tail} if container != "" { opt.Container = container } stream, err := service.PodService().StreamPodLogs(ctx, cluster, pod.Namespace, pod.Name, opt) if err != nil || stream == nil { return nil, err } defer stream.Close() var logs []*GlobalLogEntry scanner := bufio.NewScanner(stream) buf := make([]byte, 0, 64*1024) scanner.Buffer(buf, 10*1024*1024) id := uint(1) for scanner.Scan() { line := scanner.Text() if line == "" { continue } logs = append(logs, &GlobalLogEntry{ ID: id, ClusterName: cluster, Namespace: pod.Namespace, NodeName: pod.Spec.NodeName, PodName: pod.Name, Container: container, LogLevel: parseLogLevel(line), Source: "pod", Message: line, Timestamp: time.Now(), }) id++ } return logs, scanner.Err() } // filterLogs 根据条件过滤日志 func (lc *Controller) filterLogs(logs []*GlobalLogEntry, keyword, logLevel, source, startTime, endTime string) []*GlobalLogEntry { var filtered []*GlobalLogEntry var startT, endT time.Time var err error if startTime != "" { startT, err = time.Parse("2006-01-02 15:04:05", startTime) if err != nil { startT = time.Time{} } } if endTime != "" { endT, err = time.Parse("2006-01-02 15:04:05", endTime) if err != nil { endT = time.Time{} } } for _, log := range logs { // 关键字过滤 if keyword != "" && !strings.Contains(log.Message, keyword) { continue } // 日志级别过滤 if logLevel != "" && log.LogLevel != logLevel { continue } // 来源过滤 if source != "" && log.Source != source { continue } // 时间范围过滤 if !startT.IsZero() && log.Timestamp.Before(startT) { continue } if !endT.IsZero() && log.Timestamp.After(endT) { continue } filtered = append(filtered, log) } return filtered }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/weibaohui/k8m'

If you have feedback or need assistance with the MCP directory API, please join our Discord server