Qorus Integration Engine® Enterprise Edition 6.0.15_prod
Loading...
Searching...
No Matches
OMQ::DbRemoteReceive Class Reference

A class for inbound/reading data from the remote instance, I/O is handled in a background thread and is made available in the getData() and getAllData() methods. More...

Inheritance diagram for OMQ::DbRemoteReceive:
[legend]
Collaboration diagram for OMQ::DbRemoteReceive:
[legend]

Public Member Methods

 constructor (DbRemoteBase remote, string stream, string table_name, *hash< auto > options)
 constructor taking an OMQ::DbRemoteBase object for the remote connection More...
 
 constructor (DbRemoteBase remote, string table_name, *hash< auto > options)
 constructor taking an OMQ::DbRemoteBase object for the remote connection and assuming the "select" stream More...
 
 constructor (string remote, string datasource, string stream, string table_name, *hash< auto > options)
 constructor taking a string giving the name of the remote connection for the remote server More...
 
 constructor (string remote, string datasource, string table_name, *hash< auto > options)
 constructor taking a string giving the name of the remote connection for the remote server and assuming the "select" stream More...
 
 constructor (QorusSystemRestHelper remote, string datasource, string stream, string table_name, *hash< auto > options)
 constructor taking an OMQ::QorusSystemRestHelper object for the remote connection More...
 
 constructor (QorusSystemRestHelper remote, string datasource, string table_name, *hash< auto > options)
 constructor taking an OMQ::QorusSystemRestHelper object for the remote connection and assuming the "select" stream More...
 
 openStream (string stream, string table_name, *hash< auto > options)
 flushes the current stream and reopens a new stream with the same remote connection and in the same datasource More...
 
 commit ()
 Commit remote transaction. More...
 
 rollback (bool action=DO_DISCONNECT)
 Rollback remote transaction. More...
 
 disconnect ()
 Disconnects the connection.
 
*hash< auto > getData (*timeout timeout_ms)
 returns queued data as soon as it is available, if the timeout is omitted or equal or less than 0, no timeout is used and the call will block until data is available on the queue; once this method return NOTHING it signifies that the end of stream data condition has been reached; do not call this method again after it returns NOTHING More...
 
*hash< auto > getAllData (*timeout timeout_ms)
 returns all data recevied by the object in a single call, if the timeout is omitted or equal or less than 0, no timeout is used and the call will block until data is available on the queue More...
 
*list< auto > getDataRows (*timeout timeout_ms)
 returns queued data as a list<auto> of rows as soon as it is available, if the timeout is omitted or equal or less than 0, no timeout is used and the call will block until data is available on the queue; once this method return NOTHING it signifies that the end of stream data condition has been reached; do not call this method again after it returns NOTHING More...
 
*list< auto > getAllDataRows (*timeout timeout_ms)
 returns all data recevied by the object in a single call as a list<auto> of rows, if the timeout is omitted or equal or less than 0, no timeout is used and the call will block until data is available on the queue More...
 
OMQ::StreamConfig config ()
 returns the configuration object
 
- Public Member Methods inherited from OMQ::AbstractParallelReceiveStream
 destructor ()
 disconnects and aborts the I/O thread if it's still running
 
nothing cleanup ()
 called when the thread resource is still allocated and the thread exits or one of Qore::throw_thread_resource_exceptions() or Qore::throw_thread_resource_exceptions_to_mark() is called
 
abstract private throwAbortedExceptionImpl (string meth)
 throw a user-friendly exception about why the I/O thread was aborted and the connection forcibly closed and how to avoid such situations in the future
 

Private Member Methods

 constructor (*hash< auto > options, DbRemoteBase remote, string sql, *list< auto > args)
 Private constructor.
 
 constructor (*hash< auto > options, string remote, string datasource, string sql, *list< auto > args)
 Private constructor.
 
 constructor (*hash< auto > options, QorusSystemRestHelper remote, string datasource, string sql, *list< auto > args)
 Private constructor.
 
 throwAbortedExceptionImpl (string meth)
 throw a user-friendly exception about why the I/O thread was aborted and the connection forcibly closed and how to avoid such situations in the future
 
 startStreamImpl ()
 opens the remote transaction if necessary
 
 socketThreadImpl ()
 receive reimplementation
 
