swoole实现一个类似mqtt的websocket订阅主题消息推送服务代码

swoole实现一个类似mqtt的websocket订阅主题消息推送服务代码

<a href='/tag/swoole.html'>swoole</a>实现一个类似mqtt的websocket订阅主题消息推送服务代码

大家知道mqtt是一个基于主题推送的消息发送接收中间件,还有kalfa,那么如何使用swoole基于websocket打造一款与浏览器直接通讯进行主题订阅和消息推送的服务呢,今天我们就来写一个简单的类似kalfa消息订阅推送的功能代码。

一、swoole打造websocket消息订阅推送服务

我们使用swoole来搭建,通过内存全景变量client_fd和topic_list来存储会话句柄及主题列表,这个以后可以存到redis或数据库中。

第一次连接的时候,将会话句柄存在client_fd中,接收到订阅主题的时候,就向topic_list中写入一个主题的数组,内容是会话句柄,当收到消息的时候就向订阅改主题的客户端会话连接句柄群发送消息,具体代码如下:

<?php
$_client_fd = [];

$_topic_list = [];
$server = new Swoole\Websocket\Server("0.0.0.0", 9502);

$server->on('open', function($server, $req) {
    echo "connection open: {$req->fd}\n";
    global $_client_fd;
    $_client_fd[] = $req->fd;
});


$server->on('message', function($server, $frame) {


    $_jsondata = json_decode($frame->data, true);
    if (isset($_jsondata['type'])) {
        if ($_jsondata['type'] == 'heart') {
            $server->push($frame->fd, $frame->data);
            return;
        }

        if ($_jsondata['type'] == 'subs' && isset($_jsondata['name'])) {

            global $_topic_list;
            print_r($_topic_list);
            if (isset($_topic_list[$_jsondata['name']])) {
                if (!in_array($frame->fd, $_topic_list[$_jsondata['name']])) {
                    echo "订阅成功\n";
                    $_topic_list[$_jsondata['name']][] = $frame->fd;
                }
            } else {
                echo "订阅成功\n";
                $_topic_list[$_jsondata['name']][] = $frame->fd;
            }
            print_r($_topic_list);


        }

        if ($_jsondata['type'] == 'unsubs' && isset($_jsondata['name'])) {

            global $_topic_list;
            print_r($_topic_list);
            print_r($frame->fd);
            if (isset($_topic_list[$_jsondata['name']])) {
                foreach ($_topic_list[$_jsondata['name']] as $k => $val) {
                    if ($frame->fd == $val) {
                        unset($_topic_list[$_jsondata['name']][$k]);
                    }
                }

            }
            print_r($_topic_list);


        }

        if ($_jsondata['type'] == 'send' && isset($_jsondata['name'])) {

            global $_topic_list;
            if (isset($_topic_list[$_jsondata['name']])) {
                foreach ($_topic_list[$_jsondata['name']] as $fd) {
                    if ($frame->fd != $fd) {
                        $server->push($fd, $frame->data);
                    }
                }

            }



        }
    }
    //自动解析
    echo "received message: {$frame->data}\n";
    //以二进制流发送数据
    //  $server->push($frame->fd, "hello水电费", SWOOLE_WEBSOCKET_OPCODE_BINARY);

});

$server->on('close', function($server, $fd) {
    echo "connection close: {$fd}\n";
    global $_client_fd;

    foreach ($_client_fd as $k => $val) {
        if ($fd == $val) {
...

点击查看剩余70%

{{collectdata}}

网友评论0