GH-49258: [C++][Python] Add public APIs for reading and serializing IPC dictionary messages#49262
GH-49258: [C++][Python] Add public APIs for reading and serializing IPC dictionary messages#49262rustyconover wants to merge 1 commit intoapache:mainfrom
Conversation
|
Thanks for opening a pull request! If this is not a minor PR. Could you open an issue for this pull request on GitHub? https://github.com/apache/arrow/issues/new/choose Opening GitHub issues ahead of time contributes to the Openness of the Apache Arrow project. Then could you also rename the pull request title in the following format? or See also: |
|
|
…ary messages This adds low-level APIs for working with IPC dictionary messages outside of the stream/file reader/writer context, enabling message-at-a-time IPC workflows with dictionary-encoded data. C++ changes: - Add public ReadDictionary(Message, DictionaryMemo*, IpcReadOptions) to read a single dictionary message into a memo - Add CollectAndSerializeDictionaries(RecordBatch, DictionaryMemo*, IpcWriteOptions) to serialize dictionary messages with pointer-based deduplication - Expose dictionary_memo() accessor on RecordBatchStreamReader and RecordBatchFileReader - Refactor internal ReadDictionary to ReadDictionaryMessage in StreamDecoderInternal; make dictionary_memo_ protected Python changes: - Add ipc.read_dictionary_message() to populate a DictionaryMemo from a dictionary Message or Buffer - Add RecordBatch.serialize_dictionaries() to serialize dictionary IPC messages with memo-based deduplication - Add dictionary_memo property on RecordBatchStreamReader and RecordBatchFileReader - Add DictionaryMemo.wrap() for non-owning references to reader memos - Add read_dictionary_message to API docs - Comprehensive test coverage for all new APIs
749cac7 to
59ec9b1
Compare
They were successful on re-run. |
|
BTW, I opened an issue for one of the failures (the segfault one) as I've seen this happening on other PRs: |
raulcd
left a comment
There was a problem hiding this comment.
Hi @rustyconover I don't think it's a bug, I think it's a new feature you are asking for :)
I am not an expert on this area but, from my understanding you are trying to do something which is not the responsibility of the RecordBatch. RecordBatch serialize is a building block not the full serialization protocol.
RecordBatch.serialize serializes a record batch data message, Dictionary messages are a protocol-level concern handled by the stream/file IPC format.
Would it make more sense, to keep separation of concerns clear something like ipc.serialize_record_batch_with_dictionaries() instead of adding serialize_dictionaries to the RecordBatch?
What would be your expectation with DictionaryDeltas within your use case?
|
Hi @raulcd, Thanks for looking at this. My use case is "message-at-a-time" IPC over shared memory I have two processes (client and server) transferring Arrow data via shared memory + a pipe. Shared memory holds the IPC message bodies; the pipe conveys (offset, length) pairs telling the other side where to read. On my MacBook Air, pipe throughput is ~4 GB/s vs ~20 GB/s for shared memory, so the payoff is significant. The workflow looks roughly like this: # Producer
memo = pa.ipc.DictionaryMemo()
for batch in batches:
# Serialize any new dictionaries not yet in the memo
for dict_buf in batch.serialize_dictionaries(memo):
write_to_shm(dict_buf)
notify_via_pipe(offset, length)
# Serialize the record batch (indices only)
batch_buf = batch.serialize()
write_to_shm(batch_buf)
notify_via_pipe(offset, length)# Consumer
memo = pa.ipc.DictionaryMemo()
for offset, length in read_pipe():
msg = pa.ipc.read_message(shm[offset:offset+length])
if msg.type == 'dictionary':
pa.ipc.read_dictionary_message(msg, memo)
elif msg.type == 'record batch':
batch = pa.ipc.read_record_batch(msg, schema, memo)Why not In a streaming workflow, dictionaries are often written once and reused across many batches. A combined function would either re-serialize dictionaries unnecessarily or need the same DictionaryMemo tracking anyway. Keeping them separate gives the caller control over when dictionaries are emitted, which is exactly what the stream/file writers do internally. I'm just exposing that building block. That said, I'm not attached to the method living on RecordBatch. If you'd prefer it as a free function like ipc.serialize_dictionaries(batch, memo), I'm happy to move it — the important thing is that the memo-based deduplication is available to users. Happy to jump on a call if I can make this easier to understand or demo. Rusty |
|
Hi @rustyconover no need to jump in a call (even though happy to have a chat and a coffee). |
|
@raulcd, I'm a little nervous about delta dictionaries I need to read more into them. My anxiety comes from if a consumer keeps a record batch in memory, and then a delta is applied to its dictionary it references as a new record batch arrives. If that older record batch is used it would see the updated dictionary and potentially have incorrect values? Or are dictionary deltas append only? I just don't know about dictionary deltas enough to not be anxious. But I'll learn. |
This adds low-level APIs for working with IPC dictionary messages outside of the stream/file reader/writer context, enabling message-at-a-time IPC workflows with dictionary-encoded data. I'm trying to work with record batches that contain dictionaries with record batches being serialized to shared memory so I need additional methods to handle dictionary IPC messages.
I am addressing my issues from #49258.
C++ changes:
ReadDictionary(Message, DictionaryMemo*, IpcReadOptions)to read a single dictionary message into a memoCollectAndSerializeDictionaries(RecordBatch, DictionaryMemo*, IpcWriteOptions)to serialize dictionary messages with pointer-based deduplicationdictionary_memo()accessor onRecordBatchStreamReaderandRecordBatchFileReaderReadDictionarytoReadDictionaryMessageinStreamDecoderInternal; makedictionary_memo_protectedPython changes:
ipc.read_dictionary_message()to populate aDictionaryMemofrom a dictionaryMessageorBufferRecordBatch.serialize_dictionaries()to serialize dictionary IPC messages with memo-based deduplicationRecordBatchStreamReaderandRecordBatchFileReaderDictionaryMemo.wrap()for non-owning references to reader memosread_dictionary_messageto API docsAI Disclosure: I used Claude help me prepare this diff and PR. I will be responsible for all bugs, problems or inconsistencies.
Are there any user-facing changes?
Yes they are documented above in the Python changes.
This PR includes breaking changes to public APIs.
I'm making
dictionary_memo_onStreamDecoderInternal.