QuickPulse SDK for the Research API (sync + async), with:
pip install rsi-data-quickpulse
from rsi_data.quickpulse import QuickPulse
with QuickPulse(api_key="sk_live_...") as client:
resp = client.ask(query="Hello", idempotency_key="example-1")
print(resp.response)
from rsi_data.quickpulse import QuickPulse
client = QuickPulse(api_key="sk_live_...", timeout=30)
resp = client.ask(
query="What is the recent inflation trend in India?",
source="govt_news",
advanced_config={
"search_depth": "advanced",
"recency_window": "last_12m",
"include_inline_citations": True,
"format_style": "markdown_report",
"max_results_per_query": 10,
},
idempotency_key="msg-12345", # enables safe automatic retries
)
print(resp.response)
client.close()
from rsi_data.quickpulse import QuickPulse
def on_token(chunk: str) -> None:
print(chunk, end="", flush=True) # stream to console/terminal
def on_status(status: dict) -> None:
# e.g. {"type": "searching", "message": "...", "timestamp": "..."}
pass # update a progress bar or UI
with QuickPulse(api_key="sk_live_...") as client:
for event in client.stream(
query="Summarize current macro indicators in India",
source="govt",
advanced_config={"recency_window": "last_12m"},
on_token=on_token,
on_status=on_status,
):
if event.type == "final":
print("\n\nFinal answer:\n", event.data["response"])
import asyncio
from rsi_data.quickpulse import AsyncQuickPulse
async def main():
async with AsyncQuickPulse(api_key="sk_live_...") as client:
# Ask (non-streaming)
resp = await client.ask(query="FX outlook next year", source="govt")
print("ASK:", resp.response)
# Stream tokens for a live UI
async for event in client.stream(query="Budget highlights", source="govt"):
if event.type == "token":
print(event.data, end="")
elif event.type == "final":
print("\n\nFINAL:", event.data["response"])
asyncio.run(main())
ask)import json, uuid
def make_idempotency_key(query: str, options: dict | None = None) -> str:
payload = json.dumps({"query": query, "options": options or {}}, sort_keys=True)
return str(uuid.uuid5(uuid.NAMESPACE_URL, payload))
key = make_idempotency_key(
"Inflation trend",
{"source": "govt_news", "advanced_config": {"recency_window": "last_12m"}},
)
from rsi_data.quickpulse import QuickPulse, errors
client = QuickPulse(api_key="sk_live_...")
try:
client.ask(query="Hello")
except errors.AuthError as e:
print("Auth error:", e) # Bad/expired token
except errors.RateLimitError as e:
print("Rate limit, retry after:", getattr(e, "retry_after", None))
except errors.ApiError as e:
print("API error:", e.error_type, e.status_code)
except errors.TransportError as e:
print("Network error:", e)
finally:
client.close()
# Windows (PowerShell / CMD)
set QUICKPULSE_API_KEY=sk_live_...
set QUICKPULSE_BASE_URL=https://api.rsi-data.com
from rsi_data.quickpulse import QuickPulse
# API key and base URL are auto-resolved from env if not provided explicitly
client = QuickPulse()
from rsi_data.quickpulse import QuickPulse
client = QuickPulse(
api_key="sk_live_...",
timeout=30, # total timeout (ask)/per-read timeout (stream)
connect_timeout=5.0, # default 5s
read_timeout=300.0, # default 60s
proxies={"all": "http://proxy.internal:8080"},
)
from rsi_data.quickpulse import QuickPulse
import sys
with QuickPulse(api_key="sk_live_...") as client:
for event in client.stream(query="Explain RBI policy stance", source="govt"):
if event.type == "token":
sys.stdout.write(event.data)
sys.stdout.flush()
elif event.type == "final":
print("\n\nDone.")
ask() when an idempotency_key is provided.