I have written a python program to parallel restore greenplum backup in dissimilar number of segment instances. This script has all required steps mentioned in this pivotal documentation.

Problem

  • Restoring backup in DCA with different configuration
    • Old method
      • Restoring pg_dump backup (Single Thread)
      • ~350 Gb/Hour (Max)
    • Thoughput
      • Backup size: 29TB
      • Restore time: ~84 Hours

Solution

  • Taking parallel backup using gpcrondump
  • Restoring multiple backup files parallel
    • 1.8 TB / Hour
    • Backup Size: 29 TB
    • Restore time: ~16 Hours

Point to note

  • We should have all the uncompressed backup files in single directory. (Data Domain is best option for this)

Testing

For testing this script, I created a sample function. This function takes two arguments. One is string and another is integer. As you might have understood this function internally runs pg_sleep function on second(integer) argument.

Funtion

1
2
3
4
5
6
7
8
CREATE FUNCTION test_pg_sleep(TEXT, INTEGER) RETURNS VOID AS $$
DECLARE
    name ALIAS for $1;
    delay ALIAS for $2;
BEGIN
    PERFORM pg_sleep(delay);
END;
$$ LANGUAGE 'plpgsql' STRICT;

Backup files

Below are the backup files which contains statements for above function statements. I have given various values in function to sleep.

[~/sample_dumps] [0]
$ ls
gp_dump_-1_1_1234567890           gp_dump_2_10_1234567890           
gp_dump_2_2_1234567890            gp_dump_2_4_1234567890            
gp_dump_2_6_1234567890            gp_dump_2_8_1234567890
gp_dump_-1_1_1234567890_post_data gp_dump_2_1_1234567890            
gp_dump_2_3_1234567890            gp_dump_2_5_1234567890            
gp_dump_2_7_1234567890            gp_dump_2_9_1234567890
[~/sample_dumps] [0]
$ cat *
select test_pg_sleep('master_file',20)

select test_pg_sleep('post_data',10)

select test_pg_sleep('segment_10_file',50)

select test_pg_sleep('segment_1_file',5)

select test_pg_sleep('segment_2_file',10)

select test_pg_sleep('segment_3_file',15)

select test_pg_sleep('segment_4_file',20)

select test_pg_sleep('segment_5_file',25)

select test_pg_sleep('segment_6_file',30)

select test_pg_sleep('segment_7_file',35)

select test_pg_sleep('segment_8_file',40)

select test_pg_sleep('segment_9_file',45)

Running Script

Below are the two sessions, In first session I’m running restore script and in second session I’m monitoring pg_stat_activity table.

Session 1:

python2.7 parallel_restore.py -d icprod -t 1234567890 -u sample_dumps/ -p 6

Session 2:

$ while True
> do
> sleep 1
> psql icprod -c "SELECT pid,query,now() - query_start as Query_Duration from pg_stat_activity where query not ilike '%pg_stat%';"
> done

Demo

Below is live recording of above two sessions. You may want to click on fullscreen for better view.

Help

[gpadmin@mdw ~]$ ./parallel_restore.py --help
Usage:
python2 paralled_restore.py -d <database_name> -u <backup_files_directory> -t <dump_key> -p <number of parallel processes

Options:
  -h, --help            show this help message and exit
  -d DATABASE, --database=DATABASE
                        Specify target database to restore backup
  --host=HOST           Specify the target host
  -t TIMESTAMP, --timestamp=TIMESTAMP
                        Specify the timestamp of backup
  -p PARALLEL, --parallel-processes=PARALLEL
                        Specify number of parallel-processes
  -u DIRECTORY, --directory=DIRECTORY
                        Specify Backup directory

-d, --database

This option is to specify the target database. If target database doesn’t exist in the environment, Script exits immediately.

--host

This option is to specify the target host. The default value is localhost

-t, --timestamp

This option is to specify the backup key generated by gpcrondump utility. This helps to fetch the backup file list.

-p, --parallel

This option is to specify the number of parallel processes to run. The default value is 1.

-u, --directory

This is to specify the backup files directory.

Logging

This script stores its logs in /home/gpadmin/gpAdminLogs and it generates multiple log files.

parallel_restore_<date>.log

This is main log file which stores script progress

parallel_restore_master_<dumpkey>_<date>.log

This is standard log files of master backup restore

