Skip to content

Streaming Tools

流式传输工具

Supported in ADKPython v0.5.0Experimental

Streaming tools allows tools(functions) to stream intermediate results back to agents and agents can respond to those intermediate results. For example, we can use streaming tools to monitor changes of a stock price and have an agent react to it. Another example is we can have an agent monitor a video stream, and when there is changes in video stream, the agent can report the changes.

流式传输工具允许工具(函数)将中间结果流式传输回智能体,智能体可以响应这些中间结果。 例如,我们可以使用流式传输工具来监控股票价格的变化并让智能体对此做出反应。另一个例子是我们可以让智能体监控视频流,当视频流中有变化时,智能体可以报告这些变化。

Info

这仅在流式传输(live)智能体/api 中受支持。

To define a streaming tool, you must adhere to the following:

要定义流式传输工具,您必须遵守以下规定:

  1. Asynchronous Function: The tool must be an async Python function. 异步函数:工具必须是 async Python 函数。
  2. AsyncGenerator Return Type: The function must be typed to return an AsyncGenerator. The first type parameter to AsyncGenerator is the type of data you yield (e.g., str for text messages, or a custom object for structured data). The second type parameter is typically None if the generator doesn't receive values via send(). AsyncGenerator 返回类型:函数必须键入为返回 AsyncGeneratorAsyncGenerator 的第一个类型参数是您 yield 的数据类型(例如,用于文本消息的 str,或用于结构化数据的自定义对象)。如果生成器不通过 send() 接收值,则第二个类型参数通常是 None

We support two types of streaming tools: 我们支持两种类型的流式传输工具:

  • Simple type. This is one type of streaming tools that only take non video/audio streams(the streams that you feed to adk web or adk runner) as input. 简单类型。这是只接受非视频/音频流(您提供给 adk web 或 adk runner 的流)作为输入的流式传输工具类型。
  • Video streaming tools. This only works in video streaming and video stream(the streams that you feed to adk web or adk runner) will be passed into this function. 视频流式传输工具。这仅在视频流式传输中工作,视频流(您提供给 adk web 或 adk runner 的流)将被传递到此函数中。

Now let's define an agent that can monitor stock price changes and monitor video stream changes.

现在让我们定义一个可以监控股票价格变化和监控视频流变化的智能体。

import asyncio
from typing import AsyncGenerator

from google.adk.agents import LiveRequestQueue
from google.adk.agents.llm_agent import Agent
from google.adk.tools.function_tool import FunctionTool
from google.genai import Client
from google.genai import types as genai_types


async def monitor_stock_price(stock_symbol: str) -> AsyncGenerator[str, None]:
    """This function will monitor the price for the given stock_symbol in a continuous, streaming and asynchronous way."""
    """此函数将以连续、流式传输和异步方式监控给定 stock_symbol 的价格。"""
    print(f"Start monitoring stock price for {stock_symbol}!")

    # Let's mock stock price change.
    # 让我们模拟股票价格变化。
    await asyncio.sleep(4)
    price_alert1 = f"the price for {stock_symbol} is 300"
    yield price_alert1
    print(price_alert1)

    await asyncio.sleep(4)
    price_alert1 = f"the price for {stock_symbol} is 400"
    yield price_alert1
    print(price_alert1)

    await asyncio.sleep(20)
    price_alert1 = f"the price for {stock_symbol} is 900"
    yield price_alert1
    print(price_alert1)

    await asyncio.sleep(20)
    price_alert1 = f"the price for {stock_symbol} is 500"
    yield price_alert1
    print(price_alert1)


