目录
Redis pub/sub(Publish,Subscribe)
Pub/Sub功能(means Publish, Subscribe)即发布及订阅功能。基于事件的系统中,Pub/Sub是目前广泛使用的通信模型,它采用事件作为基本的通信机制,提供大规模系统所要求的松散耦合的交互模式:订阅者(如客户端)以事件订阅的方式表达出它有兴趣接收的一个事件或一类事件;发布者(如服务器)可将订阅者感兴趣的事件随时通知相关订阅者。
Publisher/subscribe 简易模型
Pub/Sub功能(means Publish,Subscribe)即发布及订阅功能")
1. 时间非耦合:发布者和订阅者不必同时在线,它们不必同时参与交互。
2. 空间非耦合:发布者和订阅者不必相互知道对方所在的位置。发布者通过事件服务发布事件,订阅者通过事件服务间接获得事件。发布者和订阅者不需要拥有直接到对方的引用,也不必知道有多少个订阅者或者是发布者参与交互。
3. 同步非耦合:发布者/订阅者是异步模式。发布者可不断地生产事件,而订阅者(通过一个回调)则可异步地得到产生事件的通知。
[](http://photo.blog.sina.com.cn/showpic.html#blogid=62b832910100xok2&url=http://s6.sinaimg.cn/orignal/62b83291tbb66a806a975)
分类:
按照订阅方式分为基于主题(topic-based)、基于内容(content-based)、基于类型(type-based)的pub/sub方式。
总结:
Pub/Sub是可适用于可扩展要求高、松散耦合系统的分布式交互模型。
在抽象层中,它的时间非耦合、空间非耦合和同步非耦合性可允许参与者不依赖另一个而独立操作,具有一定的可扩展性;然而在实现层,可扩展性仍受其他原因的牵制。
例如:1、灵活的订阅要求复杂的过滤和路由算法;
2、高可用性开销(事件侦听、日志重传);
3、消息认可带来的网络流量消耗;
4、庞大的订阅者数据带来的系统开销;
基于事件的Pub/Sub中间件的开发与利用在一定程度上可以提高系统的效率。
Redis-cli连接测试命令行参考_附件__I_
协议测试命令
l 订阅者(redis-cli)连接订阅
/usr/local/bin/redis-cli -h 10.54.37.212 -p 6380
redis-cli充当订阅者订阅(first 与second)两个topic
SUBSCRIBE first second
l 发布者(redis-cli)连接发布
/usr/local/bin/redis-cli -h 10.54.37.212 -p 6380
Pubish first wangbin
Publish second wangbin_second
l 订阅者(redis-cli)信息接收
l 取消订阅UNSUBSCRIBE测试
Redis-cli对取消订阅有bug可以使用telnet 进行测试
取消订阅测试成功;
l 匹配模式订阅PSUBSCRIBE测试
订阅者:
发布者:
publish news.wangbin wangbin_patterncommand
l 匹配模式取消订阅PUNSUBSCRIBE测试
在telnet下测试如下:
PSUBSCRIBE news. wangbin
punsubscribe wangbin*
测试成功!
https://github.com/nicolasff/phpredis pecl扩展包目前只提供了两个接口 publish subscribe
phpredis是c写的php模块
https://github.com/jamm/Memory/blob/master/RedisServer.php
php这个是php是基于redis protocol的fsocketopen链接后操作的类库,提供的接口比较全面;publish可以进入数据,但是subscrbie没有阻塞;
可以在原类包当中修改其加入对阻塞模形的支持;
发布功能:
$redis = new Redis();
$res = $redis->connect($REDIS\_HOSTS\['CACHE'\]\['host'\], $REDIS_HOSTS['CACHE']['port'], 1 );
$res = $redis->publish($key,$value);
定阅功能:
$redis = new Redis();
$res = $redis->pconnect($REDIS\_HOSTS\['CACHE'\]\['host'\], $REDIS_HOSTS['CACHE']['port']);
$res = $redis->subscribe(array($key),array('SinaRedis','subscribe_handler'));
第二个参数为回调方法;
public static function subscribe_handler($redis, $channel, $msg){
print_r($redis);
echo $chan;
echo $msg;
return true;
}
定阅 redis_subscriber.php
SinaRedis::subscribe('wangbin_test');
发布redis_publisher.php
测试收到订阅者收到发布的内容
Php-redis扩展bug
段错误:
无论是connect 还是pcconnect 当超时断后,会报错segmentation fault;
超时设置bug
仍会断开解决方法:ini_set('default_socket_timeout', -1);// it works fine
Web Server端测试
Web端订阅应用
Apache下ob_flush flush后可以输出,但是会漏下最后一条;求解决方案
Nginx下 会报504错误;求解决方法
Nginx 模块与redis直接连接呢?
Php-Fpm有没有什么设置呢
Wget https://github.com/downloads/andymccurdy/redis-py/redis-2.2.4.tar.gz
tar -xvf redis-2.2.4.tar.gz
cd redis-2.2.4
python setup.py intall
安装成功
交互模式下测试:
发布者:
Python
Type "help", "copyright", "credits" or "license" for more information.
> import redis> r = redis.Redis('10.54.37.212',6380);
> r.publish('wangbin_test','this is a information');
59L
订阅者:
[root@hadoop-master1 python]# python
Python 2.4.3 (#1, Sep 3 2009, 15:37:12)
[GCC 4.1.2 20080704 (Red Hat 4.1.2-46)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
> import redis> r = redis.Redis('10.54.37.212',6380);
> r.subscribe('wangbin_test');
> for msg in r.listen(): \
... print msg
{'pattern': None, 'type': 'message', 'channel': 'wangbin_test', 'data': 'this is a information'}
收到json字符串
Publisher.py
Subscribe.py
Wget http://nodejs.org/dist/v0.6.6/node-v0.6.6.tar.gz
Tar –xvf node-v0.6.6.tar.gz
Cd node-v0.6.6
./configure
Make
Make install
Node 安装在/usr/localbin
添加到环境变量里面:
export PATH=/usr/local/bin/:$PATH
安装:npm
curl -O http://npmjs.org/install.sh
sh install.sh
安装成功
Tar 版本过低,升级下
wget http://ftp.gnu.org/gnu/tar/tar-1.26.tar.gz
./configure --prefix=/usr --bindir=/bin --libexecdir=/usr/bin
Make
Make install
Tar –version
tar (GNU tar) 1.26 升级成功
安装socket.io express hiredis redis
/usr/local/bin/npm install socket.io
/usr/local/bin/npm install express
/usr/local/bin/npm install hiredis redis
事件驱动用例:
Publish.js
var redis = require("redis");
try{
var client = redis.createClient(6380,'10.54.37.212');
client.on(
"error",
function(err){
console.log("err"+err);
}
);
client.on('ready',
function(){
client.publish('wangbin_test',"test,i am test");
client.publish('wangbin_test2',"test, i am test2");
client.end();
}
);
}
catch(e){
console.log("err:"+e);
}
Subscribe.js
var redis = require("redis");
try{
var client = redis.createClient(6380,'10.54.37.212');
client.on(
"error",
function(err){
console.log("err"+err);
}
);
client.on('ready',
function(){
client.subscribe('wangbin_test');
client.subscribe('wangbin_test2');
//client.end();
}
);
client.on('subscribe',
function(channel,count){
console.log("channel:" + channel + ", count:"+count);
}
);
client.on('message',
function(channel,message){
console.log("channel:" + channel + ", msg:"+message);
}
);
client.on('unsubscribe',
function(channel,count){
console.log("channel:" + channel + ", count:"+count);
}
);
}
catch(e){
console.log("err:"+e);
}
测试结果:
另外还支持:pmessage, psubscribe, punsubscribe 事件监听;
---------------------------------------------------------------------------------------------------------
Node.js Socket.io redis pub/sub主动推送数据到浏览器前端
另外还支持:pmessage, psubscribe, punsubscribe 事件监听;
Server.js
var app = require('http').createServer(handler)
, io = require('socket.io').listen(app)
, fs = require('fs');
var redis = require('redis');
var redis_client = redis.createClient(6380,'10.54.37.212');
app.listen(3000);
function handler (req, res) {
fs.readFile(__dirname + '/index.html',
function (err, data) {
if (err) {
res.writeHead(500);
return res.end('Error loading index.html');
}
res.writeHead(200);
res.end(data);
});
}
io.sockets.on('connection', function (socket) {
socket.emit('news', { hello: 'world' });
redis_client.subscribe('wangbin_test');
redis_client.on('message',
function(channel,message){
socket.emit('news', message);
console.log("channel:" + channel + ", msg:"+message);
}
);
socket.on('my other event', function (data) {
console.log(data);
});
});
Index.html
<script src="/socket.io/socket.io.js"></script>
<script>
var socket = io.connect('http://10.54.37.212');
socket.on('news', function (data) {
alert(data);
socket.emit('my other event', { my: 'data' });
});
</script>
Redis-cli发送消息:
Publish wangbin_test test
Server.js订阅监控收到消息后通过socket.io主动push到brower
浏览器收到消息
redis-benchmark是redis的读写性能工具
生成测试数据源:
for i in `seq 10000000` ;do echo -ne 'publish wangbin_test "'$i'"\r\n' >> publish.data; done;
发布:
for i in $(seq 10);do (cat publish.data;sleep 100;) | nc 10.54.37.212 6380 ;done;
订阅(模拟多个客户端同时订阅)
(echo -ne 'subscribe wangbin_test\r\n';sleep 10000) | nc 10.54.37.212 6380 | grep "\".*\"" > subscribe_01.txt &
(echo -ne 'subscribe wangbin_test\r\n';sleep 10000) | nc 10.54.37.212 6380 | grep "\".*\"" > subscribe_02.txt &
(echo -ne 'subscribe wangbin_test\r\n';sleep 10000) | nc 10.54.37.212 6380 | grep "\".*\"" > subscribe_03.txt &
(echo -ne 'subscribe wangbin_test\r\n';sleep 10000) | nc 10.54.37.212 6380 | grep "\".*\"" > subscribe_04.txt &
(echo -ne 'subscribe wangbin_test\r\n';sleep 10000) | nc 10.54.37.212 6380 | grep "\".*\"" > subscribe_05.txt &
(echo -ne 'subscribe wangbin_test\r\n';sleep 10000) | nc 10.54.37.212 6380 | grep "\".*\"" > subscribe_06.txt &
性能监控测试
watch -d -n 1 'wc -l subscribe_*.txt '
或者:
for i in $(seq 15); do wc -l subscribe_*.txt > result_$i.txt ;sleep 1; done;
测试结果为:
每秒返回的数据为4-5w/s
其他PECL 扩展[SAM]Simple Asynchronous Messaging
i. Redis-cli命令信息:
-h <hostname> 域名或ip
-p <port> 端口号 (default: 6379)
-s <socket> 例如:/usr/local/bin/redis-cli -s /tmp/redis.sock
-a <password> 密码
-r <repeat> 重复次数
-i <interval> 重复时间隔时间
-n <db> 数据库个数
-x 最后的标准输入
-d <delimiter> 分隔符
--raw 原始格式回复
--latency 采集延迟模式
参考:
[1] http://redis.io/topics/pubsub
[2] http://code.google.com/p/redis/wiki/PublishSubscribe
[3] http://xmpp.org/extensions/xep-0060.html
[4] http://www.php.net/manual/en/book.sam.php
[5]https://github.com/mranney/node_redis重要
[6] http://socket.io/
[7] https://github.com/LearnBoost/socket.io
[8] https://github.com/LearnBoost/socket.io
此文章源自于【http://blog.sina.com.cn/s/blog_62b832910100xok2.html】
Original url: Access
Created at: 2019-03-11 19:17:49
Category: default
Tags: none
未标明原创文章均为采集,版权归作者所有,转载无需和我联系,请注明原出处,南摩阿彌陀佛,知识,不只知道,要得到
最新评论