parallel_restore_master_<dumpkey>_<date>.error

This is error log file of master backup file restore. It is always recommended to check this log after restore.

parallel_restore_<dump_key>_<date>.log

This is standard log file for segment backup file restore.

parallel_restore_<dump_key>_<date>.error

This is error log file for segment backup file restore. It is always recommended to check this log after restore.

parallel_restore_post_data.log

This is standard log file for post data backup file

Program:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
 
#!/bin/env python
from multiprocessing import Pool, Value
from pygresql.pg import DB
import optparse
from gppylib import gplog
import datetime
import re
import os
import sys
import smtplib


# Command line options
help_usage = """
python2 paralled_restore.py -d <database_name> -u <backup_files_directory> -t <dump_key> -p <number of parallel processes"""

parser = optparse.OptionParser(usage=help_usage)
parser.add_option("-d", "--database", dest="database", action="store", help="Specify target database to restore backup")
parser.add_option("--host", dest="host",action="store", default='localhost', help="Specify the target host")
parser.add_option("-t", "--timestamp", dest="timestamp", action="store", help="Specify the timestamp of backup")
parser.add_option("-p", "--parallel-processes",dest="parallel", action="store", default=1, help="Specify number of parallel-processes")
parser.add_option("-u", "--directory", dest="directory", action="store", help="Specify Backup directory")
parser.add_option("--noanalyze", dest="noanalyze", action="store_true", help="Specify if you dont want to run analyze on restored database")
options, args = parser.parse_args()

# Program logging
logger = gplog.get_default_logger()
gplog.setup_tool_logging("parallel_restore", '', "gpadmin")

# Start

logger.info("Starting script with args " + str(' '.join(sys.argv)))

# Python version check
if sys.version_info[0] != 2:
    logger.error("This script only supported by Python version 2... Exiting")
    sys.exit()

# Validating command line options

if options.timestamp:
    vDump_key = options.timestamp
else:
    logger.error("timestamp not supplied... exiting...")
    sys.exit()

if options.database:
    vDatabase = options.database
else:
    logger.error("database not supplied... exiting...")
    sys.exit()

if options.directory:
    vDirectory = options.directory
else:
    logger.error("directory not supplied... exiting...")
    sys.exit()

con = DB(dbname='gpadmin', host=options.host)
if vDatabase in con.get_databases():
    pass
else:
    logger.error("Database doesn't exists... exiting")
    sys.exit()
con.close()

vProcesses = int(options.parallel)
vHost = options.host

now = datetime.datetime.now()
vDate = str(now.strftime("%Y%m%d"))

logger.info("Getting list master file and segment files")

backup_files = os.listdir(vDirectory)
master_files = [vDirectory + '/' + 'gp_dump_-1_1_%s' %vDump_key, vDirectory + '/' + 'gp_dump_1_1_%s' %vDump_key]
segment_file_regex = 'gp_dump_\d+_\d+_%s$' %vDump_key

# Getting Master file
for file in master_files:
    if os.path.isfile(file):
        master_file = file
if master_file:
    pass
else:
    logger.error("Master file doesn't exists... Exiting...")
    sys.exit()

# Restoring master file
logger.info("Restoring master SQL file: %s" %master_file)
run_master_file = "psql %s -h %s -f %s >> /home/gpadmin/gpAdminLogs/parallel_restore_master_%s_%s.log 2>> /home/gpadmin/gpAdminLogs/parallel_restore_master_%s_%s.error" %(vDatabase, vHost, master_file, vDump_key, vDate, vDump_key, vDate)
os.popen(run_master_file)
logger.info("Restoring master SQL file completed")

def sendmail(body):
    SENDER = 'DBA-Greenplum@broadridge.com'
    RECEIVERS = 'DBA-Greenplum@broadridge.com'
    sender = SENDER
    receivers = RECEIVERS

    message = """From: """ + SENDER + """
To: """ + RECEIVERS + """
MIME-Version: 1.0
Content-type: text/html
Subject: Parallel restore status """ + vDatabase + """\n"""
    message = message + body
    try:
        smtpObj = smtplib.SMTP('localhost')
        smtpObj.sendmail(sender, receivers, message)
    except SMTPException:
        logging.error("Unable to send email")

