rocketmq consumer group can’t commit its offset in python

I use module rocketmq_client_python(version=0.4.4) to code my function in python3.8.10 which create a consumer to listen to the rocketmq server and uses the callback function msgListener to process the information. After processing is done in the callback function, the callback function creates another producer and sends the message back.

The problem I met is, neither Pullconsumer and Pushconsumer can’t commit its queue offset to server, so next time I start the function, the consumer reconsumes all the information on the queue from the beginning of the queue.

I checked the Pullconsumer definition code, which says that every Pullconsumer.pull() will try to find the offset value of “topic@queueID” under consumer.offset_table, and initialize it to 0 if it doesn’t. I find through debugging that the offset_table of the consumer instance generated every time the function is started is empty, perhaps this is one of the causes of the problem? Here is the code of Pullconsumer.pull()

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
<code>class PullConsumer(object):
offset_table = {}
def __init__(self, group_id):
self._handle = dll.CreatePullConsumer(_to_bytes(group_id))
if self._handle is None:
raise NullPointerException('CreatePullConsumer returned null pointer')
...
def get_message_queue_offset(self, mq):
offset = self.offset_table.get(self._get_mq_key(mq), 0)
return offset
def set_message_queue_offset(self, mq, offset):
self.offset_table[self._get_mq_key(mq)] = offset
def pull(self, topic, expression='*', max_num=32):
message_queue = POINTER(_CMessageQueue)()
queue_size = c_int()
ffi_check(dll.FetchSubscriptionMessageQueues(
self._handle,
_to_bytes(topic),
ctypes.pointer(message_queue),
ctypes.pointer(queue_size)
))
for i in range(int(queue_size.value)):
mq = message_queue[i]
tmp_offset = ctypes.c_longlong(self.get_message_queue_offset(mq))
has_new_msg = True
while has_new_msg:
pull_res = dll.Pull(
self._handle,
ctypes.pointer(mq),
_to_bytes(expression),
tmp_offset,
max_num,
)
if pull_res.pullStatus != _CPullStatus.BROKER_TIMEOUT:
tmp_offset = pull_res.nextBeginOffset
self.set_message_queue_offset(mq, tmp_offset)
if pull_res.pullStatus == _CPullStatus.FOUND:
for i in range(int(pull_res.size)):
yield RecvMessage(pull_res.msgFoundList[i])
elif pull_res.pullStatus == _CPullStatus.NO_MATCHED_MSG:
pass
elif pull_res.pullStatus == _CPullStatus.NO_NEW_MSG:
has_new_msg = False
elif pull_res.pullStatus == _CPullStatus.OFFSET_ILLEGAL:
pass
else:
pass
dll.ReleasePullResult(pull_res) # NOTE: No need to check ffi return code here
ffi_check(dll.ReleaseSubscriptionMessageQueue(message_queue))
</code>
<code>class PullConsumer(object): offset_table = {} def __init__(self, group_id): self._handle = dll.CreatePullConsumer(_to_bytes(group_id)) if self._handle is None: raise NullPointerException('CreatePullConsumer returned null pointer') ... def get_message_queue_offset(self, mq): offset = self.offset_table.get(self._get_mq_key(mq), 0) return offset def set_message_queue_offset(self, mq, offset): self.offset_table[self._get_mq_key(mq)] = offset def pull(self, topic, expression='*', max_num=32): message_queue = POINTER(_CMessageQueue)() queue_size = c_int() ffi_check(dll.FetchSubscriptionMessageQueues( self._handle, _to_bytes(topic), ctypes.pointer(message_queue), ctypes.pointer(queue_size) )) for i in range(int(queue_size.value)): mq = message_queue[i] tmp_offset = ctypes.c_longlong(self.get_message_queue_offset(mq)) has_new_msg = True while has_new_msg: pull_res = dll.Pull( self._handle, ctypes.pointer(mq), _to_bytes(expression), tmp_offset, max_num, ) if pull_res.pullStatus != _CPullStatus.BROKER_TIMEOUT: tmp_offset = pull_res.nextBeginOffset self.set_message_queue_offset(mq, tmp_offset) if pull_res.pullStatus == _CPullStatus.FOUND: for i in range(int(pull_res.size)): yield RecvMessage(pull_res.msgFoundList[i]) elif pull_res.pullStatus == _CPullStatus.NO_MATCHED_MSG: pass elif pull_res.pullStatus == _CPullStatus.NO_NEW_MSG: has_new_msg = False elif pull_res.pullStatus == _CPullStatus.OFFSET_ILLEGAL: pass else: pass dll.ReleasePullResult(pull_res) # NOTE: No need to check ffi return code here ffi_check(dll.ReleaseSubscriptionMessageQueue(message_queue)) </code>
class PullConsumer(object):
    offset_table = {}
    def __init__(self, group_id):
        self._handle = dll.CreatePullConsumer(_to_bytes(group_id))
        if self._handle is None:
            raise NullPointerException('CreatePullConsumer returned null pointer')
    
    ...

    def get_message_queue_offset(self, mq):
        offset = self.offset_table.get(self._get_mq_key(mq), 0)
        return offset

    def set_message_queue_offset(self, mq, offset):
        self.offset_table[self._get_mq_key(mq)] = offset

    def pull(self, topic, expression='*', max_num=32):
        message_queue = POINTER(_CMessageQueue)()
        queue_size = c_int()
        ffi_check(dll.FetchSubscriptionMessageQueues(
            self._handle,
            _to_bytes(topic),
            ctypes.pointer(message_queue),
            ctypes.pointer(queue_size)
        ))
        for i in range(int(queue_size.value)):
            mq = message_queue[i]
            tmp_offset = ctypes.c_longlong(self.get_message_queue_offset(mq))

            has_new_msg = True
            while has_new_msg:
                pull_res = dll.Pull(
                    self._handle,
                    ctypes.pointer(mq),
                    _to_bytes(expression),
                    tmp_offset,
                    max_num,
                )

                if pull_res.pullStatus != _CPullStatus.BROKER_TIMEOUT:
                    tmp_offset = pull_res.nextBeginOffset
                    self.set_message_queue_offset(mq, tmp_offset)

                if pull_res.pullStatus == _CPullStatus.FOUND:
                    for i in range(int(pull_res.size)):
                        yield RecvMessage(pull_res.msgFoundList[i])
                elif pull_res.pullStatus == _CPullStatus.NO_MATCHED_MSG:
                    pass
                elif pull_res.pullStatus == _CPullStatus.NO_NEW_MSG:
                    has_new_msg = False
                elif pull_res.pullStatus == _CPullStatus.OFFSET_ILLEGAL:
                    pass
                else:
                    pass
                dll.ReleasePullResult(pull_res)  # NOTE: No need to check ffi return code here
        ffi_check(dll.ReleaseSubscriptionMessageQueue(message_queue))

I also found that although my function consumed and processed the information and passed the result back to the server, the id of the conusmer group defined by my function was not available on the server. I repeated the experiment several times, and each time I consumed information without group information, while other consumers could leave their ids after consumption

I put my code of Pullconsumer version in the end of problem, and thx for any suggestions and help!

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
<code>import os
from rocketmq.client import Producer, Message, PullConsumer
from otherPyFileICoded import mainFun
def messageListener(msg):
time_start = time.time()
resultRaw = mainFun(msg.body)
time_end = time.time()
logging.info("Running time:%f", time_end - time_start)
if resultRaw['code']=='0000':
logging.info("successed,message id: %s", resultRaw['id'])
else:
logging.info("failed, message id: %s", resultRaw['id'])
resultJson = json.dumps(resultRaw, ensure_ascii=False)
# build producer
producer = Producer('producer online')
producer.set_namesrv_addr(scAddr)
producer.start()
msg = Message(topic="topic_answer")
msg.set_body(resultJson)
send_result = producer.send_sync(msg)
logging.info(f"send message to serve successed. 信息ID: {send_result.msg_id}")
producer.shutdown()
def main():
global scAddr
if 'scAddr' in os.environ:
scAddr = os.environ.get('scAddr')
logging.info("Succeeded in obtaining the listening address from the environment variable : %s",
scAddr)
else:
scAddr = "default address"
logging.info("Failed in obtaining the listening address from the environment variable, default values will be used: %s",
scAddr)
# build consumer
consumer = PullConsumer("consumer online")
consumer.set_namesrv_addr(scAddr)
try:
consumer.start()
logging.info("Successed in build consumer")
except Exception as e:
logging.info(f"Failed in build consumer: %s", e)
while True:
msgList = consumer.pull(topic="topic_require")
for msg in msgList:
messageListener(msg)
logging.info("wait for next pull")
time.sleep(5)
if __name__ == '__main__':
# build logging
logger = logging.getLogger(__name__)
console_handler = logging.StreamHandler(sys.stdout)
formatter = logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
logging.info("start function")
main()
</code>
<code>import os from rocketmq.client import Producer, Message, PullConsumer from otherPyFileICoded import mainFun def messageListener(msg): time_start = time.time() resultRaw = mainFun(msg.body) time_end = time.time() logging.info("Running time:%f", time_end - time_start) if resultRaw['code']=='0000': logging.info("successed,message id: %s", resultRaw['id']) else: logging.info("failed, message id: %s", resultRaw['id']) resultJson = json.dumps(resultRaw, ensure_ascii=False) # build producer producer = Producer('producer online') producer.set_namesrv_addr(scAddr) producer.start() msg = Message(topic="topic_answer") msg.set_body(resultJson) send_result = producer.send_sync(msg) logging.info(f"send message to serve successed. 信息ID: {send_result.msg_id}") producer.shutdown() def main(): global scAddr if 'scAddr' in os.environ: scAddr = os.environ.get('scAddr') logging.info("Succeeded in obtaining the listening address from the environment variable : %s", scAddr) else: scAddr = "default address" logging.info("Failed in obtaining the listening address from the environment variable, default values will be used: %s", scAddr) # build consumer consumer = PullConsumer("consumer online") consumer.set_namesrv_addr(scAddr) try: consumer.start() logging.info("Successed in build consumer") except Exception as e: logging.info(f"Failed in build consumer: %s", e) while True: msgList = consumer.pull(topic="topic_require") for msg in msgList: messageListener(msg) logging.info("wait for next pull") time.sleep(5) if __name__ == '__main__': # build logging logger = logging.getLogger(__name__) console_handler = logging.StreamHandler(sys.stdout) formatter = logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') console_handler.setFormatter(formatter) logger.addHandler(console_handler) logging.info("start function") main() </code>
import os
from rocketmq.client import Producer, Message, PullConsumer
from otherPyFileICoded import mainFun



