API Reference

DataHub

class datahub.models.compress.CompressFormat(value)[source]

CompressFormat enum class, there are: NONE, LZ4, ZLIB, DEFLATE

class datahub.DataHub(access_id, access_key, endpoint=None, compress_format=CompressFormat.LZ4, **kwargs)[source]

Main entrance to DataHub.

Convenient operations on DataHub objects are provided. Please refer to DataHub docs to see the details.

Generally, basic operations such as create, list, delete, update are provided for each DataHub object. Take the project as an example.

To create an DataHub instance, access_id and access_key is required, and should ensure correctness, or SignatureNotMatch error will throw.

Parameters:
  • access_id – Aliyun Access ID

  • secret_access_key – Aliyun Access Key

  • endpoint – Rest service URL

  • enable_pb – enable protobuf when put/get records, default value is False in version <= 2.11, default value will be True in version >= 2.12

  • protocol_type (datahub.core.DatahubProtocolType) – protocol type. It is recommended to use this param, ‘enable_pb’ will no longer be supported in future versions

  • compress_format (datahub.models.compress.CompressFormat) – compress format, default value is NONE.

  • enable_schema_register – enable schema register, only support in batch. default value is True in batch

Example:

>>> datahub = DataHub('**your access id**', '**your access key**', '**endpoint**')
>>> datahub_pb = DataHub('**your access id**', '**your access key**', '**endpoint**', protocol_type=DatahubProtocolType.PB)
>>> datahub_batch = DataHub('**your access id**', '**your access key**', '**endpoint**', protocol_type=DatahubProtocolType.BATCH, enable_schema_register=True)
>>> datahub_lz4 = DataHub('**your access id**', '**your access key**', '**endpoint**', compress_format=CompressFormat.LZ4)
>>>
>>> project_result = datahub.get_project('datahub_test')
>>>
>>> print(project_result is None)
>>>
append_connector_field(project_name, topic_name, connector_id, field_name)[source]

Append field to a connector

Parameters:
  • project_name – project name

  • topic_name – topic name

  • connector_id (str or datahub.models.ConnectorType) – connector id, compatible for connector type

  • field_name – field name

Returns:

none

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or connector not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name, topic_name or filed_name is empty; connector_type is wrong type

append_field(project_name, topic_name, field_name, field_type)[source]

Append field to a tuple topic

Parameters:
  • project_name – project name

  • topic_name – topic name

  • field_name – field name

  • field_type (datahub.models.FieldType) – field type

Returns:

none

Raise:

datahub.exceptions.InvalidParameterException if project_name, topic_name or field_name is empty; field_type is wrong type

create_blob_topic(project_name, topic_name, shard_count, life_cycle, comment, extend_mode=None)[source]

Create blob topic

Parameters:
  • project_name – project name

  • topic_name – topic name

  • shard_count – shard count

  • life_cycle – life cycle

  • comment – comment

  • extend_mode – use expansion method to increase shard

Returns:

none

Raise:

datahub.exceptions.InvalidParameterException if project_name is empty; topic_name is not valid; life_cycle is not positive; record_schema is wrong type

Raise:

datahub.exceptions.ResourceNotFoundException if project not existed

Raise:

datahub.exceptions.ResourceExistException if topic is already existed

create_connector(project_name, topic_name, connector_type, column_fields, config, start_time=-1)[source]

Create a data connector

Parameters:
  • project_name – project name

  • topic_name – topic name

  • connector_type (datahub.models.ConnectorType) – connector type

  • column_fields (list) – column fields

  • config (datahub.models.ConnectorConfig) – connector config

  • start_time (int) – start timestamp in milliseconds

Returns:

connector id

Return type:

datahub.models.CreateConnectorResult

Raise:

datahub.exceptions.ResourceExistException if connector is already existed

Raise:

datahub.exceptions.ResourceNotFoundException if the project or topic not exists

Raise:

datahub.exceptions.InvalidParameterException if the column field or config is invalid; project_name or topic_name is empty; connector_type or config is wrong type

create_project(project_name, comment)[source]

Create a new project by given name and comment

Parameters:
  • project_name – project name

  • comment – description of project

Returns:

none

Raise:

datahub.exceptions.InvalidParameterException if project_name is not valid

Raise:

datahub.exceptions.ResourceExistException if project is already existed

create_subscription(project_name, topic_name, comment)[source]

Create subscription to a topic

Parameters:
  • project_name – project name

  • topic_name – topic name

  • comment – comment for subscription

Returns:

create result contains subscription id

Return type:

datahub.models.results.CreateSubscriptionResult

Raise:

datahub.exceptions.ResourceNotFoundException if the project or topic not exists

Raise:

datahub.exceptions.LimitExceededException if limit of subscription number is exceeded

Raise:

datahub.exceptions.InvalidParameterException if project_name or topic_name is empty

create_tuple_topic(project_name, topic_name, shard_count, life_cycle, record_schema, comment, extend_mode=None)[source]

Create tuple topic