nothing recvDataImpl (auto rdata)
 An abstract method to handle incoming data. More...
 
nothing recvDataDoneImpl (*string err)
 called when all data has been received or the background I/O operation terminates due to an error
 
 checkComplete (string meth)
 checks if data is requested after the transfer is complete
 
- Private Member Methods inherited from OMQ::AbstractParallelReceiveStream
 abortInternMethod (string meth)
 if the I/O thread is running, then abort it, disconnect the connection, and throw a user-friendly exception what happened and how to avoid such situations in the future
 
 abortIntern (string meth, string fmt)
 if the I/O thread is running, then abort it, disconnect the connection, and throw a user-friendly exception what happened and how to avoid such situations in the future
 

Private Attributes

DbSelectStreamConfig m_config
 stream configuration
 
bool complete
 flag indicating that all data has been taken from the Queue
 

Detailed Description

A class for inbound/reading data from the remote instance, I/O is handled in a background thread and is made available in the getData() and getAllData() methods.

Transaction Management
The DbRemoteReceive object automatically starts or continues a remote transaction by including the "Qorus-Connection: Continue-Persistent" header when opening the stream if the "transaction" constructor() option is set to True. Remote transactions must be explicitly committed (by calling DbRemoteReceive::commit() for example) or aborted (by calling DbRemoteReceive::disconnect() for example); otherwise the remote transaction status is left unchanged by the DbRemoteReceive object.

To explicitly start a transaction before stream operations, see AbstractParallelStream::beginTransaction()
See also
Data Format
This class serializes data in column format, meaning a hash (with keys giving column names) of lists (giving the row values for each column). This is the most efficient data serialization technique from the point of view of the packet size and therefore of network performance. Additionally, this format is the native format to use with bulk DML on the local end.

Data in row format (lists of hashes) can be received by using the DbRemoteReceive::getDataRows() and DbRemoteReceive::getAllDataRows() methods.
Threaded I/O Implementation
This class creates a background thread that handles the socket I/O so that socket I/O can be executed in parallel to the main data handling thread (the thread that creates the object).

There is a Queue object that receives row data from the socket I/O thread; the default Queue size is 2 (representing a number of blocks), which allows two blocks of data to be queued before the DbRemoteReceive::getData() method will block.

If the Queue size is 2 and the block size is 1000 then at most 2000 rows will be queued for receiving before the I/O thread will block in the internal DbRemoteReceive::recvDataImpl() method.

This allows the main data handling thread to stay loosely in sync with the I/O thread so that memory usage is optimized and furthermore serves to avoid stalling the main data handling thread due to lack of data.
DbRemoteReceive Usage
The stream is set up in the constructor, which also starts the background socket I/O thread, which posts data on an internal queue. The data received is then made available in the getData(), getAllData(), getDataRows(), or getAllDataRows() methods.
Example:
DbRemoteReceive recv(qrest, "omquser", table_name, {"block": 2000});
while (*hash<auto> h = recv.getData()) {
processData(h);
}
Note
a remote transaction is only started in this object's constructor if the transaction option is set, otherwise the remote transaction status is left unchanged

Member Function Documentation

◆ commit()

OMQ::DbRemoteReceive::commit ( )

Commit remote transaction.

Note
exceptions thrown in the socket I/O thread will be rethrown when this method is called so that errors can be handled in the main data thread

◆ constructor() [1/6]

OMQ::DbRemoteReceive::constructor ( DbRemoteBase  remote,
string  stream,
string  table_name,
*hash< auto >  options 
)

constructor taking an OMQ::DbRemoteBase object for the remote connection

Example:
DbRemoteReceive recv(db, "select", table_name);
while (*hash<auto> h = recv.getData()) {
processData(h);
}
Parameters
remotean OMQ::DbRemoteBase object
streamthe system.sqlutil service stream to be used
table_namea string with remote table name located in the remote datasource
optionsoptional Streaming API Constructor Options as follows:
  • "block": data block size; the number of rows to retrieve from the remote database and send in each block
  • "block_queue_size": the number of blocks to queue for receiving before the I/O thread will block (default: 2); the total number of rows that can be queued = block_queue_size * block
  • "timeout": an HTTP socket timeout value in milliseconds; used locally and in the remote for socket I/O and queue operations; default value: 5m
  • "loglevel": a default log level option for logging; see Log Levels for valid value
  • "no_remote_timeout": if True the "timeout" option will not be sent to the remote
  • "select": select options for the source select statement
  • "transaction": start a remote transaction; if this option is not given, the remote transaction status is left unchanged
