API Reference
DataHub
- class datahub.models.compress.CompressFormat(value)[source]
CompressFormat enum class, there are:
NONE
,LZ4
,ZLIB
,DEFLATE
- class datahub.DataHub(access_id, access_key, endpoint=None, compress_format=CompressFormat.LZ4, **kwargs)[source]
Main entrance to DataHub.
Convenient operations on DataHub objects are provided. Please refer to DataHub docs to see the details.
Generally, basic operations such as
create
,list
,delete
,update
are provided for each DataHub object. Take theproject
as an example.To create an DataHub instance, access_id and access_key is required, and should ensure correctness, or
SignatureNotMatch
error will throw.- Parameters:
access_id – Aliyun Access ID
secret_access_key – Aliyun Access Key
endpoint – Rest service URL
enable_pb – enable protobuf when put/get records, default value is False in version <= 2.11, default value will be True in version >= 2.12
protocol_type (
datahub.core.DatahubProtocolType
) – protocol type. It is recommended to use this param, ‘enable_pb’ will no longer be supported in future versionscompress_format (
datahub.models.compress.CompressFormat
) – compress format, default value is NONE.enable_schema_register – enable schema register, only support in batch. default value is True in batch
- Example:
>>> datahub = DataHub('**your access id**', '**your access key**', '**endpoint**') >>> datahub_pb = DataHub('**your access id**', '**your access key**', '**endpoint**', protocol_type=DatahubProtocolType.PB) >>> datahub_batch = DataHub('**your access id**', '**your access key**', '**endpoint**', protocol_type=DatahubProtocolType.BATCH, enable_schema_register=True) >>> datahub_lz4 = DataHub('**your access id**', '**your access key**', '**endpoint**', compress_format=CompressFormat.LZ4) >>> >>> project_result = datahub.get_project('datahub_test') >>> >>> print(project_result is None) >>>
- append_connector_field(project_name, topic_name, connector_id, field_name)[source]
Append field to a connector
- Parameters:
project_name – project name
topic_name – topic name
connector_id (
str
ordatahub.models.ConnectorType
) – connector id, compatible for connector typefield_name – field name
- Returns:
none
- Raise:
datahub.exceptions.ResourceNotFoundException
if the project, topic or connector not exists- Raise:
datahub.exceptions.InvalidParameterException
if project_name, topic_name or filed_name is empty; connector_type is wrong type
- append_field(project_name, topic_name, field_name, field_type)[source]
Append field to a tuple topic
- Parameters:
project_name – project name
topic_name – topic name
field_name – field name
field_type (
datahub.models.FieldType
) – field type
- Returns:
none
- Raise:
datahub.exceptions.InvalidParameterException
if project_name, topic_name or field_name is empty; field_type is wrong type
- create_blob_topic(project_name, topic_name, shard_count, life_cycle, comment, extend_mode=None)[source]
Create blob topic
- Parameters:
project_name – project name
topic_name – topic name
shard_count – shard count
life_cycle – life cycle
comment – comment
extend_mode – use expansion method to increase shard
- Returns:
none
- Raise:
datahub.exceptions.InvalidParameterException
if project_name is empty; topic_name is not valid; life_cycle is not positive; record_schema is wrong type- Raise:
datahub.exceptions.ResourceNotFoundException
if project not existed- Raise:
datahub.exceptions.ResourceExistException
if topic is already existed
- create_connector(project_name, topic_name, connector_type, column_fields, config, start_time=-1)[source]
Create a data connector
- Parameters:
project_name – project name
topic_name – topic name
connector_type (
datahub.models.ConnectorType
) – connector typecolumn_fields (
list
) – column fieldsconfig (
datahub.models.ConnectorConfig
) – connector configstart_time (
int
) – start timestamp in milliseconds
- Returns:
connector id
- Return type:
datahub.models.CreateConnectorResult
- Raise:
datahub.exceptions.ResourceExistException
if connector is already existed- Raise:
datahub.exceptions.ResourceNotFoundException
if the project or topic not exists- Raise:
datahub.exceptions.InvalidParameterException
if the column field or config is invalid; project_name or topic_name is empty; connector_type or config is wrong type
- create_project(project_name, comment)[source]
Create a new project by given name and comment
- Parameters:
project_name – project name
comment – description of project
- Returns:
none
- Raise:
datahub.exceptions.InvalidParameterException
if project_name is not valid- Raise:
datahub.exceptions.ResourceExistException
if project is already existed
- create_subscription(project_name, topic_name, comment)[source]
Create subscription to a topic
- Parameters:
project_name – project name
topic_name – topic name
comment – comment for subscription
- Returns:
create result contains subscription id
- Return type:
- Raise:
datahub.exceptions.ResourceNotFoundException
if the project or topic not exists- Raise:
datahub.exceptions.LimitExceededException
if limit of subscription number is exceeded- Raise:
datahub.exceptions.InvalidParameterException
if project_name or topic_name is empty
- create_tuple_topic(project_name, topic_name, shard_count, life_cycle, record_schema, comment, extend_mode=None)[source]
Create tuple topic
- Parameters:
project_name – project name
topic_name – topic name
shard_count – shard count
life_cycle – life cycle
record_schema (
datahub.models.RecordSchema
) – record schema for tuple record typecomment – comment
extend_mode – use expansion method to increase shard
- Returns:
none
- Raise:
datahub.exceptions.InvalidParameterException
if project_name is empty; topic_name is not valid; life_cycle is not positive; record_schema is wrong type- Raise:
datahub.exceptions.ResourceNotFoundException
if project not existed- Raise:
datahub.exceptions.ResourceExistException
if topic is already existed
- delete_connector(project_name, topic_name, connector_id)[source]
Delete a data connector
- Parameters:
project_name – project name
topic_name – topic name
connector_id (
str
ordatahub.models.ConnectorType
) – connector id, compatible for connector type
- Returns:
none
- Raise:
datahub.exceptions.ResourceNotFoundException
if the project, topic or connector not exists- Raise:
datahub.exceptions.InvalidParameterException
if project_name or topic_name is empty; connector_type is wrong type
- delete_project(project_name)[source]
Delete the project by given name
- Parameters:
project_name – project name
- Returns:
none
- Raise:
datahub.exceptions.InvalidParameterException
if project_name is empty
- delete_subscription(project_name, topic_name, sub_id)[source]
Delete subscription by subscription id
- Parameters:
project_name – project name
topic_name – topic name
sub_id – subscription id
- Returns:
none
- Raise:
datahub.exceptions.ResourceNotFoundException
if the project, topic or subscription not exists- Raise:
datahub.exceptions.InvalidParameterException
if project_name, topic_name or sub_id is empty
- delete_topic(project_name, topic_name)[source]
Delete a topic
- Parameters:
topic_name – topic name
project_name – project name
- Returns:
none
- Raise:
datahub.exceptions.ResourceNotFoundException
if the project or topic not exists- Raise:
datahub.exceptions.InvalidParameterException
if the project name or topic name is empty
- delete_topic_schema(project_name, topic_name, version_id)[source]
Delete the special schema by version id of a topic
- Parameters:
project_name – project name
topic_name – topic name
version_id – version id
- Returns:
none
- Raise:
datahub.exceptions.ResourceNotFoundException
if the project or topic not exists- Raise:
datahub.exceptions.InvalidParameterException
if project_name or topic_name is empty
- extend_shard(project_name, topic_name, shard_count)[source]
Extend shard
- Parameters:
project_name – project name
topic_name – topic name
shard_count – shard count extend to
- Returns:
none
- get_blob_records(project_name, topic_name, shard_id, cursor, limit_num=0, sub_id=None)[source]
Get records from a topic
- Parameters:
project_name – project name
topic_name – topic name
shard_id – shard id
cursor – the cursor
limit_num – record number need to read
- Returns:
result include record list, start sequence, record num and next cursor
- Return type:
datahub.models.GetRecordsResult
- Raise:
datahub.exceptions.ResourceNotFoundException
if the project or topic or shard not exists- Raise:
datahub.exceptions.InvalidParameterException
if the cursor is invalid; project_name, topic_name, shard_id, or cursor is empty- Raise:
datahub.exceptions.DatahubException
if crc is wrong in pb mode
- get_connector(project_name, topic_name, connector_id)[source]
Get a data connector
- Parameters:
project_name – project name
topic_name – topic name
connector_id (
str
ordatahub.models.ConnectorType
) – connector id, compatible for connector type
- Returns:
data connector info
- Return type:
datahub.models.GetConnectorResult
- Raise:
datahub.exceptions.ResourceNotFoundException
if the project, topic or connector not exists- Raise:
datahub.exceptions.InvalidParameterException
if project_name or topic_name is empty; connector_type or config is wrong type
- get_connector_done_time(project_name, topic_name, connector_id)[source]
Get connector done time
- Parameters:
project_name – project name
topic_name – topic name
connector_id (
str
ordatahub.models.ConnectorType
) – connector id, compatible for connector type
- get_connector_shard_status(project_name, topic_name, connector_id, shard_id='')[source]
Get data connector shard status
- Parameters:
project_name – project name
topic_name – topic name
connector_id (
str
ordatahub.models.ConnectorType
) – connector id, compatible for connector typeshard_id – shard id
- Returns:
data connector shard status
- Return type:
- Raise:
datahub.exceptions.ResourceNotFoundException
if the project, topic, shard or connector not exists- Raise:
datahub.exceptions.InvalidParameterException
if project_name, topic_name or shard_id is empty; connector_type is wrong type
- get_cursor(project_name, topic_name, shard_id, cursor_type, param=-1)[source]
Get cursor. When you invoke get_blob_records/get_tuple_records, you must be invoke it to get a cursor first
- Parameters:
project_name – project name
topic_name – topic name
shard_id – shard id
cursor_type (
datahub.models.CursorType
) – cursor typeparam – param is system time if cursor_type == CursorType.SYSTEM_TIME while sequence if cursor_type==CursorType.SEQUENCE
- Returns:
cursor info
- Return type:
datahub.models.GetCursorResult
- Raise:
datahub.exceptions.ResourceNotFoundException
if the project, topic or shard not exists- Raise:
datahub.exceptions.InvalidParameterException
if the param is invalid; project_name, topic_name or shard_id is empty; cursor_type is wrong type; param is missing
- get_metering_info(project_name, topic_name, shard_id)[source]
Get a shard metering info
- Parameters:
project_name – project name
topic_name – topic name
shard_id – shard id
- Returns:
the shard metering info
- Return type:
datahub.models.GetMeteringInfoResult
- Raise:
datahub.exceptions.ResourceNotFoundException
if the project, topic or shard not exists- Raise:
datahub.exceptions.InvalidParameterException
if the project_name, topic_name or shard_id is empty
- get_project(project_name)[source]
Get a project by given name
- Parameters:
project_name – project name
- Returns:
the right project
- Return type:
datahub.models.GetProjectResult
- Raise:
datahub.exceptions.ResourceNotFoundException
if project not exists- Raise:
datahub.exceptions.InvalidParameterException
if project_name is empty
- get_subscription(project_name, topic_name, sub_id)[source]
Get subscription
- Parameters:
project_name – project name
topic_name – topic name
sub_id – subscription id
- Returns:
subscription info
- Return type:
- Raise:
datahub.exceptions.ResourceNotFoundException
if the project, topic or subscription not exists- Raise:
datahub.exceptions.InvalidParameterException
if project_name, topic_name or sub_id is empty
- get_subscription_offset(project_name, topic_name, sub_id, shard_ids=None)[source]
Get subscription offset
- Parameters:
project_name – project name
topic_name – topic name
sub_id – subscription id
shard_ids – shard ids
- Returns:
offset info
- Return type:
- Raise:
datahub.exceptions.ResourceNotFoundException
if the project, topic or subscription not exists- Raise:
datahub.exceptions.InvalidParameterException
if project_name, topic_name or sub_id is empty
- get_topic(project_name, topic_name)[source]
Get a topic
- Parameters:
topic_name – topic name
project_name – project name
- Returns:
topic info
- Return type:
datahub.models.GetTopicResult
- Raise:
datahub.exceptions.ResourceNotFoundException
if the project or topic not exists- Raise:
datahub.exceptions.InvalidParameterException
if the project name or topic name is empty
- get_topic_schema(project_name, topic_name, schema=None, version_id=-1)[source]
Get the special schema or version id of a topic
- Parameters:
project_name – project name
topic_name – topic name
schema – schema
version_id – version id
- Returns:
schema by version id or version id by schema
- Raise:
datahub.exceptions.ResourceNotFoundException
if the project or topic not exists- Raise:
datahub.exceptions.InvalidParameterException
if project_name or topic_name is empty;
- get_tuple_records(project_name, topic_name, shard_id, record_schema=None, cursor='', limit_num=0, sub_id=None)[source]
Get records from a topic
- Parameters:
project_name – project name
topic_name – topic name
shard_id – shard id
record_schema (
datahub.models.RecordSchema
) – tuple record schemafilter – filter
cursor – the cursor
limit_num – record number need to read
- Returns:
result include record list, start sequence, record num and next cursor
- Return type:
datahub.models.GetRecordsResult
- Raise:
datahub.exceptions.ResourceNotFoundException
if the project or topic or shard not exists- Raise:
datahub.exceptions.InvalidParameterException
if the cursor is invalid; project_name, topic_name, shard_id, or cursor is empty- Raise:
datahub.exceptions.DatahubException
if crc is wrong in pb mode
- heart_beat(project_name, topic_name, consumer_group, consumer_id, version_id, hold_shard_list, read_end_shard_list)[source]
Construct heartbeat with server that server know consumer status
- Parameters:
project_name – project name
topic_name – topic name
consumer_group – consumer group use sub_id you want to join
consumer_id – consumer id obtained at the time of join group
version_id – offset version id obtained at the time of join group
hold_shard_list – the shard list held by consumer
read_end_shard_list – the shard list have been read finished
- Returns:
heartbeat result contains version id, shard list and total plan
- Raise:
datahub.exceptions.ResourceNotFoundException
if the project, topic or consumer_group not exists- Raise:
datahub.exceptions.InvalidParameterException
if project_name, topic_name or consumer_group is empty; hold_shard_list is none
- init_and_get_subscription_offset(project_name, topic_name, sub_id, shard_ids)[source]
Open subscription offset session
- Parameters:
project_name – project name
topic_name – topic name
sub_id – subscription id
shard_ids – shard ids
- Returns:
offset info
:rtype
datahub.models.InitAndGetSubscriptionOffsetResult
:raise:datahub.exceptions.ResourceNotFoundException
if the project, topic or subscription not exists :raise:datahub.exceptions.InvalidParameterException
if project_name, topic_name, sub_id or shard_id is empty
- join_group(project_name, topic_name, consumer_group, session_timeout)[source]
Join a consumer group
- Parameters:
project_name – project name
topic_name – topic name
consumer_group – consumer group use sub_id you want to join
session_timeout – session timeout
- Returns:
consumer id and version id
- Raise:
datahub.exceptions.ResourceNotFoundException
if the project, topic or consumer_group not exists- Raise:
datahub.exceptions.InvalidParameterException
if project_name, topic_name or consumer_group is empty
- leave_group(project_name, topic_name, consumer_group, consumer_id, version_id)[source]
Leave consumer group info
- Parameters:
project_name – project name
topic_name – topic name
consumer_group – consumer group use sub_id you want to join
consumer_id – consumer id obtained at the time of join group
version_id – offset version id obtained at the time of join group
- Returns:
none
- Raise:
datahub.exceptions.ResourceNotFoundException
if the project, topic or consumer_group not exists- Raise:
datahub.exceptions.InvalidParameterException
if project_name, topic_name or consumer_group is empty
- list_connector(project_name, topic_name)[source]
Create a data connector
- Parameters:
project_name – project name
topic_name – topic name
- Returns:
data connector names list
- Return type:
datahub.models.ListConnectorResult
- Raise:
datahub.exceptions.InvalidParameterException
if the project_name or topic_name is empty
- list_shard(project_name, topic_name)[source]
List all shards of a topic
- Parameters:
project_name – project name
topic_name – topic name
- Returns:
shards info
- Return type:
datahub.models.ListTopicResult
- Raise:
datahub.exceptions.ResourceNotFoundException
if the project or topic not exists- Raise:
datahub.exceptions.InvalidParameterException
if the project name or topic name is empty
- list_subscription(project_name, topic_name, query_key, page_index, page_size)[source]
Query subscription in range [start, end)
start = (page_index - 1) * page_size + 1
end = start + page_size
- Parameters:
project_name – project name
topic_name – topic name
query_key – query key for search
page_index – page index
page_size – page size
- Returns:
subscription info list
- Return type:
- Raise:
datahub.exceptions.InvalidParameterException
if project_name or topic_name is empty; page_index <= 0 or page_size < 0
- list_topic(project_name)[source]
Get all topics of a project
- Parameters:
project_name – project name
- Returns:
all topics of the project
- Return type:
datahub.models.ListTopicResult
- Raise:
datahub.exceptions.ResourceNotFoundException
if the project not exists- Raise:
datahub.exceptions.InvalidParameterException
if project_name is empty
- list_topic_schema(project_name, topic_name, page_number=-1, page_size=-1)[source]
List all schema of a topic
- Parameters:
project_name – project_name
topic_name – topic_name
page_number – page number
page_size – page size
- Returns:
all schema info of a topic
- Raise:
datahub.exceptions.ResourceNotFoundException
if the project or topic not exists- Raise:
datahub.exceptions.InvalidParameterException
if project_name or topic_name is empty
- merge_shard(project_name, topic_name, shard_id, adj_shard_id)[source]
Merge shards
- Parameters:
project_name – project name
topic_name – topic name
shard_id – shard id
adj_shard_id – adjacent shard id
- Returns:
shards info after merged
- Return type:
datahub.models.MergeShardResult
- Raise:
datahub.exceptions.ResourceNotFoundException
if the project, topic or shard not exists- Raise:
datahub.exceptions.InvalidOperationException
if the shard is not active- Raise:
datahub.exceptions.InvalidParameterException
if the shards not adjacent; project name, topic name, shard id or adjacent shard id is empty- Raise:
datahub.exceptions.LimitExceededException
if merge shard operation limit exceeded
- put_records(project_name, topic_name, record_list)[source]
Put records to a topic
- Parameters:
project_name – project name
topic_name – topic name
record_list (
list
) – record list
- Returns:
failed records info
- Return type:
datahub.models.PutRecordsResult
- Raise:
datahub.exceptions.ResourceNotFoundException
if the project or topic not exists- Raise:
datahub.exceptions.InvalidParameterException
if the record is not well-formed; project_name or topic_name is empty- Raise:
datahub.exceptions.InvalidOperationException
if the shard is not active- Raise:
datahub.exceptions.LimitExceededException
if query rate or throughput rate limit exceeded- Raise:
datahub.exceptions.DatahubException
if crc is wrong in pb mode
- put_records_by_shard(project_name, topic_name, shard_id, record_list)[source]
Put records to specific shard of topic
- Parameters:
project_name – project name
topic_name – topic name
shard_id – shard id
record_list (
list
) – record list
- Returns:
failed records info
- Return type:
datahub.models.PutRecordsResult
- Raise:
datahub.exceptions.ResourceNotFoundException
if the project or topic not exists- Raise:
datahub.exceptions.InvalidParameterException
if the record is not well-formed; project_name, topic_name or shard_id is empty- Raise:
datahub.exceptions.InvalidOperationException
if the shard is not active- Raise:
datahub.exceptions.LimitExceededException
if query rate or throughput rate limit exceeded- Raise:
datahub.exceptions.DatahubException
if crc is wrong in pb mode
- register_topic_schema(project_name, topic_name, schema)[source]
Register schema in a topic
- Parameters:
project_name – project name
topic_name – topic name
schema – schema
- Returns:
version id
- Raise:
datahub.exceptions.ResourceNotFoundException
if the project or topic not exists- Raise:
datahub.exceptions.InvalidParameterException
if project_name or topic_name is empty
- reload_connector(project_name, topic_name, connector_id, shard_id='')[source]
Reload data connector by given shard id or reload all shards without any shard id given
- Parameters:
project_name – project name
topic_name – topic name
connector_id (
str
ordatahub.models.ConnectorType
) – connector id, compatible for connector typeshard_id – shard id
- Returns:
none
- Raise:
datahub.exceptions.ResourceNotFoundException
if the project, topic or connector not exists- Raise:
datahub.exceptions.InvalidParameterException
if project_name, topic_name or filed_name is empty; connector_type is wrong type
- reset_subscription_offset(project_name, topic_name, sub_id, offsets)[source]
Update subscription offset
- Parameters:
project_name – project name
topic_name – topic name
sub_id – subscription id
offsets – offsets
- Type:
dict
- Returns:
none
- Raise:
datahub.exceptions.ResourceNotFoundException
if the project, topic or subscription not exists- Raise:
datahub.exceptions.InvalidParameterException
if project_name, topic_name or sub_id is empty; offsets is wrong type
- split_shard(project_name, topic_name, shard_id, split_key='')[source]
Split shard
- Parameters:
project_name – project name
topic_name – topic name
shard_id – split shard id
split_key – split key, if not given, choose the median
- Returns:
shards info after split
- Return type:
datahub.models.SplitShardResult
- Raise:
datahub.exceptions.ResourceNotFoundException
if the project, topic or shard not exists- Raise:
datahub.exceptions.InvalidOperationException
if the shard is not active- Raise:
datahub.exceptions.InvalidParameterException
if the key range is invalid; project name, topic name or shard id is empty- Raise:
datahub.exceptions.LimitExceededException
if split shard operation limit exceeded
- sync_group(project_name, topic_name, consumer_group, consumer_id, version_id, release_shard_list, read_end_shard_list)[source]
Sync consumer group info
- Parameters:
project_name – project name
topic_name – topic name
consumer_group – consumer group use sub_id you want to join
consumer_id – consumer id obtained at the time of join group
version_id – offset version id obtained at the time of join group
release_shard_list – the shard list to release
read_end_shard_list – the shard list have been read finished
- Returns:
none
- Raise:
datahub.exceptions.ResourceNotFoundException
if the project, topic or consumer_group not exists- Raise:
datahub.exceptions.InvalidParameterException
if project_name, topic_name or consumer_group is empty
- update_connector(project_name, topic_name, connector_id, config)[source]
- Parameters:
project_name – project name
topic_name – topic name
connector_id (
str
ordatahub.models.ConnectorType
) – connector id, compatible for connector typeconfig (
datahub.models.ConnectorConfig
) – connector config
- Returns:
none
- update_connector_offset(project_name, topic_name, connector_id, shard_id, connector_offset)[source]
Update data connector offset
- Parameters:
project_name – project name
topic_name – topic name
connector_id (
str
ordatahub.models.ConnectorType
) – connector id, compatible for connector typeshard_id – shard id
connector_offset (
datahub.models.ConnectorOffset
) – current sequence
- Returns:
none
- Raise:
datahub.exceptions.ResourceNotFoundException
if the project, topic or connector not exists- Raise:
datahub.exceptions.InvalidParameterException
if project_name or topic_name is empty; connector_type or connector_state is wrong type
- update_connector_state(project_name, topic_name, connector_id, connector_state)[source]
Update data connector state
- Parameters:
project_name – project name
topic_name – topic name
connector_id (
str
ordatahub.models.ConnectorType
) – connector id, compatible for connector typeconnector_state (
datahub.models.ConnectorState
) – connector state
- Returns:
none
- Raise:
datahub.exceptions.ResourceNotFoundException
if the project, topic or connector not exists- Raise:
datahub.exceptions.InvalidParameterException
if project_name or topic_name is empty; connector_type or connector_state is wrong type
- update_project(project_name, comment)[source]
Update project comment
- Parameters:
project_name – project name
comment – new description of project
- Returns:
none
- Raise:
datahub.exceptions.ResourceNotFoundException
if project not exists- Raise:
datahub.exceptions.InvalidParameterException
if project_name is empty or comment is invalid
- update_subscription(project_name, topic_name, sub_id, comment)[source]
Update subscription
- Parameters:
project_name – project name
topic_name – topic name
sub_id – subscription id
comment – new comment
- Returns:
none
- Raise:
datahub.exceptions.ResourceNotFoundException
if the project, topic or subscription not exists- Raise:
datahub.exceptions.InvalidParameterException
if project_name, topic_name or sub_id is empty
- update_subscription_offset(project_name, topic_name, sub_id, offsets)[source]
Update subscription offset
- Parameters:
project_name – project name
topic_name – topic name
sub_id – subscription id
offsets (
dict
) – offsets
- Returns:
none
- Raise:
datahub.exceptions.ResourceNotFoundException
if the project, topic or subscription not exists- Raise:
datahub.exceptions.InvalidOperationException
if the offset session closed or offset version changed- Raise:
datahub.exceptions.InvalidParameterException
if project_name, topic_name or sub_id is empty; offsets is wrong type
- update_subscription_state(project_name, topic_name, sub_id, state)[source]
Update subscription state
- Parameters:
project_name – project name
topic_name – topic name
sub_id – subscription id
state (
datahub.models.SubscriptionState
) – new state
- Returns:
none
- Raise:
datahub.exceptions.ResourceNotFoundException
if the project, topic or subscription not exists- Raise:
datahub.exceptions.InvalidParameterException
if project_name, topic_name or sub_id is empty; state is wrong type
- update_topic(project_name, topic_name, life_cycle, comment)[source]
Update topic info, only life cycle and comment can be modified.
- Parameters:
topic_name – topic name
project_name – project name
life_cycle – life cycle of topic
comment – topic comment
- Returns:
none
- Raise:
datahub.exceptions.ResourceNotFoundException
if the project or topic not exists- Raise:
datahub.exceptions.InvalidParameterException
if the project name or topic name is empty; life_cycle is not positive
- wait_shards_ready(project_name, topic_name, timeout=30)[source]
Wait all shard state in
active
orclosed
. It always be invoked when create a topic, and will be blocked and until all shards state inactive
orclosed
or timeout .- Parameters:
project_name – project name
topic_name – topic name
timeout – -1 means it will be blocked until all shards state in
active
orclosed
, else will be wait timeout seconds
- Returns:
none
- Raise:
datahub.exceptions.ResourceNotFoundException
if the project or topic not exists- Raise:
datahub.exceptions.InvalidParameterException
if the project name or topic name is empty; timeout < 0- Raise:
datahub.exceptions.DatahubException
if timeout
Auth
- class datahub.auth.Account(*args, **kwargs)[source]
Base Account Class.
See also
- class datahub.auth.AliyunAccount(*args, **kwargs)[source]
Aliyun account implement base from
datahub.auth.Account
Schema
- class datahub.models.FieldType(value)[source]
- Field Types, datahub support 5 types of field, there are:
TINYINT
,SMALLINT
,INTEGER
,BIGINT
,STRING
,BOOLEAN
,TIMESTAMP
,FLOAT
,DOUBLE
,DECIMAL
- class datahub.models.Field(name, field_type, comment='', allow_null=True)[source]
- Members:
name (
str
): field nametype (
str
): field typecomment (
str
): field comment
- class datahub.models.RecordSchema(field_list=None)[source]
Record schema class, Tuple type Topic will use it.
- Members:
fields (list): fields
- Example:
>>> schema = RecordSchema.from_lists( ['bigint_field' , 'string_field' , 'double_field' , 'bool_field' , 'time_field'], [FieldType.BIGINT, FieldType.STRING, FieldType.DOUBLE, FieldType.BOOLEAN, FieldType.TIMESTAMP] ) >>>
See also
Record
- class datahub.models.BlobRecord(blob_data=None, values=None)[source]
Blob type record class
- Members:
blob_data (
str
): blob data
- class datahub.models.TupleRecord(field_list=None, schema=None, values=None)[source]
Tuple type record class
- Members:
field_list (
list
): fieldsname_indices (
list
): values
- Example:
>>> schema = RecordSchema.from_lists(['name', 'id'], [FieldType.STRING, FieldType.STRING]) >>> record = TupleRecord(schema=schema, values=['test', 'test2']) >>> record.values[0:2] >>> ['test', 'test2'] >>> record.set_value('name', 'test1') >>> record.values[0] >>> 'test1' >>> record.set_value(0, 'test3') >>> record.get_value(0) >>> 'test3' >>> record.get_value('name') >>> 'test3' >>> len(record.values) 2 >>> record.has_field('name') True
- class datahub.models.FailedRecord(index, error_code, error_message)[source]
Failed record info
- Members:
comment (
str
): subscription descriptioncreate_time (
int
): create timeis_owner (
bool
): owner or notlast_modify_time (
int
): last modify timestate (
datahub.models.SubscriptionState
): subscription statesub_id (
str
): subscription idtopic_name (
str
): topic nametype (
int
): type
Cursor
Offset
- class datahub.models.OffsetBase(sequence, timestamp)[source]
offset base class
- Members:
sequence (
int
): sequencetimestamp (
int
): timestamp
Subscription
- class datahub.models.SubscriptionState(value)[source]
Subscription state, there are:
INACTIVE
,ACTIVE
- class datahub.models.Subscription(comment, create_time, is_owner, last_modify_time, state, sub_id, topic_name, sub_type)[source]
subscription info
- Members:
comment (
str
): subscription descriptioncreate_time (
int
): create timeis_owner (
bool
): owner or notlast_modify_time (
int
): last modify timestate (
datahub.models.SubscriptionState
): subscription statesub_id (
str
): subscription idtopic_name (
str
): topic nametype (
int
): type
Connector
- class datahub.models.ConnectorState(value)[source]
ConnectorState enum class, there are:
CONNECTOR_RUNNING
,CONNECTOR_STOPPED
- class datahub.models.PartitionMode(value)[source]
PartitionMode enum class, there are:
USER_DEFINE
,SYSTEM_TIME
,EVENT_TIME
- class datahub.models.ConnectorType(value)[source]
ConnectorType enum class, there are:
SINK_ODPS
,SINK_ADS
,SINK_ES
,SINK_FC
,SINK_MYSQL
,SINK_OSS
,SINK_OTS
,SINK_HOLOGRES
,SINK_DATAHUB
- class datahub.models.OdpsConnectorConfig(project_name, table_name, odps_endpoint, tunnel_endpoint, access_id, access_key, partition_mode, time_range, partition_config=None)[source]
Connector config for odps
- Members:
project_name (
str
): odps project nametable_name (
str
): odps table nameodps_endpoint (
str
): odps endpointtunnel_endpoint (
str
): tunnel endpointaccess_id (
str
): odps access idaccess_key (
str
): odps access keypartition_mode (
datahub.models.connector.PartitionMode
): partition modetime_range (
int
): time rangepartition_config(
collections.OrderedDict
): partition config
- class datahub.models.DatabaseConnectorConfig(host, port, database, user, password, table, ignore)[source]
Connector config for database
- Members:
host (
str
): hostport (
int
): portdatabase (
str
): databaseuser (
str
): userpassword (
str
): passwordtable (
str
): tableignore (
bool
): ignore insert error
- class datahub.models.EsConnectorConfig(index, endpoint, user, password, id_fields, type_fields, proxy_mode)[source]
Connector config for ElasticSearch
- Members:
index (
str
): indexendpoint (
str
): endpointuser (
str
): userpassword (
str
): passwordid_fields (
list
): id fieldstype_fields (
list
): type fieldsproxy_mode (
bool
): proxy mode
- class datahub.models.FcConnectorConfig(endpoint, service, func, auth_mode, access_id='', access_key='')[source]
Connector config for FunctionCompute
- Members:
endpoint (
str
): endpointservice (
str
): servicefunc (
str
): functionauth_mode (
datahub.models.connector.AuthMode
): auth modeaccess_id (
str
): access idaccess_key (
str
): access key
- class datahub.models.OssConnectorConfig(endpoint, bucket, prefix, time_format, time_range, auth_mode, access_id='', access_key='')[source]
Connector config for ObjectStoreService
- Members:
endpoint (
str
): endpointbucket (
str
): bucketprefix (
str
): prefixtime_format (
str
): time formattime_range (
int
): time rangeauth_mode (
datahub.models.connector.AuthMode
): auth modeaccess_id (
str
): access idaccess_key (
str
): access key
- class datahub.models.OtsConnectorConfig(endpoint, instance, table, auth_mode, access_id='', access_key='', write_mode=WriteMode.PUT)[source]
Connector config for OpenTableStore
- Members:
endpoint (
str
): endpointinstance (
str
): instancetable (
str
): tableauth_mode (
datahub.models.connector.AuthMode
): auth modeaccess_id (
str
): access idaccess_key (
str
): access keywrite_mode (
datahub.models.connector.WriteMode
): write mode
Results
- class datahub.models.results.ListProjectResult(project_names)[source]
Request params of list projects api
- Members:
project_names (
list
): list of project names
- class datahub.models.results.GetProjectResult(project_name, comment, create_time, last_modify_time)[source]
Result of get project api
- Members:
project_name (
str
): project namecomment (
str
): project descriptioncreate_time (
int
): create timelast_modify_time(
int
): last modify time
- class datahub.models.results.ListTopicResult(topic_names)[source]
Result of list topics api
- Members:
topic_names (
list
): list of topic names
- class datahub.models.results.GetTopicResult(**kwargs)[source]
Result of get topic api
- Members:
project_name (
str
): project nametopic_name (
str
): topic nameshard_count (
int
) shard countlife_cycle (
int
) life cyclerecord_type (
datahub.models.RecordType
): record typerecord_schema (
datahub.models.RecordSchema
): record schemacomment (
str
): project descriptioncreate_time (
int
): create timelast_modify_time(
int
): last modify time
- class datahub.models.results.ListShardResult(shards, protocol, interval)[source]
Result of list shards api
- Members:
shards (
list
): list ofdatahub.models.Shard
- class datahub.models.results.MergeShardResult(shard_id, begin_hash_key, end_hash_key)[source]
Result of merge shard api
- Members:
shard_id (
str
): shard idbegin_hash_key (
str
): begin hash keyend_hash_key (
str
): end hash key
- class datahub.models.results.SplitShardResult(new_shards)[source]
Result of split shard api
- Members:
new_shards (
list
): list ofdatahub.models.ShardBase
- class datahub.models.results.GetCursorResult(cursor, record_time, sequence)[source]
Request params of get cursor api
- Members:
cursor (
str
): cursorrecord_time (
int
): record timesequence (
int
): sequence
- class datahub.models.results.PutRecordsResult(failed_record_count, failed_records)[source]
Result of put records api
- Members:
failed_record_count (
int
): failed record countfailed_records (
list
): list ofdatahub.models.FailedRecord
- class datahub.models.results.GetRecordsResult(next_cursor, record_count, start_seq, records)[source]
Result of get records api
- Members:
next_cursor (
str
): next cursorrecord_count (
int
): record countstart_squ (
int
): start sequencerecords (
list
): list ofdatahub.models.BlobRecord
/datahub.models.TupleRecord
- class datahub.models.results.GetMeteringInfoResult(active_time, storage)[source]
Result of get metering info api;
- Members:
active_time (
int
): active timestorage (
int
): storage
- class datahub.models.results.ListConnectorResult(connector_names, connector_ids)[source]
Result of list data connector
- Members:
connector_names (
list
): list of data connector names
- class datahub.models.results.GetConnectorResult(cluster_addr, connector_id, connector_type, state, creator, owner, create_time, column_fields, config, extra_config, shard_contexts, sub_id)[source]
Result of get data connector
- Members:
cluster_addr (
str
): cluster addressconnector_id (
str
): connector idtype (
datahub.models.ConnectorType
): connector typestate (
datahub.models.ConnectorState
): connector statecreator (
str
): creatorowner (
str
): ownercreate_time (
int
): create timecolumn_fields (
list
): list of column fieldsconfig (
datahub.models.OdpsConnectorConfig
): configextra_config (
dict
): extra configshard_contexts (
list
): list ofdatahub.models.ShardContext
sub_id (
str
): subscription id used by connector
- class datahub.models.results.GetConnectorShardStatusResult(shard_status_infos)[source]
Result of get data connector shard status
- Members:
shard_status_infos (
dict
): shard status entry map
- class datahub.models.results.InitAndGetSubscriptionOffsetResult(offsets)[source]
Result of init and get subscription offset api
- Members:
offsets (
list
): list ofdatahub.models.OffsetWithSession
- class datahub.models.results.GetSubscriptionOffsetResult(offsets)[source]
Result of get subscription offset api
- Members:
offsets (
list
): list ofdatahub.models.OffsetWithVersion
- class datahub.models.results.CreateSubscriptionResult(sub_id)[source]
Result of create subscription api
- Members:
sub_id (
str
): subscription id
- class datahub.models.results.GetSubscriptionResult(comment, create_time, is_owner, last_modify_time, state, sub_id, topic_name, sub_type)[source]
Result of get subscription api
- Members:
comment (
str
): commentcreate_time (
int
): create timeis_owner (
bool
): owner or notlast_modify_time (
int
): last modify timestate (
str
): stateupdate_time (
int
): update timerecord_time (
int
): record timediscard_count (
int
): discard count
Exceptions
- class datahub.exceptions.DatahubException(error_msg, status_code=-1, request_id=None, error_code=None)[source]
There was an base exception class that occurred while handling your request to datahub server.
- class datahub.exceptions.ResourceExistException(error_msg, status_code=-1, request_id=None, error_code=None)[source]
The exception is raised while Datahub Object that you are creating is already exist.
- class datahub.exceptions.ResourceNotFoundException(error_msg, status_code=-1, request_id=None, error_code=None)[source]
The exception is raised while Datahub Object that you are handling is not exist.
- class datahub.exceptions.InvalidParameterException(error_msg, status_code=-1, request_id=None, error_code=None)[source]
The exception is raised while that your handling request parameter is invalid.
- class datahub.exceptions.InvalidOperationException(error_msg, status_code=-1, request_id=None, error_code=None)[source]
The operation of shard is not support yet.
- class datahub.exceptions.LimitExceededException(error_msg, status_code=-1, request_id=None, error_code=None)[source]
Too many request.
- class datahub.exceptions.InternalServerException(error_msg, status_code=-1, request_id=None, error_code=None)[source]
The Datahub server occurred error.
Producer
- class datahub.client.ProducerConfig(access_id, access_key, endpoint, protocol_type=DatahubProtocolType.PB, compress_format=CompressFormat.LZ4)[source]
Config for datahub producer
- Members:
access_id (
string
): Aliyun access idaccess_key (
string
): Aliyun access keyendpoint (
string
): Datahub endpointprotocol_type (
datahub.core.DatahubProtocolType
): Protocol type for datahub clientcompress_format (
datahub.models.compress.CompressFormat
): Compress format for records dataretry_times (
int
): Retry times when request errorasync_thread_limit (
int
): Thread num limit for thread pool in message writerthread_queue_limit (
int
): Task num limit for queue in thread pool in message writerlogging_level (
int
): Logging levellogging_filename (
string
): Logging file namemax_async_buffer_records (
int
): Max buffer records number to PutRecords once. Only valid when write async.max_async_buffer_size (
int
): Max buffer size to PutRecords once. Only valid when write async.max_async_buffer_time (
int
): Max buffer time to PutRecords once. Only valid when write async.max_record_pack_queue_limit (
int
): Max ready record pack limit for queue. Only valid when write async.
- class datahub.client.DatahubProducer(project_name, topic_name, producer_config, shard_ids=None)[source]
Producer client for datahub
- Members:
project_name (
string
): project nametopic_name (
string
): topic nameproducer_config (
datahub.client.common.ProducerConfig
): config for producer client- shard_ids (
list
): list of string: shard list you want to producer. default is None, means write to all shards evenly
- shard_ids (
Consumer
- class datahub.client.ConsumerConfig(access_id, access_key, endpoint, protocol_type=DatahubProtocolType.PB, compress_format=CompressFormat.LZ4)[source]
Config for datahub producer
- Members:
access_id (
string
): Aliyun access idaccess_key (
string
): Aliyun access keyendpoint (
string
): Datahub endpointprotocol_type (
datahub.core.DatahubProtocolType
): Protocol type for datahub clientcompress_format (
datahub.models.compress.CompressFormat
): Compress format for records dataretry_times (
int
): Retry times when request errorasync_thread_limit (
int
): Thread num limit for thread pool in message readerthread_queue_limit (
int
): Task num limit for queue in thread pool in message readerlogging_level (
int
): Logging levellogging_filename (
string
): Logging file nameauto_ack_offset (
bool
): Auto ack offset for fetched records or notsession_timeout (
int
): Session timeoutmax_record_buffer_size (
int
): Max record buffer size in consumerfetch_limit (
int
): Fetch num limit need to consume
- class datahub.client.DatahubConsumer(project_name, topic_name, sub_id, consumer_config, shard_ids=None, timestamp=-1)[source]
Consumer client for datahub
- Members:
project_name (
string
): project nametopic_name (
string
): topic namesub_id (
string
): subscription id for consumeconsumer_config (
datahub.client.common.ConsumerConfig
): config for consumer client- shard_ids (
list
): list of string: shard list you want to consume. default is None, means allocated automatically by datahub server
- timestamp (
int
): set the start timestamp for consume. default is -1, means start with the subscription offset
- shard_ids (