Skip to content

strands.event_loop

This package provides the core event loop implementation for the agents SDK.

The event loop enables conversational AI agents to process messages, execute tools, and handle errors in a controlled, iterative manner.

strands.event_loop.event_loop

This module implements the central event loop.

The event loop allows agents to:

  1. Process conversation messages
  2. Execute tools based on model requests
  3. Handle errors and recovery strategies
  4. Manage recursive execution cycles

event_loop_cycle(model, system_prompt, messages, tool_config, tool_handler, thread_pool, event_loop_metrics, event_loop_parent_span, kwargs) async

Execute a single cycle of the event loop.

This core function processes a single conversation turn, handling model inference, tool execution, and error recovery. It manages the entire lifecycle of a conversation turn, including:

  1. Initializing cycle state and metrics
  2. Checking execution limits
  3. Processing messages with the model
  4. Handling tool execution requests
  5. Managing recursive calls for multi-turn tool interactions
  6. Collecting and reporting metrics
  7. Error handling and recovery

Parameters:

Name Type Description Default
model Model

Provider for running model inference.

required
system_prompt Optional[str]

System prompt instructions for the model.

required
messages Messages

Conversation history messages.

required
tool_config Optional[ToolConfig]

Configuration for available tools.

required
tool_handler Optional[ToolHandler]

Handler for executing tools.

required
thread_pool Optional[ThreadPoolExecutor]

Optional thread pool for parallel tool execution.

required
event_loop_metrics EventLoopMetrics

Metrics tracking object for the event loop.

required
event_loop_parent_span Optional[Span]

Span for the parent of this event loop.

required
kwargs dict[str, Any]

Additional arguments including:

  • request_state: State maintained across cycles
  • event_loop_cycle_id: Unique ID for this cycle
  • event_loop_cycle_span: Current tracing Span for this cycle
required

Yields:

Type Description
AsyncGenerator[dict[str, Any], None]

Model and tool invocation events. The last event is a tuple containing:

  • StopReason: Reason the model stopped generating (e.g., "tool_use")
  • Message: The generated message from the model
  • EventLoopMetrics: Updated metrics for the event loop
  • Any: Updated request state

Raises:

Type Description
EventLoopException

If an error occurs during execution

ContextWindowOverflowException

If the input is too large for the model

