如何基于RabbitMQ实现优先级队列实现

如何基于RabbitMQ实现优先级队列_百度知道
如何基于RabbitMQ实现优先级队列
提问者采纳
arguments[&rabbitmq_server-3,Program Files&#92,并且这个插件被列在RabbitMQ官方网站中了;rabbitmq&#47,因为RabbitMQ的C#客户端优先级是用byte来传递的:会列出两大版本的插件目录(选择对应目录进入下载;其他注意的地方在装了优先级队列插件的rabbitmq-server实例中。这样:申明队列时需要加入自定义的属性发送消息到rabbitmq时://www..BasicPublish(&plugins(版本号可能会变化)把ez文件拷贝过去.,
}exchange&quot.).Priority = (byte)msg,应用层面上又需要优先级队列:&#47?进入rabbitmq安装目录;;route&quot,只实现了Consumer的优先级处理,因此只定义了3个级别)有2个地方需要改动;RabbitMQ Server&#92:我们先要定义优先级枚举,否则会报错.rabbitmq,默认路径在,lib//string, object&gt,因此需求来了:先定义3个级别的优先级;lib&#47:http。但是:C,设置自定义属性internal static IDictionary& QueueArguments
IDictionary&usr&#47:如何为RabbitMQ加入优先级队列特性,进入plugins目录; arguments=new Dictionary&lt、中;//定义队列优先级为10个级别
/community-plugins,然后运行命令来enable这个插件centos下;/x-max-priority&quot.4&#47,RabbitMQ到目前为止,然后运行列举插件列表命令, object&gt.Serialize2Bytes(msg)):rabbitmq_priority_queue执行,如。地址如下,迫于种种原因;(), SerializerUtility:&#92,将上面这个ez文件拷贝到plugins目录中。 C#代码端需要作出的更改下面看看客户端类库的编写.html插件安装不要立刻下载这个url中的那个链接,所有的Durable队列必须用如上的方式.CreateBasicProperties();string,得知RabbitMQ虽然官方没有支持此特性, QueueArguments),只是为了简化.3、高(其实可以定义很多级别://www,要先根据你想要更新目标的rabbitmq版本再去另外一个地方下载相应插件,官方还没有实现优先级队列,设置x-max-priority属性;
其他类似问题
为您推荐:
优先级的相关知识
等待您来回答
下载知道APP
随时随地咨询
出门在外也不愁今天看啥 热点:
如何基于RabbitMQ实现优先级队列,rabbitmq优先级队列概述
由于种种原因,RabbitMQ到目前为止,官方还没有实现优先级队列,只实现了Consumer的优先级处理。
但是,迫于种种原因,应用层面上又需要优先级队列,因此需求来了:如何为RabbitMQ加入优先级队列特性。
查询资料后,得知RabbitMQ虽然官方没有支持此特性,但是社区已经有相关优先级队列插件了,并且这个插件被列在RabbitMQ官方网站中了。
地址如下:/community-plugins.html
不要立刻下载这个url中的那个链接,要先根据你想要更新目标的rabbitmq版本再去另外一个地方下载相应插件,如:
会列出两大版本的插件目录(选择对应目录进入下载,否则会报错...):
插件如何安装?
进入rabbitmq安装目录,进入plugins目录,将上面这个ez文件拷贝到plugins目录中,然后运行命令来enable这个插件
centos下,默认路径在:/usr/lib/rabbitmq/lib/rabbitmq_server-3.3.4/plugins(版本号可能会变化)
windows下,默认路径在:C:\Program Files\RabbitMQ Server\rabbitmq_server-3.3.4\plugins(版本号可能会变化)
把ez文件拷贝过去,然后运行列举插件列表命令:
找到这个优先级队列插件名为:rabbitmq_priority_queue
执行:rabbitmq-plugins enable rabbitmq_priority_queue
ok,重新启动rabbitmq-server服务。
这样,server端的配置算完成了。
C#代码端需要作出的更改
下面看看客户端类库的编写:
我们先要定义优先级枚举,继承自byte,因为RabbitMQ的C#客户端优先级是用byte来传递的:
先定义3个级别的优先级:低、中、高(其实可以定义很多级别,只是为了简化,因此只定义了3个级别)
有2个地方需要改动:
internal static IDictionary&string, object& QueueArguments
IDictionary&string, object& arguments=new Dictionary&string, object&();
arguments["x-max-priority"] = 10;//定义队列优先级为10个级别
channel.QueueDeclare("queueName", true, false, false, QueueArguments);//QueueArguments就是上面定义的这个dictionary
var headers = channel.CreateBasicProperties();
headers.Priority = (byte)msg.P//在这里把继承自byte的枚举转换成byte
channel.BasicPublish("exchange", "route", headers, SerializerUtility.Serialize2Bytes(msg));
&其他注意的地方
在装了优先级队列插件的rabbitmq-server实例中,所有的Durable队列必须用如上的方式,设置x-max-priority属性,否则rabbitmq-server服务会crash
楼主、你好、windows是保护机制、用户程序只能接受消息、是不能对优先级进行排序的、如果想安排优先级、需要调整好线程、或者是换一种优先级高的消息、谢谢、
优先级队列 是不同于先进先出队列的另一种队列。每次从队列中取出的是具有最高优先权的元素  优先队列的类定义  #include &assert.h&  #include &iostream.h&  $include &stdlib.h&  const int maxPQSize = 50; //缺省元素个数  template &class Type& class PQueue {  public:  PQueue ( );  ~PQueue ( ) { delete [ ] }  void PQInsert ( const Type & item );  Type PQRemove ( );   void makeEmpty ( ) { count = 0; }  int IsEmpty ( ) const   { return count == 0; }   int IsFull ( ) const   { return count == maxPQS }   int Length ( ) const { }  private:  Type * //存放数组   //队列元素计数   }   优先队列是0个或多个元素的集合,每个元素都有一个优先权或值,对优先队列执行的操作有1) 查找;2) 插入一个新元素;3) 删除.在最小优先队列(min priorityq u e u e)中,查找操作用来搜索优先权最小的元素,删除操作用来删除该元素;对于最大优先队列(max priority queue),查找操作用来搜索优先权最大的元素,删除操作用来删除该元素.优先权队列中的元素可以有相同的优先权,查找与删除操作可根据任意优先权进行.  最大优先权队列的抽象数据类型描述如ADT 9-1所示,最小优先队列的抽象数据类型描述与之类似,只需将最大改为最小即可.  ADT 最大优先队列的抽象数据类型描述抽象数据类型  M a x P r i o r i t y Q u e u e{  实例 有限的元素集合,每个元素都有一个优先权  操作  Create ( ):创建一个空的优先队列  Size ( ):返回队列中的元素数目  Max ( ):返回具有最大优先权的元素  I n s e rt (x):将x插入队列  DeleteMax (x):从队列中删除具有最大优先权的元素,并将该元素返回至x  }  优先队列插入和删除元素的复杂度都是O(lgn),所以很快。  另一种描述方法是采用有序线性表,当元素按递增次序排列,使用链表时则按递减次序排列,这两种描述方法的删除时间均为( 1 ),插入操作所需时间为(n).  例:  假设我们对机器服务进行收费.每个用户每次使用机器所付费用都是相同的,但每个  用户所需要服务时间都不同.为获得最大利润,假设只要有用户机器就不会空闲,我们可以把  等待使用该机器的用户组织成一个最小优先队列,优先权即为用户所需服务时间.当一个新的  用户需要使用机器时,将他/她的请求加入优先队列.一旦机器可用,则为需要最少服务时间  (即具有最高优先权)的用户提供服务.  如果每个用户所需时间相同,但用户愿意支付的费用不同,则可以用支付费用作为优先权,  一......余下全文>>
相关搜索:
相关阅读:
相关频道:
&&&&&&&&&&&&&&&&
Asp.Net教程最近更新RabbitMQ与Erlang - 为程序员服务
RabbitMQ与Erlang
Erlang是一门动态类型的函数式编程语言,它也是一门解释型语言,由Erlang虚拟机解释执行。从语言模型上说,Erlang是基于Actor模型的实现。在Actor模型里面,万物皆Actor,每个Actor都封装着内部状态,Actor相互之间只能通过消息传递这一种方式来进行通信。对应到Erlang里,每个Actor对应着一个Erlang进程,进程之间通过消息传递进行通信。相比共享内存,进程间通过消息传递来通信带来的直接好处就是消除了直接的锁开销(不考虑Erlang虚拟机底层实现中的锁应用),那么消息传递有没有开销?有,但是基本可以忽略,消息传递在单机上的本质就是内存复制,要知道DDR3 SDRAM的内存带宽约为16GB/s。
RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器,那么Erlang从语言层面带给RabbitMQ有哪些直接好处?
1. 高并发。
Erlang进程是完全由Erlang虚拟机进行调度和生命周期管理的一种数据结构,它与操作系统进程及线程完全没关系,也不存在数值上的什么对应关系。实际上,一个Erlang虚拟机对应一个操作系统进程,一个Erlang进程调度器对应一个操作系统线程,一般来说,有多少个CPU核就有多少个调度器。Erlang进程都非常轻量级,初始状态只包括几百个字节的PCB和233个字大小的私有堆栈,并且创建和销毁也非常轻量,都在微秒数量级。这些特征使得一个Erlang虚拟机里允许同时存在成千上万个进程,这些进程可以被公平地调度到各个CPU核上执行,因此可以在多核的场景下充分利用资源。
在RabbitMQ的进程模型里,AMQP概念里的channel和queue都被设计成了Erlang进程,从接受客户端连接请求开始,到消息最终持久化到磁盘,消息经过的进程链如下:
上图中,tcp_acceptor进程用于接收客户端连接,然后初始化出rabbit_reader,rabbit_writer和rabbit_channel进程。rabbit_reader进程用于接收客户端数据,解析AMQP帧。rabbit_writer进程用于向客户端返回数据。rabbit_channel进程解析AMQP方法,然后进行消息路由等操作,是RabbitMQ的核心进程。rabbit_amqqueue_process是队列进程,rabbit_msg_store是负责进行消息持久化的进程,这两种类型进程都是RabbitMQ启动或者创建队列时创建的。从数量角度看,整个系统中存在,一个tcp_acceptor进程,一个rabbit_msg_store进程,多少个队列就有多少个rabbit_amqqueue_process进程,每条客户端连接对应一个rabbit_reader和rabbit_writer进程,至多对应65535个rabbit_channel进程。结合进程的数量,RabbitMQ的进程模型也可以描述如下图:
RabbitMQ这种细粒度的进程模型正是得益于Erlang的高并发性。
2. 软实时。
Erlang的软实时特性可以从两方面看出。
首先是Erlang也是一门GC语言,但是Erlang的垃圾回收是以Erlang进程为粒度的。因为Erlang的消息传递和进程私有堆机制,使得按进程进行GC很容易实现,不必对一块内存或一个对象进行额外的引用计数。虽然对于单个进程来说,GC期间是“Stop The World”,但是前面也说过一个Erlang应用允许同时存在成千上万个进程,因此一个进程STW对于系统整体性能影响几乎微乎其微。另外,当进程需要销毁时,这个进程占用的所有内存可以直接回收,因为这块内存中的数据都是这个进程私有的。
另一方面,Erlang虚拟机对进程的调度采用的是抢占式策略。每个进程的运行周期都会分配到一定数量的reduction,当进程在进行外部模块函数调用,BIF调用,甚至算术运算都会减少reduction数量,当reduction数量减为0时,此进程交出CPU使用权,被其他进程抢占。相比于一些基于时间分片的软实时系统调度算法,reduction机制更加关注的是进程在执行期间能做多少事情,而不是时间上的绝对平均。
RabbitMQ将每个队列设计为一个Erlang进程,由于进程GC也是采用分代策略,当新老生代一起参与Major GC时,Erlang虚拟机会新开内存,根据root set将存活的对象拷贝至新空间,这个过程会造成新老内存空间同时存在,极端情况下,一个队列可能短期内需要两倍的内存占用量,因此设置内存流控阈值vm_memory_high_watermark时需要注意,默认的0.4就是一个保险的值,当超过0.5时,就有可能造成系统内存被瞬间吃完,RabbitMQ程序被系统OOM Killer杀掉。
3. 分布式。
Erlang可以说原生支持分布式,先看一段程序:
run(Node) -&
Pid = spawn(Node, fun ping/0),
Pid ! self(),
ping -& ok
ping() -&
From -& From ! ping
上述程序演示的是一个分布式并发程序,运用了spawn/2,!,receive…end这三个并发原语。spawn/2用于创建进程,!用于异步发送消息,receive…end用于接收消息,注意spawn/2的第一个参数Node,它表示节点名称,这意味着对于应用来说,Pid ! Msg 就是将Msg消息发送到某一个Erlang进程,而无论这个进程是本地进程还是存在于远程的某个节点上,Erlang虚拟机会帮应用搞定一切底层通信机制。也就是说,物理节点分布式对上层Erlang应用来说是透明的。
这为实现RabbitMQ的集群和HA policy机制提供了极大的便利,主节点只要维护哪个是Pid(master),哪几个是slave_pids(slave)信息就行,根据不同的类型(publish和非publish),对队列操作进行replication。
4. 健壮性。
在Erlang的设计哲学里,有一个重要的概念就是“let it crash”。Erlang不提倡防御式编程,它认为程序既然遇到错误就应该让它崩溃,对于一个健壮的系统来说,崩溃不要紧,关键要重新起来。Erlang提供一种supervisor的行为模式,用于构建一棵健壮的进程监督树。监督者进程本身不包含业务逻辑,它只负责监控子进程,当子进程退出时,监督者进程可以根据一些策略将子进程重启。据说爱立信用Erlang写的AXD301交换机系统,可靠性为9个9,这意味着运行20年差不多有1秒的不可用时间,如此高的可靠性就是supervisor行为模式及其背后任其崩溃思想的极致体现(当然也离不开Erlang另外一个法宝代码热更新)。
在RabbitMQ里,supervisor行为模式运用得非常多,基本上每个worker进程都有相应的监督者进程,层层监督。比如下图所示的网络层进程监督树模型(已做过简化):
椭圆表示进程,矩形表示重启策略,one_for_all表示一个进程挂了其监督者进程的其他子进程也会被重启,比如一个rabbit_reader进程挂了,那么rabbit_channel_sup3进程也会重启,然后所有rabbit_channel根据AMQP协议协商后重新创建。simple_one_for_one则表示一种需要时再初始化的子进程重启策略,适用于一些动态添加子进程的场景,比如图中的rabbit_channel进程和tcp_acceptor进程。
––
擅写水文,立志于写出白痴都看得懂的技术文章。
原文地址:, 感谢原作者分享。
您可能感兴趣的代码[转]消息队列RabbitMQ入门介绍
(一)基本概念
RabbitMQ是流行的开源消息队列系统,用erlang语言开发。我曾经对这门语言挺有兴趣,学过一段时间,后来没坚持。RabbitMQ是AMQP(高级消息队列协议)的标准实现。如果不熟悉AMQP,直接看RabbitMQ的文档会比较困难。不过它也只有几个关键概念,这里简单介绍。
RabbitMQ的结构图如下:
几个概念说明:
Broker:简单来说就是消息队列服务器实体。
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
producer:消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
消息队列的使用过程大概如下:
(1)客户端连接到消息队列服务器,打开一个channel。
(2)客户端声明一个exchange,并设置相关属性。
(3)客户端声明一个queue,并设置相关属性。
(4)客户端使用routing key,在exchange和queue之间建立好绑定关系。
(5)客户端投递消息到exchange。
exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。
exchange也有几个类型,完全根据key进行投递的叫做Direct交换机,例如,绑定时设置了routing
key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。对key进行模式匹配后进行投递的叫做Topic交换机,符号”#”匹配一个或多个词,符号”*”匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。还有一种不需要key的,叫做Fanout交换机,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。
RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,我想大多数用户都会选择持久化。消息队列持久化包括3个部分:
(1)exchange持久化,在声明时指定durable =& 1
(2)queue持久化,在声明时指定durable =& 1
(3)消息持久化,在投递时指定delivery_mode =& 2(1是非持久化)
如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的。如果exchange和queue两者之间有一个持久化,一个非持久化,就不允许建立绑定。
(二)应用实际
我使用Linux服务器(ubuntu 9.10 64位),安装RabbitMQ非常方便。
先运行如下命令安装erlang:
apt-get install erlang-nox
再到上下载RabbitMQ的安装包,如下安装:
dpkg -i rabbitmq-server_2.6.1-1_all.deb
安装完后,使用
/etc/init.d/rabbitmq-server start|stop|restart
来启动、停止、重启rabbitmq。
在正式应用之前,我们先在RabbitMQ里创建一个vhost,加一个用户,并设置该用户的权限。
使用rabbitmqctl客户端工具,在根目录下创建”/pyhtest”这个vhost:
rabbitmqctl add_vhost /pyhtest
创建一个用户名”pyh”,设置密码”pyh1234&P:
rabbitmqctl add_user pyh pyh1234
设置pyh用户对/pyhtest这个vhost拥有全部权限:
rabbitmqctl set_permissions -p /pyhtest pyh “.*” “.*” “.*”
后面三个”*”代表pyh用户拥有对/pyhtest的配置、写、读全部权限
设置好后,开始编程,我用Perl写一个消息投递程序(producer):
#!/usr/bin/perl
use Net::RabbitMQ;
use UUID::T
my $channel = 1000; # channel ID,可以随意指定,只要不冲突
my $queuename = “pyh_queue”; # 队列名
my $exchange = “pyh_exchange”; # 交换机名
my $routing_key = “test”; # routing key
my $mq = Net::RabbitMQ-&new(); # 创建一个RabbitMQ对象
$mq-&connect(“localhost”, { vhost =& “/pyhtest”, user =&
“pyh”, password =& “pyh1234&P }); # 建立连接
$mq-&channel_open($channel); # 打开一个channel
$mq-&exchange_declare($channel, $exchange, {durable =& 1}); #
声明一个持久化的交换机
$mq-&queue_declare($channel, $queuename, {durable =& 1}); #
声明一个持久化的队列
$mq-&queue_bind($channel, $queuename, $exchange, $routing_key);
# 使用routing key在交换机和队列间建立绑定
for (my $i=0;$i&;$i++) { # 循环1000万次
my $string = create_UUID_as_string(UUID_V1); # 产生一条UUID作为消息主体
$mq-&publish($channel, $routing_key, $string, { exchange =&
$exchange }, { delivery_mode =& 2 }); #
将消息结合key以持久化模式投递到交换机
$mq-&disconnect(); # 断开连接
消息接受程序(consumer)大概如下:
#!/usr/bin/perl
use Net::RabbitMQ;
my $channel = 1001;
my $queuename = “pyh_queue”;
my $mq = Net::RabbitMQ-&new();
$mq-&connect(“localhost”, { vhost=&”/pyhtest”, user =&
“pyh”, password =& “pyh1234&P });
$mq-&channel_open($channel);
while (1) {
my $hashref = $mq-&get($channel, $queuename);
last unless defined $
print $hashref-&{message_count}, “: “,
$hashref-&{body},”\n”;
$mq-&disconnect();
consumer连接后只要指定队列就可获取到消息。
上述程序共投递1000万条消息,每条消息36字节(UUID),打开持久化,共耗时17分多钟(包括产生UUID的时间),每秒投递消息约9500条。测试机器是8G内存、8核志强CPU。
投递完后,在/var/lib/rabbitmq/mnesia/rabbit@${hostname}/msg_store_persistent目录,产生2G多的持久化消息数据。在运行consumer程序后,这些数据都会消失,因为消息已经被消费了。
已投稿到:
以上网友发言只代表其个人观点,不代表新浪网的观点或立场。celery rabbitmq实现任务队列的异步执行 - 推酷
celery rabbitmq实现任务队列的异步执行
celery这东西在任务调度方面,很有一套的,用了他也有几年了,下面就给大家介绍下我以前使用过celery的项目。
Hello ,最近总是被爬虫,标记下博客的原文地址
对于上面的场景,我曾经用tornado和gevent的方案解决,但是在我的理解范围下,感觉还是不算成熟。 tornado把任务异步后,总是影响了他的高性能。再说gevent,他是个模拟事件的东西,但是后面由于认识越来越多,有时候因为对调的其他接口不给力,导致我们一下子处理将近1000多个请求,这时候会出现莫名的bug,听说已经有patch解决了。
那么为啥要用celery ?
很简单,就是把堵塞的任务,扔到mq里面,让其他人来搞。搞的定、搞不定都回给你callback信息。&
那问题来了 ! 这又和rabbitmq又有啥关系?
和rabbitmq的关系只是在于,celery没有消息存储功能,他需要介质,比如rabbitmq redis mysql mongodb 都是可以的。有这个可控的东西,你也可以在库里面搞搞。推荐使用rabbitmq,他的速度和可用性都很高。如果你想后期方面的处理broker,那还是用redis吧。 redis的语法相对的简单点。&
Celery和RabbitMQ是两个层面的东西。
Celery是一个分布式的任务队列。它的基本工作就是管理分配任务到不同的服务器,并且取得结果。至于说服务器之间是如何进行通信的?这个Celery本身不能解决。
所以,RabbitMQ作为一个消息队列管理工具被引入到和Celery集成,负责处理服务器之间的通信任务。
现在的Celery早已增加了一些对Redis,MongoDB之类的支持。原因是RabbitMQ尽管足够强大,但对于一些相对简单的业务环境来说可能太多(复杂)了一些。这样用户可以有多一些的选择。
celery的介绍 :&
Celery(芹菜)是一个异步任务队列/基于分布式消息传递的作业队列。它侧重于实时操作,但对调度支持也很好。
celery是用Python编写的,但该协议可以在任何语言实现。它也可以与其他语言通过webhooks实现。
建议的消息代理RabbitMQ的,但提供有限支持Redis, Beanstalk, MongoDB, CouchDB, ,和数据库(使用SQLAlchemy的或Django的 ORM) 。
celery是易于集成Django, Pylons and Flask,使用 django-celery, celery-pylons and Flask-Celery 附加包即可。
pip install celery
pip install celery
对于消息的存储方案,可以用rabbitmq也可以用redis。 要过任务不是那么要命的话,我个人会用redis,毕竟这东西,学习成本很小的。
yum -y install rabbitmq-server
yum - y install rabbitmq - server
rabbitmqctl
rabbitmq-env
rabbitmq-server
rabbitmqctl&& rabbitmq - env&& rabbitmq - server
停止rabbitmq server的命令是
/usr/local/sbin/rabbitmqctl stop
rabbitmqctl
rabbitmq-env
rabbitmq-server
/ usr / local / sbin / rabbitmqctl stop
rabbitmqctl&& rabbitmq - env&& rabbitmq - server
我们可以看看rabbitmq的状态:
要是喜欢用redis,那就需要安装 celery redis相应的模块 !
celery启动配置文件:
#coding:utf-8
import sys
sys.path.insert(0, os.getcwd())
CELERY_IMPORTS = (&tasks&, )
CELERY_RESULT_BACKEND = &amqp&
BROKER_HOST = &localhost&
BROKER_PORT = 5672
BROKER_USER = &guest&
BROKER_PASSWORD = &guest&
BROKER_VHOST = &/&
#coding:utf-8
import sys
sys . path . insert ( 0 , os . getcwd ( ) )
CELERY_IMPORTS = ( &tasks& , )
CELERY_RESULT_BACKEND = &amqp&
BROKER_HOST = &localhost&
BROKER_PORT = 5672
BROKER_USER = &guest&
BROKER_PASSWORD = &guest&
BROKER_VHOST = &/&
一个简单的测试,这个模块里面的内容也就是需要异步起来的任务。
from celery.task import task
import time
def add(x, y):
time.sleep(5)
return x + y
from celery . task import task
import time
@ task ( )
def add ( x , y ) :
time . sleep ( 5 )
return x + y
我们来测试下 ~
大家有注意那个False吗? 为啥会出现,因为我的tasks.py里面定义了让他sleep 5秒钟,我马上要数据,肯定是没有的,等5秒过了后,我再去提取数据,就可以了。
celery的参数众多的 ~
你可以等待结果来完成,但这不是因为它的异步调用同步使用
result.get(timeout=1)
咱们可以让他返回json串
app.conf.update(
CELERY_TASK_SERIALIZER='json',
CELERY_ACCEPT_CONTENT=['json'],
# Ignore other content
CELERY_RESULT_SERIALIZER='json',
CELERY_TIMEZONE='Europe/Oslo',
CELERY_ENABLE_UTC=True,
app . conf . update (
CELERY_TASK_SERIALIZER = 'json' ,
CELERY_ACCEPT_CONTENT = [ 'json' ] , && # Ignore other content
CELERY_RESULT_SERIALIZER = 'json' ,
CELERY_TIMEZONE = 'Europe/Oslo' ,
CELERY_ENABLE_UTC = True ,
咱们再来说说redis的执行方式:
redis的方法
# tasks.py
import time
from celery import Celery
celery = Celery('tasks', broker='redis://localhost:6379/0')
@celery.task
def osrun(good):
reok=os.popen(good).read()
return reok
redis 的方法
# tasks.py
import time
from celery import Celery
celery = Celery ( 'tasks' , broker = 'redis://localhost:6379/0' )
@ celery . task
def osrun ( good ) :
reok = os . popen ( good ) . read ( )
return reok
启动celery
celery -A tasks worker --loglevel=info
celery - A tasks worker -- loglevel = info
以后有他,我再也不怕堵塞了。但是我对他的理解还是有限,后期再更新下高级方面的功能。
已发表评论数()
已收藏到推刊!
请填写推刊名
描述不能大于100个字符!
权限设置: 公开
仅自己可见
正文不准确
排版有问题
没有分页内容
视频无法显示
图片无法显示

我要回帖

更多关于 redis优先级队列 的文章

 

随机推荐