redis subscribe无法使用


 class Pubsub {
    private $redis;
    public function __construct(){
        $redis = new \Redis();
        $this->redis = $redis;

    public function callbak($redis,$channel_name,$msg){

    public function subChannel($channels){
        function callbak($redis,$channel_name,$msg){

    public function pubMsg($channel,$msg){


 public function pubAction(){
        $redis = new Pubsub();
        $message = '344';
        $channel = 'sub1';
 public function subAction(){
        $redis = new Pubsub();

Redis::subscribe() expects parameter 2 to be a valid callback, class 'Redis' does not have a method 'callbak'

class Redis_Bingfa { function __construct() { $this->Config=new Config(); $redisinfo=$this->Config->get('redis'); $this->redis=new Redis(); $this->redis->connect($redisinfo['host'],$redisinfo['port']); } public function test_subscribe(){ $this->redis->setOption(Redis::OPT_READ_TIMEOUT,-1); ini_set('default_socket_timeout', -1); $this->redis->subscribe(array('redischat'),array($this,'process_msg')); } public function process_msg($redis,$chan,$msg){ echo $chan."|".$msg."\n"; } public function test_publish(){ for($i=0;$i<10;$i++){ $this->redis->publish('redischat','user'.$i.' is bidding,price is:'.(7000+$i*200)); } } } $redisBingfa =new Redis_Bingfa(); $action=$_GET['action']; switch($action){ case 'test_subscribe': $redisBingfa->test_publish(); $redisBingfa->test_subscribe(); break; }


import redis class SendRedis(object): def __init__(self): self.pool = redis.ConnectionPool(host='', port=6379, db=0) = redis.StrictRedis(host='') self.msg_key = 'msg_redis' def send(self): pub = # 开始订阅 pub.subscribe(self.msg_key) # 订阅频道 while True: msg = input("请输入你要发送的消息(over结束):"), msg) # 开始发布消息 if msg == "over": print("停止发送") break ``` # 接收 import redis class SubscribeRedis(object): def __init__(self): self.pool = redis.ConnectionPool(host='', port=6379, db=0) = redis.StrictRedis(connection_pool=self.pool) self.msg_key = 'msg_redis' def subscribe(self): pub = # 开始订阅 pub.subscribe(self.msg_key) # 订阅频道 for item in pub.listen(): # 监听状态:有消息发布了就拿过来 print(item) if item['type'] == 'message': print(item['channel'].decode()) print(item['data']) if item['data'] == 'over': print("%s : 停止发送" % (item['channel'].decode())) pub.unsubscribe(self.msg_key) print("取消了订阅") break elif item['type'] == 'subscribe': print("获取的类型不对: %s" % item['type']) break ``` # main import threading from sendRedis import SendRedis from subscribeRedis import SubscribeRedis class MainRedis(object): def main(self): send = SendRedis() sub = SubscribeRedis() t1 = threading.Thread(target=send.send(), args=()) t2 = threading.Thread(target=sub.subscribe(), args=()) t1.start() t2.start()

Golang Redis PubSub超时

<div class="post-text" itemprop="text"> <p>So far I've been doing this:</p> <pre><code>import ( _redis "" "strconv" "time" ) type Redis struct { Connector *_redis.Client PubSub *_redis.PubSub } var redis *Redis = nil func NewRedis() bool { if redis == nil { redis = new(Redis) redis.Connector = _redis.NewClient(&amp;_redis.Options{ Addr: config.RedisHostname + ":" + strconv.FormatInt(config.RedisPort, 10), Password: "", DB: 0, }) Logger.Log(nil, "Connected to Redis") err := redis.Init() if err != nil { Logger.Fatal(nil, "Cannot setup Redis:", err.Error()) return false } return true } return false } func (this *Redis) Init() error { pubsub, err := this.Connector.Subscribe("test") if err != nil { return err } defer pubsub.Close() this.PubSub = pubsub for { msgi, err := this.PubSub.ReceiveTimeout(100 * time.Millisecond) if err != nil { Logger.Error(nil, "PubSub error:", err.Error()) err = this.PubSub.Ping("") if err != nil { Logger.Error(nil, "PubSub failure:", err.Error()) break } continue } switch msg := msgi.(type) { case *_redis.Message: Logger.Log(nil, "Received", msg.Payload, "on channel", msg.Channel) } } return nil } </code></pre> <p>My Connector is a redis.Client, it's working because I was able to publish messages as well.</p> <p>When I run my program, I get the following error: <code>PubSub error: WSARecv tcp i/o timeout</code></p> <p>Do you have any idea of what I'm doing wrong ? I'm using this package: <a href="" rel="nofollow"></a></p> </div>




<div class="post-text" itemprop="text"> <pre><code>psc := redis.PubSubConn{c} psc.Subscribe("example") func Receive() { for { switch v := psc.Receive().(type) { case redis.Message: fmt.Printf("%s: message: %s ", v.Channel, v.Data) case redis.Subscription: fmt.Printf("%s: %s %d ", v.Channel, v.Kind, v.Count) case error: return v } } } </code></pre> <p>In the above code(taken from <a href="" rel="nofollow">Redigo doc</a>), if connection is lost, all subscriptions are also lost. What will be better way to recover from lost connection and resubscribe.</p> </div>


ehcache 作一级缓存,redis作二级缓存,怎么解决脏数据问题 如果服务器A和服务器B都在本地ehcache缓存中缓存了同一个数据X,若服务器A对X修改为了Y,那么修改后的结果Y会缓存在服务器A的ehcache缓存和redis中心缓存中,但是此时服务器B取到的这个数据会是X还是Y呢?服务器A对X数据进行修改后,redis中心缓存不会主动将修改后的值Y推送到各个服务器吗?


<div class="post-text" itemprop="text"> <p>I have a docker-compose file with several containers in, two of which are supposed to communicate via a Redis DB. Both containers have connections to Reids and I can read/write from both. However I'd like a container to be triggered every time the something is added from the other container. I thought I could accomplish this via Redis Sub/Pub but it when I run the code it never triggers anything, even when I can see I've added new items to the Redis queue. </p> <p>From this I have two questions: 1. Is what I'm looking to do even possible? Can I do publish/subscribe in in two separate docker containers and expect it work as described above? 2. If it is possible, can someone please point me below where I"m going wrong with this tools? </p> <p>This is my function that I add new data to the Redis queue and then publish the data in Docker container 1. </p> <pre><code>func redisShare(key string, value string) { jobsQueue.Set(key, value, 0) //setting in the queue jobsQueue.Publish(key, value) //publishing for the other docker container to notice fmt.Println("added ", key, "with a value of ", value, "to the redis queue") } </code></pre> <p>I'm using this line in my other docker container to subscribe to the Redis queue and listen for changes: <code>redisdb.Subscribe()</code></p> <p>I would expect if something was added to the redis queue it would share the data to the other container and I'd see the message received, but right now Docker Container 2 just runs and then closes. </p> <p>Thanks! </p> </div>


程序设计是这样的:线程A将要发给设备的指令先存到send频道,线程B订阅send频道,将数据保存到缓存中,线程C将缓存中的数据发送给设备终端。线程D接收数据回复。 2年了程序跑的还算没问题。最近,设备也由7万台增长到了10万台。程序现在最多跑1个小时,再发指令给设备,却收不到回复了。 打印了一段发送到redis的send频道的日志。发现这样的情况: 发布数据给send频道的日志 09:09:58 read指令 09:10:06 read指令 09:12:39 read指令 订阅send频道,打印日志 09:12:23 09:12:40 不知道为什么会间隔这么久,求大神指点。我没有积分,可怜可怜我吧。


<div class="post-text" itemprop="text"> <p>I am using the redigo library to try to subscribe to a Redis channel and then handle a published message. How do I handle the case where it errors out? Here's what I came up with. Is this a good way to do it? Is there a better way?</p> <p>Note: this question is specific to redigo, but I think it applies to other places where reconnections need to happen.</p> <pre><code>package main import ( "fmt" "time" "" ) func main() { for { fmt.Println("connecting...") c, err := redis.Dial("tcp", "localhost:6379") if err != nil { fmt.Println("error connecting to redis") time.Sleep(5 * time.Second) continue } psc := redis.PubSubConn{c} psc.Subscribe("example") ReceiveLoop: for { switch v := psc.Receive().(type) { case redis.Message: fmt.Printf("%s: message: %s ", v.Channel, v.Data) case redis.Subscription: fmt.Printf("%s: %s %d ", v.Channel, v.Kind, v.Count) case error: fmt.Println("there was an error") fmt.Println(v) time.Sleep(5 * time.Second) break ReceiveLoop } } } } </code></pre> <p>I just put it in the main() function for the example. It would really run in some goroutine.</p> </div>


<div class="post-text" itemprop="text"> <p>Is there a concept of acknowledgements in Redis Pub/Sub?</p> <p>For example, when using RabbitMQ, I can have two workers running on separate machines and when I publish a message to the queue, only one of the workers will ack/nack it and process the message.</p> <p>However I have discovered with Redis Pub/Sub, both workers will process the message.</p> <p>Consider this simple example, I have this go routine running on two different machines/clients:</p> <pre><code>go func() { for { switch n := pubSubClient.Receive().(type) { case redis.Message: process(n.Data) case redis.Subscription: if n.Count == 0 { return } case error: log.Print(n) } } }() </code></pre> <p>When I publish a message:</p> <pre><code>conn.Do("PUBLISH", "tasks", "task A") </code></pre> <p>Both go routines will receive it and run the process function.</p> <p>Is there a way of achieving similar behaviour to RabbitMQ? E.g. first worker to ack the message will be the only one to receive it and process it.</p> </div>


<div class="post-text" itemprop="text"> <p>I have to subscribe to all the channels of my Redis db and simultaneously read data from another hash in the same db node. The following is the code I have written for this using phpredis:</p> <pre><code>$notif = new Worker; try { // connect redis $notif-&gt;connectRedisServer(); // start the worker processing $notif-&gt;startWorkerProcess(); } catch(Exception $e) { exit('Exception: '.$e-&gt;getMessage()); } class Worker { private $redis; public $recentHash = ''; public $notifHash = ''; public $serverConnectionDetails = { //Redis server connection details }; private $redis; // redis database used for getting recent record of every imei const RECENTDATA_DB = 1; public function connectRedisServer() { if(!empty($this-&gt;redis)) return $this-&gt;redis; // already connected $this-&gt;redis = new Redis(); if(!$this-&gt;redis-&gt;pconnect( $this-&gt;serverConnectionDetails['redis']['host'], $this-&gt;serverConnectionDetails['redis']['port'] )) return 'Redis connection failed.'; if(isset($this-&gt;serverConnectionDetails['redis']['password']) &amp;&amp; !$this-&gt;redis-&gt;auth($this-&gt;serverConnectionDetails['redis']['password']) ) return 'Redis authentication failed.'; return $this-&gt;redis; } public function startWorkerProcess() { $this-&gt;redis-&gt;select(self::RECENTDATA_DB); $this-&gt;redis-&gt;pSubscribe(array('*'), array($this, 'processRedisData')); } public function processRedisData($redis, $pattern, $chan, $msg) { $message = rtrim((string) $msg,","); $tData = json_decode($message, true); $tId = (int) $tData['tID']; echo "Recent Published Data:"; var_dump($tData); $data = $this-&gt;getNotifSettings($tId); if(!empty($data)) { echo "Redis Settings: "; var_dump($trackerSettings); } } public function getNotifSettings($tId) { $data = $this-&gt;redis-&gt;hGet($this-&gt;notifHash, $tId); //This command doesn't return any data even if it exists for the $tId if($data === false) return null; $data = json_decode($data, true); return $data; // Always comes up as null } } </code></pre> <p>The problem here is that once I get subscribed to all the channels on <code>db1</code> in Redis. I don't get any result if I try to run <code>HGET</code>, even though the data for the given key exists in the db. I have put additional comments in the code above to explain where the problem is. Check <code>getNotifSettings()</code> function. </p> <p>Any help will be appreciated.</p> </div>


我们分布式系统中需要实现redis 的session共享功能。现在已经实现。jedis的版本为2.9.0。但是redis还具有发布订阅消息的功能,我的系统中不需要。运维警告说,用redis的发布订阅,会导致占链接不释放,长时间会撑爆redis。我现在想要关闭redis的该功能,该如何配置。 下面是spring的配置redis的部分代码:<bean class=""> <property name="maxInactiveIntervalInSeconds" value="3600" /> <property name="redisNamespace" value="#####"/> </bean> <bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig"> <property name="maxTotal" value="100"/> <property name="maxIdle" value="10"/> <property name="minIdle" value="5"/> <property name="maxWaitMillis" value="3000"/> <property name="testOnBorrow" value="true"/> <property name="testOnReturn" value="true"/> <property name="testWhileIdle" value="true"/> </bean>

redis 发布、订阅者,redis重启怎么重新订阅?

redis 发布、订阅者, redis重启后怎么重新订阅? 或 redis切换怎么重新订阅问题


<div class="post-text" itemprop="text"> <p>This is referring to a previous post made on Stackoverflow asked 3 yrs ago (<a href="">Connecting directly to Redis with (client side) javascript?</a>). Are there any updates on how to call redis pub/sub from Javascript? (Other than the webdis option the chosen answer showed). I have a redis php script that is able to subscribe from php but I am having trouble calling from Javascript and getting the response. I have tried making a ajax get with jquery and it times out. Any updates would be helpful.</p> </div>


<div class="post-text" itemprop="text"> <p>I am making a PubSub using redigo and the connection is created by redis pool.</p> <p>This is the Redis Pool code:</p> <pre><code>package main import ( "os" "os/signal" "syscall" "time" "" ) type IRedis interface { Addr() string Conn() redis.Conn Set(key string, body string) error Close() } type Redis struct { addr string pool *redis.Pool } func NewRedis(addr string) *Redis { r := &amp;Redis{ addr, &amp;redis.Pool{ MaxIdle: 50000, IdleTimeout: 240 * time.Second, Dial: func() (redis.Conn, error) { c, err := redis.Dial("tcp", addr) if err != nil { return nil, err } return c, err }, TestOnBorrow: func(c redis.Conn, t time.Time) error { _, err := c.Do("PING") return err }, }, } r.cleanupHook() return r } func (r *Redis) cleanupHook() { c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) signal.Notify(c, syscall.SIGTERM) signal.Notify(c, syscall.SIGKILL) go func() { &lt;-c r.pool.Close() os.Exit(0) }() } func (r *Redis) Addr() string { return r.addr } func (r *Redis) Conn() redis.Conn { return r.pool.Get() } func (r *Redis) Set(key string, body string) error { p := r.pool.Get() defer p.Close() _, err := p.Do("SET", key, body) return err } func (r *Redis) Close() { r.pool.Close() } </code></pre> <p>This is my PubSub code:</p> <pre><code>package main import ( "log" "" ) type PubSub struct { send chan string conn *redis.PubSubConn } func NewPubSub(s chan string, c *redis.PubSubConn) (*PubSub, error) { err := c.Subscribe("urls") if err != nil { return nil, err } return &amp;PubSub{s, c}, nil } func (ps *PubSub) Start() { for { switch v := ps.conn.Receive().(type) { case redis.Message: data := string(v.Data) ps.send &lt;- data case redis.Subscription: log.Printf("subscription message: %s: %s %d ", v.Channel, v.Kind, v.Count) case error: log.Println("error:", v) return } } } func (ps *PubSub) Close() { ps.conn.Close() } </code></pre> <p>and after I receive 10k messages in my PubSub channel the connection is lost and I receive an redis.Error with the message EOF</p> <p>Any thoughts why this is happen? Even when I run locally the problem happens</p> </div>

Redis Golang客户端会定期丢弃不良的PubSub连接(EOF)

<div class="post-text" itemprop="text"> <h2>What I did:</h2> <p>I'm using the <code>golang</code> Redis library from <code></code>. My client listens on a PubSub channel called 'control'. Whenever a message arrives, I handle it and continue receiving the next message. I listen endlessly and the messages can come often, or sometimes not for days.</p> <h2>What I expect:</h2> <p>I expect the redis channel to stay open endlessly and receive messages as they are sent.</p> <h2>What I experience:</h2> <p>Usually it runs well for days, but every once in a while <code>client.Receive()</code> returns <code>EOF</code> error. After this error, the client no longer receives messages on that channel. Internally, the redis client prints to stdout the following message:</p> <blockquote> <p>redis: 2019/08/29 14:18:57 pubsub.go:151: redis: discarding bad PubSub connection: EOF</p> </blockquote> <p><strong><em>Disclaimer:</em></strong> I am not certain that this error is what causes me to stop receiving messages, it just seems related.</p> <h2>Additional questions:</h2> <p>I'd like to understand why this happens, if this is normal and if reconnecting to the channel via <code>client.Subscribe()</code> whenever I encounter the behaviour is a good remedy, or should I address the root issue, whatever it may be.</p> <h2>The code:</h2> <p>Here is the entire code that handles my client (connect to redis, subscribe to channel, endlessly receive messages):</p> <pre><code>func InitAndListenAsync(log *log.Logger, sseHandler func(string, string) error) error { rootLogger = log.With(zap.String("component", "redis-client")) host := env.RedisHost port := env.RedisPort pass := env.RedisPass addr := fmt.Sprintf("%s:%s", host, port) tlsCfg := &amp;tls.Config{} client = redis.NewClient(&amp;redis.Options{ Addr: addr, Password: pass, TLSConfig: tlsCfg, }) if _, err := client.Ping().Result(); err != nil { return err } go func() { controlSub := client.Subscribe("control") defer controlSub.Close() for { in, err := controlSub.Receive() // *** SOMETIMES RETURNS EOF ERROR *** if err != nil { rootLogger.Error("failed to get feedback", zap.Error(err)) break } switch in.(type) { case *redis.Message: cm := comm.ControlMessageEvent{} payload := []byte(in.(*redis.Message).Payload) if err := json.Unmarshal(payload, &amp;cm); err != nil { rootLogger.Error("failed to parse control message", zap.Error(err)) } else if err := handleIncomingEvent(&amp;cm); err != nil { rootLogger.Error("failed to handle control message", zap.Error(err)) } default: rootLogger.Warn("Received unknown input over REDIS PubSub control channel", zap.Any("received", in)) } } }() return nil } </code></pre> </div>


redis的发布与订阅时,当redis发布了消息,publish了消息,redis缓存会存贮这个消息吗?如果存储了,key是多少,该怎么查询呢? @Test public void test(){ publishMessage.publishMessage("order", "aaaaaaa"); } 2019-12-25 15:26:31.005 INFO 17152 --- [ container-2] : 接收了order频道的消息:"aaaaaaa" 但是缓存里面没有看到这个内容


<div class="post-text" itemprop="text"> <p>I have a gateway server, which can push message to client side by using websocket, A new client connected to my server, I will generate a <code>cid</code> for it. And then I also subscribe a channel, which using <code>cid</code>. If any message publish to this channel, My server will push it to client side. For now, all unit are working fine, but when I try to test with benchmark test by <a href="" rel="nofollow noreferrer">thor</a>, it will crash, I fine the <code>DeliverMessage</code> has some issue, it would never exit, since it has a die-loop. but since redis need to subscribe something, I don't know how to avoid loop.</p> <pre><code>func (h *Hub) DeliverMessage(pool *redis.Pool) { conn := pool.Get() defer conn.Close() var gPubSubConn *redis.PubSubConn gPubSubConn = &amp;redis.PubSubConn{Conn: conn} defer gPubSubConn.Close() for { switch v := gPubSubConn.Receive().(type) { case redis.Message: // fmt.Printf("Channel=%q | Data=%s ", v.Channel, string(v.Data)) h.Push(string(v.Data)) case redis.Subscription: fmt.Printf("Subscription message: %s : %s %d ", v.Channel, v.Kind, v.Count) case error: fmt.Println("Error pub/sub, delivery has stopped", v) panic("Error pub/sub") } } } </code></pre> <p>In the main function, I have call the above function as:</p> <pre><code>go h.DeliverMessage(pool) </code></pre> <p>But when I test it with huge connection, it get me some error like:</p> <blockquote> <p>ERR max number of clients reached</p> </blockquote> <p>So, I change the redis pool size by change <code>MaxIdle</code>:</p> <pre><code>func newPool(addr string) *redis.Pool { return &amp;redis.Pool{ MaxIdle: 5000, IdleTimeout: 240 * time.Second, Dial: func() (redis.Conn, error) { return redis.Dial("tcp", addr) }, } } </code></pre> <p>But it still doesn't work, so I wonder to know, if there any good way to kill a goroutine after my websocket disconnected to my server on the below selection:</p> <pre><code>case client := &lt;-h.Unregister: if _, ok := h.Clients[client]; ok { delete(h.Clients, client) delete(h.Connections, client.CID) close(client.Send) if err := gPubSubConn.Unsubscribe(client.CID); err != nil { panic(err) } // TODO kill subscribe goroutine if don't client-side disconnected ... } </code></pre> <p>But How do I identify this goroutine? How can I do it like <code>unix</code> way. <code>kill -9 &lt;PID&gt;</code>?</p> </div>

Redis Pub / Sub使用Python后端和

<div class="post-text" itemprop="text"> <p>I have a PHP code which publishes data to a channel called "MESSAGE_FROM_MARS". The snippet is as follows : </p> <pre><code> function send_data_to_check_spam($feedback) { $d_id=$this-&gt;redis_connect(11); //echo $feedback; //die(); echo "&lt;b style='color:red'&gt;MESSAGE SENT TO SPAM SWATTER&lt;/b&gt;"."&lt;br&gt;"; $d_id-&gt;PUBLISH("MESSAGE_FROM_MARS",$feedback); } </code></pre> <p>There is a server side python listener which receives the published data and the snippet is as follows : </p> <pre><code>r = redis.StrictRedis(host='localhost', port=6379, db=11) def sum(a,b): print a+b def main(): sub = r.pubsub() sub.subscribe('MESSAGE_FROM_MARS') </code></pre> <p>The python code does the processing and publishes the result back. The snippet is as follows : </p> <pre><code>r.publish('SPAM_STATUS',spam) </code></pre> <p>I am trying to get the result using a websocket and the snippet is as follows. : </p> <pre><code>&lt;script&gt; var socket = io.connect(''); console.log(socket); socket.on('SPAM_STATUS', function (data) { alert("here"); console.log(data); //socket.emit('my other event', { my: 'data' }); }); &lt;/script&gt; </code></pre> <p>Everything works like a peach except that the for some reason is not getting the message from the SPAM_STATUS channel. </p> <p>What am I doing wrong? I am relatively new to, so pardon my naivety </p> </div>