Parameters:
  • project_name – project name

  • topic_name – topic name

  • shard_count – shard count

  • life_cycle – life cycle

  • record_schema (datahub.models.RecordSchema) – record schema for tuple record type

  • comment – comment

  • extend_mode – use expansion method to increase shard

Returns:

none

Raise:

datahub.exceptions.InvalidParameterException if project_name is empty; topic_name is not valid; life_cycle is not positive; record_schema is wrong type

Raise:

datahub.exceptions.ResourceNotFoundException if project not existed

Raise:

datahub.exceptions.ResourceExistException if topic is already existed

delete_connector(project_name, topic_name, connector_id)[source]

Delete a data connector

Parameters:
  • project_name – project name

  • topic_name – topic name

  • connector_id (str or datahub.models.ConnectorType) – connector id, compatible for connector type

Returns:

none

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or connector not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name or topic_name is empty; connector_type is wrong type

delete_project(project_name)[source]

Delete the project by given name

Parameters:

project_name – project name

Returns:

none

Raise:

datahub.exceptions.InvalidParameterException if project_name is empty

delete_subscription(project_name, topic_name, sub_id)[source]

Delete subscription by subscription id

Parameters:
  • project_name – project name

  • topic_name – topic name

  • sub_id – subscription id

Returns:

none

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or subscription not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name, topic_name or sub_id is empty

delete_topic(project_name, topic_name)[source]

Delete a topic

Parameters:
  • topic_name – topic name

  • project_name – project name

Returns:

none

Raise:

datahub.exceptions.ResourceNotFoundException if the project or topic not exists

Raise:

datahub.exceptions.InvalidParameterException if the project name or topic name is empty

delete_topic_schema(project_name, topic_name, version_id)[source]

Delete the special schema by version id of a topic

Parameters:
  • project_name – project name

  • topic_name – topic name

  • version_id – version id

Returns:

none

Raise:

datahub.exceptions.ResourceNotFoundException if the project or topic not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name or topic_name is empty

extend_shard(project_name, topic_name, shard_count)[source]

Extend shard

Parameters:
  • project_name – project name

  • topic_name – topic name

  • shard_count – shard count extend to

Returns:

none

get_blob_records(project_name, topic_name, shard_id, cursor, limit_num=0, sub_id=None)[source]

Get records from a topic

Parameters:
  • project_name – project name

  • topic_name – topic name

  • shard_id – shard id

  • cursor – the cursor

  • limit_num – record number need to read

Returns:

result include record list, start sequence, record num and next cursor

Return type:

datahub.models.GetRecordsResult

Raise:

datahub.exceptions.ResourceNotFoundException if the project or topic or shard not exists

Raise:

datahub.exceptions.InvalidParameterException if the cursor is invalid; project_name, topic_name, shard_id, or cursor is empty

Raise:

datahub.exceptions.DatahubException if crc is wrong in pb mode

get_connector(project_name, topic_name, connector_id)[source]

Get a data connector

Parameters:
  • project_name – project name

  • topic_name – topic name

  • connector_id (str or datahub.models.ConnectorType) – connector id, compatible for connector type

Returns:

data connector info

Return type:

datahub.models.GetConnectorResult

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or connector not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name or topic_name is empty; connector_type or config is wrong type

get_connector_done_time(project_name, topic_name, connector_id)[source]

Get connector done time

Parameters:
  • project_name – project name

  • topic_name – topic name

  • connector_id (str or datahub.models.ConnectorType) – connector id, compatible for connector type

get_connector_shard_status(project_name, topic_name, connector_id, shard_id='')[source]

Get data connector shard status

Parameters:
  • project_name – project name

  • topic_name – topic name

  • connector_id (str or datahub.models.ConnectorType) – connector id, compatible for connector type

  • shard_id – shard id

Returns:

data connector shard status

Return type:

datahub.models.results.GetConnectorShardStatusResult

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic, shard or connector not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name, topic_name or shard_id is empty; connector_type is wrong type

get_cursor(project_name, topic_name, shard_id, cursor_type, param=-1)[source]

Get cursor. When you invoke get_blob_records/get_tuple_records, you must be invoke it to get a cursor first

Parameters:
  • project_name – project name

  • topic_name – topic name

  • shard_id – shard id

  • cursor_type (datahub.models.CursorType) – cursor type

  • param – param is system time if cursor_type == CursorType.SYSTEM_TIME while sequence if cursor_type==CursorType.SEQUENCE

Returns:

cursor info

Return type:

datahub.models.GetCursorResult

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or shard not exists

Raise:

datahub.exceptions.InvalidParameterException if the param is invalid; project_name, topic_name or shard_id is empty; cursor_type is wrong type; param is missing

get_metering_info(project_name, topic_name, shard_id)[source]

Get a shard metering info

Parameters:
  • project_name – project name

  • topic_name – topic name

  • shard_id – shard id

Returns:

the shard metering info

Return type:

datahub.models.GetMeteringInfoResult

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or shard not exists

Raise:

datahub.exceptions.InvalidParameterException if the project_name, topic_name or shard_id is empty

get_project(project_name)[source]

Get a project by given name

Parameters:

project_name – project name

Returns:

the right project

Return type:

datahub.models.GetProjectResult

Raise:

datahub.exceptions.ResourceNotFoundException if project not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name is empty

get_subscription(project_name, topic_name, sub_id)[source]

Get subscription

Parameters:
  • project_name – project name

  • topic_name – topic name

  • sub_id – subscription id

Returns:

subscription info

Return type:

datahub.models.results.GetSubscriptionResult

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or subscription not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name, topic_name or sub_id is empty

get_subscription_offset(project_name, topic_name, sub_id, shard_ids=None)[source]

Get subscription offset

Parameters:
  • project_name – project name

  • topic_name – topic name

  • sub_id – subscription id

  • shard_ids – shard ids

Returns:

offset info

Return type:

datahub.models.results.GetSubscriptionOffsetResult

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or subscription not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name, topic_name or sub_id is empty

get_topic(project_name, topic_name)[source]

Get a topic

Parameters:
  • topic_name – topic name

  • project_name – project name

Returns:

topic info

Return type:

datahub.models.GetTopicResult

Raise:

datahub.exceptions.ResourceNotFoundException if the project or topic not exists

Raise:

datahub.exceptions.InvalidParameterException if the project name or topic name is empty

get_topic_schema(project_name, topic_name, schema=None, version_id=-1)[source]

Get the special schema or version id of a topic

Parameters:
  • project_name – project name

  • topic_name – topic name

  • schema – schema

  • version_id – version id

Returns:

schema by version id or version id by schema

Raise:

datahub.exceptions.ResourceNotFoundException if the project or topic not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name or topic_name is empty;

get_tuple_records(project_name, topic_name, shard_id, record_schema=None, cursor='', limit_num=0, sub_id=None)[source]

Get records from a topic

Parameters:
  • project_name – project name

  • topic_name – topic name

  • shard_id – shard id

  • record_schema (datahub.models.RecordSchema) – tuple record schema

  • filter – filter

  • cursor – the cursor

  • limit_num – record number need to read

Returns:

result include record list, start sequence, record num and next cursor

Return type:

datahub.models.GetRecordsResult

Raise:

datahub.exceptions.ResourceNotFoundException if the project or topic or shard not exists

Raise:

datahub.exceptions.InvalidParameterException if the cursor is invalid; project_name, topic_name, shard_id, or cursor is empty

Raise:

datahub.exceptions.DatahubException if crc is wrong in pb mode

heart_beat(project_name, topic_name, consumer_group, consumer_id, version_id, hold_shard_list, read_end_shard_list)[source]

Construct heartbeat with server that server know consumer status

Parameters:
  • project_name – project name

  • topic_name – topic name

  • consumer_group – consumer group use sub_id you want to join

  • consumer_id – consumer id obtained at the time of join group

  • version_id – offset version id obtained at the time of join group

  • hold_shard_list – the shard list held by consumer

  • read_end_shard_list – the shard list have been read finished

Returns:

heartbeat result contains version id, shard list and total plan

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or consumer_group not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name, topic_name or consumer_group is empty; hold_shard_list is none

init_and_get_subscription_offset(project_name, topic_name, sub_id, shard_ids)[source]

Open subscription offset session

Parameters:
  • project_name – project name

  • topic_name – topic name

  • sub_id – subscription id

  • shard_ids – shard ids

Returns:

offset info

:rtype datahub.models.InitAndGetSubscriptionOffsetResult :raise: datahub.exceptions.ResourceNotFoundException if the project, topic or subscription not exists :raise: datahub.exceptions.InvalidParameterException if project_name, topic_name, sub_id or shard_id is empty

join_group(project_name, topic_name, consumer_group, session_timeout)[source]

Join a consumer group

Parameters:
  • project_name – project name

  • topic_name – topic name

  • consumer_group – consumer group use sub_id you want to join

  • session_timeout – session timeout

Returns:

consumer id and version id

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or consumer_group not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name, topic_name or consumer_group is empty

leave_group(project_name, topic_name, consumer_group, consumer_id, version_id)[source]

Leave consumer group info

Parameters:
  • project_name – project name

  • topic_name – topic name

  • consumer_group – consumer group use sub_id you want to join

  • consumer_id – consumer id obtained at the time of join group

  • version_id – offset version id obtained at the time of join group

Returns:

none

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or consumer_group not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name, topic_name or consumer_group is empty

list_connector(project_name, topic_name)[source]

Create a data connector

Parameters:
  • project_name – project name

  • topic_name – topic name

Returns:

data connector names list

Return type:

datahub.models.ListConnectorResult

Raise:

datahub.exceptions.InvalidParameterException if the project_name or topic_name is empty

list_project()[source]

List all project names

Returns:

projects in datahub server

Return type:

datahub.models.results.ListProjectResult

list_shard(project_name, topic_name)[source]

List all shards of a topic

Parameters:
  • project_name – project name

  • topic_name – topic name

Returns:

shards info

Return type:

datahub.models.ListTopicResult

Raise:

datahub.exceptions.ResourceNotFoundException if the project or topic not exists

Raise:

datahub.exceptions.InvalidParameterException if the project name or topic name is empty

list_subscription(project_name, topic_name, query_key, page_index, page_size)[source]

Query subscription in range [start, end)

start = (page_index - 1) * page_size + 1

end = start + page_size

Parameters:
  • project_name – project name

  • topic_name – topic name

  • query_key – query key for search

  • page_index – page index

  • page_size – page size

Returns:

subscription info list

Return type:

datahub.models.results.ListSubscriptionResult

Raise:

datahub.exceptions.InvalidParameterException if project_name or topic_name is empty; page_index <= 0 or page_size < 0

list_topic(project_name)[source]

Get all topics of a project

Parameters:

project_name – project name

Returns:

all topics of the project

Return type:

datahub.models.ListTopicResult

Raise:

datahub.exceptions.ResourceNotFoundException if the project not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name is empty

list_topic_schema(project_name, topic_name, page_number=-1, page_size=-1)[source]

List all schema of a topic

Parameters:
  • project_name – project_name

  • topic_name – topic_name

  • page_number – page number

  • page_size – page size

Returns:

all schema info of a topic

Raise:

datahub.exceptions.ResourceNotFoundException if the project or topic not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name or topic_name is empty

merge_shard(project_name, topic_name, shard_id, adj_shard_id)[source]

Merge shards

Parameters:
  • project_name – project name

  • topic_name – topic name

  • shard_id – shard id

  • adj_shard_id – adjacent shard id

Returns:

shards info after merged

Return type:

datahub.models.MergeShardResult

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or shard not exists

Raise:

datahub.exceptions.InvalidOperationException if the shard is not active

Raise:

datahub.exceptions.InvalidParameterException if the shards not adjacent; project name, topic name, shard id or adjacent shard id is empty

Raise:

datahub.exceptions.LimitExceededException if merge shard operation limit exceeded

put_records(project_name, topic_name, record_list)[source]

Put records to a topic

Parameters:
  • project_name – project name

  • topic_name – topic name

  • record_list (list) – record list

Returns:

failed records info

Return type:

datahub.models.PutRecordsResult

Raise:

datahub.exceptions.ResourceNotFoundException if the project or topic not exists

Raise:

datahub.exceptions.InvalidParameterException if the record is not well-formed; project_name or topic_name is empty

Raise:

datahub.exceptions.InvalidOperationException if the shard is not active

Raise:

datahub.exceptions.LimitExceededException if query rate or throughput rate limit exceeded

Raise:

datahub.exceptions.DatahubException if crc is wrong in pb mode

put_records_by_shard(project_name, topic_name, shard_id, record_list)[source]

Put records to specific shard of topic

Parameters:
  • project_name – project name

  • topic_name – topic name

  • shard_id – shard id

  • record_list (list) – record list

Returns:

failed records info

Return type:

datahub.models.PutRecordsResult

Raise:

datahub.exceptions.ResourceNotFoundException if the project or topic not exists

Raise:

datahub.exceptions.InvalidParameterException if the record is not well-formed; project_name, topic_name or shard_id is empty

Raise:

datahub.exceptions.InvalidOperationException if the shard is not active

Raise:

datahub.exceptions.LimitExceededException if query rate or throughput rate limit exceeded

Raise:

datahub.exceptions.DatahubException if crc is wrong in pb mode

register_topic_schema(project_name, topic_name, schema)[source]

Register schema in a topic

Parameters:
  • project_name – project name

  • topic_name – topic name

  • schema – schema

Returns:

version id

Raise:

datahub.exceptions.ResourceNotFoundException if the project or topic not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name or topic_name is empty

reload_connector(project_name, topic_name, connector_id, shard_id='')[source]

Reload data connector by given shard id or reload all shards without any shard id given

Parameters:
  • project_name – project name

  • topic_name – topic name

  • connector_id (str or datahub.models.ConnectorType) – connector id, compatible for connector type

  • shard_id – shard id

Returns:

none

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or connector not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name, topic_name or filed_name is empty; connector_type is wrong type

reset_subscription_offset(project_name, topic_name, sub_id, offsets)[source]

Update subscription offset

Parameters:
  • project_name – project name

  • topic_name – topic name

  • sub_id – subscription id

  • offsets – offsets

Type:

dict

Returns:

none

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or subscription not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name, topic_name or sub_id is empty; offsets is wrong type

split_shard(project_name, topic_name, shard_id, split_key='')[source]

Split shard

Parameters:
  • project_name – project name

  • topic_name – topic name

  • shard_id – split shard id

  • split_key – split key, if not given, choose the median

Returns:

shards info after split

Return type:

datahub.models.SplitShardResult

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or shard not exists

Raise:

datahub.exceptions.InvalidOperationException if the shard is not active

Raise:

datahub.exceptions.InvalidParameterException if the key range is invalid; project name, topic name or shard id is empty

Raise:

datahub.exceptions.LimitExceededException if split shard operation limit exceeded

sync_group(project_name, topic_name, consumer_group, consumer_id, version_id, release_shard_list, read_end_shard_list)[source]

Sync consumer group info

Parameters:
  • project_name – project name

  • topic_name – topic name

  • consumer_group – consumer group use sub_id you want to join

  • consumer_id – consumer id obtained at the time of join group

  • version_id – offset version id obtained at the time of join group

  • release_shard_list – the shard list to release

  • read_end_shard_list – the shard list have been read finished

Returns:

none

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or consumer_group not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name, topic_name or consumer_group is empty

update_connector(project_name, topic_name, connector_id, config)[source]
Parameters:
  • project_name – project name

  • topic_name – topic name

  • connector_id (str or datahub.models.ConnectorType) – connector id, compatible for connector type

  • config (datahub.models.ConnectorConfig) – connector config

Returns:

none

update_connector_offset(project_name, topic_name, connector_id, shard_id, connector_offset)[source]

Update data connector offset

Parameters:
  • project_name – project name

  • topic_name – topic name

  • connector_id (str or datahub.models.ConnectorType) – connector id, compatible for connector type

  • shard_id – shard id

  • connector_offset (datahub.models.ConnectorOffset) – current sequence

Returns:

none

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or connector not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name or topic_name is empty; connector_type or connector_state is wrong type

update_connector_state(project_name, topic_name, connector_id, connector_state)[source]

Update data connector state

Parameters:
Returns:

none

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or connector not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name or topic_name is empty; connector_type or connector_state is wrong type

update_project(project_name, comment)[source]

Update project comment

Parameters:
  • project_name – project name

  • comment – new description of project

Returns:

none

Raise:

datahub.exceptions.ResourceNotFoundException if project not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name is empty or comment is invalid

update_subscription(project_name, topic_name, sub_id, comment)[source]

Update subscription

Parameters:
  • project_name – project name

  • topic_name – topic name

  • sub_id – subscription id

  • comment – new comment

Returns:

none

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or subscription not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name, topic_name or sub_id is empty

update_subscription_offset(project_name, topic_name, sub_id, offsets)[source]

Update subscription offset

Parameters:
  • project_name – project name

  • topic_name – topic name

  • sub_id – subscription id

  • offsets (dict) – offsets

Returns:

none

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or subscription not exists

Raise:

datahub.exceptions.InvalidOperationException if the offset session closed or offset version changed

Raise:

datahub.exceptions.InvalidParameterException if project_name, topic_name or sub_id is empty; offsets is wrong type

update_subscription_state(project_name, topic_name, sub_id, state)[source]

Update subscription state

Parameters:
Returns:

none

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or subscription not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name, topic_name or sub_id is empty; state is wrong type

update_topic(project_name, topic_name, life_cycle, comment)[source]

Update topic info, only life cycle and comment can be modified.

Parameters:
  • topic_name – topic name

  • project_name – project name

  • life_cycle – life cycle of topic

  • comment – topic comment

Returns:

none

Raise:

datahub.exceptions.ResourceNotFoundException if the project or topic not exists

Raise:

datahub.exceptions.InvalidParameterException if the project name or topic name is empty; life_cycle is not positive

wait_shards_ready(project_name, topic_name, timeout=30)[source]

Wait all shard state in active or closed. It always be invoked when create a topic, and will be blocked and until all shards state in active or closed or timeout .

Parameters:
  • project_name – project name

  • topic_name – topic name

  • timeout – -1 means it will be blocked until all shards state in active or closed, else will be wait timeout seconds

Returns:

none

Raise:

datahub.exceptions.ResourceNotFoundException if the project or topic not exists

Raise:

datahub.exceptions.InvalidParameterException if the project name or topic name is empty; timeout < 0

Raise:

datahub.exceptions.DatahubException if timeout

Auth

class datahub.auth.AccountType[source]

Account type.

Only Support ‘aliyun’ type now.

class datahub.auth.Account(*args, **kwargs)[source]

Base Account Class.

get_type()[source]

Get account type, subclass must be provided.

Returns:

the account type

Return type:

datahub.auth.AccountType

sign_request(request)[source]

Generator signature for request, subclass must be provided.

Parameters:

request – request object

Returns:

none

class datahub.auth.AliyunAccount(*args, **kwargs)[source]

Aliyun account implement base from datahub.auth.Account

get_type()[source]

Get account type.

Returns:

the account type

Return type:

datahub.auth.AccountType

sign_request(request)[source]

Generator signature for request.

Parameters:

request – request object

Returns:

none

Schema

class datahub.models.FieldType(value)[source]
Field Types, datahub support 5 types of field, there are:

TINYINT, SMALLINT, INTEGER, BIGINT, STRING, BOOLEAN, TIMESTAMP, FLOAT, DOUBLE, DECIMAL

class datahub.models.Field(name, field_type, comment='', allow_null=True)[source]
Members:

name (str): field name

type (str): field type

comment (str): field comment

class datahub.models.RecordSchema(field_list=None)[source]

Record schema class, Tuple type Topic will use it.

Members:

fields (list): fields

Example:

>>> schema = RecordSchema.from_lists(         ['bigint_field'  , 'string_field'  , 'double_field'  , 'bool_field'     , 'time_field'],         [FieldType.BIGINT, FieldType.STRING, FieldType.DOUBLE, FieldType.BOOLEAN, FieldType.TIMESTAMP]     )
>>>

Record

class datahub.models.RecordType(value)[source]

Record type, there are two type: TUPLE and BLOB

class datahub.models.BlobRecord(blob_data=None, values=None)[source]

Blob type record class

Members:

blob_data (str): blob data

class datahub.models.TupleRecord(field_list=None, schema=None, values=None)[source]

Tuple type record class

Members:

field_list (list): fields

name_indices (list): values

Example:

>>> schema = RecordSchema.from_lists(['name', 'id'], [FieldType.STRING, FieldType.STRING])
>>> record = TupleRecord(schema=schema, values=['test', 'test2'])
>>> record.values[0:2]
>>> ['test', 'test2']
>>> record.set_value('name', 'test1')
>>> record.values[0]
>>> 'test1'
>>> record.set_value(0, 'test3')
>>> record.get_value(0)
>>> 'test3'
>>> record.get_value('name')
>>> 'test3'
>>> len(record.values)
2
>>> record.has_field('name')
True
class datahub.models.FailedRecord(index, error_code, error_message)[source]

Failed record info

Members:

comment (str): subscription description

create_time (int): create time

is_owner (bool): owner or not

last_modify_time (int): last modify time

state (datahub.models.SubscriptionState): subscription state

sub_id (str): subscription id

topic_name (str): topic name

type (int): type

Shard

class datahub.models.ShardState(value)[source]

Shard state, there are: OPENING, ACTIVE, CLOSED, CLOSING

class datahub.models.ShardBase(shard_id, begin_hash_key, end_hash_key)[source]

Shard base info

Members:

shard_id (str): shard id

begin_hash_key (str): begin hash key

end_hash_key (str): end hash key

class datahub.models.Shard(shard_id, begin_hash_key, end_hash_key, state, closed_time, parent_shard_ids, left_shard_id, right_shard_id)[source]

Shard info

Members:

shard_id (str): shard id

begin_hash_key (str): begin hash key

end_hash_key (str): end hash key

state (datahub.models.ShardState): shard state

closed_time (str): closed time

parent_shard_ids (list): parent shard ids

left_shard_id (str): left shard id

right_shard_id (str): right shard id

class datahub.models.ShardContext(shard_id, start_sequence, end_sequence, current_sequence)[source]

Shard context

Members:

shard_id (str): shard id

start_sequence (str): start sequence

end_sequence (str): end sequence

current_sequence (str): current sequence

Cursor

class datahub.models.CursorType(value)[source]

Cursor type enum, there are: OLDEST, LATEST, SYSTEM_TIME, SEQUENCE

Offset

class datahub.models.OffsetBase(sequence, timestamp)[source]

offset base class

Members:

sequence (int): sequence

timestamp (int): timestamp

class datahub.models.OffsetWithVersion(sequence, timestamp, version)[source]

offset with version class

Members:

sequence (int): sequence

timestamp (int): timestamp

version (int): version

class datahub.models.OffsetWithSession(sequence, timestamp, version, session_id)[source]

offset with session class

Members:

sequence (int): sequence

timestamp (int): timestamp

version (int): version

session_id (int): session id

Subscription

class datahub.models.SubscriptionState(value)[source]

Subscription state, there are: INACTIVE, ACTIVE

class datahub.models.Subscription(comment, create_time, is_owner, last_modify_time, state, sub_id, topic_name, sub_type)[source]

subscription info

Members:

comment (str): subscription description

create_time (int): create time

is_owner (bool): owner or not

last_modify_time (int): last modify time

state (datahub.models.SubscriptionState): subscription state

sub_id (str): subscription id

topic_name (str): topic name

type (int): type

Connector

class datahub.models.ConnectorState(value)[source]

ConnectorState enum class, there are: CONNECTOR_RUNNING, CONNECTOR_STOPPED

class datahub.models.AuthMode(value)[source]

AuthMode enum class, there are: ak, sts

class datahub.models.PartitionMode(value)[source]

PartitionMode enum class, there are: USER_DEFINE, SYSTEM_TIME, EVENT_TIME

class datahub.models.ConnectorType(value)[source]

ConnectorType enum class, there are: SINK_ODPS, SINK_ADS, SINK_ES, SINK_FC, SINK_MYSQL, SINK_OSS, SINK_OTS, SINK_HOLOGRES, SINK_DATAHUB

class datahub.models.OdpsConnectorConfig(project_name, table_name, odps_endpoint, tunnel_endpoint, access_id, access_key, partition_mode, time_range, partition_config=None)[source]

Connector config for odps

Members:

project_name (str): odps project name

table_name (str): odps table name

odps_endpoint (str): odps endpoint

tunnel_endpoint (str): tunnel endpoint

access_id (str): odps access id

access_key (str): odps access key

partition_mode (datahub.models.connector.PartitionMode): partition mode

time_range (int): time range

partition_config(collections.OrderedDict): partition config

class datahub.models.DatabaseConnectorConfig(host, port, database, user, password, table, ignore)[source]

Connector config for database

Members:

host (str): host

port (int): port

database (str): database

user (str): user

password (str): password

table (str): table

ignore (bool): ignore insert error

class datahub.models.EsConnectorConfig(index, endpoint, user, password, id_fields, type_fields, proxy_mode)[source]

Connector config for ElasticSearch

Members:

index (str): index

endpoint (str): endpoint

user (str): user

password (str): password

id_fields (list): id fields

type_fields (list): type fields

proxy_mode (bool): proxy mode

class datahub.models.FcConnectorConfig(endpoint, service, func, auth_mode, access_id='', access_key='')[source]

Connector config for FunctionCompute

Members:

endpoint (str): endpoint

service (str): service

func (str): function

auth_mode (datahub.models.connector.AuthMode): auth mode

access_id (str): access id

access_key (str): access key

class datahub.models.OssConnectorConfig(endpoint, bucket, prefix, time_format, time_range, auth_mode, access_id='', access_key='')[source]

Connector config for ObjectStoreService

Members:

endpoint (str): endpoint

bucket (str): bucket

prefix (str): prefix

time_format (str): time format

time_range (int): time range

auth_mode (datahub.models.connector.AuthMode): auth mode

access_id (str): access id

access_key (str): access key

class datahub.models.OtsConnectorConfig(endpoint, instance, table, auth_mode, access_id='', access_key='', write_mode=WriteMode.PUT)[source]

Connector config for OpenTableStore

Members:

endpoint (str): endpoint

instance (str): instance

table (str): table

auth_mode (datahub.models.connector.AuthMode): auth mode

access_id (str): access id

access_key (str): access key

write_mode (datahub.models.connector.WriteMode): write mode

Results

class datahub.models.results.ListProjectResult(project_names)[source]

Request params of list projects api

Members:

project_names (list): list of project names

class datahub.models.results.GetProjectResult(project_name, comment, create_time, last_modify_time)[source]

Result of get project api

Members:

project_name (str): project name

comment (str): project description

create_time (int): create time

last_modify_time(int): last modify time

class datahub.models.results.ListTopicResult(topic_names)[source]

Result of list topics api

Members:

topic_names (list): list of topic names

class datahub.models.results.GetTopicResult(**kwargs)[source]

Result of get topic api

Members:

project_name (str): project name

topic_name (str): topic name

shard_count (int) shard count

life_cycle (int) life cycle

record_type (datahub.models.RecordType): record type

record_schema (datahub.models.RecordSchema): record schema

comment (str): project description

create_time (int): create time

last_modify_time(int): last modify time

class datahub.models.results.ListShardResult(shards, protocol, interval)[source]

Result of list shards api

Members:

shards (list): list of datahub.models.Shard

class datahub.models.results.MergeShardResult(shard_id, begin_hash_key, end_hash_key)[source]

Result of merge shard api

Members:

shard_id (str): shard id

begin_hash_key (str): begin hash key

end_hash_key (str): end hash key

class datahub.models.results.SplitShardResult(new_shards)[source]

Result of split shard api

Members:

new_shards (list): list of datahub.models.ShardBase

class datahub.models.results.GetCursorResult(cursor, record_time, sequence)[source]

Request params of get cursor api

Members:

cursor (str): cursor

record_time (int): record time

sequence (int): sequence

class datahub.models.results.PutRecordsResult(failed_record_count, failed_records)[source]

Result of put records api

Members:

failed_record_count (int): failed record count

failed_records (list): list of datahub.models.FailedRecord

class datahub.models.results.GetRecordsResult(next_cursor, record_count, start_seq, records)[source]

Result of get records api

Members:

next_cursor (str): next cursor

record_count (int): record count

start_squ (int): start sequence

records (list): list of datahub.models.BlobRecord/datahub.models.TupleRecord

class datahub.models.results.GetMeteringInfoResult(active_time, storage)[source]

Result of get metering info api;

Members:

active_time (int): active time

storage (int): storage

class datahub.models.results.ListConnectorResult(connector_names, connector_ids)[source]

Result of list data connector

Members:

connector_names (list): list of data connector names

class datahub.models.results.GetConnectorResult(cluster_addr, connector_id, connector_type, state, creator, owner, create_time, column_fields, config, extra_config, shard_contexts, sub_id)[source]

Result of get data connector

Members:

cluster_addr (str): cluster address

connector_id (str): connector id

type (datahub.models.ConnectorType): connector type

state (datahub.models.ConnectorState): connector state

creator (str): creator

owner (str): owner

create_time (int): create time

column_fields (list): list of column fields

config (datahub.models.OdpsConnectorConfig): config

extra_config (dict): extra config

shard_contexts (list): list of datahub.models.ShardContext

sub_id (str): subscription id used by connector

class datahub.models.results.GetConnectorShardStatusResult(shard_status_infos)[source]

Result of get data connector shard status

Members:

shard_status_infos (dict): shard status entry map

class datahub.models.results.InitAndGetSubscriptionOffsetResult(offsets)[source]

Result of init and get subscription offset api

Members:

offsets (list): list of datahub.models.OffsetWithSession

class datahub.models.results.GetSubscriptionOffsetResult(offsets)[source]

Result of get subscription offset api

Members:

offsets (list): list of datahub.models.OffsetWithVersion

class datahub.models.results.CreateSubscriptionResult(sub_id)[source]

Result of create subscription api

Members:

sub_id (str): subscription id

class datahub.models.results.GetSubscriptionResult(comment, create_time, is_owner, last_modify_time, state, sub_id, topic_name, sub_type)[source]

Result of get subscription api

Members:

comment (str): comment

create_time (int): create time

is_owner (bool): owner or not

last_modify_time (int): last modify time

state (str): state

update_time (int): update time

record_time (int): record time

discard_count (int): discard count

class datahub.models.results.ListSubscriptionResult(total_count, subscriptions)[source]

Result of query subscription api

Exceptions

class datahub.exceptions.DatahubException(error_msg, status_code=-1, request_id=None, error_code=None)[source]

There was an base exception class that occurred while handling your request to datahub server.

class datahub.exceptions.ResourceExistException(error_msg, status_code=-1, request_id=None, error_code=None)[source]

The exception is raised while Datahub Object that you are creating is already exist.

class datahub.exceptions.ResourceNotFoundException(error_msg, status_code=-1, request_id=None, error_code=None)[source]

The exception is raised while Datahub Object that you are handling is not exist.

class datahub.exceptions.InvalidParameterException(error_msg, status_code=-1, request_id=None, error_code=None)[source]

The exception is raised while that your handling request parameter is invalid.

class datahub.exceptions.InvalidOperationException(error_msg, status_code=-1, request_id=None, error_code=None)[source]

The operation of shard is not support yet.

class datahub.exceptions.LimitExceededException(error_msg, status_code=-1, request_id=None, error_code=None)[source]

Too many request.

class datahub.exceptions.InternalServerException(error_msg, status_code=-1, request_id=None, error_code=None)[source]

The Datahub server occurred error.

class datahub.exceptions.AuthorizationFailedException(error_msg, status_code=-1, request_id=None, error_code=None)[source]

The authorization failed error.

class datahub.exceptions.NoPermissionException(error_msg, status_code=-1, request_id=None, error_code=None)[source]

The operation without permission.

Producer

class datahub.client.ProducerConfig(access_id, access_key, endpoint, protocol_type=DatahubProtocolType.PB, compress_format=CompressFormat.LZ4)[source]

Config for datahub producer

Members:

access_id (string): Aliyun access id

access_key (string): Aliyun access key

endpoint (string): Datahub endpoint

protocol_type (datahub.core.DatahubProtocolType): Protocol type for datahub client

compress_format (datahub.models.compress.CompressFormat): Compress format for records data

retry_times (int): Retry times when request error

async_thread_limit (int): Thread num limit for thread pool in message writer

thread_queue_limit (int): Task num limit for queue in thread pool in message writer

logging_level (int): Logging level

logging_filename (string): Logging file name

max_async_buffer_records (int): Max buffer records number to PutRecords once. Only valid when write async.

max_async_buffer_size (int): Max buffer size to PutRecords once. Only valid when write async.

max_async_buffer_time (int): Max buffer time to PutRecords once. Only valid when write async.

max_record_pack_queue_limit (int): Max ready record pack limit for queue. Only valid when write async.

class datahub.client.DatahubProducer(project_name, topic_name, producer_config, shard_ids=None)[source]

Producer client for datahub

Members:

project_name (string): project name

topic_name (string): topic name

producer_config (datahub.client.common.ProducerConfig): config for producer client

shard_ids (list): list of string: shard list you want to producer.

default is None, means write to all shards evenly

Consumer

class datahub.client.ConsumerConfig(access_id, access_key, endpoint, protocol_type=DatahubProtocolType.PB, compress_format=CompressFormat.LZ4)[source]

Config for datahub producer

Members:

access_id (string): Aliyun access id

access_key (string): Aliyun access key

endpoint (string): Datahub endpoint

protocol_type (datahub.core.DatahubProtocolType): Protocol type for datahub client

compress_format (datahub.models.compress.CompressFormat): Compress format for records data

retry_times (int): Retry times when request error

async_thread_limit (int): Thread num limit for thread pool in message reader

thread_queue_limit (int): Task num limit for queue in thread pool in message reader

logging_level (int): Logging level

logging_filename (string): Logging file name

auto_ack_offset (bool): Auto ack offset for fetched records or not

session_timeout (int): Session timeout

max_record_buffer_size (int): Max record buffer size in consumer

fetch_limit (int): Fetch num limit need to consume

class datahub.client.DatahubConsumer(project_name, topic_name, sub_id, consumer_config, shard_ids=None, timestamp=-1)[source]

Consumer client for datahub

Members:

project_name (string): project name

topic_name (string): topic name

sub_id (string): subscription id for consume

consumer_config (datahub.client.common.ConsumerConfig): config for consumer client

shard_ids (list): list of string: shard list you want to consume.

default is None, means allocated automatically by datahub server

timestamp (int): set the start timestamp for consume.

default is -1, means start with the subscription offset