graphlearn_torch.channel

base

class ChannelBase[source]

Bases: ABC

A base class that initializes a channel for sample messages and provides send() and recv() routines.

abstract send(msg: Dict[str, Tensor], **kwargs)[source]
Send a sample message into channel, the implemented channel should

porcess this message data properly.

Parameters:

msg – The sample message to send.

abstract recv(**kwargs) Dict[str, Tensor][source]

Recv a sample message from channel.

mp_channel

class MpChannel(**kwargs)[source]

Bases: ChannelBase

A simple multiprocessing channel using torch.multiprocessing.Queue.

Parameters:

torch.multiprocessing.Queue. (The input arguments should be consistent with) –

send(msg: Dict[str, Tensor], **kwargs)[source]
Send a sample message into channel, the implemented channel should

porcess this message data properly.

Parameters:

msg – The sample message to send.

recv(**kwargs) Dict[str, Tensor][source]

Recv a sample message from channel.

remote_channel

class RemoteReceivingChannel(server_rank: int, producer_id: int, num_expected: int, prefetch_size: int = 4)[source]

Bases: ChannelBase

A pull-based receiving channel that can fetch sampled messages from remote sampling servers.

Parameters:
  • server_rank (int) – The rank of target server to fetch sampled messages.

  • producer_id (int) – The sequence id of created sampling producer on the target server.

  • num_expected (int) – The number of expected sampled messages at one epoch.

  • prefetch_size (int) – The number of messages to prefetch. (Default 4).

reset()[source]

Reset all states to start a new epoch consuming.

send(msg: Dict[str, Tensor], **kwargs)[source]
Send a sample message into channel, the implemented channel should

porcess this message data properly.

Parameters:

msg – The sample message to send.

recv(**kwargs) Dict[str, Tensor][source]

Recv a sample message from channel.

shm_channel

class ShmChannel(capacity: int = 128, shm_size: str | int = '256MB')[source]

Bases: ChannelBase

A communication channel for sample messages based on a shared-memory

queue, which is implemented in the underlying c++ lib.

Note that the underlying shared-memory buffer of this channel is pinnable,

which will achieve better performance when the consumer needs to copy data from channel to gpu device.

Parameters:
  • capacity – The max bufferd number of sample messages in channel.

  • shm_size – The allocated size (bytes) for underlying shared-memory.

When the producer send sample message to the channel, it will be limited by

both capacity and shm_size. E.g, if current number of buffered messages in channel reaches the capacity limit, or current used buffer memory reaches the shm_size limit, the current send operation will be blocked until some messages in channel are consumed and related resource are released.

pin_memory()[source]

Pin underlying shared-memory.

send(msg: Dict[str, Tensor], **kwargs)[source]
Send a sample message into channel, the implemented channel should

porcess this message data properly.

Parameters:

msg – The sample message to send.

recv(**kwargs) Dict[str, Tensor][source]

Recv a sample message from channel.