linux下安装kafka及php扩展记录

linux下安装kafka及php扩展记录


什么是Kafka

Kafka是一个分布式发布 - 订阅消息系统和一个强大的队列,可以处理大量的数据,并使您能够将消息从一个端点传递到另一个端点。 Kafka适合离线和在线消息消费。 Kafka消息保留在磁盘上,并在群集内复制以防止数据丢失。 Kafka构建在ZooKeeper同步服务之上。 它与Apache Storm和Spark非常好地集成,用于实时流式数据分析。

优势好处

可靠性 - Kafka是分布式,分区,复制和容错的。

可扩展性 - Kafka消息传递系统轻松缩放,无需停机。

耐用性 - Kafka使用分布式提交日志,这意味着消息会尽可能快地保留在磁盘上,因此它是持久的。

性能 - Kafka对于发布和订阅消息都具有高吞吐量。 即使存储了许多TB的消息,它也保持稳定的性能。

Kafka非常快,并保证零停机和零数据丢失。

使用场景

Kafka可以在许多用例中使用。 其中一些列出如下 -

指标 - Kafka通常用于操作监控数据。 这涉及聚合来自分布式应用程序的统计信息,以产生操作数据的集中馈送。

日志聚合解决方案 - Kafka可用于跨组织从多个服务收集日志,并使它们以标准格式提供给多个服务器。

流处理 - 流行的框架(如Storm和Spark Streaming)从主题中读取数据,对其进行处理,并将处理后的数据写入新主题,供用户和应用程序使用。 Kafka的强耐久性在流处理的上下文中也非常有用。

安装kafka及php扩展

安装 kafka 需要先安装 jdk。

1、 安装java,并设置相关的环境变量

> wget https://download.java.net/openjdk/jdk7u75/ri/openjdk-7u75-b13-linux-x64-18_dec_2014.tar.gz
> tar zxvf openjdk-7u75-b13-linux-x64-18_dec_2014.tar.gz
> mv java-se-7u75-ri/ /opt/
> export JAVA_HOME=/opt/java-se-7u75-ri
> export PATH=$PATH:$JAVA_HOME/bin:$JAVA_HOME/jre/bin
> export CLASSPATH=.:$JAVA_HOME/lib:$JAVA_HOME/lib/tools.jar

#验证安装

> java -verison
openjdk version "1.7.0_75"
OpenJDK Runtime Environment (build 1.7.0_75-b13)
OpenJDK 64-Bit Server VM (build 24.75-b04, mixed mode)

2、安装kafka,这里以0.10.2版本为例

> wget http://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz
> tar zxvf kafka_2.11-0.10.2.0.tgz
> mv kafka_2.11-0.10.2.0/ /opt/kafka
> cd /opt/kafka

#启动zookeeper(注意尾部增加&是为了能够退出命令行执行下一个命令,以免第一个命令停止了,无法执行第二个命令)
> bin/zookeeper-server-start.sh config/zookeeper.properties &

#启动kafka(注意尾部增加&是为了能够退出命令行执行下一个命令,以免第一个命令停止了,无法执行第二个命令)

> bin/kafka-server-start.sh config/server.properties &

#尝试创建一个topic

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
> bin/kafka-topics.sh --list --zookeeper localhost:2181
test

#生产者写入消息

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

#消费者消费消息

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message

配置开机自启动

在 /lib/systemd/system/ 目录下创建 zookeeper服务和kafka服务 的配置文件。

# vim zookeeper.service

zookeeper.service 添加内容:

[Unit]
Description=Zookeeper service
After=network.target

[Service]
Type=simple
Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
User=root
Group=root
ExecStart=/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties
ExecStop=/opt/kafka/bin/zookeeper-server-stop.sh
Restart=on-failure

[Install]
WantedBy=multi-user.target
# vim kafka.service
kafka.service 添加内容:
[Unit]
Description=Apache Kafka server (broker)
After=network.target zookeeper.service

[Service]
Type=simple
Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
User=root
Group=root
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
Restart=on-failure

[Install]
WantedBy=multi-user.target

注:以上两个文件 根据自己的 jdk 和 kafka 安装目录相应的修改。

刷新配置。

# systemctl daemon-reload

zookeeper、kafka服务加入开机自启。

#systemctl enable zookeeper

#systemctl enable kafka

使用systemctl启动/关闭/重启 zookeeper、kafka服务systemctl start/stop/restart zookeeper/kafka。

注:启动kafka前必须先启动zookeeper 。

# systemctl start zookeeper

# systemctl start kafka

查看状态。

# systemctl status zookeeper

# systemctl status kafka

3、安装kafka的C操作库

wget https://github.com/edenhill/librdkafka/archive/v1.3.0.tar.gz
tar zxvf librdkafka-1.3.0.tar.gz
cd librdkafka-1.3.0
./configure
make && make install

linux下安装kafka及php扩展记录

4、安装php的kafka扩展 ,这里选择php-rdkafka扩展 https://github.com/arnaud-lb/php-rdkafka

wget https://github.com/arnaud-lb/php-rdkafka/archive/4.0.2.tar.gz
tar zxvf php-rdkafka-4.0.2.tar.gz
cd php-rdkafka-4.0.2
/usr/local/php/bin/phpize
./configure --with-php-config=/usr/local/php/bin/php-config
make && make install

linux下安装kafka及php扩展记录

我们看到编译后的rdkafka.so的目录,保存下来

vi /usr/local/php/etc/php.ini

修改php.ini,加入 extension=上面的目录/rdkafka.so

  重启php

service php-fpm restart

示例代码

好了,这个时候就可以通过php连接kafka了,示例代码如下:

<?php
//消息生产
$rcf = new RdKafka\Conf();
$rcf->set('group.id', 'test'); //topicname
$cf = new RdKafka\TopicConf();
$cf->set('offset.store.method', 'broker');
$cf->set('auto.offset.reset', 'smallest');
$rk = new RdKafka\Producer($rcf);
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("127.0.0.1"); //brokeraddr
$topic = $rk->newTopic("test", $cf); //topicname
for ($i = 0; $i < 10; $i++) {
    $topic->produce(0, 0, 'test' . $i);
}
//消息消费
$rcf = new RdKafka\Conf();
$rcf->set('group.id', 'test');
$rcf->set('broker.version.fallback', '0.8.2'); //brokername,kafkaversion
$cf = new RdKafka\TopicConf();
$cf->set('auto.offset.reset', 'smallest');
$cf->set('auto.commit.enable', true);
$rk = new RdKafka\Consumer($rcf);
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("127.0.0.1"); //brokeraddr
$topic = $rk->newTopic("test", $cf); //topicname,topicobject
$topic->consumeStart(0, 10); //partition,offset
$msg = $topic->consume(0, 1000); //partition,timeout
var_dump($msg);

{{collectdata}}

网友评论0