I use module rocketmq_client_python(version=0.4.4)
to code my function in python3.8.10 which create a consumer to listen to the rocketmq server and uses the callback function msgListener
to process the information. After processing is done in the callback function, the callback function creates another producer and sends the message back.
The problem I met is, neither Pullconsumer
and Pushconsumer
can’t commit its queue offset to server, so next time I start the function, the consumer reconsumes all the information on the queue from the beginning of the queue.
I checked the Pullconsumer
definition code, which says that every Pullconsumer.pull()
will try to find the offset value of “topic@queueID” under consumer.offset_table, and initialize it to 0 if it doesn’t. I find through debugging that the offset_table of the consumer instance generated every time the function is started is empty, perhaps this is one of the causes of the problem? Here is the code of Pullconsumer.pull()
class PullConsumer(object):
offset_table = {}
def __init__(self, group_id):
self._handle = dll.CreatePullConsumer(_to_bytes(group_id))
if self._handle is None:
raise NullPointerException('CreatePullConsumer returned null pointer')
...
def get_message_queue_offset(self, mq):
offset = self.offset_table.get(self._get_mq_key(mq), 0)
return offset
def set_message_queue_offset(self, mq, offset):
self.offset_table[self._get_mq_key(mq)] = offset
def pull(self, topic, expression='*', max_num=32):
message_queue = POINTER(_CMessageQueue)()
queue_size = c_int()
ffi_check(dll.FetchSubscriptionMessageQueues(
self._handle,
_to_bytes(topic),
ctypes.pointer(message_queue),
ctypes.pointer(queue_size)
))
for i in range(int(queue_size.value)):
mq = message_queue[i]
tmp_offset = ctypes.c_longlong(self.get_message_queue_offset(mq))
has_new_msg = True
while has_new_msg:
pull_res = dll.Pull(
self._handle,
ctypes.pointer(mq),
_to_bytes(expression),
tmp_offset,
max_num,
)
if pull_res.pullStatus != _CPullStatus.BROKER_TIMEOUT:
tmp_offset = pull_res.nextBeginOffset
self.set_message_queue_offset(mq, tmp_offset)
if pull_res.pullStatus == _CPullStatus.FOUND:
for i in range(int(pull_res.size)):
yield RecvMessage(pull_res.msgFoundList[i])
elif pull_res.pullStatus == _CPullStatus.NO_MATCHED_MSG:
pass
elif pull_res.pullStatus == _CPullStatus.NO_NEW_MSG:
has_new_msg = False
elif pull_res.pullStatus == _CPullStatus.OFFSET_ILLEGAL:
pass
else:
pass
dll.ReleasePullResult(pull_res) # NOTE: No need to check ffi return code here
ffi_check(dll.ReleaseSubscriptionMessageQueue(message_queue))
I also found that although my function consumed and processed the information and passed the result back to the server, the id of the conusmer group defined by my function was not available on the server. I repeated the experiment several times, and each time I consumed information without group information, while other consumers could leave their ids after consumption
I put my code of Pullconsumer version in the end of problem, and thx for any suggestions and help!
import os
from rocketmq.client import Producer, Message, PullConsumer
from otherPyFileICoded import mainFun
def messageListener(msg):
time_start = time.time()
resultRaw = mainFun(msg.body)
time_end = time.time()
logging.info("Running time:%f", time_end - time_start)
if resultRaw['code']=='0000':
logging.info("successed,message id: %s", resultRaw['id'])
else:
logging.info("failed, message id: %s", resultRaw['id'])
resultJson = json.dumps(resultRaw, ensure_ascii=False)
# build producer
producer = Producer('producer online')
producer.set_namesrv_addr(scAddr)
producer.start()
msg = Message(topic="topic_answer")
msg.set_body(resultJson)
send_result = producer.send_sync(msg)
logging.info(f"send message to serve successed. 信息ID: {send_result.msg_id}")
producer.shutdown()
def main():
global scAddr
if 'scAddr' in os.environ:
scAddr = os.environ.get('scAddr')
logging.info("Succeeded in obtaining the listening address from the environment variable : %s",
scAddr)
else:
scAddr = "default address"
logging.info("Failed in obtaining the listening address from the environment variable, default values will be used: %s",
scAddr)
# build consumer
consumer = PullConsumer("consumer online")
consumer.set_namesrv_addr(scAddr)
try:
consumer.start()
logging.info("Successed in build consumer")
except Exception as e:
logging.info(f"Failed in build consumer: %s", e)
while True:
msgList = consumer.pull(topic="topic_require")
for msg in msgList:
messageListener(msg)
logging.info("wait for next pull")
time.sleep(5)
if __name__ == '__main__':
# build logging
logger = logging.getLogger(__name__)
console_handler = logging.StreamHandler(sys.stdout)
formatter = logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
logging.info("start function")
main()