Examples
Topics
You can run the async code by importing Client
from upstash_qstash.asyncio
and awaiting the methods.
Create a topic and add 2 endpoints
from upstash_qstash import Client
client = Client("<QSTASH_TOKEN>")
topics = client.topics()
topics.upsert_or_add_endpoints(
{
"name": "topic_name",
"endpoints": [
{"url": "https://my-endpoint-1"},
{"url": "https://my-endpoint-2"}
],
}
)
Get topic by name
from upstash_qstash import Client
client = Client("<QSTASH_TOKEN>")
topics = client.topics()
topic = topics.get("topic_name")
print(topic["name"], topic["endpoints"])
List topics
from upstash_qstash import Client
client = Client("<QSTASH_TOKEN>")
topics = client.topics()
all_topics = topics.list()
for topic in all_topics:
print(topic["name"], topic["endpoints"])
Remove an endpoint from a topic
from upstash_qstash import Client
client = Client("<QSTASH_TOKEN>")
topics = client.topics()
topics.remove_endpoints(
{
"name": "topic_name",
"endpoints": [
{"url": "https://my-endpoint-1"},
],
}
)
Delete a topic
from upstash_qstash import Client
client = Client("<QSTASH_TOKEN>")
topics = client.topics()
topics.delete("topic_name")