Source code in strands/event_loop/event_loop.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
async def event_loop_cycle(
    model: Model,
    system_prompt: Optional[str],
    messages: Messages,
    tool_config: Optional[ToolConfig],
    tool_handler: Optional[ToolHandler],
    thread_pool: Optional[ThreadPoolExecutor],
    event_loop_metrics: EventLoopMetrics,
    event_loop_parent_span: Optional[trace.Span],
    kwargs: dict[str, Any],
) -> AsyncGenerator[dict[str, Any], None]:
    """Execute a single cycle of the event loop.

    This core function processes a single conversation turn, handling model inference, tool execution, and error
    recovery. It manages the entire lifecycle of a conversation turn, including:

    1. Initializing cycle state and metrics
    2. Checking execution limits
    3. Processing messages with the model
    4. Handling tool execution requests
    5. Managing recursive calls for multi-turn tool interactions
    6. Collecting and reporting metrics
    7. Error handling and recovery

    Args:
        model: Provider for running model inference.
        system_prompt: System prompt instructions for the model.
        messages: Conversation history messages.
        tool_config: Configuration for available tools.
        tool_handler: Handler for executing tools.
        thread_pool: Optional thread pool for parallel tool execution.
        event_loop_metrics: Metrics tracking object for the event loop.
        event_loop_parent_span: Span for the parent of this event loop.
        kwargs: Additional arguments including:

            - request_state: State maintained across cycles
            - event_loop_cycle_id: Unique ID for this cycle
            - event_loop_cycle_span: Current tracing Span for this cycle

    Yields:
        Model and tool invocation events. The last event is a tuple containing:

            - StopReason: Reason the model stopped generating (e.g., "tool_use")
            - Message: The generated message from the model
            - EventLoopMetrics: Updated metrics for the event loop
            - Any: Updated request state

    Raises:
        EventLoopException: If an error occurs during execution
        ContextWindowOverflowException: If the input is too large for the model
    """
    # Initialize cycle state
    kwargs["event_loop_cycle_id"] = uuid.uuid4()

    # Initialize state and get cycle trace
    if "request_state" not in kwargs:
        kwargs["request_state"] = {}
    attributes = {"event_loop_cycle_id": str(kwargs.get("event_loop_cycle_id"))}
    cycle_start_time, cycle_trace = event_loop_metrics.start_cycle(attributes=attributes)
    kwargs["event_loop_cycle_trace"] = cycle_trace

    yield {"callback": {"start": True}}
    yield {"callback": {"start_event_loop": True}}

    # Create tracer span for this event loop cycle
    tracer = get_tracer()
    cycle_span = tracer.start_event_loop_cycle_span(
        event_loop_kwargs=kwargs, messages=messages, parent_span=event_loop_parent_span
    )
    kwargs["event_loop_cycle_span"] = cycle_span

    # Create a trace for the stream_messages call
    stream_trace = Trace("stream_messages", parent_id=cycle_trace.id)
    cycle_trace.add_child(stream_trace)

    # Clean up orphaned empty tool uses
    clean_orphaned_empty_tool_uses(messages)

    # Process messages with exponential backoff for throttling
    message: Message
    stop_reason: StopReason
    usage: Any
    metrics: Metrics

    # Retry loop for handling throttling exceptions
    current_delay = INITIAL_DELAY
    for attempt in range(MAX_ATTEMPTS):
        model_id = model.config.get("model_id") if hasattr(model, "config") else None
        model_invoke_span = tracer.start_model_invoke_span(
            messages=messages,
            parent_span=cycle_span,
            model_id=model_id,
        )

        try:
            # TODO: To maintain backwards compatability, we need to combine the stream event with kwargs before yielding
            #       to the callback handler. This will be revisited when migrating to strongly typed events.
            async for event in stream_messages(model, system_prompt, messages, tool_config):
                if "callback" in event:
                    yield {"callback": {**event["callback"], **(kwargs if "delta" in event["callback"] else {})}}

            stop_reason, message, usage, metrics = event["stop"]
            kwargs.setdefault("request_state", {})

            if model_invoke_span:
                tracer.end_model_invoke_span(model_invoke_span, message, usage, stop_reason)
            break  # Success! Break out of retry loop

        except ContextWindowOverflowException as e:
            if model_invoke_span:
                tracer.end_span_with_error(model_invoke_span, str(e), e)
            raise e

        except ModelThrottledException as e:
            if model_invoke_span:
                tracer.end_span_with_error(model_invoke_span, str(e), e)

            if attempt + 1 == MAX_ATTEMPTS:
                yield {"callback": {"force_stop": True, "force_stop_reason": str(e)}}
                raise e

            logger.debug(
                "retry_delay_seconds=<%s>, max_attempts=<%s>, current_attempt=<%s> "
                "| throttling exception encountered "
                "| delaying before next retry",
                current_delay,
                MAX_ATTEMPTS,
                attempt + 1,
            )
            time.sleep(current_delay)
            current_delay = min(current_delay * 2, MAX_DELAY)

            yield {"callback": {"event_loop_throttled_delay": current_delay, **kwargs}}

        except Exception as e:
            if model_invoke_span:
                tracer.end_span_with_error(model_invoke_span, str(e), e)
            raise e

    try:
        # Add message in trace and mark the end of the stream messages trace
        stream_trace.add_message(message)
        stream_trace.end()

        # Add the response message to the conversation
        messages.append(message)
        yield {"callback": {"message": message}}

        # Update metrics
        event_loop_metrics.update_usage(usage)
        event_loop_metrics.update_metrics(metrics)

        # If the model is requesting to use tools
        if stop_reason == "tool_use":
            if not tool_handler:
                raise EventLoopException(
                    Exception("Model requested tool use but no tool handler provided"),
                    kwargs["request_state"],
                )

            if tool_config is None:
                raise EventLoopException(
                    Exception("Model requested tool use but no tool config provided"),
                    kwargs["request_state"],
                )

            # Handle tool execution
            events = _handle_tool_execution(
                stop_reason,
                message,
                model,
                system_prompt,
                messages,
                tool_config,
                tool_handler,
                thread_pool,
                event_loop_metrics,
                event_loop_parent_span,
                cycle_trace,
                cycle_span,
                cycle_start_time,
                kwargs,
            )
            async for event in events:
                yield event

            return

        # End the cycle and return results
        event_loop_metrics.end_cycle(cycle_start_time, cycle_trace, attributes)
        if cycle_span:
            tracer.end_event_loop_cycle_span(
                span=cycle_span,
                message=message,
            )
    except EventLoopException as e:
        if cycle_span:
            tracer.end_span_with_error(cycle_span, str(e), e)

        # Don't yield or log the exception - we already did it when we
        # raised the exception and we don't need that duplication.
        raise
    except ContextWindowOverflowException as e:
        if cycle_span:
            tracer.end_span_with_error(cycle_span, str(e), e)
        raise e
    except Exception as e:
        if cycle_span:
            tracer.end_span_with_error(cycle_span, str(e), e)

        # Handle any other exceptions
        yield {"callback": {"force_stop": True, "force_stop_reason": str(e)}}
        logger.exception("cycle failed")
        raise EventLoopException(e, kwargs["request_state"]) from e

    yield {"stop": (stop_reason, message, event_loop_metrics, kwargs["request_state"])}

