connector.rs•14 kB
use chrono::DateTime;
use convex_fivetran_common::{
    config::Config,
    fivetran_sdk::{
        alter_table_response,
        create_table_response,
        describe_table_response,
        destination_connector_server::DestinationConnector,
        test_response,
        truncate_response,
        write_batch_response,
        AlterTableRequest,
        AlterTableResponse,
        BatchFileFormat,
        CapabilitiesRequest,
        CapabilitiesResponse,
        ConfigurationFormRequest,
        ConfigurationFormResponse,
        ConfigurationTest,
        CreateTableRequest,
        CreateTableResponse,
        DescribeTableRequest,
        DescribeTableResponse,
        Task,
        TestRequest,
        TestResponse,
        TruncateRequest,
        TruncateResponse,
        WriteBatchRequest,
        WriteBatchResponse,
        WriteHistoryBatchRequest,
    },
};
use convex_fivetran_destination::api_types::DeleteType;
use prost_types::Timestamp;
use tonic::{
    Request,
    Response,
    Status,
};
use crate::{
    application::{
        alter_table,
        create_table,
        describe_table,
        truncate,
        write_batch,
        DescribeTableResponse as _DescribeTableResponse,
    },
    convex_api::{
        ConvexApi,
        Destination,
    },
    log,
};
/// Implements the gRPC server endpoints used by Fivetran.
#[derive(Debug)]
pub struct ConvexFivetranDestination;
type DestinationResult<T> = Result<Response<T>, Status>;
#[tonic::async_trait]
impl DestinationConnector for ConvexFivetranDestination {
    async fn configuration_form(
        &self,
        _: Request<ConfigurationFormRequest>,
    ) -> DestinationResult<ConfigurationFormResponse> {
        log("configuration form request");
        Ok(Response::new(ConfigurationFormResponse {
            schema_selection_supported: false,
            table_selection_supported: false,
            fields: Config::fivetran_fields(),
            tests: vec![ConfigurationTest {
                name: "connection".to_string(),
                label: "Test connection".to_string(),
            }],
        }))
    }
    async fn capabilities(
        &self,
        _request: Request<CapabilitiesRequest>,
    ) -> DestinationResult<CapabilitiesResponse> {
        log("capabilities request");
        Ok(Response::new(CapabilitiesResponse {
            batch_file_format: BatchFileFormat::Csv as i32,
        }))
    }
    async fn test(&self, request: Request<TestRequest>) -> DestinationResult<TestResponse> {
        log(&format!("test request"));
        let config = match Config::from_parameters(request.into_inner().configuration) {
            Ok(config) => config,
            Err(error) => {
                return Ok(Response::new(TestResponse {
                    response: Some(test_response::Response::Failure(error.to_string())),
                }));
            },
        };
        log(&format!("test request for {}", config.deploy_url));
        let source = ConvexApi { config };
        // Perform an API request to verify if the credentials work
        Ok(Response::new(TestResponse {
            response: Some(match source.test_streaming_import_connection().await {
                Ok(_) => {
                    log("Successful test request");
                    test_response::Response::Success(true)
                },
                Err(e) => {
                    log(&format!("Test error: {e}"));
                    test_response::Response::Failure(e.to_string())
                },
            }),
        }))
    }
    async fn describe_table(
        &self,
        request: Request<DescribeTableRequest>,
    ) -> DestinationResult<DescribeTableResponse> {
        log(&format!("describe table request"));
        let DescribeTableRequest {
            configuration,
            schema_name,
            table_name,
        } = request.into_inner();
        let config = match Config::from_parameters(configuration) {
            Ok(config) => config,
            Err(error) => {
                return Ok(Response::new(DescribeTableResponse {
                    response: Some(describe_table_response::Response::Task(Task {
                        message: error.to_string(),
                    })),
                }));
            },
        };
        let table_name = fivetran_req_to_table_name(schema_name, table_name);
        log(&format!("describe table request for {}", config.deploy_url));
        let destination = ConvexApi { config };
        Ok(Response::new(DescribeTableResponse {
            response: Some(match describe_table(destination, table_name).await {
                Ok(_DescribeTableResponse::NotFound) => {
                    log("Successful describe table request (table not found)");
                    describe_table_response::Response::NotFound(true)
                },
                Ok(_DescribeTableResponse::Table(table)) => {
                    log("Successful describe table request (table found)");
                    describe_table_response::Response::Table(table)
                },
                Err(err) => {
                    log(&format!("Describe table error: {err}"));
                    describe_table_response::Response::Task(Task {
                        message: err.to_string(),
                    })
                },
            }),
        }))
    }
    async fn create_table(
        &self,
        request: Request<CreateTableRequest>,
    ) -> DestinationResult<CreateTableResponse> {
        log(&format!("create table request"));
        let CreateTableRequest {
            configuration,
            schema_name,
            table,
        } = request.into_inner();
        let config = match Config::from_parameters(configuration) {
            Ok(config) => config,
            Err(error) => {
                return Ok(Response::new(CreateTableResponse {
                    response: Some(create_table_response::Response::Task(Task {
                        message: error.to_string(),
                    })),
                }));
            },
        };
        log(&format!("create table request for {}", config.deploy_url));
        let destination = ConvexApi { config };
        let Some(mut table) = table else {
            return Ok(Response::new(CreateTableResponse {
                response: Some(create_table_response::Response::Task(Task {
                    message: "Missing table argument".to_string(),
                })),
            }));
        };
        table.name = fivetran_req_to_table_name(schema_name, table.name);
        Ok(Response::new(CreateTableResponse {
            response: Some(match create_table(destination, table).await {
                Ok(_) => {
                    log("Successful create table request");
                    create_table_response::Response::Success(true)
                },
                Err(e) => {
                    log(&format!("Create table error: {e}"));
                    create_table_response::Response::Task(Task {
                        message: e.to_string(),
                    })
                },
            }),
        }))
    }
    async fn alter_table(
        &self,
        request: Request<AlterTableRequest>,
    ) -> DestinationResult<AlterTableResponse> {
        log(&format!("alter table request"));
        let AlterTableRequest {
            configuration,
            schema_name,
            table,
        } = request.into_inner();
        let config = match Config::from_parameters(configuration) {
            Ok(config) => config,
            Err(error) => {
                return Ok(Response::new(AlterTableResponse {
                    response: Some(alter_table_response::Response::Task(Task {
                        message: error.to_string(),
                    })),
                }));
            },
        };
        log(&format!("alter table request for {}", config.deploy_url));
        let destination = ConvexApi { config };
        let Some(mut table) = table else {
            return Ok(Response::new(AlterTableResponse {
                response: Some(alter_table_response::Response::Task(Task {
                    message: "Missing table argument".to_string(),
                })),
            }));
        };
        table.name = fivetran_req_to_table_name(schema_name, table.name);
        Ok(Response::new(AlterTableResponse {
            response: Some(match alter_table(destination, table).await {
                Ok(_) => {
                    log("Successful alter table request");
                    alter_table_response::Response::Success(true)
                },
                Err(e) => {
                    log(&format!("Alter table error: {e}"));
                    alter_table_response::Response::Task(Task {
                        message: e.to_string(),
                    })
                },
            }),
        }))
    }
    async fn truncate(
        &self,
        request: Request<TruncateRequest>,
    ) -> DestinationResult<TruncateResponse> {
        log(&format!("truncate request"));
        let TruncateRequest {
            configuration,
            schema_name,
            table_name,
            synced_column: _,
            utc_delete_before,
            soft,
        } = request.into_inner();
        let config = match Config::from_parameters(configuration) {
            Ok(config) => config,
            Err(error) => {
                return Ok(Response::new(TruncateResponse {
                    response: Some(truncate_response::Response::Task(Task {
                        message: error.to_string(),
                    })),
                }));
            },
        };
        let table_name = fivetran_req_to_table_name(schema_name, table_name);
        log(&format!("truncate request for {}", config.deploy_url));
        let destination = ConvexApi { config };
        Ok(Response::new(TruncateResponse {
            response: Some(
                match truncate(
                    destination,
                    table_name,
                    utc_delete_before.map(|Timestamp { seconds, nanos }| {
                        DateTime::from_timestamp(seconds, nanos as u32).expect("Invalid timestamp")
                    }),
                    match soft {
                        Some(_) => DeleteType::SoftDelete,
                        None => DeleteType::HardDelete,
                    },
                )
                .await
                {
                    Ok(_) => {
                        log("Successful truncate request");
                        truncate_response::Response::Success(true)
                    },
                    Err(e) => {
                        log(&format!("Truncate error: {e}"));
                        truncate_response::Response::Task(Task {
                            message: e.to_string(),
                        })
                    },
                },
            ),
        }))
    }
    async fn write_batch(
        &self,
        request: Request<WriteBatchRequest>,
    ) -> DestinationResult<WriteBatchResponse> {
        log(&format!("write batch request"));
        let WriteBatchRequest {
            configuration,
            schema_name,
            table,
            keys,
            replace_files,
            update_files,
            delete_files,
            file_params,
        } = request.into_inner();
        let config = match Config::from_parameters(configuration) {
            Ok(config) => config,
            Err(error) => {
                return Ok(Response::new(WriteBatchResponse {
                    response: Some(write_batch_response::Response::Task(Task {
                        message: error.to_string(),
                    })),
                }));
            },
        };
        log(&format!("write batch request for {}", config.deploy_url));
        let destination = ConvexApi { config };
        let Some(mut table) = table else {
            return Ok(Response::new(WriteBatchResponse {
                response: Some(write_batch_response::Response::Task(Task {
                    message: "Missing table argument".to_string(),
                })),
            }));
        };
        table.name = fivetran_req_to_table_name(schema_name, table.name);
        let Some(file_params) = file_params else {
            return Ok(Response::new(WriteBatchResponse {
                response: Some(write_batch_response::Response::Task(Task {
                    message: "Missing file_params argument".to_string(),
                })),
            }));
        };
        Ok(Response::new(WriteBatchResponse {
            response: Some(
                match write_batch(
                    destination,
                    table,
                    keys,
                    replace_files,
                    update_files,
                    delete_files,
                    file_params,
                )
                .await
                {
                    Ok(_) => {
                        log("Successful batch write request");
                        write_batch_response::Response::Success(true)
                    },
                    Err(e) => {
                        log(&format!("Batch write error: {e}"));
                        write_batch_response::Response::Task(Task {
                            message: e.to_string(),
                        })
                    },
                },
            ),
        }))
    }
    async fn write_history_batch(
        &self,
        _request: Request<WriteHistoryBatchRequest>,
    ) -> DestinationResult<WriteBatchResponse> {
        log(&format!("write history batch request"));
        return Err(Status::unimplemented("write history batch not implemented"));
    }
}
fn fivetran_req_to_table_name(fivetran_schema_name: String, fivetran_table_name: String) -> String {
    format!("{fivetran_schema_name}_{fivetran_table_name}")
}