Note
  • a remote transaction is only started in this object's constructor if the transaction option is set, otherwise the remote transaction status is left unchanged
  • the forupdate select_option must be manually included in the "select" option hash to initiate a "select for update"; this is not automatically set by setting the "transaction" option
  • the explicit or default timeout value here overrides any socket I/O timeout set for the remote connection object

◆ constructor() [2/6]

OMQ::DbRemoteReceive::constructor ( DbRemoteBase  remote,
string  table_name,
*hash< auto >  options 
)

constructor taking an OMQ::DbRemoteBase object for the remote connection and assuming the "select" stream

Example:
DbRemoteReceive recv(db, table_name);
while (*hash<auto> h = recv.getData()) {
processData(h);
}
Parameters
remotean OMQ::DbRemoteBase object
table_namea string with remote table name located in the remote datasource
optionsoptional Streaming API Constructor Options as follows:
  • "block": data block size; the number of rows to retrieve from the remote database and send in each block
  • "block_queue_size": the number of blocks to queue for receiving before the I/O thread will block (default: 2); the total number of rows that can be queued = block_queue_size * block
  • "timeout": an HTTP socket timeout value in milliseconds; used locally and in the remote for socket I/O and queue operations; default value: 5m
  • "loglevel": a default log level option for logging; see Log Levels for valid value
  • "no_remote_timeout": if True the "timeout" option will not be sent to the remote
  • "select": select options for the source select statement
  • "transaction": start a remote transaction; if this option is not given, the remote transaction status is left unchanged
Note
  • this constructor always uses the system.sqlutil service stream: "select"
  • a remote transaction is only started in this object's constructor if the transaction option is set, otherwise the remote transaction status is left unchanged
  • the forupdate select_option must be manually included in the "select" option hash to initiate a "select for update"; this is not automatically set by setting the "transaction" option
  • the explicit or default timeout value here overrides any socket I/O timeout set for the remote connection object

◆ constructor() [3/6]

OMQ::DbRemoteReceive::constructor ( QorusSystemRestHelper  remote,
string  datasource,
string  stream,
string  table_name,
*hash< auto >  options 
)

constructor taking an OMQ::QorusSystemRestHelper object for the remote connection

Example:
DbRemoteReceive recv(qrest, datasource_name, "select", table_name);
while (*hash<auto> h = recv.getData())
processData(h);
Parameters
remotean OMQ::QorusSystemRestHelper object
datasourcea string with name of the remote datasource to use
streamthe system.sqlutil service stream to be used
table_namea string with remote table name located in the remote datasource
optionsoptional Streaming API Constructor Options as follows:
  • "block": data block size; the number of rows to retrieve from the remote database and send in each block
  • "block_queue_size": the number of blocks to queue for receiving before the I/O thread will block (default: 2); the total number of rows that can be queued = block_queue_size * block
  • "timeout": an HTTP socket timeout value in milliseconds; used locally and in the remote for socket I/O and queue operations; default value: 5m
  • "loglevel": a default log level option for logging; see Log Levels for valid value
  • "no_remote_timeout": if True the "timeout" option will not be sent to the remote
  • "select": select options for the source select statement
  • "transaction": start a remote transaction; if this option is not given, the remote transaction status is left unchanged
Note
  • a remote transaction is only started in this object's constructor if the transaction option is set, otherwise the remote transaction status is left unchanged
  • the forupdate select_option must be manually included in the "select" option hash to initiate a "select for update"; this is not automatically set by setting the "transaction" option
  • the explicit or default timeout value here overrides any socket I/O timeout set for the remote connection object

◆ constructor() [4/6]

OMQ::DbRemoteReceive::constructor ( QorusSystemRestHelper  remote,
string  datasource,
string  table_name,
*hash< auto >  options 
)

constructor taking an OMQ::QorusSystemRestHelper object for the remote connection and assuming the "select" stream