recurse_event_loop(model, system_prompt, messages, tool_config, tool_handler, thread_pool, event_loop_metrics, event_loop_parent_span, kwargs) async

Make a recursive call to event_loop_cycle with the current state.

This function is used when the event loop needs to continue processing after tool execution.

Parameters:

Name Type Description Default
model Model

Provider for running model inference

required
system_prompt Optional[str]

System prompt instructions for the model

required
messages Messages

Conversation history messages

required
tool_config Optional[ToolConfig]

Configuration for available tools

required
tool_handler Optional[ToolHandler]

Handler for tool execution

required
thread_pool Optional[ThreadPoolExecutor]

Optional thread pool for parallel tool execution.

required
event_loop_metrics EventLoopMetrics

Metrics tracking object for the event loop.

required
event_loop_parent_span Optional[Span]

Span for the parent of this event loop.

required
kwargs dict[str, Any]

Arguments to pass through event_loop_cycle

required

Yields:

Type Description
AsyncGenerator[dict[str, Any], None]

Results from event_loop_cycle where the last result contains:

  • StopReason: Reason the model stopped generating
  • Message: The generated message from the model
  • EventLoopMetrics: Updated metrics for the event loop
  • Any: Updated request state
Source code in strands/event_loop/event_loop.py
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
async def recurse_event_loop(
    model: Model,
    system_prompt: Optional[str],
    messages: Messages,
    tool_config: Optional[ToolConfig],
    tool_handler: Optional[ToolHandler],
    thread_pool: Optional[ThreadPoolExecutor],
    event_loop_metrics: EventLoopMetrics,
    event_loop_parent_span: Optional[trace.Span],
    kwargs: dict[str, Any],
) -> AsyncGenerator[dict[str, Any], None]:
    """Make a recursive call to event_loop_cycle with the current state.

    This function is used when the event loop needs to continue processing after tool execution.

    Args:
        model: Provider for running model inference
        system_prompt: System prompt instructions for the model
        messages: Conversation history messages
        tool_config: Configuration for available tools
        tool_handler: Handler for tool execution
        thread_pool: Optional thread pool for parallel tool execution.
        event_loop_metrics: Metrics tracking object for the event loop.
        event_loop_parent_span: Span for the parent of this event loop.
        kwargs: Arguments to pass through event_loop_cycle


    Yields:
        Results from event_loop_cycle where the last result contains:

            - StopReason: Reason the model stopped generating
            - Message: The generated message from the model
            - EventLoopMetrics: Updated metrics for the event loop
            - Any: Updated request state
    """
    cycle_trace = kwargs["event_loop_cycle_trace"]

    # Recursive call trace
    recursive_trace = Trace("Recursive call", parent_id=cycle_trace.id)
    cycle_trace.add_child(recursive_trace)

    yield {"callback": {"start": True}}

    events = event_loop_cycle(
        model=model,
        system_prompt=system_prompt,
        messages=messages,
        tool_config=tool_config,
        tool_handler=tool_handler,
        thread_pool=thread_pool,
        event_loop_metrics=event_loop_metrics,
        event_loop_parent_span=event_loop_parent_span,
        kwargs=kwargs,
    )
    async for event in events:
        yield event

    recursive_trace.end()

