编辑
2023-09-22
后端
00

主题思路

上次简单演示了一下SSE使用,接下来使用SSE来实现一个发布订阅模式,主体逻辑大概如下,服务端负责维护一个主题集合,主题可以由任何人通过POST请求来创建,订阅者发送一个GET请求,包含要订阅的主题名,然后和服务端建立一个SSE连接,开始接收消息,发布者发送一个POST请求,包含主题名和需要发布的消息,服务端收到发布者发布的信息,将这条信息推送给所有订阅了这个主题的人。代码实现也很简单,为了简化实现,我们用gin来实现。

接下来先来定义一下主题

go
type Topic struct { Name string Subscribers map[string]chan<- string }

其中每个订阅者都有一个消息发送通道,消息会先到chan,后面到订阅时候通过轮询队列来发送信息

go
type PubSubServer struct { Topics map[string]*Topic } func NewPubSubServer() *PubSubServer { return &PubSubServer{ Topics: make(map[string]*Topic), } }

服务实例。接下来定义三个路由

go
pubSubServer := NewPubSubServer() router := gin.Default() router.POST("/topic", pubSubServer.createTopicHandler) router.GET("/subto", pubSubServer.subscribeHandler) router.POST("/pushto", pubSubServer.pushToTopicHandler) log.Println("PubSub server started on port 8000") log.Fatal(router.Run(":8000"))

这就是主函数逻辑,在代码中我会尽量使用英文,为了提升英语能力,这三个路由主要是创建主题,订阅主题,发布主题。

接下来是代码主体

go
func (ps *PubSubServer) createTopicHandler(c *gin.Context) { topicName := c.PostForm("TopicName") if topicName == "" { c.JSON(http.StatusBadRequest, gin.H{"error": "TopicName is required"}) return } _, exists := ps.Topics[topicName] if exists { c.JSON(http.StatusBadRequest, gin.H{"error": "Topic already exists"}) return } topic := &Topic{ Name: topicName, Subscribers: make(map[string]chan<- string), } ps.Topics[topicName] = topic c.JSON(http.StatusCreated, gin.H{"message": fmt.Sprintf("Topic created: %s", topicName)}) } func (ps *PubSubServer) subscribeHandler(c *gin.Context) { topicName := c.Query("TopicName") if topicName == "" { c.JSON(http.StatusBadRequest, gin.H{"error": "TopicName is required"}) return } topic, exists := ps.Topics[topicName] if !exists { c.JSON(http.StatusBadRequest, gin.H{"error": "Topic does not exist"}) return } messageChan := make(chan string) topic.Subscribers[c.ClientIP()] = messageChan c.Header("Content-Type", "text/event-stream") c.Header("Cache-Control", "no-cache") c.Header("Connection", "keep-alive") c.Header("Access-Control-Allow-Origin", "*") // Write the response to the HTTP response stream flusher, _ := c.Writer.(http.Flusher) for { select { case message := <-messageChan: // Write message to response stream fmt.Fprintf(c.Writer, "data: %s\n", message) // Refresh the response stream and send data to the client flusher.Flush() case <-c.Writer.CloseNotify(): delete(topic.Subscribers, c.ClientIP()) log.Printf("Subscriber disconnected: %s", c.ClientIP()) return } } } func (ps *PubSubServer) pushToTopicHandler(c *gin.Context) { topicName := c.PostForm("TopicName") if topicName == "" { c.JSON(http.StatusBadRequest, gin.H{"error": "TopicName is required"}) return } topic, exists := ps.Topics[topicName] if !exists { c.JSON(http.StatusBadRequest, gin.H{"error": "Topic does not exist"}) return } message := c.PostForm("Message") if message == "" { c.JSON(http.StatusBadRequest, gin.H{"error": "Message is required"}) return } for _, subscriber := range topic.Subscribers { subscriber <- message } c.JSON(http.StatusOK, gin.H{"message": "Message pushed to topic"}) }

以上代码都很简单,不多讲解,接下来实现一个简单的基于命令行的客户端,

go
package main import ( "bufio" "fmt" "io" "log" "net/http" "net/url" "os" "strings" ) func main() { reader := bufio.NewReader(os.Stdin) for { fmt.Println("Select an action:") fmt.Println("1. Create a topic") fmt.Println("2. Subscribe to a topic") fmt.Println("3. Push a message to a topic") fmt.Println("0. Exit") input, _ := reader.ReadString('\n') input = strings.TrimSpace(input) switch input { case "1": createTopic(reader) case "2": subscribeToTopic(reader) case "3": pushMessageToTopic(reader) case "0": fmt.Println("Exiting...") return default: fmt.Println("Invalid input. Please try again.") } } } func createTopic(reader *bufio.Reader) { fmt.Print("Enter the topic name: ") topicName, _ := reader.ReadString('\n') topicName = strings.TrimSpace(topicName) data := url.Values{} data.Set("TopicName", topicName) resp, err := http.PostForm("http://localhost:8000/topic", data) if err != nil { fmt.Println("Error creating topic:", err) return } defer resp.Body.Close() if resp.StatusCode == http.StatusCreated { fmt.Println("Topic created successfully") } else { fmt.Println("Error creating topic:", resp.Status) } } func subscribeToTopic(reader *bufio.Reader) { // 获取订阅主题 fmt.Print("Enter the topic name: ") topicName, _ := reader.ReadString('\n') topicName = strings.TrimSpace(topicName) // 构建查询参数 queryParams := url.Values{} queryParams.Set("TopicName", topicName) // 创建 SSE 连接 url := "http://localhost:8000/subto?" + queryParams.Encode() req, err := http.NewRequest("GET", url, nil) if err != nil { log.Fatal("Error creating request:", err) } req.Header.Set("Accept", "text/event-stream") client := &http.Client{} fmt.Println("开始做") resp, err := client.Do(req) fmt.Println("做完了") if err != nil { log.Fatal("Error connecting to server:", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { log.Fatal("Failed to subscribe to topic:", resp.Status) } go readEvents(resp.Body) select {} } func readEvents(body io.Reader) { reader := bufio.NewReader(body) for { line, err := reader.ReadString('\n') if err != nil { log.Fatal("Error reading event:", err) } fmt.Println("Received event:", line) } } func pushMessageToTopic(reader *bufio.Reader) { fmt.Print("Enter the topic name: ") topicName, _ := reader.ReadString('\n') topicName = strings.TrimSpace(topicName) fmt.Print("Enter the message to push: ") message, _ := reader.ReadString('\n') message = strings.TrimSpace(message) data := url.Values{} data.Set("TopicName", topicName) data.Set("Message", message)
编辑
2023-09-22
后端
00

HTTP响应状态码404表示"Not Found",指示服务器无法找到请求的资源。

当收到HTTP响应状态码404时,说明请求的URL或资源不存在。这可能是由以下原因引起的:

  1. 错误的URL:请检查您发送的请求中的URL是否正确。确保URL的路径和文件名等信息准确无误。

  2. 资源不存在:请求的资源可能已被移动、删除或重命名。确保您请求的资源确实存在于服务器上。

  3. 访问权限限制:某些服务器可能会限制对特定资源的访问权限。请确保您具有足够的权限来访问所请求的资源。

编辑
2023-09-22
后端
00

HTTP响应状态码405表示"Method Not Allowed",指示服务器不允许使用请求中指定

编辑
2023-09-22
后端
00

HTTP 400 状态码是客户端错误的一种表示,它表示服务器无法理解或处理客户端发送的请求。HTTP

编辑
2023-09-21
后端
00

我们来写一个简单的SSE服务端,SSE全名Server-Send-Event服务端五秒钟向客户端