切換語言為:簡體

使用WebSocket和RabbitMQ實現任務進度推送

  • 爱糖宝
  • 2024-06-24
  • 2116
  • 0
  • 0

1、WebSocket簡述

WebSocket 是 HTML5 新增的 API,是一種基於 TCP 連線的持久化雙向通訊協議。

WebSocket 預設連線埠是80,執行埠是443。

WebSocket 連線地址示例(以 ws 或者 wss 開頭):ws://text.com:8080 或 wss://text.com:8080(加密)。

Springboot專案匯入WebSocket依賴:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>


註冊WebSocket節點

@Slf4j
@Configuration
public class WebSocketConfig { 
    /**
     * ServerEndpointExporter 作用
     * 這個Bean會自動註冊使用@ServerEndpoint註解宣告的websocket endpoint
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        log.info("serverEndpointExporter被注入了");
        return new ServerEndpointExporter();
    }
}

2、RabbitMQ簡述

RabbitMQ是一款開源的訊息中介軟體,實現了高階訊息佇列協議(AMQP),提供了可靠的訊息傳遞機制,用於在分散式系統中傳遞和儲存訊息。

Springboot專案匯入RabbitMQ依賴:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>


在application.yml檔案中配置資訊:

spring:
  rabbitmq:
    host: 172.16.10.XXX
    port: 5672
    virtual-host: /
    username: XXX
    password: XXXXXX


生產者生產訊息,並向訊息佇列傳送訊息簡單示例,這裏以介面的形式呈現。

若要生產進度訊息,還需要再任務模型裡進行設計,或按分頁計算進度,或預估計算時間

@Slf4j
@RestController
@RequestMapping("/inform")
public class MQProducerController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    ObjectMapper mapper = new ObjectMapper();

    @RequestMapping("/test")
    public Result<String> index(String msgType, String userId, String sessionId, Float result) throws JsonProcessingException {
        AnalysisTaskProgressMessage<Float> progressMsg = new AnalysisTaskProgressMessage<>(msgType, userId, sessionId, result);

        String progressMsgJson = mapper.writeValueAsString(progressMsg);
        rabbitTemplate.convertAndSend("simple.queue", progressMsgJson);
        log.info(progressMsgJson + "訊息傳送成功!");
        return Result.OK("訊息傳送成功!");
    }
}


3、註冊WebSocket端點

@Slf4j
@ServerEndpoint(value = "/webSocket/analysisTask/{userId}", encoders = {BaseModelEncoder.class})
@Component
public class AnalysisTaskWSSvr {
    /**
     * concurrent包的執行緒安全Set,用來存放每個客戶端對應的MyWebSocket物件。
     */
    private static ConcurrentHashMap<AnalysisTaskCompositeId, AnalysisTaskWSSvr> webSocketMap = new ConcurrentHashMap<>();

    /**
     * 記錄當前線上使用者的ID
     */
    private static Set<String> onlineUserSet = new CopyOnWriteArraySet<>();

    /**
     * 與某個客戶端的連線會話,需要透過它來給客戶端傳送資料
     */
    private Session session;

    /**
     * 接收 AnalysisTaskCompositeId
     */
    private AnalysisTaskCompositeId analysisTaskCompositeId;

    /**
     * 連線建立成功呼叫的方法
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("userId") String userId, @PathParam("requestId") String requestId) {
        // 設定長連線時間
        // session.setMaxIdleTimeout(3600000);
        this.session = session;
        AnalysisTaskCompositeId analysisTaskId = new AnalysisTaskCompositeId(userId, this.session.getId());
        this.analysisTaskCompositeId = analysisTaskId;
        if (webSocketMap.containsKey(analysisTaskId)) {
            webSocketMap.remove(analysisTaskId);
            webSocketMap.put(analysisTaskId, this);
        } else {
            // 加入set中
            webSocketMap.put(analysisTaskId, this);
            addOnlineCount(analysisTaskId.getUserId());
        }
        try {
            AnalysisTaskWSSvr.sendSuccessMessage(analysisTaskId);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        log.info("使用者連線:" + userId + ",當前線上人數為:" + getOnlineCount());
    }

    /**
     * 連線關閉呼叫的方法
     */
    @OnClose
    public void onClose() {
        if (webSocketMap.containsKey(analysisTaskCompositeId)) {
            webSocketMap.remove(analysisTaskCompositeId);
            //從set中刪除
            subOnlineCount(analysisTaskCompositeId.getUserId());
        }
        log.info("使用者退出:" + analysisTaskCompositeId.getUserId() + ",當前線上人數為:" + getOnlineCount());
    }

