TG客服

Google Cloud Pub/Sub如何配置消息过滤,只接收感兴趣的消息

⏱️2026-04-06 09:00 👁️3
```html

Google Cloud Pub/Sub 消息过滤配置 📢

想要只接收感兴趣的消息吗?Pub/Sub 消息过滤可以帮到你!🎉 让我们一起来看看如何配置吧!

1. 理解过滤器的概念 🤔

过滤器是附加在订阅上的表达式。当发布到 Topic 的消息的属性与过滤器表达式匹配时,订阅才会接收该消息。

2. 创建 Topic 和 Subscription ☁️

首先,你需要一个 Topic 和至少一个 Subscription。你可以使用 Google Cloud Console, gcloud CLI, 或者客户端库来创建它们。

使用 gcloud CLI 创建 Topic:

gcloud pubsub topics create your-topic-name

使用 gcloud CLI 创建 Subscription:

gcloud pubsub subscriptions create your-subscription-name --topic=your-topic-name

3. 设置消息属性 🏷️

在发布消息时,你需要设置消息的属性(Attributes)。属性是键值对,用于过滤消息。

例如,你可以设置一个名为 "event_type" 的属性,值为 "order_created""user_registered"

发布带有属性的消息 (使用 gcloud CLI):

gcloud pubsub topics publish your-topic-name --message="Hello, world!" --attribute="event_type=order_created"

4. 配置 Subscription 过滤器 ⚙️

现在,我们可以配置 Subscription 的过滤器,只接收具有特定属性的消息。

过滤器使用 Common Expression Language (CEL) 编写。

示例 1: 接收 event_type"order_created" 的消息

过滤器表达式: attributes.event_type = "order_created"

示例 2: 接收 price 大于 100 的消息

过滤器表达式: attributes.price > 100

示例 3: 接收 region"US""EU" 的消息

过滤器表达式: attributes.region == "US" || attributes.region == "EU"

使用 gcloud CLI 更新 Subscription 并添加过滤器:

gcloud pubsub subscriptions update your-subscription-name --filter='attributes.event_type = "order_created"'

5. 测试过滤器 🧪

发布一些消息,确保只有符合过滤器条件的消息被 Subscription 接收。

你可以使用 Google Cloud Console 或客户端库来拉取 Subscription 中的消息。

6. CEL 语法注意事项 ⚠️

* 属性名称区分大小写。 * 字符串需要用双引号括起来。 * 可以使用逻辑运算符 && (AND), || (OR), 和 ! (NOT)。 * 可以使用比较运算符 ==, !=, >, >=, <, <=。 * 可以使用 in 运算符检查属性值是否在列表中 (例如: attributes.color in ["red", "blue", "green"])。 * 可以使用 has 运算符检查属性是否存在 (例如: has(attributes.my_attribute))。

7. 更多高级用法 🚀

* 过滤消息正文: 虽然不推荐,但你可以使用 message.data 来过滤消息正文(需要先进行 Base64 解码)。 * 使用函数: CEL 支持一些内置函数,例如字符串处理函数。 * 复杂表达式: 可以组合多个条件来创建更复杂的过滤器。

8. 注意事项和最佳实践 💡

* 性能: 复杂的过滤器可能会影响性能。尽量保持过滤器简单高效。 * 错误处理: 如果过滤器表达式无效,Subscription 将无法接收任何消息。 * 监控: 监控 Subscription 的消息积压情况,确保消息能够及时处理。

9. 示例代码 (Python) 🐍


from google.cloud import pubsub_v1

# 发布消息
def publish_message(topic_name, message, attributes):
    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(project_id, topic_name)
    data = message.encode("utf-8")
    future = publisher.publish(topic_path, data, **attributes)
    print(f"Published message ID: {future.result()}")

# 拉取消息
def pull_messages(subscription_name):
    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(project_id, subscription_name)
    response = subscriber.pull(
        request={"subscription": subscription_path, "max_messages": 10},
    )

    for received_message in response.received_messages:
        print(f"Received: {received_message.message.data.decode('utf-8')}")
        print(f"Attributes: {received_message.message.attributes}")

        # 确认消息
        subscriber.acknowledge(
            request={"subscription": subscription_path, "ack_ids": [received_message.ack_id]}
        )

project_id = "your-project-id"
topic_name = "your-topic-name"
subscription_name = "your-subscription-name"

# 发布消息示例
publish_message(topic_name, "Order created!", {"event_type": "order_created"})
publish_message(topic_name, "User registered!", {"event_type": "user_registered"})
publish_message(topic_name, "Order updated!", {"event_type": "order_updated", "price": "120"})


# 拉取消息示例 (假设 Subscription 过滤器设置为 attributes.event_type = "order_created")
pull_messages(subscription_name)

希望以上信息能够帮助你配置 Pub/Sub 消息过滤!👍 Good luck! 🍀

```

国际云自助站点

我们提供一站式多云服务管理平台,支持阿里云国际、腾讯云国际、AWS(亚马逊云)和GCP(谷歌云)等主流国际云厂商。无论是新账户申请、余额充值,还是日常管理与监控,平台均可统一操作,大幅提升管理效率。同时支持余额预警、异常通知等推送功能,帮助用户实时掌握各云平台资源状态,防止因欠费导致业务中断。平台还支持多账号集中管理,适用于个人站长、跨境电商、开发团队等多场景使用需求,真正实现高效、安全、灵活的多云资源协同管理。

热门文章
更多>