def messageListener(msg):

    time_start = time.time()
    resultRaw = mainFun(msg.body)
    time_end = time.time()
    logging.info("Running time:%f", time_end - time_start)
    if resultRaw['code']=='0000':
        logging.info("successed,message id: %s", resultRaw['id'])
    else:
        logging.info("failed, message id: %s", resultRaw['id'])
    resultJson = json.dumps(resultRaw, ensure_ascii=False)

    # build producer
    producer = Producer('producer online')
    producer.set_namesrv_addr(scAddr)

    producer.start()
    msg = Message(topic="topic_answer")
    msg.set_body(resultJson)
    send_result = producer.send_sync(msg)
    logging.info(f"send message to serve successed. 信息ID: {send_result.msg_id}")
    producer.shutdown()


def main():

    global scAddr
    if 'scAddr' in os.environ:
        scAddr = os.environ.get('scAddr')
        logging.info("Succeeded in obtaining the listening address from the environment variable : %s",
                    scAddr)
    else:
        scAddr = "default address"
        logging.info("Failed in obtaining the listening address from the environment variable, default values will be used: %s",
                    scAddr)
    
    
    # build consumer
    consumer = PullConsumer("consumer online")
    consumer.set_namesrv_addr(scAddr)
    try:
        consumer.start()
        logging.info("Successed in build consumer")
    except Exception as e:
        logging.info(f"Failed in build consumer: %s", e)
    
    while True:
        msgList = consumer.pull(topic="topic_require")
        for msg in msgList:
            messageListener(msg)
        logging.info("wait for next pull")
        time.sleep(5)

    


if __name__ == '__main__':
    # build logging
    logger = logging.getLogger(__name__)
    console_handler = logging.StreamHandler(sys.stdout)
    formatter = logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
    console_handler.setFormatter(formatter)
    logger.addHandler(console_handler)

    logging.info("start function")
    main()

Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa Dịch vụ tổ chức sự kiện 5 sao Thông tin về chúng tôi Dịch vụ sinh nhật bé trai Dịch vụ sinh nhật bé gái Sự kiện trọn gói Các tiết mục giải trí Dịch vụ bổ trợ Tiệc cưới sang trọng Dịch vụ khai trương Tư vấn tổ chức sự kiện Hình ảnh sự kiện Cập nhật tin tức Liên hệ ngay Thuê chú hề chuyên nghiệp Tiệc tất niên cho công ty Trang trí tiệc cuối năm Tiệc tất niên độc đáo Sinh nhật bé Hải Đăng Sinh nhật đáng yêu bé Khánh Vân Sinh nhật sang trọng Bích Ngân Tiệc sinh nhật bé Thanh Trang Dịch vụ ông già Noel Xiếc thú vui nhộn Biểu diễn xiếc quay đĩa Dịch vụ tổ chức tiệc uy tín Khám phá dịch vụ của chúng tôi Tiệc sinh nhật cho bé trai Trang trí tiệc cho bé gái Gói sự kiện chuyên nghiệp Chương trình giải trí hấp dẫn Dịch vụ hỗ trợ sự kiện Trang trí tiệc cưới đẹp Khởi đầu thành công với khai trương Chuyên gia tư vấn sự kiện Xem ảnh các sự kiện đẹp Tin mới về sự kiện Kết nối với đội ngũ chuyên gia Chú hề vui nhộn cho tiệc sinh nhật Ý tưởng tiệc cuối năm Tất niên độc đáo Trang trí tiệc hiện đại Tổ chức sinh nhật cho Hải Đăng Sinh nhật độc quyền Khánh Vân Phong cách tiệc Bích Ngân Trang trí tiệc bé Thanh Trang Thuê dịch vụ ông già Noel chuyên nghiệp Xem xiếc khỉ đặc sắc Xiếc quay đĩa thú vị
Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa
Thiết kế website Thiết kế website Thiết kế website Cách kháng tài khoản quảng cáo Mua bán Fanpage Facebook Dịch vụ SEO Tổ chức sinh nhật