主题思路
上次简单演示了一下SSE使用,接下来使用SSE来实现一个发布订阅模式,主体逻辑大概如下,服务端负责维护一个主题集合,主题可以由任何人通过POST请求来创建,订阅者发送一个GET请求,包含要订阅的主题名,然后和服务端建立一个SSE连接,开始接收消息,发布者发送一个POST请求,包含主题名和需要发布的消息,服务端收到发布者发布的信息,将这条信息推送给所有订阅了这个主题的人。代码实现也很简单,为了简化实现,我们用gin来实现。
接下来先来定义一下主题
gotype Topic struct {
Name string
Subscribers map[string]chan<- string
}
其中每个订阅者都有一个消息发送通道,消息会先到chan,后面到订阅时候通过轮询队列来发送信息
gotype PubSubServer struct {
Topics map[string]*Topic
}
func NewPubSubServer() *PubSubServer {
return &PubSubServer{
Topics: make(map[string]*Topic),
}
}
服务实例。接下来定义三个路由
gopubSubServer := NewPubSubServer()
router := gin.Default()
router.POST("/topic", pubSubServer.createTopicHandler)
router.GET("/subto", pubSubServer.subscribeHandler)
router.POST("/pushto", pubSubServer.pushToTopicHandler)
log.Println("PubSub server started on port 8000")
log.Fatal(router.Run(":8000"))
这就是主函数逻辑,在代码中我会尽量使用英文,为了提升英语能力,这三个路由主要是创建主题,订阅主题,发布主题。
接下来是代码主体
gofunc (ps *PubSubServer) createTopicHandler(c *gin.Context) {
topicName := c.PostForm("TopicName")
if topicName == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "TopicName is required"})
return
}
_, exists := ps.Topics[topicName]
if exists {
c.JSON(http.StatusBadRequest, gin.H{"error": "Topic already exists"})
return
}
topic := &Topic{
Name: topicName,
Subscribers: make(map[string]chan<- string),
}
ps.Topics[topicName] = topic
c.JSON(http.StatusCreated, gin.H{"message": fmt.Sprintf("Topic created: %s", topicName)})
}
func (ps *PubSubServer) subscribeHandler(c *gin.Context) {
topicName := c.Query("TopicName")
if topicName == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "TopicName is required"})
return
}
topic, exists := ps.Topics[topicName]
if !exists {
c.JSON(http.StatusBadRequest, gin.H{"error": "Topic does not exist"})
return
}
messageChan := make(chan string)
topic.Subscribers[c.ClientIP()] = messageChan
c.Header("Content-Type", "text/event-stream")
c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive")
c.Header("Access-Control-Allow-Origin", "*")
// Write the response to the HTTP response stream
flusher, _ := c.Writer.(http.Flusher)
for {
select {
case message := <-messageChan:
// Write message to response stream
fmt.Fprintf(c.Writer, "data: %s\n", message)
// Refresh the response stream and send data to the client
flusher.Flush()
case <-c.Writer.CloseNotify():
delete(topic.Subscribers, c.ClientIP())
log.Printf("Subscriber disconnected: %s", c.ClientIP())
return
}
}
}
func (ps *PubSubServer) pushToTopicHandler(c *gin.Context) {
topicName := c.PostForm("TopicName")
if topicName == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "TopicName is required"})
return
}
topic, exists := ps.Topics[topicName]
if !exists {
c.JSON(http.StatusBadRequest, gin.H{"error": "Topic does not exist"})
return
}
message := c.PostForm("Message")
if message == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "Message is required"})
return
}
for _, subscriber := range topic.Subscribers {
subscriber <- message
}
c.JSON(http.StatusOK, gin.H{"message": "Message pushed to topic"})
}
以上代码都很简单,不多讲解,接下来实现一个简单的基于命令行的客户端,
gopackage main
import (
"bufio"
"fmt"
"io"
"log"
"net/http"
"net/url"
"os"
"strings"
)
func main() {
reader := bufio.NewReader(os.Stdin)
for {
fmt.Println("Select an action:")
fmt.Println("1. Create a topic")
fmt.Println("2. Subscribe to a topic")
fmt.Println("3. Push a message to a topic")
fmt.Println("0. Exit")
input, _ := reader.ReadString('\n')
input = strings.TrimSpace(input)
switch input {
case "1":
createTopic(reader)
case "2":
subscribeToTopic(reader)
case "3":
pushMessageToTopic(reader)
case "0":
fmt.Println("Exiting...")
return
default:
fmt.Println("Invalid input. Please try again.")
}
}
}
func createTopic(reader *bufio.Reader) {
fmt.Print("Enter the topic name: ")
topicName, _ := reader.ReadString('\n')
topicName = strings.TrimSpace(topicName)
data := url.Values{}
data.Set("TopicName", topicName)
resp, err := http.PostForm("http://localhost:8000/topic", data)
if err != nil {
fmt.Println("Error creating topic:", err)
return
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusCreated {
fmt.Println("Topic created successfully")
} else {
fmt.Println("Error creating topic:", resp.Status)
}
}
func subscribeToTopic(reader *bufio.Reader) {
// 获取订阅主题
fmt.Print("Enter the topic name: ")
topicName, _ := reader.ReadString('\n')
topicName = strings.TrimSpace(topicName)
// 构建查询参数
queryParams := url.Values{}
queryParams.Set("TopicName", topicName)
// 创建 SSE 连接
url := "http://localhost:8000/subto?" + queryParams.Encode()
req, err := http.NewRequest("GET", url, nil)
if err != nil {
log.Fatal("Error creating request:", err)
}
req.Header.Set("Accept", "text/event-stream")
client := &http.Client{}
fmt.Println("开始做")
resp, err := client.Do(req)
fmt.Println("做完了")
if err != nil {
log.Fatal("Error connecting to server:", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
log.Fatal("Failed to subscribe to topic:", resp.Status)
}
go readEvents(resp.Body)
select {}
}
func readEvents(body io.Reader) {
reader := bufio.NewReader(body)
for {
line, err := reader.ReadString('\n')
if err != nil {
log.Fatal("Error reading event:", err)
}
fmt.Println("Received event:", line)
}
}
func pushMessageToTopic(reader *bufio.Reader) {
fmt.Print("Enter the topic name: ")
topicName, _ := reader.ReadString('\n')
topicName = strings.TrimSpace(topicName)
fmt.Print("Enter the message to push: ")
message, _ := reader.ReadString('\n')
message = strings.TrimSpace(message)
data := url.Values{}
data.Set("TopicName", topicName)
data.Set("Message", message)
HTTP响应状态码404表示"Not Found",指示服务器无法找到请求的资源。
当收到HTTP响应状态码404时,说明请求的URL或资源不存在。这可能是由以下原因引起的:
错误的URL:请检查您发送的请求中的URL是否正确。确保URL的路径和文件名等信息准确无误。
资源不存在:请求的资源可能已被移动、删除或重命名。确保您请求的资源确实存在于服务器上。
访问权限限制:某些服务器可能会限制对特定资源的访问权限。请确保您具有足够的权限来访问所请求的资源。