# Get segment files to restore
def get_segment_files():
    segment_files = []
    for file in backup_files:
        if re.match(segment_file_regex, file, re.S):
            segment_files.append(vDirectory + '/' + file)
    if master_file in segment_files:
        segment_files.remove(master_file)
    return segment_files

# Get table list to analyze
def get_tables():
    con = DB(dbname = vDatabase)
    tables = con.get_tables()
    con.close()
    return tables

# To get count of completed backup_files
counter = Value('i', 0)
total_segment_files = len(get_segment_files())


def init(args):
    ''' store the counter for later use '''
    global counter
    counter = args

# Function to run analyze

def run_analyze(table):
    global counter
    con = DB(dbname = vDatabase)
    con.query("analyze %s" %table)
    con.close()
    with counter.get_lock():
        counter.value += 1
    if counter.value % 50 == 0 or counter.value == len(get_tables()):
        logger.info("analyze status: completed " + str(counter.value) + " out of " + str(len(get_tables())) + " tables or partitions ")

# Function to run segment backup_files
def run_segment_files(segment_file):
    global counter
    os.popen('psql %s -f %s -h %s >> /home/gpadmin/gpAdminLogs/parallel_restore_%s_%s.log 2>> /home/gpadmin/gpAdminLogs/parallel_restore_%s_%s.error' %(vDatabase, segment_file, vHost, vDump_key, vDate, vDump_key, vDate))
    with counter.get_lock():
        counter.value += 1
    logger.info(str(counter.value) + " files completed out of " + str(total_segment_files) + " files")
    if counter.value % (vProcesses * 2) == 0:
        sendmail("Restore status: " + str(counter.value) + " files completed out of " + str(total_segment_files) + " files")



# Forking new #n processes for run_segment_files() Function
logger.info("Running segment file restore")
pool = Pool(initializer=init, initargs=(counter, ), processes=vProcesses)
pool.map(run_segment_files, get_segment_files())
pool.close()  # worker processes will terminate when all work already assigned has completed.
pool.join()  # to wait for the worker processes to terminate.

# Running set schema and set val statements
schema_path_set_val = open('/tmp/schema_path_set_val_%s.sql' %vDump_key,'a')
for line in open(master_file,'r'):
    if 'SET search_path' in line or 'SELECT pg_catalog.setval' in line:
        schema_path_set_val.write(line)


# Running post_data file
post_data_file = master_file + '_post_data'
run_schema_path_set_val = "psql %s -h %s -f /tmp/schema_path_set_val_%s.sql > /home/gpadmin/gpAdminLogs/parallel_restore_schema_path_set_val_%s.log" %(vDatabase, vHost, vDump_key, vDump_key)
run_post_data_file = "psql %s -h %s -f %s > /home/gpadmin/gpAdminLogs/parallel_restore_post_data_%s.log" %(vDatabase, vHost, post_data_file, vDump_key)

if os.path.isfile('/tmp/schema_path_set_val_%s.sql' %vDump_key):
    logger.info("Running schema_path_set_val")
    os.popen(run_schema_path_set_val)
else:
    logger.info("schema_path_set_val file doesn't exists")

if os.path.isfile(post_data_file):
    logger.info("Running post_data file")
    os.popen(run_post_data_file)
else:
    logger.info("Post data file doesn't exists")
logger.info("Restore completed")

if not options.noanalyze:
    sendmail("""
                Restore status: completed.\n
                Please review the log file in ~/gpAdminLogs directory (recommended)\n
                Running analyze:
                """)
    logger.info("Running analyze on " + str(len(get_tables())) + " tables or partitions")
    pool = Pool(initializer=init, initargs=(counter, ), processes=vProcesses)
    pool.map(run_analyze, get_tables())
    pool.close()  # worker processes will terminate when all work already assigned has completed.
    pool.join()  # to wait for the worker processes to terminate.
    sendmail("Analyze status: completed")
else:
    sendmail("""
                Restore status: completed.\n
                Analyze is skipped by request; database performance may be adversely impacted until analyze is done.\n
                Please review the log file in ~/gpAdminLogs directory (recommended)""")
    logger.warning("---------------------------------------------------------------------------------------------------")
    logger.warning("Analyze bypassed on request; database performance may be adversely impacted until analyze is done.")
    logger.warning("Please review the log file in ~/gpAdminLogs directory (recommended)")
    logger.warning("---------------------------------------------------------------------------------------------------")