Integrate Services with Azure Service Bus
Introduction
3 minAzure Service Bus is an enterprise-grade messaging broker that decouples AI pipeline stages. A document ingestion API sends to a queue; an embedding service processes independently; a vector indexing service receives results โ all without any service knowing about the others. The broker handles ordering, retries, dead-lettering, and guaranteed delivery.
Service Bus Concepts
8 minService Bus: Producer โ Queue/Topic โ Multiple Consumers (fan-out for Topics)
1. Queues vs Topics/Subscriptions
| # | Feature | Queue | Topic + Subscriptions |
|---|---|---|---|
| 1 | Consumers | One receiver per message | Each subscription gets a copy โ multiple receivers |
| 2 | Pattern | Point-to-point | Pub/Sub (fan-out) |
| 3 | Use for | Task queue, work distribution | Events that multiple services must react to |
| 4 | AI example | Document โ embedding worker (1 worker processes) | New AI result โ logging + analytics + webhook (3 subscribers) |
2. Message Properties
- MessageId โ unique identifier. Used for duplicate detection (idempotent receive).
- SessionId โ groups related messages. Required for session-based FIFO ordering. All messages with same SessionId go to same consumer.
- ScheduledEnqueueTimeUtc โ delay message delivery to a future time.
- TimeToLive (TTL) โ auto-move to DLQ if not consumed before TTL expires.
- ApplicationProperties โ custom key-value metadata for filtering subscriptions.
3. Receive Modes โ The Exam Trap
- Peek-lock (default, recommended) โ message is locked for a timeout (default 60 seconds). Process succeeds โ call
complete(). Process fails โ lock expires, message returns to queue for retry. Guarantees at-least-once delivery. - Receive-and-Delete โ message deleted immediately on receive. Fast but if processing fails, message is lost forever. Only for non-critical, idempotent operations.
Send and Receive Messages with the SDK
10 min1. Send Messages
from azure.servicebus import ServiceBusClient, ServiceBusMessage
from azure.identity import DefaultAzureCredential
credential = DefaultAzureCredential()
client = ServiceBusClient("mynamespace.servicebus.windows.net", credential)
with client.get_queue_sender("document-processing") as sender:
msg = ServiceBusMessage(
body='{"document_id": "doc-456", "url": "https://..."}',
message_id="doc-456", # For duplicate detection
application_properties={"model": "gpt-4o", "priority": "high"}
)
sender.send_messages(msg) 2. Receive Messages (Peek-Lock)
with client.get_queue_receiver("document-processing", max_wait_time=5) as receiver:
for msg in receiver:
try:
process_document(str(msg)) # Your business logic
receiver.complete_message(msg) # Removes from queue
except Exception as e:
receiver.abandon_message(msg) # Returns to queue for retry
# Or: receiver.dead_letter_message(msg, reason=str(e)) # Send to DLQ 3. Send Large Payloads โ Claim-Check Pattern
Service Bus max message size = 256 KB (Standard) or 100 MB (Premium with large message support). AI models, embeddings, and documents often exceed this.
- Upload large content to Azure Blob Storage
- Send only the blob URI in the Service Bus message body
- Receiver reads URI, fetches blob, processes large content
# Producer (send only the reference)
blob_uri = upload_to_blob(large_document_bytes)
sender.send_messages(ServiceBusMessage(
body='{"blob_uri": "' + blob_uri + '", "document_id": "doc-123"}'
))
# Consumer (fetch the actual content)
msg_data = json.loads(str(msg))
content = download_from_blob(msg_data["blob_uri"]) Dead-Letter Queue, Sessions, and Filters
8 min1. Dead-Letter Queue (DLQ)
Sub-queue automatically created for every queue/subscription. Messages land in DLQ when:
- Delivery count exceeds maximum (default 10 retries)
- Consumer explicitly calls
dead_letter_message() - Message TTL expires without being consumed
- Filter evaluation fails on a topic subscription
Access the DLQ path: queuename/$deadletterqueue. Re-process or investigate failed messages there.
2. Sessions โ FIFO Ordering Per Key
Standard Service Bus is not ordered. Sessions guarantee FIFO ordering for messages with the same session_id. Essential for conversation chains in AI chatbots where messages from the same user must be processed in order.
msg = ServiceBusMessage(body="step-2", session_id="user-001")
sender.send_messages(msg)
# Receiver acquires exclusive lock on a session
with receiver.get_session_receiver("my-queue", session_id="user-001") as session_receiver:
for msg in session_receiver:
process_in_order(msg) 3. Topic Filters โ Route Messages to Subscriptions
- Boolean filter โ
TrueRuleFilter(): all messages.FalseRuleFilter(): no messages. - SQL filter โ SQL-like expression on ApplicationProperties:
priority = 'high' AND model = 'gpt-4o' - Correlation filter โ match specific property values exactly. More efficient than SQL filters.
โก Service Bus Master Cheatsheet
queuename/$deadletterqueueExercise โ Async Document Processing Pipeline
30 min- Create a Service Bus namespace and queue
- Send document processing messages with application_properties for model type
- Implement a consumer with peek-lock: complete on success, abandon on failure
- Observe DLQ after 10 failed retries
- Implement claim-check: upload a large JSON payload to Blob, send URI in message
- Create a topic with two subscriptions + SQL filter to route by model type
Knowledge Check
5 min- Q: New AI result must notify 3 different services. Queue or Topic? A: Topic with 3 subscriptions (fan-out)
- Q: Consumer crashes mid-processing. Which receive mode re-queues message? A: Peek-lock (lock expires โ message returns to queue)
- Q: Message payload is 5 MB embedding vector. Service Bus max is 256 KB. Solution? A: Claim-check pattern (store blob in Azure Storage, send URI)
- Q: Chat messages per user must be processed in order. Feature needed? A: Sessions (session_id = userId)
- Q: Where do messages go after 10 failed delivery attempts? A: Dead-letter queue (DLQ)
Summary
2 minService Bus decouples AI pipeline stages. Use Queues for point-to-point, Topics for pub/sub fan-out. Always use Peek-lock for reliability โ complete on success, abandon or dead-letter on failure. Use claim-check for large payloads. Use sessions for per-key FIFO. Monitor DLQ for stuck messages. Authenticate with managed identity + Azure Service Bus Data Sender/Receiver roles.
๐ง Memory Tricks
Queue vs Topic: Queue = one consumer wins. Topic = everyone wins (pub/sub).
Peek-lock flow: Peek (lock) โ Process โ Complete (or Abandon/DLQ). Think: "Look before you delete."
Claim-check: "Check your coat at the door" โ store the big thing (blob), carry the ticket (URI).
Exam Summary Card
2 min| Scenario | Answer |
|---|---|
| 1 service processes each message | Queue |
| Multiple services need each message | Topic + subscriptions |
| No data loss on consumer crash | Peek-lock + complete()/abandon() |
| Message > 256 KB (Standard) | Claim-check pattern (blob + URI) |
| Messages per user in order | Sessions (session_id=userId) |
| Messages that keep failing | Dead-letter queue (after max delivery count) |
| Route to subscription based on attribute | SQL filter on ApplicationProperties |
Azure Functions + Service Bus Trigger
3 minService Bus integrates natively with Azure Functions for serverless processing. A function triggers on every new message โ scales to zero when queue is empty, scales up automatically under load. Perfect for AI background processing without managing infrastructure.
import azure.functions as func
@app.service_bus_queue_trigger(
arg_name="msg",
queue_name="document-processing",
connection="ServiceBusConnection"
)
def process_document(msg: func.ServiceBusMessage):
doc_data = msg.get_body().decode('utf-8')
# process document, generate embedding, store in vector DB
return Azure Service Bus
๐ Key Facts
- Queue โ Point-to-point โ ONE consumer processes each message
- Topic + Subscriptions โ Pub/Sub โ ALL subscriptions get a copy (fan-out)
- Peek-lock โ Lock โ Process โ complete() or abandon() โ no message loss
- Receive-and-Delete โ Deleted on receive โ fast but message lost on crash
- DLQ path โ queuename/$deadletterqueue โ after max delivery count (default 10)
- Claim-check pattern โ Large payload (>256 KB) โ Blob Storage, send URI in message
- Sessions โ FIFO per session_id โ use for per-user conversation ordering
- SQL filter โ Route topic messages by ApplicationProperties key/value
๐ป Commands & Patterns
from azure.servicebus import ServiceBusClient, ServiceBusMessage
from azure.identity import DefaultAzureCredential
client = ServiceBusClient(
"mynamespace.servicebus.windows.net",
DefaultAzureCredential())
# Send
with client.get_queue_sender("doc-queue") as s:
s.send_messages(ServiceBusMessage(
body='{"doc_id":"123"}', message_id="doc-123",
application_properties={"priority":"high"}))
# Receive (peek-lock)
with client.get_queue_receiver("doc-queue",max_wait_time=5) as r:
for msg in r:
try:
process(str(msg)); r.complete_message(msg)
except: r.abandon_message(msg) # retry
# Claim-check
blob_uri = upload_to_blob(large_content)
s.send_messages(ServiceBusMessage(f'{{"uri":"{blob_uri}"}}')) Build and Deploy Serverless APIs with Azure Functions
Introduction to Azure Functions
3 minAzure Functions is serverless compute โ code runs in response to events (HTTP, queue messages, timers, blob uploads) without managing servers. For AI apps: trigger on Service Bus, call Azure OpenAI, write results to Cosmos DB โ zero infrastructure.
Triggers and Output Bindings
10 min1. HTTP Trigger
import azure.functions as func
import json
app = func.FunctionApp()
@app.route(route="embed", methods=["POST"])
def embed_text(req: func.HttpRequest) -> func.HttpResponse:
body = req.get_json()
embedding = generate_embedding(body.get("text", ""))
return func.HttpResponse(
json.dumps({"embedding": embedding}),
mimetype="application/json"
) 2. Service Bus Trigger
@app.service_bus_queue_trigger(
arg_name="msg",
queue_name="embed-queue",
connection="ServiceBusConnection"
)
def process_embedding(msg: func.ServiceBusMessage) -> None:
body = json.loads(msg.get_body().decode())
embedding = generate_embedding(body["text"])
store_in_cosmos(body["doc_id"], embedding)
# Auto-completed on success; dead-lettered on repeated failure complete() call needed. 3. Timer Trigger (NCRONTAB)
@app.timer_trigger(
arg_name="timer",
schedule="0 0 2 * * *"
)
def daily_cleanup(timer: func.TimerRequest) -> None:
cleanup_old_embeddings()
# NCRONTAB: {second} {minute} {hour} {day} {month} {dow} 4. Output Binding โ No SDK Boilerplate
@app.route(route="process", methods=["POST"])
@app.cosmos_db_output(
arg_name="doc",
database_name="ai-db",
container_name="results",
connection="CosmosConnection"
)
def process_and_store(
req: func.HttpRequest,
doc: func.Out[func.Document]
) -> func.HttpResponse:
result = process(req.get_json())
doc.set(func.Document.from_dict(result))
return func.HttpResponse("Stored", status_code=202) doc.set(value). Exam-preferred pattern for "write to Cosmos/Queue from Functions". Hosting Plans
5 min| Plan | Cold Start | VNet | Max Duration |
|---|---|---|---|
| Consumption | Yes | No | 10 min |
| Premium | No (pre-warmed) | Yes | Unlimited |
| Dedicated | No | Yes | Unlimited |
Durable Functions โ Fan-Out / Fan-In
7 minimport azure.durable_functions as df
@app.orchestration_trigger(context_name="context")
def embedding_orchestrator(context: df.DurableOrchestrationContext):
docs = context.get_input()
# Fan-out: embed all docs in parallel
tasks = [context.call_activity("EmbedDoc", d) for d in docs]
# Fan-in: wait for all
embeddings = yield context.task_all(tasks)
yield context.call_activity("StoreAll", embeddings)
return f"Done: {len(embeddings)} docs"
@app.activity_trigger(input_name="doc")
def embed_doc(doc: dict) -> list:
return generate_embedding(doc["content"]) datetime.now(), no random, no direct external calls. All side effects go through call_activity(). Exercise
20 min- Create a Function App (Python 3.11, Consumption plan)
- Add an HTTP trigger that returns embeddings for POSTed text
- Add a Service Bus trigger consuming from
embed-queue - Use a Cosmos DB output binding โ no SDK code needed
- Deploy with
func azure functionapp publish my-app
Summary
2 minAzure Functions: HTTP triggers for REST APIs, Service Bus triggers for reliable queue processing, Timer triggers for scheduled jobs. Output bindings eliminate SDK boilerplate. Durable Functions: orchestrator + activities for fan-out/fan-in parallel AI workloads. Orchestrator must be deterministic. Premium plan = no cold starts + VNet for private endpoints.