-
Notifications
You must be signed in to change notification settings - Fork 120
Port Celery4 v2 protocol to Redis and PECL-AMQP #78
Comments
@gjedeer the celery4 branch isn't working for me (all of my messages are getting deleted by celery). FYI, I also noticed while using this branch that ETA support was broken at the same time. eta works with the 2.1.1 branch, but while testing the celery4 branch, eta tasks were executed immediately. |
Do you have these issues with php-amqplib? Which Celery version? I believe I tested with 4.0.0. Regarding ETA, I believe there are no tests for this, somebody contributed the code but didn't contribute a test. |
@gjedeer celery 4.0.2. hey I noticed you have a celery4 branch and then a celery4pr66 branch with more commits... it's just the celery4 branch right? I forked your repo so trying to find the best base to start with. I plan to:
Create PR once tested in production in my environment. |
@gjedeer I'm wondering if the issue I was having was actually the message format, because I have celery upgraded to 3.1.25 which supports both version 1 and 2 of celery tasks. The celery4 branch works with 3.1.25, but not 4.0.2. I noticed the messages are coming through with an empty exchange though -- is that intended? Maybe 3.1.25 is better handling that in the message. |
Hey, thanks, I have not noticed that the exchange was empty. Moreover, someone did a Java client based on my work in this branch and it works, somehow so the bug has already propagated, haha. I was testing with 4.0.0 - don't remember if it was the release version or some rc. Your support for the ETA would be much appreciated. |
@gjedeer using the celery4 branch rabbit will cmplain saying wrong destination and trash it, and shows me this raw delivery info: delivery_info:{'consumer_tag': 'None4', 'redelivered': False, 'routing_key': 'MYQUEUE, 'delivery_tag': 7L, 'exchange': ''} I'm looking at seeing the exchange based to amqplib basic_publish along w/ the routing key... what version of amqplib are you using? The latest 2.6.3 works for you? |
Hi, yes, I used 2.6.3. |
I just tried the celery4 branch with celery==4.0.2 and test.php posted the task successfully. I didn't do any changes to the protocol, just ported the settings in testscenario to the new syntax: https://github.com/gjedeer/celery-php/blob/celery4/testscenario/celeryconfig.py
and running php test.php in the other terminal worked perfectly. Could it be that you have some incompatible settings in Celery?
|
I have been meaning to follow up with you. Sorry for my delay I've just
been crunching getting this queue working.
It wasn't a bug with celery4 branch, I wasn't properly defining my custom
exchange when init Celery.
So right now I have 4.0.2 working. BUT, I did confirm the eta support is
not working in that branch.
What I ended up doing is just setting the task protocol to 1 and using your
latest release and everything is working.
I also ran into an issue creating tasks from within the original celery4
task (with protocol 2), but using protocol 1 by default resolved it.
I just don't have the time to resolve that now that I have a working stack.
I'll follow up ASAP w eta support to celery 4 branch.
…On Jan 7, 2017 12:04 PM, "GDR!" ***@***.***> wrote:
I just tried the celery4 branch with celery==4.0.2 and test.php posted the
task successfully. I didn't do any changes to the protocol, just ported the
settings in testscenario to the new syntax: https://github.com/gjedeer/
celery-php/blob/celery4/testscenario/celeryconfig.py
cd testscenario
celery worker -l DEBUG -c 20
and running php test.php
<https://github.com/gjedeer/celery-php/blob/celery4/test.php> in the
other terminal worked perfectly. Could it be that you have some
incompatible settings in Celery?
[celery4] ***@***.***:~/celery-php/testscenario# pip freeze
Warning: cannot find svn location for distribute==0.6.24dev-r0
SOAPpy==0.12.0
amqp==2.1.4
anyjson==0.3.3
argparse==1.2.1
billiard==3.5.0.2
celery==4.0.2
celery-with-redis==3.0
chardet==2.0.1
## FIXME: could not find svn URL in dependency_links for this package:
distribute==0.6.24dev-r0
fpconst==0.7.2
jedi==0.9.0
kombu==4.0.2
powerline-status==2.5
python-apt==0.8.8.2
python-dateutil==1.5
python-debian==0.1.21
python-debianbts==1.11
pytz==2016.10
redis==2.10.5
reportbug==6.4.4
six==1.6.1
vine==1.1.3
wsgiref==0.1.2
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
<#78 (comment)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AAl2OzhLekQdTOR1DjcYO1TbKzliiXPVks5rP8WhgaJpZM4KrXff>
.
|
That's cool, thanks for your effort anyway. In the meanwhile, I just ran tests with the AMQPLib connector and they pass 100% (mind you, there's no test for ETA) |
PECL-AMQP connector is delayed until php-amqp/php-amqp#263 is solved. I tried writing a simple patch but, while it solved single NULL values like 'eta' or 'group', it serialized
into
which crashed Celery. For the record, the patch I tried:
|
Redis message looks as following
|
@gjedeer thanks for being so responsive! I'm not too hot with unit testing -- so it'd be cool to get that in there so you could see it doesn't work? I definitely confirmed the ETA does not work by queueing messages using protocol 1 on celery 3 and celery 4 with ETA's applied using the celery-php master (not celery4 branch). When I get over these other hurdles I'm working on maybe I'll have the time to look into how to write a unit test for it. I did look over the code an saw that ETA is moved to headers and NOT in params anymore. I saw you were doing an array_merge maybe you just need to move the ETA param to the headers. |
I'm not hurrying or anything, it's just an issue I opened and you commented on and I'm using it to paste info for myself or whoever will work on this :) ETA doesn't work because I set the field to NULL, as I just discovered. |
Hey I just saw that redis message which brings up another thing... I see you're setting the correlation_id there -- but the correlation_id isn't being set in amqplibconnector... could that cause any issues with creating tasks from the parent task? Because I was able to get the celery4 branch to post the initial task, but then (existing working with celery3) task.delay()s within that task would fail once received with 'Wrong Destinatinon!?!" |
Do you have any documentation on what correlation_id is? My AMQP knowledge is rather rusty and this seems to be a new thing |
@gjedeer haha yeah I get it. I meant comment wise. Yeah I know ETA is set to null. But you were previously doing a array_merge to overwrite the null value with the user provided one ... but since that was moved to headers I think that's why it's not working anymore. It looks like an easy fix just don't have time for now. Also, it's not just celery-php, so many accompanying libraries are lacking celery 4 support -- the rabbitmq mgmt module is buggy as hell when using task protocol 2 -- so for now sticking to task protocol 1 got me the benefits I needed from celery 4 without the new protocol, for now. |
correction_id is new in protocol 2... I think i remember reading replaced task id? Yes, confirmed here: http://docs.celeryproject.org/en/latest/internals/protocol.html Just search that page for correlation_id |
Could that have any impact on a task being created within a task (that has a properly defined task_route) ending up getting dropped for Wrong Destination? I was stumped so my only fix was to fallback to protocol 1 for now which is fine. |
Oh, flower is buggy as hell with celery 4 (task protocol 2) also because mochiweb is buggy because of it. hopefully these other projects mature quickly. |
Looks like I have interesting times ahead (client wanted Celery 4 for a project that goes live soon). Re. correlation_id: I added it in celery.php, so it should be added to all connectors now. |
\o/
|
More info on correlation_id. |
What's the unit tests testing though? Like I said, I was able to get celery4 branch to create a task. I was wrong initially. BUT, that task spawns more tasks via task.apply_async(). This code hasn't been changed, but when using task protocol 2, the subtasks that are spawned go to the right queue but once consumed celery trashes them for 'Wrong Destination'. I think that might be a celery bug but it's hard to say. That's why I was wondering if maybe the format of the celery4 task that's coming in missing a piece of info (like correllation id) would cause a sub task not to route properly. It doesn't make sense though. Sorry just ranting here. |
Are you able to do that? You should be able to test easily... es:
$celery->posttask() creates the initial task successfully, but the subsequent tasks are thrown away once consumed from a separate queue. That same archetecture works with task protocol 1 fine (with celery 4). But not task_protocol 2. |
NOTE: i am NOT using any default celery values for exchange/queue/key |
Could you open a new issue for that and paste this code? I'll take a look at it... sometime. |
@gjedeer sure thing... maybe i'll beat you too it. Want me to create a sep issue to write a unit test for ETA as well? |
Yeah, please do. This megathread will be hard to manage :) |
Thanks to @pinepain there's this PR fixing the biggest PECL AMQP issue. Not all tests are passing yet but we're almost there. |
What is the state Celery4 support? I'm interested in redis broker in particular. |
It passes the unit tests which cover basic functionality. Users report that advanced stuff, in particular ETA support, does not work. There is a pull request promising to take care of that and you're welcome to test and improve upon that PR.
…On February 15, 2017 1:31:28 PM CET, "Mārtiņš Šulcs" ***@***.***> wrote:
What is the state Celery4 support? I'm interested in redis broker in
particular.
--
You are receiving this because you were mentioned.
Reply to this email directly or view it on GitHub:
#78 (comment)
--
Sent from my Android device with K-9 Mail. Please excuse my brevity.
|
The celery4 branch contains a working Celery Protocol v2 implementation for PhpAmqpLib. The changes are not yet ported to the other backends.
It should be very straightforward for PECL AMQP - basically add support for application headers, and don't bind to an exchange when checking results.
For Redis we'll need to reverse engineer the serialization used for headers, which shouldn't be a major task too.
Pull requests welcome if you want this code to be released soon!
The text was updated successfully, but these errors were encountered: