swoole实现一个类似mqtt的websocket订阅主题消息推送服务代码

swoole实现一个类似mqtt的websocket订阅主题消息推送服务代码

<a href='/tag/swoole.html'>swoole</a>实现一个类似mqtt的websocket订阅主题消息推送服务代码

大家知道mqtt是一个基于主题推送的消息发送接收中间件,还有kalfa,那么如何使用swoole基于websocket打造一款与浏览器直接通讯进行主题订阅和消息推送的服务呢,今天我们就来写一个简单的类似kalfa消息订阅推送的功能代码。

一、swoole打造websocket消息订阅推送服务

我们使用swoole来搭建,通过内存全景变量client_fd和topic_list来存储会话句柄及主题列表,这个以后可以存到redis或数据库中。

第一次连接的时候,将会话句柄存在client_fd中,接收到订阅主题的时候,就向topic_list中写入一个主题的数组,内容是会话句柄,当收到消息的时候就向订阅改主题的客户端会话连接句柄群发送消息,具体代码如下:

<?php
$_client_fd = [];

$_topic_list = [];
$server = new Swoole\Websocket\Server("0.0.0.0", 9502);

$server->on('open', function($server, $req) {
    echo "connection open: {$req->fd}\n";
    global $_client_fd;
    $_client_fd[] = $req->fd;
});


$server->on('message', function($server, $frame) {


    $_jsondata = json_decode($frame->data, true);
    if (isset($_jsondata['type'])) {
        if ($_jsondata['type'] == 'heart') {
            $server->push($frame->fd, $frame->data);
            return;
        }

        if ($_jsondata['type'] == 'subs' && isset($_jsondata['name'])) {

            global $_topic_list;
            print_r($_topic_list);
            if (isset($_topic_list[$_jsondata['name']])) {
                if (!in_array($frame->fd, $_topic_list[$_jsondata['name']])) {
                    echo "订阅成功\n";
                    $_topic_list[$_jsondata['name']][] = $frame->fd;
                }
            } else {
                echo "订阅成功\n";
                $_topic_list[$_jsondata['name']][] = $frame->fd;
            }
            print_r($_topic_list);


        }

        if ($_jsondata['type'] == 'unsubs' && isset($_jsondata['name'])) {

            global $_topic_list;
            print_r($_topic_list);
            print_r($frame->fd);
            if (isset($_topic_list[$_jsondata['name']])) {
                foreach ($_topic_list[$_jsondata['name']] as $k => $val) {
                    if ($frame->fd == $val) {
                        unset($_topic_list[$_jsondata['name']][$k]);
                    }
                }

            }
            print_r($_topic_list);


        }

        if ($_jsondata['type'] == 'send' && isset($_jsondata['name'])) {

            global $_topic_list;
            if (isset($_topic_list[$_jsondata['name']])) {
                foreach ($_topic_list[$_jsondata['name']] as $fd) {
                    if ($frame->fd != $fd) {
                        $server->push($fd, $frame->data);
                    }
                }

            }



        }
    }
    //自动解析
    echo "received message: {$frame->data}\n";
    //以二进制流发送数据
    //  $server->push($frame->fd, "hello水电费", SWOOLE_WEBSOCKET_OPCODE_BINARY);

});

$server->on('close', function($server, $fd) {
    echo "connection close: {$fd}\n";
    global $_client_fd;

    foreach ($_client_fd as $k => $val) {
        if ($fd == $val) {
            unset($_client_fd[$k]);
        }
    }



    print_r($_client_fd);
    global $_topic_list;
    foreach ($_topic_list as &$fds) {

        foreach ($fds as $k => $val) {
            if ($fd == $val) {
                unset($fds[$k]);
            }
        }


    }
    print_r($_topic_list);

});

$server->start();

二、html端的websocket订阅代码

html客户端通过websocket连接服务端,并且手动订阅自己感兴趣的主题,还可以取消订阅,最后服务器端就会根据客户端订阅的主题推送消息,具体代码如下:

<!DOCTYPE html>
<html>