strands.event_loop.streaming

Utilities for handling streaming responses from language models.

extract_usage_metrics(event)

Extracts usage metrics from the metadata chunk.

Parameters:

Name Type Description Default
event MetadataEvent

metadata.

required

Returns:

Type Description
tuple[Usage, Metrics]

The extracted usage metrics and latency.

Source code in strands/event_loop/streaming.py
239
240
241
242
243
244
245
246
247
248
249
250
251
def extract_usage_metrics(event: MetadataEvent) -> tuple[Usage, Metrics]:
    """Extracts usage metrics from the metadata chunk.

    Args:
        event: metadata.

    Returns:
        The extracted usage metrics and latency.
    """
    usage = Usage(**event["usage"])
    metrics = Metrics(**event["metrics"])

    return usage, metrics

handle_content_block_delta(event, state)

Handles content block delta updates by appending text, tool input, or reasoning content to the state.

Parameters:

Name Type Description Default
event ContentBlockDeltaEvent

Delta event.

required
state dict[str, Any]

The current state of message processing.

required

Returns:

Type Description
tuple[dict[str, Any], dict[str, Any]]

Updated state with appended text or tool input.

Source code in strands/event_loop/streaming.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def handle_content_block_delta(
    event: ContentBlockDeltaEvent, state: dict[str, Any]
) -> tuple[dict[str, Any], dict[str, Any]]:
    """Handles content block delta updates by appending text, tool input, or reasoning content to the state.

    Args:
        event: Delta event.
        state: The current state of message processing.

    Returns:
        Updated state with appended text or tool input.
    """
    delta_content = event["delta"]

    callback_event = {}

    if "toolUse" in delta_content:
        if "input" not in state["current_tool_use"]:
            state["current_tool_use"]["input"] = ""

        state["current_tool_use"]["input"] += delta_content["toolUse"]["input"]
        callback_event["callback"] = {"delta": delta_content, "current_tool_use": state["current_tool_use"]}

    elif "text" in delta_content:
        state["text"] += delta_content["text"]
        callback_event["callback"] = {"data": delta_content["text"], "delta": delta_content}

    elif "reasoningContent" in delta_content:
        if "text" in delta_content["reasoningContent"]:
            if "reasoningText" not in state:
                state["reasoningText"] = ""

            state["reasoningText"] += delta_content["reasoningContent"]["text"]
            callback_event["callback"] = {
                "reasoningText": delta_content["reasoningContent"]["text"],
                "delta": delta_content,
                "reasoning": True,
            }

        elif "signature" in delta_content["reasoningContent"]:
            if "signature" not in state:
                state["signature"] = ""

            state["signature"] += delta_content["reasoningContent"]["signature"]
            callback_event["callback"] = {
                "reasoning_signature": delta_content["reasoningContent"]["signature"],
                "delta": delta_content,
                "reasoning": True,
            }

    return state, callback_event

handle_content_block_start(event)

Handles the start of a content block by extracting tool usage information if any.

Parameters:

Name Type Description Default
event ContentBlockStartEvent

Start event.

required

Returns:

Type Description
dict[str, Any]

Dictionary with tool use id and name if tool use request, empty dictionary otherwise.

Source code in strands/event_loop/streaming.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def handle_content_block_start(event: ContentBlockStartEvent) -> dict[str, Any]:
    """Handles the start of a content block by extracting tool usage information if any.

    Args:
        event: Start event.

    Returns:
        Dictionary with tool use id and name if tool use request, empty dictionary otherwise.
    """
    start: ContentBlockStart = event["start"]
    current_tool_use = {}

    if "toolUse" in start and start["toolUse"]:
        tool_use_data = start["toolUse"]
        current_tool_use["toolUseId"] = tool_use_data["toolUseId"]
        current_tool_use["name"] = tool_use_data["name"]
        current_tool_use["input"] = ""

    return current_tool_use

handle_content_block_stop(state)

Handles the end of a content block by finalizing tool usage, text content, or reasoning content.

Parameters:

Name Type Description Default
state dict[str, Any]

The current state of message processing.

required

Returns:

Type Description
dict[str, Any]

Updated state with finalized content block.