Example:
DbRemoteReceive recv(qrest, datasource_name, table_name);
while (*hash<auto> h = recv.getData()) {
processData(h);
}
Parameters
remotean OMQ::QorusSystemRestHelper object
datasourcea string with name of the remote datasource to use
table_namea string with remote table name located in the remote datasource
optionsoptional Streaming API Constructor Options as follows:
  • "block": data block size; the number of rows to retrieve from the remote database and send in each block
  • "block_queue_size": the number of blocks to queue for receiving before the I/O thread will block (default: 2); the total number of rows that can be queued = block_queue_size * block
  • "timeout": an HTTP socket timeout value in milliseconds; used locally and in the remote for socket I/O and queue operations; default value: 5m
  • "loglevel": a default log level option for logging; see Log Levels for valid value
  • "no_remote_timeout": if True the "timeout" option will not be sent to the remote
  • "select": select options for the source select statement
  • "transaction": start a remote transaction; if this option is not given, the remote transaction status is left unchanged
Note
  • this constructor always uses the system.sqlutil service stream: "select"
  • a remote transaction is only started in this object's constructor if the transaction option is set, otherwise the remote transaction status is left unchanged
  • the forupdate select_option must be manually included in the "select" option hash to initiate a "select for update"; this is not automatically set by setting the "transaction" option
  • the explicit or default timeout value here overrides any socket I/O timeout set for the remote connection object

◆ constructor() [5/6]

OMQ::DbRemoteReceive::constructor ( string  remote,
string  datasource,
string  stream,
string  table_name,
*hash< auto >  options 
)

constructor taking a string giving the name of the remote connection for the remote server

Example:
DbRemoteReceive recv(remote_name, datasource_name, "select", table_name);
while (*hash<auto> h = recv.getData())
processData(h);
Parameters
remotea string giving the name of the remote connection for the remote server
datasourcea string with name of the remote datasource to use
streamthe system.sqlutil service stream to be used
table_namea string with remote table name located in the remote datasource
optionsoptional Streaming API Constructor Options as follows:
  • "block": data block size; the number of rows to retrieve from the remote database and send in each block
  • "block_queue_size": the number of blocks to queue for receiving before the I/O thread will block (default: 2); the total number of rows that can be queued = block_queue_size * block
  • "loglevel": a default log level option for logging; see Log Levels for valid value
  • "no_remote_timeout": if True the "timeout" option will not be sent to the remote
  • "select": select options for the source select statement
  • "timeout": an HTTP socket timeout value in milliseconds; used locally and in the remote for socket I/O and queue operations; default value: 5m
  • "transaction": start a remote transaction; if this option is not given, the remote transaction status is left unchanged
Note
  • a remote transaction is only started in this object's constructor if the transaction option is set, otherwise the remote transaction status is left unchanged
  • the forupdate select_option must be manually included in the "select" option hash to initiate a "select for update"; this is not automatically set by setting the "transaction" option
  • the explicit or default timeout value here overrides any socket I/O timeout set for the remote connection object

◆ constructor() [6/6]

OMQ::DbRemoteReceive::constructor ( string  remote,
string  datasource,
string  table_name,
*hash< auto >  options 
)

constructor taking a string giving the name of the remote connection for the remote server and assuming the "select" stream

Example:
DbRemoteReceive recv(remote_name, datasource_name, table_name);
while (*hash<auto> h = recv.getData()) {
processData(h);
}
Parameters
remotea string giving the name of the remote connection for the remote server
datasourcea string with name of the remote datasource to use
table_namea string with remote table name located in the remote datasource
optionsoptional Streaming API Constructor Options as follows:
  • "block": data block size; the number of rows to retrieve from the remote database and send in each block
  • "block_queue_size": the number of blocks to queue for receiving before the I/O thread will block (default: 2); the total number of rows that can be queued = block_queue_size * block
  • "loglevel": a default log level option for logging; see Log Levels for valid value
  • "no_remote_timeout": if True the "timeout" option will not be sent to the remote
  • "select": select options for the source select statement
  • "timeout": an HTTP socket timeout value in milliseconds; used locally and in the remote for socket I/O and queue operations; default value: 5m
  • "transaction": start a remote transaction; if this option is not given, the remote transaction status is left unchanged
Note
  • this constructor always uses the system.sqlutil service stream: "select"
  • a remote transaction is only started in this object's constructor if the transaction option is set, otherwise the remote transaction status is left unchanged
  • the forupdate select_option must be manually included in the "select" option hash to initiate a "select for update"; this is not automatically set by setting the "transaction" option
  • the explicit or default timeout value here overrides any socket I/O timeout set for the remote connection object

◆ getAllData()

*hash< auto > OMQ::DbRemoteReceive::getAllData ( *timeout  timeout_ms)

returns all data recevied by the object in a single call, if the timeout is omitted or equal or less than 0, no timeout is used and the call will block until data is available on the queue

Examples:
DbRemoteReceive recv(qrest, "omquser", "select", table_name, ("block": 2000));
*hash<auto> h = recv.getAllData();
Parameters
timeout_msan override for the the timeout for waiting on the queue; integers are interpreted as milliseconds; relative date/time values are interpreted literally with a maximum resolution of milliseconds; if the timeout limit is exceeded before data is available, a QUEUE-TIMEOUT error is thrown. If no value is given here, the default queue timeout value configured for the object is used (see the "queue_timeout" option in the constructor())
Returns
a hash is returned where keys = columns, values = lists of row values, or NOTHING if no data was returned
Exceptions
DB-REMOTE-RECEIVE-ERRORthis exception is thrown if this method is called after it returns NOTHING signifying end of stream
QUEUE-TIMEOUTno data was posted to the queue in the timeout period
STREAM-TERMINATEDif the I/O thread was terminated prematurely, this exception will be thrown
Note
  • exceptions thrown in the socket I/O thread will be rethrown when this method is called so that errors can be handled in the main data thread
  • any exception thrown in this method (I/O error or timeout error) will cause all data to be lost; when this method returns (either normally or due to an exception), the data complete flag is set and no more data can be retrieved from the object

◆ getAllDataRows()

*list< auto > OMQ::DbRemoteReceive::getAllDataRows ( *timeout  timeout_ms)

returns all data recevied by the object in a single call as a list<auto> of rows, if the timeout is omitted or equal or less than 0, no timeout is used and the call will block until data is available on the queue

Examples:
DbRemoteReceive recv(qrest, "omquser", "select", table_name, ("block": 2000));
*list<auto> l = recv.getAllDataRows();
Parameters
timeout_msan override for the the timeout for waiting on the queue; integers are interpreted as milliseconds; relative date/time values are interpreted literally with a maximum resolution of milliseconds; if the timeout limit is exceeded before data is available, a QUEUE-TIMEOUT error is thrown. If no value is given here, the default queue timeout value configured for the object is used (see the "queue_timeout" option in the constructor())
Returns
a list<auto> of rows or NOTHING if no data was returned
Exceptions
DB-REMOTE-RECEIVE-ERRORthis exception is thrown if this method is called after it returns NOTHING signifying end of stream
QUEUE-TIMEOUTno data was posted to the queue in the timeout period
STREAM-TERMINATEDif the I/O thread was terminated prematurely, this exception will be thrown
Note
  • exceptions thrown in the socket I/O thread will be rethrown when this method is called so that errors can be handled in the main data thread
  • any exception thrown in this method (I/O error or timeout error) will cause all data to be lost; when this method returns (either normally or due to an exception), the data complete flag is set and no more data can be retrieved from the object
  • the native column format is converted to row format for the return value of this method

◆ getData()

*hash< auto > OMQ::DbRemoteReceive::getData ( *timeout  timeout_ms)

returns queued data as soon as it is available, if the timeout is omitted or equal or less than 0, no timeout is used and the call will block until data is available on the queue; once this method return NOTHING it signifies that the end of stream data condition has been reached; do not call this method again after it returns NOTHING