<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width,initial-scale=1.0,maximum=1.0,minimum=1.0,user-scalable=0" />
    <script type="text/javascript" src="//repo.bfw.wiki/bfwrepo/js/jquery-3.2.1.min.js"></script>
    <script type="text/javascript">
        $(function() {
    	var lockReconnect = false;//避免重复连接
    	var ws = null; //WebSocket的引用
    	var wsUrl = "web.debug.only.bfw.wiki:9502"; //这个要与后端提供的相同
    	//创建WebSocket连接,如果不确定浏览器是否支持,可以使用socket.js做连接
    	function createWebSocket(url){
    		 try {
    			 if ('WebSocket' in window) {
    		        ws = new WebSocket("ws://" + url + "/socketServer");
    		     }
    	         initEventHandle();
    	     } catch (e) {
    	         reconnect(wsUrl);
    	     }
    	}
    	
    	function reconnect(url) {
            if(lockReconnect) return;
            lockReconnect = true;
            //没连接上会一直重连,设置延迟避免请求过多
            setTimeout(function () {
                createWebSocket(wsUrl);
                console.log("正在重连,当前时间"+new Date())
                lockReconnect = false;
            }, 5000); //这里设置重连间隔(ms)
        }
    	
    	 /*********************初始化开始**********************/
    	function initEventHandle() {
    		// 连接成功建立后响应
    		ws.onopen = function() {
    			console.log("成功连接到" + wsUrl);
    			//心跳检测重置
    			heartCheck.reset().start();
    		}
    		// 收到服务器消息后响应
    		ws.onmessage = function(e) {
    			//如果获取到消息,心跳检测重置
                //拿到任何消息都说明当前连接是正常的
                heartCheck.reset().start();
                //Json转换成Object
                var msg = eval('(' + e.data + ')');
                
                if(msg.type == "heart"){
                    console.log("心跳监测");
                	//忽略心跳的信息,因为只要有消息进来,断线重连就会重置不会触发
                }else{
                    console.log(msg);
                	//处理消息的业务逻辑
                }
            }
    
    		// 连接关闭后响应
    		ws.onclose = function() {
    			console.log("关闭连接");
    			reconnect(wsUrl);//重连
    		}
    		ws.onerror = function () {
    			reconnect(wsUrl);//重连
    		};
    	} 
    	/***************初始化结束***********************/
    	 //心跳检测
        var heartCheck = {
            timeout: 15000,//毫秒
            timeoutObj: null,
            serverTimeoutObj: null,
            reset: function(){
                clearTimeout(this.timeoutObj);
                clearTimeout(this.serverTimeoutObj);
                return this;
            },
            start: function(){
                var self = this;
                this.timeoutObj = setTimeout(function(){
                    //这里发送一个心跳,后端收到后,返回一个心跳消息,
                    //onmessage拿到返回的心跳就说明连接正常
                    ws.send(JSON.stringify({
            'type':"heart",
            
          }));
                    console.log("HeartBeat");
                    self.serverTimeoutObj = setTimeout(function(){//如果超过一定时间还没重置,说明后端主动断开了
                        ws.close();//如果onclose会执行reconnect,我们执行ws.close()就行了.如果直接执行reconnect 会触发onclose导致重连两次
                    }, self.timeout)
                }, this.timeout)
            }
        }
    
    // 发送字符串消息
    
    	$("#subs").click(function() {
    	    
    	   
    		if (ws.readyState ==1) {
    			//自定义消息串,让后端接收
    			ws.send( JSON.stringify({
            'type':"subs","name":"test"
            
          }));
    		}else{
    			alert("当前连接超时,请刷新重试!");
    		}
    		return false;
    	});
    	$("#unsubs").click(function() {
    	    
    	   
    		if (ws.readyState ==1) {
    			//自定义消息串,让后端接收
    			ws.send( JSON.stringify({
            'type':"unsubs","name":"test"
            
          }));
    		}else{
    			alert("当前连接超时,请刷新重试!");
    		}
    		return false;
    	});
    	
    		$("#send").click(function() {
    	    
    	   
    		if (ws.readyState ==1) {
    			//自定义消息串,让后端接收
    			ws.send( JSON.stringify({
            'type':"send","name":"test","data":"ddddddddd"
            
          }));
    		}else{
    			alert("当前连接超时,请刷新重试!");
    		}
    		return false;
    	});
    	// 强制退出
    	window.onunload = function() {
    		ws.close();
    	}
    	
    	
    	createWebSocket(wsUrl);/**启动连接**/
    });
    </script>

</head>

<body>
    <button id="send">
        发送数据
    </button>
    <button id="subs">
        订阅主题
    </button>
    <button id="unsubs">
        取消订阅
    </button>
</body>

</html>

三、应用场景

可以用在及时通讯及群聊应用里,方便服务端更便捷的推送消息。

{{collectdata}}

网友评论0