package scanner
import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"
"buf.build/gen/go/safedep/api/grpc/go/safedep/services/malysis/v1/malysisv1grpc"
malysisv1pb "buf.build/gen/go/safedep/api/protocolbuffers/go/safedep/messages/malysis/v1"
packagev1 "buf.build/gen/go/safedep/api/protocolbuffers/go/safedep/messages/package/v1"
malysisv1 "buf.build/gen/go/safedep/api/protocolbuffers/go/safedep/services/malysis/v1"
"github.com/safedep/dry/adapters"
"github.com/safedep/vet/pkg/common/logger"
"github.com/safedep/vet/pkg/common/utils"
"github.com/safedep/vet/pkg/models"
"google.golang.org/grpc"
)
var errMalysisPollRetryInFuture = errors.New("retry in future")
type MalysisMalwareEnricherConfig struct {
// Timeout for the enricher starting from the time of initialization.
// This includes the time taken to submit the package for analysis and
// waiting for the analysis to complete.
Timeout time.Duration
// Timeout for the gRPC operation to the Malysis service. This timeout
// is for a single gRPC operation.
GrpcOperationTimeout time.Duration
// Number of workers to poll the analysis results
QueryWorkerCount int
// Maximum number of retries for querying the analysis results for a given
// analysis identifier.
MaxQueryRetries int
}
func DefaultMalysisMalwareEnricherConfig() MalysisMalwareEnricherConfig {
return MalysisMalwareEnricherConfig{
Timeout: 5 * time.Minute,
QueryWorkerCount: 10,
GrpcOperationTimeout: 10 * time.Second,
MaxQueryRetries: 10,
}
}
type malysisMalwareEnricher struct {
cc *grpc.ClientConn
client malysisv1grpc.MalwareAnalysisServiceClient
config MalysisMalwareEnricherConfig
githubClient *adapters.GithubClient
// Cache of analysis identifiers to poll and apply the results
// to all packages that were submitted for analysis. Malysis internally
// maintains a cache for analysis result for a given package. It will
// not re-analyse the package if it has already been analysed or submitted
// for analysis.
queryCache map[string][]*models.Package
qcLock sync.Mutex
// Channel to submit analysis identifiers to the query worker
queryChannel chan *analysisQueryRequest
// Channel to push results by the query worker
resultsChannel chan *analysisQueryResult
// Wait group to synchronize between submissions and completions
wg sync.WaitGroup
// Waiting context
ctx context.Context
}
var _ PackageMetaEnricher = (*malysisMalwareEnricher)(nil)
type analysisSubmissionResult struct {
analysisId string
}
type analysisQueryRequest struct {
analysisId string
retryCount int
nextRetryAt time.Time
}
type analysisQueryResult struct {
req *analysisQueryRequest
response *malysisv1.GetAnalysisReportResponse
err error
}
func NewMalysisMalwareEnricher(cc *grpc.ClientConn,
gha *adapters.GithubClient,
config MalysisMalwareEnricherConfig,
) (*malysisMalwareEnricher, error) {
if cc == nil {
return nil, errors.New("grpc client connection is required")
}
if gha == nil {
return nil, errors.New("github client is required")
}
ctx, cancelFn := context.WithTimeout(context.Background(), config.Timeout)
go func() {
_ = time.AfterFunc(config.Timeout+(500*time.Millisecond), cancelFn)
}()
client := malysisv1grpc.NewMalwareAnalysisServiceClient(cc)
enricher := &malysisMalwareEnricher{
cc: cc,
githubClient: gha,
client: client,
config: config,
queryCache: make(map[string][]*models.Package),
queryChannel: make(chan *analysisQueryRequest, 10000),
resultsChannel: make(chan *analysisQueryResult, 10000),
ctx: ctx,
}
err := enricher.startResultWorker(ctx)
if err != nil {
cancelFn()
return nil, fmt.Errorf("failed to start result worker: %w", err)
}
err = enricher.startQueryWorker(ctx)
if err != nil {
cancelFn()
return nil, fmt.Errorf("failed to start query worker: %w", err)
}
return enricher, nil
}
func (e *malysisMalwareEnricher) Name() string {
return "Malysis Malware Enricher"
}
// We will submit all packages for analysis because our backend has caching.
// This will ensure that eventually the packages will get analysed and
// subsequent scans will have better coverage in terms of malware analysis.
func (e *malysisMalwareEnricher) Enrich(pkg *models.Package,
_ PackageDependencyCallbackFn,
) error {
// Submit for analysis
res, err := e.submitPackageForAnalysis(pkg)
if err != nil {
return fmt.Errorf("failed to submit package for analysis: %w", err)
}
// Wait for the analysis to complete
e.wg.Add(1)
// Add the analysis identifier to the cache for query go routine
// to poll and apply the results to all packages that were submitted
e.qcLock.Lock()
defer e.qcLock.Unlock()
if _, ok := e.queryCache[res.analysisId]; !ok {
e.queryCache[res.analysisId] = make([]*models.Package, 0)
}
e.queryCache[res.analysisId] = append(e.queryCache[res.analysisId], pkg)
e.enqueueAnalysisForQuery(&analysisQueryRequest{analysisId: res.analysisId})
return nil
}
// Wait returns when there are no more pending enrichments
// or when timeout is reached.
func (e *malysisMalwareEnricher) Wait() error {
ch := make(chan bool)
go func() {
e.wg.Wait()
close(ch)
}()
select {
case <-e.ctx.Done():
return e.ctx.Err()
case <-ch:
return nil
}
}
func (e *malysisMalwareEnricher) enqueueAnalysisForQuery(req *analysisQueryRequest) {
// If we are in the backoff period, we will schedule an enqueue
// operation in the future else we will submit the request immediately.
if time.Now().Before(req.nextRetryAt) {
// We are creating unbounded go routines here. This is fine because
// the number of retries are limited and the go routines will be
// garbage collected once the retries are exhausted.
go func(ctx context.Context) {
timer := time.NewTimer(time.Until(req.nextRetryAt.Add(100 * time.Millisecond)))
defer timer.Stop()
select {
case <-ctx.Done():
return
case <-timer.C:
}
e.queryChannel <- req
}(e.ctx)
} else {
req.retryCount++
req.nextRetryAt = time.Now().Add(5 * time.Second * time.Duration(req.retryCount))
e.queryChannel <- req
}
}
func (e *malysisMalwareEnricher) submitPackageForAnalysis(pkg *models.Package) (*analysisSubmissionResult, error) {
logger.Infof("[Malware Analysis] Submitting package for malware analysis: %s/%s/%s",
pkg.Manifest.Ecosystem, pkg.PackageDetails.Name, pkg.PackageDetails.Version)
req := malysisv1.AnalyzePackageRequest{
Target: &malysisv1pb.PackageAnalysisTarget{
PackageVersion: &packagev1.PackageVersion{
Package: &packagev1.Package{
Ecosystem: pkg.GetControlTowerSpecEcosystem(),
Name: pkg.GetName(),
},
Version: pkg.GetVersion(),
},
},
}
ctx, cancelFn := context.WithTimeout(context.Background(), e.config.GrpcOperationTimeout)
defer cancelFn()
// This is an exception case, we cannot use the version as is because GitHub Actions
// tags, branches are mutable and can change. Hence we will resolve the commit hash
// for the given version and use it for analysis.
if req.GetTarget().GetPackageVersion().GetPackage().GetEcosystem() == packagev1.Ecosystem_ECOSYSTEM_GITHUB_ACTIONS {
logger.Debugf("[Malware Analysis] Resolving commit hash for GitHub Actions package: %s/%s",
pkg.GetName(), pkg.GetVersion())
parts := strings.Split(pkg.GetName(), "/")
if len(parts) != 2 {
return nil, fmt.Errorf("invalid package name: %s for GitHub Actions - should be in the format <owner>/<repo>", pkg.GetName())
}
commitHash, err := utils.ResolveGitHubRepositoryCommitSHA(ctx, e.githubClient, parts[0], parts[1], pkg.GetVersion())
if err != nil {
return nil, fmt.Errorf("failed to resolve commit hash for GitHub Actions package: %w", err)
}
logger.Debugf("[Malware Analysis] Resolved commit hash for GitHub Actions package: %s/%s@%s",
parts[0], parts[1], commitHash)
// Patch the version in the request
req.GetTarget().GetPackageVersion().Version = commitHash
}
res, err := e.client.AnalyzePackage(ctx, &req)
if err != nil {
return nil, fmt.Errorf("failed to submit package for analysis: %w", err)
}
return &analysisSubmissionResult{analysisId: res.AnalysisId}, nil
}
// Receive results from query worker and apply to all packages.
// In case of error, re-submit for query.
func (e *malysisMalwareEnricher) startResultWorker(ctx context.Context) error {
go func() {
for {
select {
case <-ctx.Done():
return
case msg, ok := <-e.resultsChannel:
if !ok {
return
}
// Extract the response and error from the message
res, err := msg.response, msg.err
// Check for incomplete analysis
if (res != nil) && err == nil {
// If the analysis explicitly failed at the service provider, we will
// log the error and will NOT retry the polling.
if res.Status == malysisv1.AnalysisStatus_ANALYSIS_STATUS_FAILED {
logger.Errorf("[Malware Analysis] Analysis Id: %s failed with error: %s",
msg.req.analysisId, res.GetErrorMessage())
e.wg.Done()
continue
}
// For any other status, we will retry the polling by returning
// an error to the results channel.
if res.Status != malysisv1.AnalysisStatus_ANALYSIS_STATUS_COMPLETED {
err = fmt.Errorf("analysis is not completed: %s", res.Status)
}
}
if err != nil {
if msg.req.retryCount >= e.config.MaxQueryRetries {
logger.Errorf("[Malware Analysis] Max retries exceeded for analysis: %s", msg.req.analysisId)
e.wg.Done()
continue
}
e.enqueueAnalysisForQuery(msg.req)
continue
}
// At this point, we will not retry the query any more
func() {
defer e.wg.Done()
e.qcLock.Lock()
defer e.qcLock.Unlock()
if res == nil {
logger.Errorf("[Malware Analysis] Empty response for analysis: %s", msg.req.analysisId)
return
}
if res.GetReport() == nil {
logger.Errorf("[Malware Analysis] Empty report for analysis: %s", msg.req.analysisId)
return
}
// Apply the results to all packages that were submitted for analysis
if packages, ok := e.queryCache[msg.req.analysisId]; ok {
for _, pkg := range packages {
// Apply the results to the package
logger.Debugf("[Malware Analysis] Applying results to package: %s/%s/%s",
pkg.Manifest.GetControlTowerSpecEcosystem(), pkg.GetName(), pkg.GetVersion())
// Here we only enrich the package with the malware analysis result.
// We do not make a decision based on the result.
pkg.SetMalwareAnalysisResult(&models.MalwareAnalysisResult{
AnalysisId: msg.req.analysisId,
Report: res.GetReport(),
VerificationRecord: res.GetVerificationRecord(),
})
}
}
}()
}
}
}()
return nil
}
// Poll the result for a given analysisId
func (e *malysisMalwareEnricher) startQueryWorker(ctx context.Context) error {
for i := 0; i < e.config.QueryWorkerCount; i++ {
go func() {
for {
select {
case <-ctx.Done():
return
case req, ok := <-e.queryChannel:
if !ok {
return
}
// If we are in the backoff period, we will not query the service
if time.Now().Before(req.nextRetryAt) {
logger.Debugf("[Malware Analysis] Retrying query for analysis report: %s in %s",
req.analysisId, time.Until(req.nextRetryAt))
e.resultsChannel <- &analysisQueryResult{
req: req,
err: errMalysisPollRetryInFuture,
}
continue
}
ctx, cancelFn := context.WithTimeout(ctx, e.config.GrpcOperationTimeout)
defer cancelFn()
res, err := e.client.GetAnalysisReport(ctx, &malysisv1.GetAnalysisReportRequest{
AnalysisId: req.analysisId,
})
e.resultsChannel <- &analysisQueryResult{
req: req,
response: res,
err: err,
}
}
}
}()
}
return nil
}