    /**
     * 收到客戶端訊息後呼叫的方法
     *
     * @param message 客戶端傳送過來的訊息
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("使用者訊息:" + analysisTaskCompositeId.getUserId() + ", 報文:" + message);
        if (StringUtils.isEmpty(message)) return;
        // TODO: 接收資訊,呼叫相關服務
        
    }

    /**
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.warn("使用者錯誤:" + this.analysisTaskCompositeId.getUserId() + ",原因:" + error.getMessage());
        error.printStackTrace();
    }

    public static void sendSuccessMessage(AnalysisTaskCompositeId compositeId) throws IOException {
        AnalysisTaskWSSvr webSocketServer = webSocketMap.get(compositeId);
        webSocketServer.session.getBasicRemote().sendText(String.valueOf(compositeId));
    }

    public static int getOnlineCount() {
        return onlineUserSet.size();
    }

    public static void addOnlineCount(String userId) {
        if (!onlineUserSet.contains(userId)) {
            onlineUserSet.add(userId);
        }
    }

    public static void subOnlineCount(String userId) {
        if (onlineUserSet.contains(userId)) {
            onlineUserSet.remove(userId);
        }
    }
}


4、監聽佇列,消費訊息

構建訊息監聽器,同時呼叫相關功能處理訊息,這裏有多種方式可以選擇:

1、採用Direct訂閱模型,不同的訊息被不同的佇列進行消費;

2、採用簡單佇列模式,生產者生產的訊息採用同一個方式進行封裝,並標註好訊息型別,監聽器接收到訊息後會根據訊息型別分發給不同的websocket連線。

@Slf4j
@Configuration
@RequiredArgsConstructor
public class AnalysisTaskProgressListener {
    /**
     * 監聽任務進度狀態
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "analysis.queue1", durable = "true"),
            exchange = @Exchange(name = "analysisTask", type = ExchangeTypes.DIRECT),
            key = "task1"
    ))
    public void listenTaskProgress(String msg) {
        executeTask(msg);
    }
    
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(String msg) {
        executeTask(msg);
    }
    
    ```
    private static void executeTask(String msg) {
        log.info("監聽到訊息" + msg);
        JSONObject jsonMsg = JSON.parseObject(msg);
        // 對訊息型別做判斷
        AnalysisTaskWSSvr.sendProgressMessage(
                jsonMsg.getString("userId"),
                jsonMsg.getString("sessionId"),
                jsonMsg.getString("taskId"),
                jsonMsg.getString("taskType"),
                jsonMsg.getFloatValue("progress")
        );
    }
}


5、使用vue3構建WebSocket連線

這裏使用vue3構建前端介面,並建立WebSocket連線

WebSocket連線:ws://localhost:8889/webSocket/analysisTask/#{userId}

export default class SocketService {
    static instance = null;
    static get Instance() {
      if (!this.instance) {
        this.instance = new SocketService();
      }
      return this.instance;
    }
    // 和服務端連線的socket物件
    ws = null;
    // 儲存回撥函式
    callBackMapping = {};
    // 標識是否連線成功
    connected = false;
    // 記錄重試的次數
    sendRetryCount = 0;
    // 重新連線嘗試的次數
    connectRetryCount = 0;
    //  定義連線伺服器的方法
    connect() {
      // 連線伺服器
      if (!window.WebSocket) {
        return console.log('您的瀏覽器不支援WebSocket');
      }
      // let token = $.cookie('123');
      // let token = '4E6EF539AAF119D82AC4C2BC84FBA21F';
      let url = 'ws://localhost:8889/webSocket/analysisTask/001';
      this.ws = new WebSocket(url);
      // 連線成功的事件
      this.ws.onopen = () => {
        console.log('連線服務端成功了');
        
        this.connected = true;
        // 重置重新連線的次數
        this.connectRetryCount = 0;
      };
      // 1.連線服務端失敗
      // 2.當連線成功之後, 伺服器關閉的情況
      this.ws.onclose = () => {
        console.log('連線服務端失敗');
        this.connected = false;
        this.connectRetryCount++;
        setTimeout(() => {
          this.connect();
        }, 500 * this.connectRetryCount);
      };
      // 得到服務端傳送過來的資料
      this.ws.onmessage = msg => {
        console.log(msg.data, '從服務端獲取到了資料');
      };
    }
    // 回撥函式的註冊
    registerCallBack(socketType, callBack) {
      this.callBackMapping[socketType] = callBack;
    }
    // 取消某一個回撥函式
    unRegisterCallBack(socketType) {
      this.callBackMapping[socketType] = null;
    }
    // 傳送資料的方法
    send(data) {
      // 判斷此時此刻有沒有連線成功
      if (this.connected) {
        this.sendRetryCount = 0;
        try {
          this.ws.send(JSON.stringify(data));
        } catch (e) {
          this.ws.send(data);
        }
      } else {
        this.sendRetryCount++;
        setTimeout(() => {
          this.send(data);
        }, this.sendRetryCount * 500);
      }
    }
  }


<script setup>
import SocketService from "../websocket/SocketService.js"
import { reactive } from "vue";

const data = reactive({
  socketServe: SocketService.Instance,
});

const sendData = () => {
  data.socketServe.send({
    "type": "SoilFertilityAnalysisTask",
    "taskId": "002"
  });
  console.log('傳送資料');
};

const buildWS = () => {
  SocketService.Instance.connect();
  data.socketServe = SocketService.Instance;
  data.socketServe.registerCallBack('callback1', data.socketServe);
  setTimeout(() => {
    sendData()
  }, 1000)
}
</script>

<template>
  <div>
    <button @click="buildWS">執行</button>
  </div>
</template>

<style scoped>
.read-the-docs {
  color: #888;
}
</style>


效果展示如下:

使用WebSocket和RabbitMQ實現任務進度推送

0則評論

您的電子郵件等資訊不會被公開,以下所有項目均必填

OK! You can skip this field.