什么是Kafka?
Kafka是一个分布式发布 - 订阅消息系统和一个强大的队列,可以处理大量的数据,并使您能够将消息从一个端点传递到另一个端点。 Kafka适合离线和在线消息消费。 Kafka消息保留在磁盘上,并在群集内复制以防止数据丢失。 Kafka构建在ZooKeeper同步服务之上。 它与Apache Storm和Spark非常好地集成,用于实时流式数据分析。
优势好处
可靠性 - Kafka是分布式,分区,复制和容错的。
可扩展性 - Kafka消息传递系统轻松缩放,无需停机。
耐用性 - Kafka使用分布式提交日志,这意味着消息会尽可能快地保留在磁盘上,因此它是持久的。
性能 - Kafka对于发布和订阅消息都具有高吞吐量。 即使存储了许多TB的消息,它也保持稳定的性能。
Kafka非常快,并保证零停机和零数据丢失。
使用场景
Kafka可以在许多用例中使用。 其中一些列出如下 -
指标 - Kafka通常用于操作监控数据。 这涉及聚合来自分布式应用程序的统计信息,以产生操作数据的集中馈送。
日志聚合解决方案 - Kafka可用于跨组织从多个服务收集日志,并使它们以标准格式提供给多个服务器。
流处理 - 流行的框架(如Storm和Spark Streaming)从主题中读取数据,对其进行处理,并将处理后的数据写入新主题,供用户和应用程序使用。 Kafka的强耐久性在流处理的上下文中也非常有用。
安装kafka及php扩展
安装 kafka 需要先安装 jdk。
1、 安装java,并设置相关的环境变量
> wget https://download.java.net/openjdk/jdk7u75/ri/openjdk-7u75-b13-linux-x64-18_dec_2014.tar.gz
> tar zxvf openjdk-7u75-b13-linux-x64-18_dec_2014.tar.gz
> mv java-se-7u75-ri/ /opt/
> export JAVA_HOME=/opt/java-se-7u75-ri
> export PATH=$PATH:$JAVA_HOME/bin:$JAVA_HOME/jre/bin
> export CLASSPATH=.:$JAVA_HOME/lib:$JAVA_HOME/lib/tools.jar
#验证安装
> java -verison
openjdk version "1.7.0_75"
OpenJDK Runtime Environment (build 1.7.0_75-b13)
OpenJDK 64-Bit Server VM (build 24.75-b04, mixed mode)
2、安装kafka,这里以0.10.2版本为例
> wget http://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz
> tar zxvf kafka_2.11-0.10.2.0.tgz
> mv kafka_2.11-0.10.2.0/ /opt/kafka
> cd /opt/kafka
#启动zookeeper(注意尾部增加&是为了能够退出命令行执行下一个命令,以免第一个命令停止了,无法执行第二个命令)
> bin/zookeeper-server-start.sh config/zookeeper.properties &
#启动kafka(注意尾部增加&是为了能够退出命令行执行下一个命令,以免第一个命令停止了,无法执行第二个命令)
> bin/kafka-server-start.sh config/server.properties &
#尝试创建一个topic
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
> bin/kafka-topics.sh --list --zookeeper localhost:2181
test
#生产者写入消息
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
#消费者消费消息
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message
配置开机自启动
在 /lib/systemd/system/ 目录下创建 zookeeper服务和kafka服务 的配置文件。
# vim zookeeper.service
zookeeper.service 添加内容:
[Unit] Description=Zookeeper service After=network.target [Service] Type=simple Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin" User=root Group=root ExecStart=/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties ExecStop=/opt/kafka/bin/zookeeper-server-stop.sh Restart=on-failure [Install] WantedBy=multi-user.target# vim kafka.service
[Unit] Description=Apache Kafka server (broker) After=network.target zookeeper.service [Service] Type=simple Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin" User=root Group=root ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties ExecStop=/opt/kafka/bin/kafka-server-stop.sh Restart=on-failure [Install] WantedBy=multi-user.target
3、安装kafka的C操作库
wget https://github.com/edenhill/librdkafka/archive/v1.3.0.tar.gz tar zxvf librdkafka-1.3.0.tar.gz cd librdkafka-1.3.0 ./configure make && make install
4、安装php的kafka扩展 ,这里选择php-rdkafka扩展 https://github.com/arnaud-lb/php-rdkafka
wget https://github.com/arnaud-lb/php-rdkafka/archive/4.0.2.tar.gz tar zxvf php-rdkafka-4.0.2.tar.gz cd php-rdkafka-4.0.2 /usr/local/php/bin/phpize ./configure --with-php-config=/usr/local/php/bin/php-config make && make install
我们看到编译后的rdkafka.so的目录,保存下来
vi /usr/local/php/etc/php.ini
修改php.ini,加入 extension=上面的目录/rdkafka.so
重启php
service php-fpm restart
示例代码
好了,这个时候就可以通过php连接kafka了,示例代码如下:
<?php //消息生产 $rcf = new RdKafka\Conf(); $rcf->set('group.id', 'test'); //topicname $cf = new RdKafka\TopicConf(); $cf->set('offset.store.method', 'broker'); $cf->set('auto.offset.reset', 'smallest'); $rk = new RdKafka\Producer($rcf); $rk->setLogLevel(LOG_DEBUG); $rk->addBrokers("127.0.0.1"); //brokeraddr $topic = $rk->newTopic("test", $cf); //topicname for ($i = 0; $i < 10; $i++) { $topic->produce(0, 0, 'test' . $i); } //消息消费 $rcf = new RdKafka\Conf(); $rcf->set('group.id', 'test'); $rcf->set('broker.version.fallback', '0.8.2'); //brokername,kafkaversion $cf = new RdKafka\TopicConf(); $cf->set('auto.offset.reset', 'smallest'); $cf->set('auto.commit.enable', true); $rk = new RdKafka\Consumer($rcf); $rk->setLogLevel(LOG_DEBUG); $rk->addBrokers("127.0.0.1"); //brokeraddr $topic = $rk->newTopic("test", $cf); //topicname,topicobject $topic->consumeStart(0, 10); //partition,offset $msg = $topic->consume(0, 1000); //partition,timeout var_dump($msg);
网友评论0