# for video streaming, `input_stream: LiveRequestQueue` is required and reserved key parameter for ADK to pass video streams in.
# 对于视频流式传输,`input_stream: LiveRequestQueue` 是必需的,并且是 ADK 用于传入视频流的保留键参数。
async def monitor_video_stream(
    input_stream: LiveRequestQueue,
) -> AsyncGenerator[str, None]:
    """Monitor how many people are in the video streams."""
    """监控视频流中有多少人。"""
    print("start monitoring_video_stream!")
    client = Client(vertexai=False)
    prompt_text = (
      "Count the number of people in this image. Just respond with a numeric"
      " number."
    )
    last_count = None
    while True:
        last_valid_req = None
        print("Start monitoring loop")

        # use this loop to pull the latest images and discard the old ones
        # 使用此循环提取最新图像并丢弃旧图像
        while input_stream._queue.qsize() != 0:
          live_req = await input_stream.get()

          if live_req.blob is not None and live_req.blob.mime_type == "image/jpeg":
            last_valid_req = live_req

        # If we found a valid image, process it
        # 如果我们找到了有效图像,则处理它
        if last_valid_req is not None:
          print("Processing the most recent frame from the queue")

          # Create an image part using the blob's data and mime type
          # 使用 blob 的数据和 mime 类型创建图像部分
          image_part = genai_types.Part.from_bytes(
              data=last_valid_req.blob.data, mime_type=last_valid_req.blob.mime_type
          )

          contents = genai_types.Content(
              role="user",
              parts=[image_part, genai_types.Part.from_text(prompt_text)],
          )

          # Call the model to generate content based on the provided image and prompt
          # 调用模型基于提供的图像和提示生成内容
          response = client.models.generate_content(
              model="gemini-2.0-flash-exp",
              contents=contents,
              config=genai_types.GenerateContentConfig(
                  system_instruction=(
                      "You are a helpful video analysis assistant. You can count"
                      " the number of people in this image or video. Just respond"
                      " with a numeric number."
                  )
              ),
          )
          if not last_count:
            last_count = response.candidates[0].content.parts[0].text
          elif last_count != response.candidates[0].content.parts[0].text:
            last_count = response.candidates[0].content.parts[0].text
            yield response
            print("response:", response)

        # Wait before checking for new images
        # 等待之后再检查新图像
        await asyncio.sleep(0.5)


# Use this exact function to help ADK stop your streaming tools when requested.
# for example, if we want to stop `monitor_stock_price`, then the agent will
# invoke this function with stop_streaming(function_name=monitor_stock_price).
# 使用此确切函数来帮助 ADK 在被请求时停止您的流式传输工具。
# 例如,如果我们想要停止 `monitor_stock_price`,则智能体将
# 使用 stop_streaming(function_name=monitor_stock_price) 调用此函数。
def stop_streaming(function_name: str):
  """Stop streaming

  Args:
    function_name: The name of the streaming function to stop.
    """停止流式传输

    参数:
      function_name: 要停止的流式传输函数的名称
  """
  pass


root_agent = Agent(
    model="gemini-2.0-flash-exp",
    name="video_streaming_agent",
    instruction="""
      You are a monitoring agent. You can do video monitoring and stock price monitoring
      using the provided tools/functions.
      When users want to monitor a video stream,
      You can use monitor_video_stream function to do that. When monitor_video_stream
      returns an alert, you should tell the users.
      When users want to monitor a stock price, you can use monitor_stock_price.
      Don't ask too many questions. Don't be too talkative.
      """,
    """,
      您是一个监控智能体您可以使用提供的工具/函数进行视频监控和股票价格监控
      当用户想要监控视频流时,
      您可以使用 monitor_video_stream 函数来执行该操作 monitor_video_stream
      返回警报时,您应该告诉用户
      当用户想要监控股票价格时,您可以使用 monitor_stock_price
      不要问太多问题不要太啰嗦
    """,
    tools=[
        monitor_video_stream,
        monitor_stock_price,
        FunctionTool(stop_streaming),
    ]
)

Here are some sample queries to test: 以下是一些测试用示例查询:

  • Help me monitor the stock price for $XYZ stock. 帮我监控 $XYZ 股票的股票价格。
  • Help me monitor how many people are there in the video stream. 帮我监控视频流中有多少人。