前言:
python下的celery是啥东西大家应该有了解,是一个异步的任务框架 。话说, 我以前写过一个报警平台的项目,也需要任务的扩展成分布式,当时总是觉得 用celery不是那么太靠谱,所以就自己写了一个分布式的任务派发的系统。
今个和朋友聊起了分布式爬虫,这哥们说 任务有时候经常的崩溃,但是celery的retry的机制有些意思,最后看了下文档 ,又研究了下retry的参数,然后把自己的一些实战分享给大家。
@celery .task(bind = True ,max_retries = 3 ,default_retry_delay = 1 * 6 ) def sum ( self ,num): f = open ( 'plog' , 'a' ) f.write( 'retry\n' ) except Exceptionasexc: raise self .retry(exc = exc,countdown = 60 ) |
其实最主要就那几个参数,官网写的也很干练,上来就给个例子。 呵呵 ~
bind=True 是开启
max_retries 是重新尝试的此时
default_retry_delay 是默认的间隔时间,尝试的时间
下面的代码,大家应该懂的。 就是捕捉异常。
countdown 也是时间,这个时间优先级是大于上面的default_retry_delay的。
这个时候我是可以看到,我刚才设置的,碰到异常之后,重新执行三遍的。
注意下,这个异常是我自己特意抛出去的,不懂的看上面的py。 还有一点是celery 自己会sleep 时间。 我定义了60s 。
原文:http://rfyiamcool.blog.51cto.com/1030776/1548051
然后咱们在测试下,重启celery,任务肯定是正常运行的,毕竟是放在队列里面的。启动celery的时候,他也只是从队列里面取任务。 我写入celery的时候,只要保证后端的队列没挂掉就可以了。
redis127.0.0.1:6379>lrangecelery0-1 1) "{\"body\":\"gAJ9cQEoVQdleHBpcmVzcQJOVQN1dGNxA4hVBGFyZ3NxBFUEaXAgYXEFhXEGVQVjaG9yZHEHTlUJY2FsbGJhY2tzcQhOVQhlcnJiYWNrc3EJTlUHdGFza3NldHEKTlUCaWRxC1UkYTZkMTJkZTMtYjUzOC00ZjMxLWFiNzMtNjExNTQwYjY5NmZkcQxVB3JldHJpZXNxDUsDVQR0YXNrcQ5VCXRhc2tzLnN1bXEPVQl0aW1lbGltaXRxEE5OhnERVQNldGFxElUgMjAxNC0wOS0wMlQxMjoxMjozOC4wNDE4OTYrMDA6MDBxE1UGa3dhcmdzcRR9cRV1Lg==\",\"headers\":{\"redelivered\":true},\"content-type\":\"application/x-python-serialize\",\"properties\":{\"body_encoding\":\"base64\",\"delivery_info\":{\"priority\":0,\"routing_key\":\"celery\",\"exchange\":\"celery\"},\"delivery_mode\":2,\"correlation_id\":\"a6d12de3-b538-4f31-ab73-611540b696fd\",\"reply_to\":\"4459d9e6-2cff-35c9-be5b-45a2d976911e\",\"delivery_tag\":\"bd4480dd-d04a-4401-876b-831b30b55f4e\"},\"content-encoding\":\"binary\"}" 2) "{\"body\":\"gAJ9cQEoVQdleHBpcmVzcQJOVQN1dGNxA4hVBGFyZ3NxBFUEaXAgYXEFhXEGVQVjaG9yZHEHTlUJY2FsbGJhY2tzcQhOVQhlcnJiYWNrc3EJTlUHdGFza3NldHEKTlUCaWRxC1UkNDczMzFhYTgtNzZhOC00N2E1LTg1MGItNzZkYTY0YjY2YzM1cQxVB3JldHJpZXNxDUsBVQR0YXNrcQ5VCXRhc2tzLnN1bXEPVQl0aW1lbGltaXRxEE5OhnERVQNldGFxElUgMjAxNC0wOS0wMlQxMjoxMjo1NS40NjA0MzArMDA6MDBxE1UGa3dhcmdzcRR9cRV1Lg==\",\"headers\":{\"redelivered\":true},\"content-type\":\"application/x-python-serialize\",\"properties\":{\"body_encoding\":\"base64\",\"delivery_info\":{\"priority\":0,\"routing_key\":\"celery\",\"exchange\":\"celery\"},\"delivery_mode\":2,\"correlation_id\":\"47331aa8-76a8-47a5-850b-76da64b66c35\",\"_to\":\"4459d9e6-2cff-35c9-be5b-45a2d976911e\",\"delivery_tag\":\"9fa3c120-0bfd-4453-9539-1465e6e820ff\"},\"content-encoding\":\"binary\"}" redis127.0.0.1:6379> |
其实我更关注的是崩溃的处理,比如 我们的celery已经做了分布式扩展了。 当一个node已经去到任务,但是突然oom了,sx了。 我原本以为celery 借助rabbotmq的ack机制,来处理这样的情况,但是我的测试结果告诉我,celery的retry机制,只是限于本地玩耍。 其实我们就算不用他的retry装饰器,也可以自己写个for循环,然后过滤下异常罢了。
我现在的做法是,每次获取到任务,做事情之前,先要回调一个接口,然后把我要做的事情push过去,然后把做个标示位,说自己正在干,如果10分钟之后,还没有把你删掉的话,你就再塞入队列中。
当然方法还是有些挫,但是已经线上跑了一段时间,没什么大问题,只是在任务太多的情况下,监控任务的线程貌似多到崩溃。 后期可以改用gevent pool的方式来进行轮训监控事件是否完成。
如果你是那种平台性质的任务发布,在页面上长时间是loading....的状态,你容易做出分析的。
但大家没这么倒霉的~ 只要异常处理的好。
个人还是自己开发的靠谱点。 celery可能是太强大了,忽略了一些地方。
本文出自 "峰云,就她了。" 博客,谢绝转载!
线上遇到的一个case,应用结构是nginx----->resin---java,其中nginx做lb,resin做为java的容器。在nginx层面做http code的监控,发现有4xx比例高的的报警:
分析nginx日志,发现是由于499比例高导致
xxxxxxxx-[29 /Oct/2012 :04:10:03+0800] "GET/GetConfiguration.jsp?peer_version=2.3.0.4779&peer_id=E00B3B81B458D7D5A3C2E2BD85865354HTTP/1.0" 4990 "-" "-" "-" "-" xxxxx:8080 "" - "" 0.001" xxxxxxxxxx-[29 /Oct/2012 :04:10:03+0800] "GET/GetConfiguration.jsp?peer_version=2.3.0.4779&peer_id=E00B3B81B458D7D5A3C2E2BD85865354HTTP/1.0" 4990 "-" "-" "-" "-" "xxxx:8080" "-" "0.000" |
499一般是由于后端响应超时造成,但是这里的499响应时间都在毫秒以内,证明不是超时导致,而是后端直接不能响应抛出。对于java应用来说会有两种情况,一是线程锁导致,另一个是堆栈导致。
使用jstack打印出堆栈分析,发现有类似下面的这种线程被锁:
"http-0.0.0.0:8080-65096$1864960835" daemonprio= 10 tid= 0x000000004c169800 nid= 0x4bf8 waiting for monitorentry[ 0x0000000043e8e000 .. 0x0000000043e8ed10 ] java.lang.Thread.State:BLOCKED(onobjectmonitor) atcom.caucho.server.log.AccessLog.log(AccessLog.java: 345 ) -waitingtolock< 0x00002aaab522a638 >(ajava.lang.Object) atcom.caucho.server.webapp.WebAppFilterChain.doFilter(WebAppFilterChain.java: 223 ) atcom.caucho.server.dispatch.ServletInvocation.service(ServletInvocation.java: 265 ) atcom.caucho.server.http.HttpRequest.handleRequest(HttpRequest.java: 273 ) atcom.caucho.server.port.TcpConnection.run(TcpConnection.java: 682 ) -locked< 0x00002aab3a6542f8 >(ajava.lang.Object) atcom.caucho.util.ThreadPool$Item.runTasks(ThreadPool.java: 730 ) atcom.caucho.util.ThreadPool$Item.run(ThreadPool.java: 649 ) atjava.lang.Thread.run(Thread.java: 619 ) |
这其实是一个bug,在resin 3.1.9和3.1.11中日志翻转会遇到access log锁问题,导致同步锁不能正常释放线程被锁住。
bugid:
http://bugs.caucho.com/view.php?id=3509
http://bugs.caucho.com/view.php?id=4821
解决方法也比较简单:
1.升级resin到4.0.2以上的版本
2.如果不care resin的日志,可以diable掉记录日志的功能
本文出自 "菜光光的博客" 博客,请务必保留此出处http://caiguangguang.blog.51cto.com/1652935/1548047
没有评论:
发表评论