Source code in strands/event_loop/streaming.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
def handle_content_block_stop(state: dict[str, Any]) -> dict[str, Any]:
    """Handles the end of a content block by finalizing tool usage, text content, or reasoning content.

    Args:
        state: The current state of message processing.

    Returns:
        Updated state with finalized content block.
    """
    content: list[ContentBlock] = state["content"]

    current_tool_use = state["current_tool_use"]
    text = state["text"]
    reasoning_text = state["reasoningText"]

    if current_tool_use:
        if "input" not in current_tool_use:
            current_tool_use["input"] = ""

        try:
            current_tool_use["input"] = json.loads(current_tool_use["input"])
        except ValueError:
            current_tool_use["input"] = {}

        tool_use_id = current_tool_use["toolUseId"]
        tool_use_name = current_tool_use["name"]

        tool_use = ToolUse(
            toolUseId=tool_use_id,
            name=tool_use_name,
            input=current_tool_use["input"],
        )
        content.append({"toolUse": tool_use})
        state["current_tool_use"] = {}

    elif text:
        content.append({"text": text})
        state["text"] = ""

    elif reasoning_text:
        content.append(
            {
                "reasoningContent": {
                    "reasoningText": {
                        "text": state["reasoningText"],
                        "signature": state["signature"],
                    }
                }
            }
        )
        state["reasoningText"] = ""

    return state

handle_message_start(event, message)

Handles the start of a message by setting the role in the message dictionary.

Parameters:

Name Type Description Default
event MessageStartEvent

A message start event.

required
message Message

The message dictionary being constructed.

required

Returns:

Type Description
Message

Updated message dictionary with the role set.

Source code in strands/event_loop/streaming.py
69
70
71
72
73
74
75
76
77
78
79
80
def handle_message_start(event: MessageStartEvent, message: Message) -> Message:
    """Handles the start of a message by setting the role in the message dictionary.

    Args:
        event: A message start event.
        message: The message dictionary being constructed.

    Returns:
        Updated message dictionary with the role set.
    """
    message["role"] = event["role"]
    return message

handle_message_stop(event)

Handles the end of a message by returning the stop reason.

Parameters:

Name Type Description Default
event MessageStopEvent

Stop event.

required

Returns:

Type Description
StopReason

The reason for stopping the stream.

Source code in strands/event_loop/streaming.py
212
213
214
215
216
217
218
219
220
221
def handle_message_stop(event: MessageStopEvent) -> StopReason:
    """Handles the end of a message by returning the stop reason.

    Args:
        event: Stop event.

    Returns:
        The reason for stopping the stream.
    """
    return event["stopReason"]

handle_redact_content(event, messages, state)

Handles redacting content from the input or output.

Parameters:

Name Type Description Default
event RedactContentEvent

Redact Content Event.

required
messages Messages

Agent messages.

required
state dict[str, Any]

The current state of message processing.

required
Source code in strands/event_loop/streaming.py
224
225
226
227
228
229
230
231
232
233
234
235
236
def handle_redact_content(event: RedactContentEvent, messages: Messages, state: dict[str, Any]) -> None:
    """Handles redacting content from the input or output.

    Args:
        event: Redact Content Event.
        messages: Agent messages.
        state: The current state of message processing.
    """
    if event.get("redactUserContentMessage") is not None:
        messages[-1]["content"] = [{"text": event["redactUserContentMessage"]}]  # type: ignore

    if event.get("redactAssistantContentMessage") is not None:
        state["message"]["content"] = [{"text": event["redactAssistantContentMessage"]}]

process_stream(chunks, messages) async

Processes the response stream from the API, constructing the final message and extracting usage metrics.

Parameters:

Name Type Description Default
chunks AsyncIterable[StreamEvent]

The chunks of the response stream from the model.

required
messages Messages

The agents messages.

required

Returns:

Type Description
AsyncGenerator[dict[str, Any], None]

The reason for stopping, the constructed message, and the usage metrics.

