eventstream.go•17.5 kB
// Code generated by smithy-go-codegen DO NOT EDIT.
package cloudwatchlogs
import (
"context"
"fmt"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream"
"github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream/eventstreamapi"
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types"
smithy "github.com/aws/smithy-go"
"github.com/aws/smithy-go/middleware"
smithysync "github.com/aws/smithy-go/sync"
smithyhttp "github.com/aws/smithy-go/transport/http"
"io"
"io/ioutil"
"sync"
)
// GetLogObjectResponseStreamReader provides the interface for reading events from
// a stream.
//
// The writer's Close method must allow multiple concurrent calls.
type GetLogObjectResponseStreamReader interface {
Events() <-chan types.GetLogObjectResponseStream
Close() error
Err() error
}
// StartLiveTailResponseStreamReader provides the interface for reading events
// from a stream.
//
// The writer's Close method must allow multiple concurrent calls.
type StartLiveTailResponseStreamReader interface {
Events() <-chan types.StartLiveTailResponseStream
Close() error
Err() error
}
type startLiveTailResponseStreamReadEvent interface {
isStartLiveTailResponseStreamReadEvent()
}
type startLiveTailResponseStreamReadEventMessage struct {
Value types.StartLiveTailResponseStream
}
func (*startLiveTailResponseStreamReadEventMessage) isStartLiveTailResponseStreamReadEvent() {}
type startLiveTailResponseStreamReadEventInitialResponse struct {
Value interface{}
}
func (*startLiveTailResponseStreamReadEventInitialResponse) isStartLiveTailResponseStreamReadEvent() {
}
type startLiveTailResponseStreamReader struct {
stream chan types.StartLiveTailResponseStream
decoder *eventstream.Decoder
eventStream io.ReadCloser
err *smithysync.OnceErr
payloadBuf []byte
done chan struct{}
closeOnce sync.Once
initialResponseDeserializer func(*eventstream.Message) (interface{}, error)
initialResponse chan interface{}
}
func newStartLiveTailResponseStreamReader(readCloser io.ReadCloser, decoder *eventstream.Decoder, ird func(*eventstream.Message) (interface{}, error)) *startLiveTailResponseStreamReader {
w := &startLiveTailResponseStreamReader{
stream: make(chan types.StartLiveTailResponseStream),
decoder: decoder,
eventStream: readCloser,
err: smithysync.NewOnceErr(),
done: make(chan struct{}),
payloadBuf: make([]byte, 10*1024),
initialResponseDeserializer: ird,
initialResponse: make(chan interface{}, 1),
}
go w.readEventStream()
return w
}
func (r *startLiveTailResponseStreamReader) Events() <-chan types.StartLiveTailResponseStream {
return r.stream
}
func (r *startLiveTailResponseStreamReader) readEventStream() {
defer r.Close()
defer close(r.stream)
defer close(r.initialResponse)
for {
r.payloadBuf = r.payloadBuf[0:0]
decodedMessage, err := r.decoder.Decode(r.eventStream, r.payloadBuf)
if err != nil {
if err == io.EOF {
return
}
select {
case <-r.done:
return
default:
r.err.SetError(err)
return
}
}
event, err := r.deserializeEventMessage(&decodedMessage)
if err != nil {
r.err.SetError(err)
return
}
switch ev := event.(type) {
case *startLiveTailResponseStreamReadEventInitialResponse:
select {
case r.initialResponse <- ev.Value:
case <-r.done:
return
default:
}
case *startLiveTailResponseStreamReadEventMessage:
select {
case r.stream <- ev.Value:
case <-r.done:
return
}
default:
r.err.SetError(fmt.Errorf("unexpected event wrapper: %T", event))
return
}
}
}
func (r *startLiveTailResponseStreamReader) deserializeEventMessage(msg *eventstream.Message) (startLiveTailResponseStreamReadEvent, error) {
messageType := msg.Headers.Get(eventstreamapi.MessageTypeHeader)
if messageType == nil {
return nil, fmt.Errorf("%s event header not present", eventstreamapi.MessageTypeHeader)
}
switch messageType.String() {
case eventstreamapi.EventMessageType:
eventType := msg.Headers.Get(eventstreamapi.EventTypeHeader)
if eventType == nil {
return nil, fmt.Errorf("%s event header not present", eventstreamapi.EventTypeHeader)
}
if eventType.String() == "initial-response" {
v, err := r.initialResponseDeserializer(msg)
if err != nil {
return nil, err
}
return &startLiveTailResponseStreamReadEventInitialResponse{Value: v}, nil
}
var v types.StartLiveTailResponseStream
if err := awsAwsjson11_deserializeEventStreamStartLiveTailResponseStream(&v, msg); err != nil {
return nil, err
}
return &startLiveTailResponseStreamReadEventMessage{Value: v}, nil
case eventstreamapi.ExceptionMessageType:
return nil, awsAwsjson11_deserializeEventStreamExceptionStartLiveTailResponseStream(msg)
case eventstreamapi.ErrorMessageType:
errorCode := "UnknownError"
errorMessage := errorCode
if header := msg.Headers.Get(eventstreamapi.ErrorCodeHeader); header != nil {
errorCode = header.String()
}
if header := msg.Headers.Get(eventstreamapi.ErrorMessageHeader); header != nil {
errorMessage = header.String()
}
return nil, &smithy.GenericAPIError{
Code: errorCode,
Message: errorMessage,
}
default:
mc := msg.Clone()
return nil, &UnknownEventMessageError{
Type: messageType.String(),
Message: &mc,
}
}
}
func (r *startLiveTailResponseStreamReader) ErrorSet() <-chan struct{} {
return r.err.ErrorSet()
}
func (r *startLiveTailResponseStreamReader) Close() error {
r.closeOnce.Do(r.safeClose)
return r.Err()
}
func (r *startLiveTailResponseStreamReader) safeClose() {
close(r.done)
r.eventStream.Close()
}
func (r *startLiveTailResponseStreamReader) Err() error {
return r.err.Err()
}
func (r *startLiveTailResponseStreamReader) Closed() <-chan struct{} {
return r.done
}
type getLogObjectResponseStreamReadEvent interface {
isGetLogObjectResponseStreamReadEvent()
}
type getLogObjectResponseStreamReadEventMessage struct {
Value types.GetLogObjectResponseStream
}
func (*getLogObjectResponseStreamReadEventMessage) isGetLogObjectResponseStreamReadEvent() {}
type getLogObjectResponseStreamReadEventInitialResponse struct {
Value interface{}
}
func (*getLogObjectResponseStreamReadEventInitialResponse) isGetLogObjectResponseStreamReadEvent() {}
type getLogObjectResponseStreamReader struct {
stream chan types.GetLogObjectResponseStream
decoder *eventstream.Decoder
eventStream io.ReadCloser
err *smithysync.OnceErr
payloadBuf []byte
done chan struct{}
closeOnce sync.Once
initialResponseDeserializer func(*eventstream.Message) (interface{}, error)
initialResponse chan interface{}
}
func newGetLogObjectResponseStreamReader(readCloser io.ReadCloser, decoder *eventstream.Decoder, ird func(*eventstream.Message) (interface{}, error)) *getLogObjectResponseStreamReader {
w := &getLogObjectResponseStreamReader{
stream: make(chan types.GetLogObjectResponseStream),
decoder: decoder,
eventStream: readCloser,
err: smithysync.NewOnceErr(),
done: make(chan struct{}),
payloadBuf: make([]byte, 10*1024),
initialResponseDeserializer: ird,
initialResponse: make(chan interface{}, 1),
}
go w.readEventStream()
return w
}
func (r *getLogObjectResponseStreamReader) Events() <-chan types.GetLogObjectResponseStream {
return r.stream
}
func (r *getLogObjectResponseStreamReader) readEventStream() {
defer r.Close()
defer close(r.stream)
defer close(r.initialResponse)
for {
r.payloadBuf = r.payloadBuf[0:0]
decodedMessage, err := r.decoder.Decode(r.eventStream, r.payloadBuf)
if err != nil {
if err == io.EOF {
return
}
select {
case <-r.done:
return
default:
r.err.SetError(err)
return
}
}
event, err := r.deserializeEventMessage(&decodedMessage)
if err != nil {
r.err.SetError(err)
return
}
switch ev := event.(type) {
case *getLogObjectResponseStreamReadEventInitialResponse:
select {
case r.initialResponse <- ev.Value:
case <-r.done:
return
default:
}
case *getLogObjectResponseStreamReadEventMessage:
select {
case r.stream <- ev.Value:
case <-r.done:
return
}
default:
r.err.SetError(fmt.Errorf("unexpected event wrapper: %T", event))
return
}
}
}
func (r *getLogObjectResponseStreamReader) deserializeEventMessage(msg *eventstream.Message) (getLogObjectResponseStreamReadEvent, error) {
messageType := msg.Headers.Get(eventstreamapi.MessageTypeHeader)
if messageType == nil {
return nil, fmt.Errorf("%s event header not present", eventstreamapi.MessageTypeHeader)
}
switch messageType.String() {
case eventstreamapi.EventMessageType:
eventType := msg.Headers.Get(eventstreamapi.EventTypeHeader)
if eventType == nil {
return nil, fmt.Errorf("%s event header not present", eventstreamapi.EventTypeHeader)
}
if eventType.String() == "initial-response" {
v, err := r.initialResponseDeserializer(msg)
if err != nil {
return nil, err
}
return &getLogObjectResponseStreamReadEventInitialResponse{Value: v}, nil
}
var v types.GetLogObjectResponseStream
if err := awsAwsjson11_deserializeEventStreamGetLogObjectResponseStream(&v, msg); err != nil {
return nil, err
}
return &getLogObjectResponseStreamReadEventMessage{Value: v}, nil
case eventstreamapi.ExceptionMessageType:
return nil, awsAwsjson11_deserializeEventStreamExceptionGetLogObjectResponseStream(msg)
case eventstreamapi.ErrorMessageType:
errorCode := "UnknownError"
errorMessage := errorCode
if header := msg.Headers.Get(eventstreamapi.ErrorCodeHeader); header != nil {
errorCode = header.String()
}
if header := msg.Headers.Get(eventstreamapi.ErrorMessageHeader); header != nil {
errorMessage = header.String()
}
return nil, &smithy.GenericAPIError{
Code: errorCode,
Message: errorMessage,
}
default:
mc := msg.Clone()
return nil, &UnknownEventMessageError{
Type: messageType.String(),
Message: &mc,
}
}
}
func (r *getLogObjectResponseStreamReader) ErrorSet() <-chan struct{} {
return r.err.ErrorSet()
}
func (r *getLogObjectResponseStreamReader) Close() error {
r.closeOnce.Do(r.safeClose)
return r.Err()
}
func (r *getLogObjectResponseStreamReader) safeClose() {
close(r.done)
r.eventStream.Close()
}
func (r *getLogObjectResponseStreamReader) Err() error {
return r.err.Err()
}
func (r *getLogObjectResponseStreamReader) Closed() <-chan struct{} {
return r.done
}
type awsAwsjson11_deserializeOpEventStreamGetLogObject struct {
LogEventStreamWrites bool
LogEventStreamReads bool
}
func (*awsAwsjson11_deserializeOpEventStreamGetLogObject) ID() string {
return "OperationEventStreamDeserializer"
}
func (m *awsAwsjson11_deserializeOpEventStreamGetLogObject) HandleDeserialize(ctx context.Context, in middleware.DeserializeInput, next middleware.DeserializeHandler) (
out middleware.DeserializeOutput, metadata middleware.Metadata, err error,
) {
defer func() {
if err == nil {
return
}
m.closeResponseBody(out)
}()
logger := middleware.GetLogger(ctx)
request, ok := in.Request.(*smithyhttp.Request)
if !ok {
return out, metadata, fmt.Errorf("unknown transport type: %T", in.Request)
}
_ = request
out, metadata, err = next.HandleDeserialize(ctx, in)
if err != nil {
return out, metadata, err
}
deserializeOutput, ok := out.RawResponse.(*smithyhttp.Response)
if !ok {
return out, metadata, fmt.Errorf("unknown transport type: %T", out.RawResponse)
}
_ = deserializeOutput
output, ok := out.Result.(*GetLogObjectOutput)
if out.Result != nil && !ok {
return out, metadata, fmt.Errorf("unexpected output result type: %T", out.Result)
} else if out.Result == nil {
output = &GetLogObjectOutput{}
out.Result = output
}
eventReader := newGetLogObjectResponseStreamReader(
deserializeOutput.Body,
eventstream.NewDecoder(func(options *eventstream.DecoderOptions) {
options.Logger = logger
options.LogMessages = m.LogEventStreamReads
}),
awsAwsjson11_deserializeEventMessageResponseGetLogObjectOutput,
)
defer func() {
if err == nil {
return
}
_ = eventReader.Close()
}()
ir := <-eventReader.initialResponse
irv, ok := ir.(*GetLogObjectOutput)
if !ok {
return out, metadata, fmt.Errorf("unexpected output result type: %T", ir)
}
*output = *irv
output.eventStream = NewGetLogObjectEventStream(func(stream *GetLogObjectEventStream) {
stream.Reader = eventReader
})
go output.eventStream.waitStreamClose()
return out, metadata, nil
}
func (*awsAwsjson11_deserializeOpEventStreamGetLogObject) closeResponseBody(out middleware.DeserializeOutput) {
if resp, ok := out.RawResponse.(*smithyhttp.Response); ok && resp != nil && resp.Body != nil {
_, _ = io.Copy(ioutil.Discard, resp.Body)
_ = resp.Body.Close()
}
}
func addEventStreamGetLogObjectMiddleware(stack *middleware.Stack, options Options) error {
if err := stack.Deserialize.Insert(&awsAwsjson11_deserializeOpEventStreamGetLogObject{
LogEventStreamWrites: options.ClientLogMode.IsRequestEventMessage(),
LogEventStreamReads: options.ClientLogMode.IsResponseEventMessage(),
}, "OperationDeserializer", middleware.Before); err != nil {
return err
}
return nil
}
type awsAwsjson11_deserializeOpEventStreamStartLiveTail struct {
LogEventStreamWrites bool
LogEventStreamReads bool
}
func (*awsAwsjson11_deserializeOpEventStreamStartLiveTail) ID() string {
return "OperationEventStreamDeserializer"
}
func (m *awsAwsjson11_deserializeOpEventStreamStartLiveTail) HandleDeserialize(ctx context.Context, in middleware.DeserializeInput, next middleware.DeserializeHandler) (
out middleware.DeserializeOutput, metadata middleware.Metadata, err error,
) {
defer func() {
if err == nil {
return
}
m.closeResponseBody(out)
}()
logger := middleware.GetLogger(ctx)
request, ok := in.Request.(*smithyhttp.Request)
if !ok {
return out, metadata, fmt.Errorf("unknown transport type: %T", in.Request)
}
_ = request
out, metadata, err = next.HandleDeserialize(ctx, in)
if err != nil {
return out, metadata, err
}
deserializeOutput, ok := out.RawResponse.(*smithyhttp.Response)
if !ok {
return out, metadata, fmt.Errorf("unknown transport type: %T", out.RawResponse)
}
_ = deserializeOutput
output, ok := out.Result.(*StartLiveTailOutput)
if out.Result != nil && !ok {
return out, metadata, fmt.Errorf("unexpected output result type: %T", out.Result)
} else if out.Result == nil {
output = &StartLiveTailOutput{}
out.Result = output
}
eventReader := newStartLiveTailResponseStreamReader(
deserializeOutput.Body,
eventstream.NewDecoder(func(options *eventstream.DecoderOptions) {
options.Logger = logger
options.LogMessages = m.LogEventStreamReads
}),
awsAwsjson11_deserializeEventMessageResponseStartLiveTailOutput,
)
defer func() {
if err == nil {
return
}
_ = eventReader.Close()
}()
ir := <-eventReader.initialResponse
irv, ok := ir.(*StartLiveTailOutput)
if !ok {
return out, metadata, fmt.Errorf("unexpected output result type: %T", ir)
}
*output = *irv
output.eventStream = NewStartLiveTailEventStream(func(stream *StartLiveTailEventStream) {
stream.Reader = eventReader
})
go output.eventStream.waitStreamClose()
return out, metadata, nil
}
func (*awsAwsjson11_deserializeOpEventStreamStartLiveTail) closeResponseBody(out middleware.DeserializeOutput) {
if resp, ok := out.RawResponse.(*smithyhttp.Response); ok && resp != nil && resp.Body != nil {
_, _ = io.Copy(ioutil.Discard, resp.Body)
_ = resp.Body.Close()
}
}
func addEventStreamStartLiveTailMiddleware(stack *middleware.Stack, options Options) error {
if err := stack.Deserialize.Insert(&awsAwsjson11_deserializeOpEventStreamStartLiveTail{
LogEventStreamWrites: options.ClientLogMode.IsRequestEventMessage(),
LogEventStreamReads: options.ClientLogMode.IsResponseEventMessage(),
}, "OperationDeserializer", middleware.Before); err != nil {
return err
}
return nil
}
// UnknownEventMessageError provides an error when a message is received from the stream,
// but the reader is unable to determine what kind of message it is.
type UnknownEventMessageError struct {
Type string
Message *eventstream.Message
}
// Error retruns the error message string.
func (e *UnknownEventMessageError) Error() string {
return "unknown event stream message type, " + e.Type
}
func setSafeEventStreamClientLogMode(o *Options, operation string) {
switch operation {
case "GetLogObject":
toggleEventStreamClientLogMode(o, false, true)
return
case "StartLiveTail":
toggleEventStreamClientLogMode(o, false, true)
return
default:
return
}
}
func toggleEventStreamClientLogMode(o *Options, request, response bool) {
mode := o.ClientLogMode
if request && mode.IsRequestWithBody() {
mode.ClearRequestWithBody()
mode |= aws.LogRequest
}
if response && mode.IsResponseWithBody() {
mode.ClearResponseWithBody()
mode |= aws.LogResponse
}
o.ClientLogMode = mode
}