Are you an LLM? Read llms.txt for a summary of the docs, or llms-full.txt for the full context.
Skip to content

Subscriptions

What this teaches: the graphql-transport-ws model, when to multiplex over a Session, and when to open a dedicated connection with WsClient::subscribe.

Mental model

WsClient is a cheap configuration value. Every live socket is a Session — an Arc-backed handle that owns a background tokio task driving one WebSocket. Subscriptions are tagged with protocol-level ids and routed back to per-stream channels.

The protocol is graphql-transport-ws (the graphql-ws rewrite that async-graphql speaks). The SDK sends a Ping every 15 seconds to stay under the server's 30-second keepalive_timeout.

Two entry points

Entry pointConnectionsWhen to use
Session::subscribeOne socket, many subscriptionsLong-lived process with several streams. Pay the handshake cost once.
WsClient::subscribeOne socket per subscriptionOne-shot stream, short-lived task, or trivial demo.

Multiplexed Session

use {
    anyhow::Result,
    dango_sdk::{SubscribeBlock, SubscribeTrades, WsClient, subscribe_block, subscribe_trades},
    futures::StreamExt,
};
 
#[tokio::main]
async fn main() -> Result<()> {
    let session = WsClient::new("wss://api-mainnet.dango.zone/graphql")?
        .connect()
        .await?;
 
    let mut blocks = session
        .subscribe::<SubscribeBlock>(subscribe_block::Variables {})
        .await?;
    let mut trades = session
        .subscribe::<SubscribeTrades>(subscribe_trades::Variables {
            base_denom:  "dango".into(),
            quote_denom: "bridge/usdc".into(),
        })
        .await?;
 
    loop {
        tokio::select! {
            Some(item) = blocks.next() => println!("block: {item:?}"),
            Some(item) = trades.next() => println!("trade: {item:?}"),
            else => break,
        }
    }
    Ok(())
}

The Session and every clone close together. The connection drops when the last clone and every derived stream have been dropped — see Session.

Dedicated WsClient::subscribe

use {
    anyhow::Result,
    dango_sdk::{SubscribeBlock, WsClient, subscribe_block},
    futures::StreamExt,
};
 
#[tokio::main]
async fn main() -> Result<()> {
    let ws = WsClient::new("wss://api-mainnet.dango.zone/graphql")?;
    let mut stream = ws
        .subscribe::<SubscribeBlock>(subscribe_block::Variables {})
        .await?;
 
    while let Some(item) = stream.next().await {
        println!("{item:?}");
    }
    Ok(())
}

Convenience: connect + subscribe + drop session when the stream ends.

Sugar: SubscriptionVariables

Each of the 13 codegen Variables types implements SubscriptionVariables, letting you call vars.subscribe(&ws) instead of ws.subscribe::<Q>(vars):

use {
    dango_sdk::{SubscriptionVariables, WsClient, subscribe_block},
    futures::StreamExt,
};
 
let ws = WsClient::new("wss://api-mainnet.dango.zone/graphql")?;
let mut stream = subscribe_block::Variables {}.subscribe(&ws).await?;
while let Some(item) = stream.next().await {
    println!("{item:?}");
}
// Ok::<(), anyhow::Error>(())

Stream item shape

type Item<T> = Result<graphql_client::Response<T>, dango_sdk::WsError>;
  • Err(WsError) — transport-level or subscription-protocol failure. Inspect the variant; some are terminal.
  • Ok(Response { data, errors, .. }) — a server-side payload. GraphQL errors live in errors; data may be None if every selection errored.
while let Some(item) = stream.next().await {
    match item {
        Ok(resp) => {
            if let Some(errors) = resp.errors {
                eprintln!("graphql: {errors:?}");
            }
            if let Some(data) = resp.data {
                println!("{data:?}");
            }
        }
        Err(err) => {
            eprintln!("ws: {err}");
            break;
        }
    }
}
// Ok::<(), anyhow::Error>(())

Limits

The server caps each connection at 30 concurrent subscriptions. When you need more, shard across multiple WsClient/Session pairs.

Next