Source code in strands/event_loop/streaming.py
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
async def process_stream(
    chunks: AsyncIterable[StreamEvent],
    messages: Messages,
) -> AsyncGenerator[dict[str, Any], None]:
    """Processes the response stream from the API, constructing the final message and extracting usage metrics.

    Args:
        chunks: The chunks of the response stream from the model.
        messages: The agents messages.

    Returns:
        The reason for stopping, the constructed message, and the usage metrics.
    """
    stop_reason: StopReason = "end_turn"

    state: dict[str, Any] = {
        "message": {"role": "assistant", "content": []},
        "text": "",
        "current_tool_use": {},
        "reasoningText": "",
        "signature": "",
    }
    state["content"] = state["message"]["content"]

    usage: Usage = Usage(inputTokens=0, outputTokens=0, totalTokens=0)
    metrics: Metrics = Metrics(latencyMs=0)

    async for chunk in chunks:
        yield {"callback": {"event": chunk}}

        if "messageStart" in chunk:
            state["message"] = handle_message_start(chunk["messageStart"], state["message"])
        elif "contentBlockStart" in chunk:
            state["current_tool_use"] = handle_content_block_start(chunk["contentBlockStart"])
        elif "contentBlockDelta" in chunk:
            state, callback_event = handle_content_block_delta(chunk["contentBlockDelta"], state)
            yield callback_event
        elif "contentBlockStop" in chunk:
            state = handle_content_block_stop(state)
        elif "messageStop" in chunk:
            stop_reason = handle_message_stop(chunk["messageStop"])
        elif "metadata" in chunk:
            usage, metrics = extract_usage_metrics(chunk["metadata"])
        elif "redactContent" in chunk:
            handle_redact_content(chunk["redactContent"], messages, state)

    yield {"stop": (stop_reason, state["message"], usage, metrics)}

remove_blank_messages_content_text(messages)

Remove or replace blank text in message content.

Parameters:

Name Type Description Default
messages Messages

Conversation messages to update.

required

Returns:

Type Description
Messages

Updated messages.

Source code in strands/event_loop/streaming.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def remove_blank_messages_content_text(messages: Messages) -> Messages:
    """Remove or replace blank text in message content.

    Args:
        messages: Conversation messages to update.

    Returns:
        Updated messages.
    """
    removed_blank_message_content_text = False
    replaced_blank_message_content_text = False

    for message in messages:
        # only modify assistant messages
        if "role" in message and message["role"] != "assistant":
            continue

        if "content" in message:
            content = message["content"]
            has_tool_use = any("toolUse" in item for item in content)

            if has_tool_use:
                # Remove blank 'text' items for assistant messages
                before_len = len(content)
                content[:] = [item for item in content if "text" not in item or item["text"].strip()]
                if not removed_blank_message_content_text and before_len != len(content):
                    removed_blank_message_content_text = True
            else:
                # Replace blank 'text' with '[blank text]' for assistant messages
                for item in content:
                    if "text" in item and not item["text"].strip():
                        replaced_blank_message_content_text = True
                        item["text"] = "[blank text]"

    if removed_blank_message_content_text:
        logger.debug("removed blank message context text")
    if replaced_blank_message_content_text:
        logger.debug("replaced blank message context text")

    return messages

stream_messages(model, system_prompt, messages, tool_config) async

Streams messages to the model and processes the response.

Parameters:

Name Type Description Default
model Model

Model provider.

required
system_prompt Optional[str]

The system prompt to send.

required
messages Messages

List of messages to send.

required
tool_config Optional[ToolConfig]

Configuration for the tools to use.

required

Returns:

Type Description
AsyncGenerator[dict[str, Any], None]

The reason for stopping, the final message, and the usage metrics

Source code in strands/event_loop/streaming.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
async def stream_messages(
    model: Model,
    system_prompt: Optional[str],
    messages: Messages,
    tool_config: Optional[ToolConfig],
) -> AsyncGenerator[dict[str, Any], None]:
    """Streams messages to the model and processes the response.

    Args:
        model: Model provider.
        system_prompt: The system prompt to send.
        messages: List of messages to send.
        tool_config: Configuration for the tools to use.

    Returns:
        The reason for stopping, the final message, and the usage metrics
    """
    logger.debug("model=<%s> | streaming messages", model)

    messages = remove_blank_messages_content_text(messages)
    tool_specs = [tool["toolSpec"] for tool in tool_config.get("tools", [])] or None if tool_config else None

    chunks = model.converse(messages, tool_specs, system_prompt)
    async for event in process_stream(chunks, messages):
        yield event