Examples:
DbRemoteReceive recv(qrest, "omquser", "select", table_name, ("block": 2000));
while (*hash<auto> h = recv.getData())
processData(h);
Parameters
timeout_msan override for the the timeout for waiting on the queue; integers are interpreted as milliseconds; relative date/time values are interpreted literally with a maximum resolution of milliseconds; if the timeout limit is exceeded before data is available, a QUEUE-TIMEOUT error is thrown. If no value is given here, the default queue timeout value configured for the object is used (see the "queue_timeout" option in the constructor())
Returns
a hash of lists: keys = columns, values = lists of row values or NOTHING if all data has been received (do not call this method again after it returns NOTHING)
Exceptions
DB-REMOTE-RECEIVE-ERRORthis exception is thrown if this method is called after it returns NOTHING signifying end of stream
QUEUE-TIMEOUTthis exception is thrown if a timeout occurs on the Queue
STREAM-TERMINATEDif the I/O thread was terminated prematurely, this exception will be thrown
Note
exceptions thrown in the socket I/O thread will be rethrown when this method is called so that errors can be handled in the main data thread

◆ getDataRows()

*list< auto > OMQ::DbRemoteReceive::getDataRows ( *timeout  timeout_ms)

returns queued data as a list<auto> of rows as soon as it is available, if the timeout is omitted or equal or less than 0, no timeout is used and the call will block until data is available on the queue; once this method return NOTHING it signifies that the end of stream data condition has been reached; do not call this method again after it returns NOTHING

Examples:
DbRemoteReceive recv(qrest, "omquser", "select", table_name, ("block": 2000));
while (*list<auto> l = recv.getDataRows())
map processDataRow($1), l;
Parameters
timeout_msan override for the the timeout for waiting on the queue; integers are interpreted as milliseconds; relative date/time values are interpreted literally with a maximum resolution of milliseconds; if the timeout limit is exceeded before data is available, a QUEUE-TIMEOUT error is thrown. If no value is given here, the default queue timeout value configured for the object is used (see the "queue_timeout" option in the constructor())
Returns
a list<auto> of row data or NOTHING if all data has been received (do not call this method again after it returns NOTHING)
Exceptions
DB-REMOTE-RECEIVE-ERRORthis exception is thrown if this method is called after it returns NOTHING signifying end of stream
QUEUE-TIMEOUTthis exception is thrown if a timeout occurs on the Queue
STREAM-TERMINATEDif the I/O thread was terminated prematurely, this exception will be thrown
Note
  • exceptions thrown in the socket I/O thread will be rethrown when this method is called so that errors can be handled in the main data thread
  • the native column format is converted to row format for the return value of this method

◆ openStream()

OMQ::DbRemoteReceive::openStream ( string  stream,
string  table_name,
*hash< auto >  options 
)

flushes the current stream and reopens a new stream with the same remote connection and in the same datasource

Example:
out.openStream("update", "table2");
Parameters
streamthe system.sqlutil service stream to be used
table_namea string with remote table name located in the remote datasource
optionsoptional Streaming API Constructor Options as follows:
  • "timeout": an HTTP socket timeout value in milliseconds; used locally and in the remote for socket I/O and queue operations; default value: 5m
  • "loglevel": a default log level option for logging; see Log Levels for valid value
  • "block": data block size
  • "no_remote_timeout": if True the "timeout" option will not be sent to the remote
  • "queue_block_size": the number of blocks to queue for sending before the main data thread will block (default: 2)
  • "queue_timeout": the number of milliseconds to wait for queue data before throwing a QUEUE-TIMEOUT exception
Note
the explicit or default timeout value here overrides any socket I/O timeout set for the remote connection object

◆ recvDataImpl()

nothing OMQ::DbRemoteReceive::recvDataImpl ( auto  rdata)
privatevirtual

An abstract method to handle incoming data.

Parameters
rdatadata arrive in hash format: keys = columns, values = lists of row values

Implements DataStreamClient::DataStreamRecvMessage.

◆ rollback()

OMQ::DbRemoteReceive::rollback ( bool  action = DO_DISCONNECT)

Rollback remote transaction.

Note
  • it's normally better to disconnect the connection if an error occurs rather than call rollback() when streaming because if a chunked transfer is interrupted, then HTTP calls will fail anyway, and the remote end will rollback the transaction in any case unless an explicit commit is executed
Parameters
actionan optional bool value to keep connection open or close the connection. Connection closing is the default
See also
DO_DISCONNECT
DONT_DISCONNECT
disconnect()

The documentation for this class was generated from the following file: