cube_dispatcher(1) PgQ consumer that is used to write source records into partitoned tables

SYNOPSIS


cube_dispatcher.py [switches] config.ini

DESCRIPTION

cube_dispatcher is PgQ consumer that reads url encoded records from source queue and writes them into partitioned tables according to configuration file. Used to prepare data for business intelligence. Name of the table is read from producer field in event. Batch creation time is used for partitioning. All records created in same day will go into same table partion. If partiton does not exist cube dispatcer will create it according to template.

Events are usually procuded by pgq.logutriga(). Logutriga adds all the data of the record into the event (also in case of updates and deletes).

cube_dispatcher can be used in to modes:

keep_all

keeps all the data that comes in. If record is updated several times during one day then table partiton for that day will contain several instances of that record.

keep_latest

only last instance of each record is kept for each day. That also means that all tables must have primary keys so cube dispatcher can delete previous versions of records before inserting new data.

QUICK-START

Basic cube_dispatcher setup and usage can be summarized by the following steps:

1. pgq and logutriga must be installed in source databases. See pgqadm man page for details. target database must also have pgq_ext schema.

2. edit a cube_dispatcher configuration file, say cube_dispatcher_sample.ini

3. create source queue

$ pgqadm.py ticker.ini create <queue>

4. create target database and parent tables in it.

5. launch cube dispatcher in daemon mode

$ cube_dispatcher.py cube_dispatcher_sample.ini -d

6. start producing events (create logutriga trggers on tables) CREATE OR REPLACE TRIGGER trig_cube_replica AFTER INSERT OR UPDATE ON some_table FOR EACH ROW EXECUTE PROCEDURE pgq.logutriga(<queue>)

CONFIG

Common configuration parameters

job_name

Name for particulat job the script does. Script will log under this name to logdb/logserver. The name is also used as default for PgQ consumer name. It should be unique.

pidfile

Location for pid file. If not given, script is disallowed to daemonize.

logfile

Location for log file.

loop_delay

If continuisly running process, how long to sleep after each work loop, in seconds. Default: 1.

connection_lifetime

Close and reconnect older database connections.

log_count

Number of log files to keep. Default: 3

log_size

Max size for one log file. File is rotated if max size is reached. Default: 10485760 (10M)

use_skylog

If set, search for [./skylog.ini, ~/.skylog.ini, /etc/skylog.ini]. If found then the file is used as config file for Pythons logging module. It allows setting up fully customizable logging setup.

Common PgQ consumer parameters

pgq_queue_name

Queue name to attach to. No default.

pgq_consumer_id

Consumers ID to use when registering. Default: %(job_name)s

Config options specific to cube_dispatcher

src_db

Connect string for source database where the queue resides.

dst_db

Connect string for target database where the tables should be created.

mode

Operation mode for cube_dispatcher. Either keep_all or keep_latest.

dateformat

Optional parameter to specify how to suffix data tables. Default is YYYY_MM_DD which creates per-day tables. With YYYY_MM per-month tables can be created. If explicitly set empty, partitioning is disabled.

part_template

SQL fragment for table creation. Various magic replacements are done there:

_PKEY

comma separated list of primery key columns.

_PARENT

schema-qualified parent table name.

_DEST_TABLE

schema-qualified partition table.

_SCHEMA_TABLE

same as DEST_TABLE but dots replaced with "_", to allow use as index names.

Example config file

[cube_dispatcher]
job_name          = some_queue_to_cube

src_db            = dbname=sourcedb_test
dst_db            = dbname=dataminedb_test

pgq_queue_name    = udata.some_queue

logfile           = ~/log/%(job_name)s.log
pidfile           = ~/pid/%(job_name)s.pid

# how many rows are kept: keep_latest, keep_all
mode = keep_latest

# to_char() fmt for table suffix
#dateformat = YYYY_MM_DD
# following disables table suffixes:
#dateformat =

part_template =
     create table _DEST_TABLE (like _PARENT);
     alter table only _DEST_TABLE add primary key (_PKEY);

LOGUTRIGA EVENT FORMAT

PgQ trigger function pgq.logutriga() sends table change event into queue in following format:

ev_type

(op || ":" || pkey_fields). Where op is either "I", "U" or "D", corresponging to insert, update or delete. And pkey_fields is comma-separated list of primary key fields for table. Operation type is always present but pkey_fields list can be empty, if table has no primary keys. Example: I:col1,col2

ev_data

Urlencoded record of data. It uses db-specific urlecoding where existence of = is meaningful - missing = means NULL, present = means literal value. Example: id=3&name=str&nullvalue&emptyvalue=

ev_extra1

Fully qualified table name.

COMMAND LINE SWITCHES

Following switches are common to all skytools.DBScript-based Python programs.

-h, --help

show help message and exit

-q, --quiet

make program silent

-v, --verbose

make program more verbose

-d, --daemon

make program go background

Following switches are used to control already running process. The pidfile is read from config then signal is sent to process id specified there.

-r, --reload

reload config (send SIGHUP)

-s, --stop

stop program safely (send SIGINT)

-k, --kill

kill program immidiately (send SIGTERM)