admin管理员组文章数量:1332881
#[tokio::main]
async fn main() {
println!("Starting Tokio runtime...");
if tokio::runtime::Handle::try_current().is_ok() {
println!("Tokio runtime is active.");
} else {
println!("No active Tokio runtime.");
}
let local = tokio::task::LocalSet::new();
println!("LocalSet created.");
dotenv().ok();
let services = Arc::new(services::new_services());
let connected_clients: ConnectedClients = Arc::new(Mutex::new(HashMap::new()));
let make_svc = make_service_fn(|_conn| {
let services = Arc::clone(&services);
let clients = Arc::clone(&connected_clients);
async move {
Ok::<_, hyper::Error>(service_fn(move |req: Request<Body>| {
handle_request(req, services.clone(), clients.clone())
}))
}
});
let port: u16 = env::var("PORT")
.unwrap_or_else(|_| "8080".to_string())
.parse()
.expect("PORT must be a valid number");
let addr = format!("127.0.0.1:{}", port).parse().expect("Invalid address");
local.run_until(async {
let server = Server::bind(&addr).serve(make_svc);
println!("Server listening on {}", addr);
if let Err(e) = server.await {
eprintln!("Server error: {}", e);
}
}).await;
}
async fn handle_request(
mut req: Request<Body>,
_services: Arc<Services>,
clients: ConnectedClients
) -> Result<Response<Body>, hyper::Error> {
if req.uri().path() == "/ws" {
println!("/ws path entered");
// Check if it's a WebSocket request
if !is_websocket_request(&req) {
return Ok(Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(Body::from("Not a WebSocket request"))
.unwrap());
}
println!("ITS A WS CONNECTION!");
// Extract token from query parameters
let token = extract_token_from_query(req.uri());
let (mut user_id,
mut _id,
mut stores,
mut first_name,
mut last_name,
mut is_admin,
mut route,
mut device_id
) = (
String::new(),
String::new(),
Vec::new(),
String::new(),
String::new(),
false,
String::new(),
None,
);
match token {
Some(ref t) => {
// Token validation logic
println!("Entered into match token");
let secret_key = std::env::var("JWT_SECRET").unwrap_or_else(|_| "default_secret".to_string());
let decoding_key = DecodingKey::from_secret(secret_key.as_ref());
let validation = Validation::default();
// TOKEN VALIDATION AND PARSING
}
// Get the WebSocket key from headers
let ws_key = req.headers().get(SEC_WEBSOCKET_KEY).and_then(|v| v.to_str().ok()).unwrap();
// Generate accept key
let accept_key = derive_accept_key(ws_key.as_bytes());
println!("WebSocket Key: {:?}", ws_key);
println!("Accept Key: {}", accept_key);
let mut response = Response::builder()
.status(StatusCode::SWITCHING_PROTOCOLS)
.header(CONNECTION, "Upgrade")
.header(UPGRADE, "websocket")
.header(SEC_WEBSOCKET_ACCEPT, accept_key);
// Add protocol if present
if let Some(protocol) = req.headers().get(SEC_WEBSOCKET_PROTOCOL) {
response = response.header(SEC_WEBSOCKET_PROTOCOL, protocol);
}
let response = response.body(Body::empty()).unwrap();
println!("response : {:#?}", response);
// Perform WebSocket upgrade and wrapping outside `tokio::spawn`
let device_id_clone = device_id.clone();
let local = LocalSet::new();
println!("local {:#?}", local);
if tokio::runtime::Handle::try_current().is_err() {
println!("No active Tokio runtime");
}
println!("Incoming request: {:?}", req);
local.spawn_local(async move{
println!("Entered into local spawn");
match on(req).await {
Ok(upgraded) => {
println!("Connection upgraded");
let ws_stream: WebSocketStream<Upgraded> = WebSocketStream::from_raw_socket(
upgraded,
Role::Server,
Some(WebSocketConfig::default())
).await;
let shared_ws_stream = Arc::new(RwLock::new(ws_stream));
let shared_ws_stream_clone = Arc::clone(&shared_ws_stream);
let user_connection: Connection = Connection::new(
None,
None,
shared_ws_stream,
device_id.clone(),
None,
None,
);
println!("WebSocket stream created");
if let Err(e) = handle_websocket_stream(shared_ws_stream_clone, clients).await {
eprintln!("WebSocket handler error: {}", e);
}
}
Err(e) => eprintln!("Failed to upgrade connection: {}", e),
}
});
Ok(response)
} else {
Ok(Response::new(Body::from("404 not found!")))
}
}
Trying to make a websocket connection in rust, but the connection breaks abruptly after connecting for 1-2 milliseconds Below are the terminal logs:
Starting Tokio runtime...
Tokio runtime is active.
LocalSet created.
Server listening on 127.0.0.1:8080
/ws path entered
WebSocket request validation:
Upgrade header present: true
Connection upgrade: true
Has WebSocket key: true
Has correct version: true
ITS A WS CONNECTION!
Entered into match token
Token validated successfully
CALLED
WebSocket Key: "rTNBeHrwfawHZ9uhlw3M1Q=="
Accept Key: pJhyS7Aq49YTSEt5997ylXf8Z7g=
response : Response {
status: 101,
version: HTTP/1.1,
headers: {
"connection": "Upgrade",
"upgrade": "websocket",
"sec-websocket-accept": "pJhyS7Aq49YTSEt5997ylXf8Z7g=",
},
body: Body(
Empty,
),
}
local LocalSet
Incoming request: Request { method: GET, uri: /ws?token=JWT_TOKEN, version: HTTP/1.1, headers: {"sec-websocket-version": "13", "sec-websocket-key": "rTNBeHrwfawHZ9uhlw3M1Q==", "connection": "Upgrade", "upgrade": "websocket", "sec-websocket-extensions": "permessage-deflate; client_max_window_bits", "host": "localhost:8080"}, body: Body(Empty) }
**The Code is not entering into local.spawn_local block so its not able to make a connection, as per me. If anyone has any idea how to solve this, please share. **
Here's the full error after adding the local.await before Ok(response)
future cannot be sent between threads safely
within `hyper::proto::h2::server::H2Stream<impl futures::Future<Output = Result<hyper::Response<Body>, hyper::Error>>, Body>`, the trait `std::marker::Send` is not implemented for `Rc<tokio::task::local::Context>`, which is required by `hyper::common::exec::Exec: hyper::common::exec::ConnStreamExec<_, _>`
the trait `hyper::common::exec::ConnStreamExec<F, B>` is implemented for `hyper::common::exec::Exec`
Second error at server.await in main function:
the trait bound `hyper::common::exec::Exec: hyper::common::exec::ConnStreamExec<impl futures::Future<Output = Result<hyper::Response<Body>, hyper::Error>>, Body>` is not satisfied
the trait `hyper::common::exec::ConnStreamExec<F, B>` is implemented for `hyper::common::exec::Exec`
required for `NewSvcTask<AddrStream, {async block@src/main.rs:114:9: 118:10}, ServiceFn<..., ...>, ..., ...>` to implement `futures::Future`
required for `hyper::common::exec::Exec` to implement `hyper::common::exec::NewSvcExec<AddrStream, {async block@src/main.rs:114:9: 118:10}, hyper::service::util::ServiceFn<{closure@src/main.rs:115:46: 115:71}, Body>, hyper::common::exec::Exec, hyper::server::server::NoopWatcher>`
1 redundant requirement hidden
required for `hyper::Server<AddrIncoming, hyper::service::make::MakeServiceFn<{closure@src/main.rs:111:36: 111:43}>>` to implement `futures::Future`
required for `hyper::Server<AddrIncoming, hyper::service::make::MakeServiceFn<{closure@src/main.rs:111:36: 111:43}>>` to implement `std::future::IntoFuture`
consider using `--verbose` to print the full type name to the console
#[tokio::main]
async fn main() {
println!("Starting Tokio runtime...");
if tokio::runtime::Handle::try_current().is_ok() {
println!("Tokio runtime is active.");
} else {
println!("No active Tokio runtime.");
}
let local = tokio::task::LocalSet::new();
println!("LocalSet created.");
dotenv().ok();
let services = Arc::new(services::new_services());
let connected_clients: ConnectedClients = Arc::new(Mutex::new(HashMap::new()));
let make_svc = make_service_fn(|_conn| {
let services = Arc::clone(&services);
let clients = Arc::clone(&connected_clients);
async move {
Ok::<_, hyper::Error>(service_fn(move |req: Request<Body>| {
handle_request(req, services.clone(), clients.clone())
}))
}
});
let port: u16 = env::var("PORT")
.unwrap_or_else(|_| "8080".to_string())
.parse()
.expect("PORT must be a valid number");
let addr = format!("127.0.0.1:{}", port).parse().expect("Invalid address");
local.run_until(async {
let server = Server::bind(&addr).serve(make_svc);
println!("Server listening on {}", addr);
if let Err(e) = server.await {
eprintln!("Server error: {}", e);
}
}).await;
}
async fn handle_request(
mut req: Request<Body>,
_services: Arc<Services>,
clients: ConnectedClients
) -> Result<Response<Body>, hyper::Error> {
if req.uri().path() == "/ws" {
println!("/ws path entered");
// Check if it's a WebSocket request
if !is_websocket_request(&req) {
return Ok(Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(Body::from("Not a WebSocket request"))
.unwrap());
}
println!("ITS A WS CONNECTION!");
// Extract token from query parameters
let token = extract_token_from_query(req.uri());
let (mut user_id,
mut _id,
mut stores,
mut first_name,
mut last_name,
mut is_admin,
mut route,
mut device_id
) = (
String::new(),
String::new(),
Vec::new(),
String::new(),
String::new(),
false,
String::new(),
None,
);
match token {
Some(ref t) => {
// Token validation logic
println!("Entered into match token");
let secret_key = std::env::var("JWT_SECRET").unwrap_or_else(|_| "default_secret".to_string());
let decoding_key = DecodingKey::from_secret(secret_key.as_ref());
let validation = Validation::default();
// TOKEN VALIDATION AND PARSING
}
// Get the WebSocket key from headers
let ws_key = req.headers().get(SEC_WEBSOCKET_KEY).and_then(|v| v.to_str().ok()).unwrap();
// Generate accept key
let accept_key = derive_accept_key(ws_key.as_bytes());
println!("WebSocket Key: {:?}", ws_key);
println!("Accept Key: {}", accept_key);
let mut response = Response::builder()
.status(StatusCode::SWITCHING_PROTOCOLS)
.header(CONNECTION, "Upgrade")
.header(UPGRADE, "websocket")
.header(SEC_WEBSOCKET_ACCEPT, accept_key);
// Add protocol if present
if let Some(protocol) = req.headers().get(SEC_WEBSOCKET_PROTOCOL) {
response = response.header(SEC_WEBSOCKET_PROTOCOL, protocol);
}
let response = response.body(Body::empty()).unwrap();
println!("response : {:#?}", response);
// Perform WebSocket upgrade and wrapping outside `tokio::spawn`
let device_id_clone = device_id.clone();
let local = LocalSet::new();
println!("local {:#?}", local);
if tokio::runtime::Handle::try_current().is_err() {
println!("No active Tokio runtime");
}
println!("Incoming request: {:?}", req);
local.spawn_local(async move{
println!("Entered into local spawn");
match on(req).await {
Ok(upgraded) => {
println!("Connection upgraded");
let ws_stream: WebSocketStream<Upgraded> = WebSocketStream::from_raw_socket(
upgraded,
Role::Server,
Some(WebSocketConfig::default())
).await;
let shared_ws_stream = Arc::new(RwLock::new(ws_stream));
let shared_ws_stream_clone = Arc::clone(&shared_ws_stream);
let user_connection: Connection = Connection::new(
None,
None,
shared_ws_stream,
device_id.clone(),
None,
None,
);
println!("WebSocket stream created");
if let Err(e) = handle_websocket_stream(shared_ws_stream_clone, clients).await {
eprintln!("WebSocket handler error: {}", e);
}
}
Err(e) => eprintln!("Failed to upgrade connection: {}", e),
}
});
Ok(response)
} else {
Ok(Response::new(Body::from("404 not found!")))
}
}
Trying to make a websocket connection in rust, but the connection breaks abruptly after connecting for 1-2 milliseconds Below are the terminal logs:
Starting Tokio runtime...
Tokio runtime is active.
LocalSet created.
Server listening on 127.0.0.1:8080
/ws path entered
WebSocket request validation:
Upgrade header present: true
Connection upgrade: true
Has WebSocket key: true
Has correct version: true
ITS A WS CONNECTION!
Entered into match token
Token validated successfully
CALLED
WebSocket Key: "rTNBeHrwfawHZ9uhlw3M1Q=="
Accept Key: pJhyS7Aq49YTSEt5997ylXf8Z7g=
response : Response {
status: 101,
version: HTTP/1.1,
headers: {
"connection": "Upgrade",
"upgrade": "websocket",
"sec-websocket-accept": "pJhyS7Aq49YTSEt5997ylXf8Z7g=",
},
body: Body(
Empty,
),
}
local LocalSet
Incoming request: Request { method: GET, uri: /ws?token=JWT_TOKEN, version: HTTP/1.1, headers: {"sec-websocket-version": "13", "sec-websocket-key": "rTNBeHrwfawHZ9uhlw3M1Q==", "connection": "Upgrade", "upgrade": "websocket", "sec-websocket-extensions": "permessage-deflate; client_max_window_bits", "host": "localhost:8080"}, body: Body(Empty) }
**The Code is not entering into local.spawn_local block so its not able to make a connection, as per me. If anyone has any idea how to solve this, please share. **
Here's the full error after adding the local.await before Ok(response)
future cannot be sent between threads safely
within `hyper::proto::h2::server::H2Stream<impl futures::Future<Output = Result<hyper::Response<Body>, hyper::Error>>, Body>`, the trait `std::marker::Send` is not implemented for `Rc<tokio::task::local::Context>`, which is required by `hyper::common::exec::Exec: hyper::common::exec::ConnStreamExec<_, _>`
the trait `hyper::common::exec::ConnStreamExec<F, B>` is implemented for `hyper::common::exec::Exec`
Second error at server.await in main function:
the trait bound `hyper::common::exec::Exec: hyper::common::exec::ConnStreamExec<impl futures::Future<Output = Result<hyper::Response<Body>, hyper::Error>>, Body>` is not satisfied
the trait `hyper::common::exec::ConnStreamExec<F, B>` is implemented for `hyper::common::exec::Exec`
required for `NewSvcTask<AddrStream, {async block@src/main.rs:114:9: 118:10}, ServiceFn<..., ...>, ..., ...>` to implement `futures::Future`
required for `hyper::common::exec::Exec` to implement `hyper::common::exec::NewSvcExec<AddrStream, {async block@src/main.rs:114:9: 118:10}, hyper::service::util::ServiceFn<{closure@src/main.rs:115:46: 115:71}, Body>, hyper::common::exec::Exec, hyper::server::server::NoopWatcher>`
1 redundant requirement hidden
required for `hyper::Server<AddrIncoming, hyper::service::make::MakeServiceFn<{closure@src/main.rs:111:36: 111:43}>>` to implement `futures::Future`
required for `hyper::Server<AddrIncoming, hyper::service::make::MakeServiceFn<{closure@src/main.rs:111:36: 111:43}>>` to implement `std::future::IntoFuture`
consider using `--verbose` to print the full type name to the console
Share
Improve this question
edited Nov 21, 2024 at 12:32
Sagar Maheshwari
asked Nov 21, 2024 at 5:46
Sagar MaheshwariSagar Maheshwari
72 bronze badges
1 Answer
Reset to default 0A LocalSet
doesn't do anything until you actually await it somehow. local.spawn_local
doesn't start driving the provided future if the LocalSet
itself isn't being driven itself. This is why the code doesn't run -- you send the future to the LocalSet
and then you drop it. This is not too different from writing let foo = async move { ... };
, never awaiting foo
, and then wondering why the code in the async block doesn't run.
You could add local.await;
before Ok(response)
for example. Note that this will block the outer function until the future given to the LocalSet
finishes. It's not clear from your (long and convoluted) example code whether this is actually the correct solution to the problem, however it would cause the code to run, which fixes the primary issue that you're seeing.
It's not clear why you're even using LocalSet
here since you could just await the future directly. I don't see any utility LocalSet
is giving you whatsoever.
本文标签: Not able to make websocket connection in RustStack Overflow
版权声明:本文标题:Not able to make websocket connection in Rust - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1742311783a2451011.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论