Skip to content

Commit

Permalink
openapi ingress (ts, py) & egress (py) (#1993)
Browse files Browse the repository at this point in the history
  • Loading branch information
DatGuyJonathan authored Jan 21, 2025
1 parent db5c857 commit 1b8edf1
Show file tree
Hide file tree
Showing 10 changed files with 427 additions and 19 deletions.
20 changes: 20 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions apps/framework-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ opentelemetry-otlp = { version = "0.27.0", default-features = false, features =
] }
opentelemetry-http = { version = "0.27.0", features = ["reqwest"] }
prometheus-client = "0.22.2"
serde_yaml = "0.9.34"

[dev-dependencies]
clickhouse = { version = "0.11.5", features = ["uuid", "test-util"] }
Expand Down
46 changes: 43 additions & 3 deletions apps/framework-cli/src/cli/local_webserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use log::Level::{Debug, Trace};
use std::collections::{HashMap, HashSet};
use std::env;
use std::env::VarError;
use std::fs;
use std::future::Future;
use std::io::ErrorKind;
use std::net::SocketAddr;
Expand Down Expand Up @@ -224,6 +225,7 @@ struct ManagementService<I: InfraMapProvider + Clone> {
is_prod: bool,
metrics: Arc<Metrics>,
infra_map: I,
openapi_path: Option<PathBuf>,
}

impl Service<Request<Incoming>> for RouteService {
Expand Down Expand Up @@ -263,6 +265,7 @@ impl<I: InfraMapProvider + Clone + Send + 'static> Service<Request<Incoming>>
self.metrics.clone(),
// here we're either cloning the reference or the RwLock
self.infra_map.clone(),
self.openapi_path.clone(),
req,
))
}
Expand Down Expand Up @@ -354,6 +357,39 @@ async fn metrics_route(metrics: Arc<Metrics>) -> Result<Response<Full<Bytes>>, h
Ok(response)
}

async fn openapi_route(
is_prod: bool,
openapi_path: Option<PathBuf>,
) -> Result<Response<Full<Bytes>>, hyper::http::Error> {
if is_prod {
return Ok(Response::builder()
.status(StatusCode::FORBIDDEN)
.body(Full::new(Bytes::from(
"OpenAPI spec not available in production",
)))
.unwrap());
}

if let Some(path) = openapi_path {
match fs::read_to_string(path) {
Ok(contents) => Ok(Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/yaml")
.body(Full::new(Bytes::from(contents)))
.unwrap()),
Err(_) => Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Full::new(Bytes::from("Failed to read OpenAPI spec file")))
.unwrap()),
}
} else {
Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Full::new(Bytes::from("OpenAPI spec file not found")))
.unwrap())
}
}

fn bad_json_response(e: serde_json::Error) -> Response<Full<Bytes>> {
show_message!(
MessageType::Error,
Expand Down Expand Up @@ -774,6 +810,7 @@ async fn management_router<I: InfraMapProvider>(
is_prod: bool,
metrics: Arc<Metrics>,
infra_map: I,
openapi_path: Option<PathBuf>,
req: Request<Incoming>,
) -> Result<Response<Full<Bytes>>, hyper::http::Error> {
let level = if req.uri().path().ends_with(METRICS_LOGS_PATH) {
Expand Down Expand Up @@ -804,6 +841,7 @@ async fn management_router<I: InfraMapProvider>(
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Full::new(Bytes::from(res)))
}
(&hyper::Method::GET, "openapi.yaml") => openapi_route(is_prod, openapi_path).await,
_ => route_not_found_response(),
};

Expand Down Expand Up @@ -870,7 +908,7 @@ impl Webserver {
},
);
}
APIType::EGRESS => {
APIType::EGRESS { .. } => {
consumption_apis
.write()
.await
Expand All @@ -884,7 +922,7 @@ impl Webserver {
APIType::INGRESS { .. } => {
route_table.remove(&api_endpoint.path);
}
APIType::EGRESS => {
APIType::EGRESS { .. } => {
consumption_apis
.write()
.await
Expand All @@ -911,7 +949,7 @@ impl Webserver {
},
);
}
APIType::EGRESS => {
APIType::EGRESS { .. } => {
// Nothing to do, we don't need to update the route table
}
}
Expand All @@ -932,6 +970,7 @@ impl Webserver {
infra_map: I,
project: Arc<Project>,
metrics: Arc<Metrics>,
openapi_path: Option<PathBuf>,
) {
//! Starts the local webserver
let socket = self.socket().await;
Expand Down Expand Up @@ -990,6 +1029,7 @@ impl Webserver {
is_prod: project.is_production,
metrics,
infra_map,
openapi_path,
};

let graceful = GracefulShutdown::new();
Expand Down
27 changes: 22 additions & 5 deletions apps/framework-cli/src/cli/routines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,11 @@ use std::sync::Arc;
use tokio::sync::{Mutex, RwLock};
use tokio::time::{interval, Duration};

use crate::framework::core::execute::execute_initial_infra_change;
use crate::framework::core::plan::plan_changes;

use crate::cli::routines::openapi::openapi;
use crate::framework::controller::RouteMeta;
use crate::framework::core::execute::execute_initial_infra_change;
use crate::framework::core::infrastructure_map::InfrastructureMap;
use crate::framework::core::plan::plan_changes;
use crate::infrastructure::olap::clickhouse_alt_client::{get_pool, store_infrastructure_map};
use crate::infrastructure::processes::cron_registry::CronRegistry;
use crate::infrastructure::processes::kafka_clickhouse_sync::clickhouse_writing_pause_button;
Expand All @@ -117,6 +117,7 @@ pub mod logs;
pub mod ls;
pub mod metrics_console;
pub mod migrate;
pub mod openapi;
pub mod peek;
pub mod ps;
pub mod streaming;
Expand Down Expand Up @@ -473,6 +474,8 @@ pub async fn start_development_mode(
)
.await?;

let openapi_file = openapi(&project).await?;

plan.target_infra_map
.store_in_redis(&*redis_client.lock().await)
.await?;
Expand All @@ -493,7 +496,14 @@ pub async fn start_development_mode(

info!("Starting web server...");
web_server
.start(route_table, consumption_apis, infra_map, project, metrics)
.start(
route_table,
consumption_apis,
infra_map,
project,
metrics,
Some(openapi_file),
)
.await;

{
Expand Down Expand Up @@ -579,7 +589,14 @@ pub async fn start_production_mode(
let infra_map: &'static InfrastructureMap = Box::leak(Box::new(plan.target_infra_map));

web_server
.start(route_table, consumption_apis, infra_map, project, metrics)
.start(
route_table,
consumption_apis,
infra_map,
project,
metrics,
None,
)
.await;

{
Expand Down
Loading

0 comments on commit 1b8edf1

Please sign in to comment.