Skip to main content
Glama
eventstream.go17.5 kB
// Code generated by smithy-go-codegen DO NOT EDIT. package cloudwatchlogs import ( "context" "fmt" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream" "github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream/eventstreamapi" "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types" smithy "github.com/aws/smithy-go" "github.com/aws/smithy-go/middleware" smithysync "github.com/aws/smithy-go/sync" smithyhttp "github.com/aws/smithy-go/transport/http" "io" "io/ioutil" "sync" ) // GetLogObjectResponseStreamReader provides the interface for reading events from // a stream. // // The writer's Close method must allow multiple concurrent calls. type GetLogObjectResponseStreamReader interface { Events() <-chan types.GetLogObjectResponseStream Close() error Err() error } // StartLiveTailResponseStreamReader provides the interface for reading events // from a stream. // // The writer's Close method must allow multiple concurrent calls. type StartLiveTailResponseStreamReader interface { Events() <-chan types.StartLiveTailResponseStream Close() error Err() error } type startLiveTailResponseStreamReadEvent interface { isStartLiveTailResponseStreamReadEvent() } type startLiveTailResponseStreamReadEventMessage struct { Value types.StartLiveTailResponseStream } func (*startLiveTailResponseStreamReadEventMessage) isStartLiveTailResponseStreamReadEvent() {} type startLiveTailResponseStreamReadEventInitialResponse struct { Value interface{} } func (*startLiveTailResponseStreamReadEventInitialResponse) isStartLiveTailResponseStreamReadEvent() { } type startLiveTailResponseStreamReader struct { stream chan types.StartLiveTailResponseStream decoder *eventstream.Decoder eventStream io.ReadCloser err *smithysync.OnceErr payloadBuf []byte done chan struct{} closeOnce sync.Once initialResponseDeserializer func(*eventstream.Message) (interface{}, error) initialResponse chan interface{} } func newStartLiveTailResponseStreamReader(readCloser io.ReadCloser, decoder *eventstream.Decoder, ird func(*eventstream.Message) (interface{}, error)) *startLiveTailResponseStreamReader { w := &startLiveTailResponseStreamReader{ stream: make(chan types.StartLiveTailResponseStream), decoder: decoder, eventStream: readCloser, err: smithysync.NewOnceErr(), done: make(chan struct{}), payloadBuf: make([]byte, 10*1024), initialResponseDeserializer: ird, initialResponse: make(chan interface{}, 1), } go w.readEventStream() return w } func (r *startLiveTailResponseStreamReader) Events() <-chan types.StartLiveTailResponseStream { return r.stream } func (r *startLiveTailResponseStreamReader) readEventStream() { defer r.Close() defer close(r.stream) defer close(r.initialResponse) for { r.payloadBuf = r.payloadBuf[0:0] decodedMessage, err := r.decoder.Decode(r.eventStream, r.payloadBuf) if err != nil { if err == io.EOF { return } select { case <-r.done: return default: r.err.SetError(err) return } } event, err := r.deserializeEventMessage(&decodedMessage) if err != nil { r.err.SetError(err) return } switch ev := event.(type) { case *startLiveTailResponseStreamReadEventInitialResponse: select { case r.initialResponse <- ev.Value: case <-r.done: return default: } case *startLiveTailResponseStreamReadEventMessage: select { case r.stream <- ev.Value: case <-r.done: return } default: r.err.SetError(fmt.Errorf("unexpected event wrapper: %T", event)) return } } } func (r *startLiveTailResponseStreamReader) deserializeEventMessage(msg *eventstream.Message) (startLiveTailResponseStreamReadEvent, error) { messageType := msg.Headers.Get(eventstreamapi.MessageTypeHeader) if messageType == nil { return nil, fmt.Errorf("%s event header not present", eventstreamapi.MessageTypeHeader) } switch messageType.String() { case eventstreamapi.EventMessageType: eventType := msg.Headers.Get(eventstreamapi.EventTypeHeader) if eventType == nil { return nil, fmt.Errorf("%s event header not present", eventstreamapi.EventTypeHeader) } if eventType.String() == "initial-response" { v, err := r.initialResponseDeserializer(msg) if err != nil { return nil, err } return &startLiveTailResponseStreamReadEventInitialResponse{Value: v}, nil } var v types.StartLiveTailResponseStream if err := awsAwsjson11_deserializeEventStreamStartLiveTailResponseStream(&v, msg); err != nil { return nil, err } return &startLiveTailResponseStreamReadEventMessage{Value: v}, nil case eventstreamapi.ExceptionMessageType: return nil, awsAwsjson11_deserializeEventStreamExceptionStartLiveTailResponseStream(msg) case eventstreamapi.ErrorMessageType: errorCode := "UnknownError" errorMessage := errorCode if header := msg.Headers.Get(eventstreamapi.ErrorCodeHeader); header != nil { errorCode = header.String() } if header := msg.Headers.Get(eventstreamapi.ErrorMessageHeader); header != nil { errorMessage = header.String() } return nil, &smithy.GenericAPIError{ Code: errorCode, Message: errorMessage, } default: mc := msg.Clone() return nil, &UnknownEventMessageError{ Type: messageType.String(), Message: &mc, } } } func (r *startLiveTailResponseStreamReader) ErrorSet() <-chan struct{} { return r.err.ErrorSet() } func (r *startLiveTailResponseStreamReader) Close() error { r.closeOnce.Do(r.safeClose) return r.Err() } func (r *startLiveTailResponseStreamReader) safeClose() { close(r.done) r.eventStream.Close() } func (r *startLiveTailResponseStreamReader) Err() error { return r.err.Err() } func (r *startLiveTailResponseStreamReader) Closed() <-chan struct{} { return r.done } type getLogObjectResponseStreamReadEvent interface { isGetLogObjectResponseStreamReadEvent() } type getLogObjectResponseStreamReadEventMessage struct { Value types.GetLogObjectResponseStream } func (*getLogObjectResponseStreamReadEventMessage) isGetLogObjectResponseStreamReadEvent() {} type getLogObjectResponseStreamReadEventInitialResponse struct { Value interface{} } func (*getLogObjectResponseStreamReadEventInitialResponse) isGetLogObjectResponseStreamReadEvent() {} type getLogObjectResponseStreamReader struct { stream chan types.GetLogObjectResponseStream decoder *eventstream.Decoder eventStream io.ReadCloser err *smithysync.OnceErr payloadBuf []byte done chan struct{} closeOnce sync.Once initialResponseDeserializer func(*eventstream.Message) (interface{}, error) initialResponse chan interface{} } func newGetLogObjectResponseStreamReader(readCloser io.ReadCloser, decoder *eventstream.Decoder, ird func(*eventstream.Message) (interface{}, error)) *getLogObjectResponseStreamReader { w := &getLogObjectResponseStreamReader{ stream: make(chan types.GetLogObjectResponseStream), decoder: decoder, eventStream: readCloser, err: smithysync.NewOnceErr(), done: make(chan struct{}), payloadBuf: make([]byte, 10*1024), initialResponseDeserializer: ird, initialResponse: make(chan interface{}, 1), } go w.readEventStream() return w } func (r *getLogObjectResponseStreamReader) Events() <-chan types.GetLogObjectResponseStream { return r.stream } func (r *getLogObjectResponseStreamReader) readEventStream() { defer r.Close() defer close(r.stream) defer close(r.initialResponse) for { r.payloadBuf = r.payloadBuf[0:0] decodedMessage, err := r.decoder.Decode(r.eventStream, r.payloadBuf) if err != nil { if err == io.EOF { return } select { case <-r.done: return default: r.err.SetError(err) return } } event, err := r.deserializeEventMessage(&decodedMessage) if err != nil { r.err.SetError(err) return } switch ev := event.(type) { case *getLogObjectResponseStreamReadEventInitialResponse: select { case r.initialResponse <- ev.Value: case <-r.done: return default: } case *getLogObjectResponseStreamReadEventMessage: select { case r.stream <- ev.Value: case <-r.done: return } default: r.err.SetError(fmt.Errorf("unexpected event wrapper: %T", event)) return } } } func (r *getLogObjectResponseStreamReader) deserializeEventMessage(msg *eventstream.Message) (getLogObjectResponseStreamReadEvent, error) { messageType := msg.Headers.Get(eventstreamapi.MessageTypeHeader) if messageType == nil { return nil, fmt.Errorf("%s event header not present", eventstreamapi.MessageTypeHeader) } switch messageType.String() { case eventstreamapi.EventMessageType: eventType := msg.Headers.Get(eventstreamapi.EventTypeHeader) if eventType == nil { return nil, fmt.Errorf("%s event header not present", eventstreamapi.EventTypeHeader) } if eventType.String() == "initial-response" { v, err := r.initialResponseDeserializer(msg) if err != nil { return nil, err } return &getLogObjectResponseStreamReadEventInitialResponse{Value: v}, nil } var v types.GetLogObjectResponseStream if err := awsAwsjson11_deserializeEventStreamGetLogObjectResponseStream(&v, msg); err != nil { return nil, err } return &getLogObjectResponseStreamReadEventMessage{Value: v}, nil case eventstreamapi.ExceptionMessageType: return nil, awsAwsjson11_deserializeEventStreamExceptionGetLogObjectResponseStream(msg) case eventstreamapi.ErrorMessageType: errorCode := "UnknownError" errorMessage := errorCode if header := msg.Headers.Get(eventstreamapi.ErrorCodeHeader); header != nil { errorCode = header.String() } if header := msg.Headers.Get(eventstreamapi.ErrorMessageHeader); header != nil { errorMessage = header.String() } return nil, &smithy.GenericAPIError{ Code: errorCode, Message: errorMessage, } default: mc := msg.Clone() return nil, &UnknownEventMessageError{ Type: messageType.String(), Message: &mc, } } } func (r *getLogObjectResponseStreamReader) ErrorSet() <-chan struct{} { return r.err.ErrorSet() } func (r *getLogObjectResponseStreamReader) Close() error { r.closeOnce.Do(r.safeClose) return r.Err() } func (r *getLogObjectResponseStreamReader) safeClose() { close(r.done) r.eventStream.Close() } func (r *getLogObjectResponseStreamReader) Err() error { return r.err.Err() } func (r *getLogObjectResponseStreamReader) Closed() <-chan struct{} { return r.done } type awsAwsjson11_deserializeOpEventStreamGetLogObject struct { LogEventStreamWrites bool LogEventStreamReads bool } func (*awsAwsjson11_deserializeOpEventStreamGetLogObject) ID() string { return "OperationEventStreamDeserializer" } func (m *awsAwsjson11_deserializeOpEventStreamGetLogObject) HandleDeserialize(ctx context.Context, in middleware.DeserializeInput, next middleware.DeserializeHandler) ( out middleware.DeserializeOutput, metadata middleware.Metadata, err error, ) { defer func() { if err == nil { return } m.closeResponseBody(out) }() logger := middleware.GetLogger(ctx) request, ok := in.Request.(*smithyhttp.Request) if !ok { return out, metadata, fmt.Errorf("unknown transport type: %T", in.Request) } _ = request out, metadata, err = next.HandleDeserialize(ctx, in) if err != nil { return out, metadata, err } deserializeOutput, ok := out.RawResponse.(*smithyhttp.Response) if !ok { return out, metadata, fmt.Errorf("unknown transport type: %T", out.RawResponse) } _ = deserializeOutput output, ok := out.Result.(*GetLogObjectOutput) if out.Result != nil && !ok { return out, metadata, fmt.Errorf("unexpected output result type: %T", out.Result) } else if out.Result == nil { output = &GetLogObjectOutput{} out.Result = output } eventReader := newGetLogObjectResponseStreamReader( deserializeOutput.Body, eventstream.NewDecoder(func(options *eventstream.DecoderOptions) { options.Logger = logger options.LogMessages = m.LogEventStreamReads }), awsAwsjson11_deserializeEventMessageResponseGetLogObjectOutput, ) defer func() { if err == nil { return } _ = eventReader.Close() }() ir := <-eventReader.initialResponse irv, ok := ir.(*GetLogObjectOutput) if !ok { return out, metadata, fmt.Errorf("unexpected output result type: %T", ir) } *output = *irv output.eventStream = NewGetLogObjectEventStream(func(stream *GetLogObjectEventStream) { stream.Reader = eventReader }) go output.eventStream.waitStreamClose() return out, metadata, nil } func (*awsAwsjson11_deserializeOpEventStreamGetLogObject) closeResponseBody(out middleware.DeserializeOutput) { if resp, ok := out.RawResponse.(*smithyhttp.Response); ok && resp != nil && resp.Body != nil { _, _ = io.Copy(ioutil.Discard, resp.Body) _ = resp.Body.Close() } } func addEventStreamGetLogObjectMiddleware(stack *middleware.Stack, options Options) error { if err := stack.Deserialize.Insert(&awsAwsjson11_deserializeOpEventStreamGetLogObject{ LogEventStreamWrites: options.ClientLogMode.IsRequestEventMessage(), LogEventStreamReads: options.ClientLogMode.IsResponseEventMessage(), }, "OperationDeserializer", middleware.Before); err != nil { return err } return nil } type awsAwsjson11_deserializeOpEventStreamStartLiveTail struct { LogEventStreamWrites bool LogEventStreamReads bool } func (*awsAwsjson11_deserializeOpEventStreamStartLiveTail) ID() string { return "OperationEventStreamDeserializer" } func (m *awsAwsjson11_deserializeOpEventStreamStartLiveTail) HandleDeserialize(ctx context.Context, in middleware.DeserializeInput, next middleware.DeserializeHandler) ( out middleware.DeserializeOutput, metadata middleware.Metadata, err error, ) { defer func() { if err == nil { return } m.closeResponseBody(out) }() logger := middleware.GetLogger(ctx) request, ok := in.Request.(*smithyhttp.Request) if !ok { return out, metadata, fmt.Errorf("unknown transport type: %T", in.Request) } _ = request out, metadata, err = next.HandleDeserialize(ctx, in) if err != nil { return out, metadata, err } deserializeOutput, ok := out.RawResponse.(*smithyhttp.Response) if !ok { return out, metadata, fmt.Errorf("unknown transport type: %T", out.RawResponse) } _ = deserializeOutput output, ok := out.Result.(*StartLiveTailOutput) if out.Result != nil && !ok { return out, metadata, fmt.Errorf("unexpected output result type: %T", out.Result) } else if out.Result == nil { output = &StartLiveTailOutput{} out.Result = output } eventReader := newStartLiveTailResponseStreamReader( deserializeOutput.Body, eventstream.NewDecoder(func(options *eventstream.DecoderOptions) { options.Logger = logger options.LogMessages = m.LogEventStreamReads }), awsAwsjson11_deserializeEventMessageResponseStartLiveTailOutput, ) defer func() { if err == nil { return } _ = eventReader.Close() }() ir := <-eventReader.initialResponse irv, ok := ir.(*StartLiveTailOutput) if !ok { return out, metadata, fmt.Errorf("unexpected output result type: %T", ir) } *output = *irv output.eventStream = NewStartLiveTailEventStream(func(stream *StartLiveTailEventStream) { stream.Reader = eventReader }) go output.eventStream.waitStreamClose() return out, metadata, nil } func (*awsAwsjson11_deserializeOpEventStreamStartLiveTail) closeResponseBody(out middleware.DeserializeOutput) { if resp, ok := out.RawResponse.(*smithyhttp.Response); ok && resp != nil && resp.Body != nil { _, _ = io.Copy(ioutil.Discard, resp.Body) _ = resp.Body.Close() } } func addEventStreamStartLiveTailMiddleware(stack *middleware.Stack, options Options) error { if err := stack.Deserialize.Insert(&awsAwsjson11_deserializeOpEventStreamStartLiveTail{ LogEventStreamWrites: options.ClientLogMode.IsRequestEventMessage(), LogEventStreamReads: options.ClientLogMode.IsResponseEventMessage(), }, "OperationDeserializer", middleware.Before); err != nil { return err } return nil } // UnknownEventMessageError provides an error when a message is received from the stream, // but the reader is unable to determine what kind of message it is. type UnknownEventMessageError struct { Type string Message *eventstream.Message } // Error retruns the error message string. func (e *UnknownEventMessageError) Error() string { return "unknown event stream message type, " + e.Type } func setSafeEventStreamClientLogMode(o *Options, operation string) { switch operation { case "GetLogObject": toggleEventStreamClientLogMode(o, false, true) return case "StartLiveTail": toggleEventStreamClientLogMode(o, false, true) return default: return } } func toggleEventStreamClientLogMode(o *Options, request, response bool) { mode := o.ClientLogMode if request && mode.IsRequestWithBody() { mode.ClearRequestWithBody() mode |= aws.LogRequest } if response && mode.IsResponseWithBody() { mode.ClearResponseWithBody() mode |= aws.LogResponse } o.ClientLogMode = mode }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/mozillazg/kube-audit-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server