[go: up one dir, main page]

Skip to content

Commit

Permalink
Add mydumper support for OceanBase 2.1, it is not robust
Browse files Browse the repository at this point in the history
  • Loading branch information
AcidGo committed Jun 3, 2020
1 parent 240b6dd commit 2ce5d99
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 16 deletions.
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
# What are the new features?

Original mydumper is not support for OceanBase 2.1, but i need the features, so i add it.

**IMPORTANT: The OceanBase support is not robust, prohibited to use in production environment!!!**

-----



# What is mydumper? Why?

* Parallelism (hence, speed) and performance (avoids expensive character set conversion routines, efficient code overall)
Expand Down
93 changes: 77 additions & 16 deletions mydumper.c
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ gchar *daemon_binlog_directory = NULL;
gchar *logfile = NULL;
FILE *logoutfile = NULL;

gboolean is_ob = FALSE;
gboolean is_drds = FALSE;
gboolean no_schemas = FALSE;
gboolean no_data = FALSE;
gboolean no_locks = FALSE;
Expand Down Expand Up @@ -147,6 +149,10 @@ static GMutex *ll_mutex = NULL;
int errors;

static GOptionEntry entries[] = {
// Acid Add
{"is-ob", 0, 0, G_OPTION_ARG_NONE, &is_ob, "specific options for OceanBase 2.2", NULL},
{"is-drds", 0, 0, G_OPTION_ARG_NONE, &is_drds, "specific options for DRDS DGB version", NULL},
// EOF Acid Add
{"database", 'B', 0, G_OPTION_ARG_STRING, &db, "Database to dump", NULL},
{"tables-list", 'T', 0, G_OPTION_ARG_STRING, &tables_list,
"Comma delimited table list to dump (does not exclude regex option)",
Expand Down Expand Up @@ -593,11 +599,34 @@ void *process_queue(struct thread_data *td) {
mysql_query(thrconn, "SET SESSION wait_timeout = 2147483")) {
g_warning("Failed to increase wait_timeout: %s", mysql_error(thrconn));
}
if (mysql_query(thrconn,
"SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ")) {
g_critical("Failed to set isolation level: %s", mysql_error(thrconn));
exit(EXIT_FAILURE);

// specific for ob, skip the isolation level changing
if (!is_ob) {
if (mysql_query(thrconn,
"SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ")) {
g_critical("Failed to set isolation level: %s", mysql_error(thrconn));
exit(EXIT_FAILURE);
}
} else {
if (mysql_query(thrconn,
"SET SESSION ob_query_timeout = 10000000000")) {
g_critical("Failed to set session ob_query_timeout: %s", mysql_error(thrconn));
exit(EXIT_FAILURE);
} else {
g_message("Set session ob_query_timeout to 10000000000 for thread %d and connection ID %lu",
td->thread_id, mysql_thread_id(thrconn));
}

if (mysql_query(thrconn,
"SET SESSION ob_trx_timeout = 10000000000")) {
g_critical("Failed to set session ob_trx_timeout: %s", mysql_error(thrconn));
exit(EXIT_FAILURE);
} else {
g_message("Set session ob_trx_timeout to 10000000000 for thread %d and connection ID %lu",
td->thread_id, mysql_thread_id(thrconn));
}
}

if (mysql_query(thrconn,
"START TRANSACTION /*!40108 WITH CONSISTENT SNAPSHOT */")) {
g_critical("Failed to start consistent snapshot: %s", mysql_error(thrconn));
Expand Down Expand Up @@ -874,10 +903,21 @@ void *process_queue_less_locking(struct thread_data *td) {
td->thread_id, mysql_thread_id(thrconn));
}

if ((detected_server == SERVER_TYPE_MYSQL) &&
if ((detected_server == SERVER_TYPE_MYSQL) && (!is_ob) &&
mysql_query(thrconn, "SET SESSION wait_timeout = 2147483")) {
g_warning("Failed to increase wait_timeout: %s", mysql_error(thrconn));
}

// specific for ob to change ob_query_timeout
if ((is_ob) &&
mysql_query(thrconn, "SET SESSION ob_query_timeout = 10000000000")) {
g_warning("Failed to increase ob_query_timeout: %s", mysql_error(thrconn));
} else {
if (is_ob) {
g_message("Connection Id %lu set the ob_query_timeout to 10000000000", mysql_thread_id(thrconn));
}
}

if (!skip_tz && mysql_query(thrconn, "/*!40103 SET TIME_ZONE='+00:00' */")) {
g_critical("Failed to set time zone: %s", mysql_error(thrconn));
}
Expand Down Expand Up @@ -1127,7 +1167,7 @@ int main(int argc, char *argv[]) {
ll_mutex = g_mutex_new();
ll_cond = g_cond_new();

context = g_option_context_new("multi-threaded MySQL dumping");
context = g_option_context_new("AcidGo custom - multi-threaded MySQL dumping");
GOptionGroup *main_group =
g_option_group_new("main", "Main Options", "Main Options", NULL, NULL);
g_option_group_add_entries(main_group, entries);
Expand All @@ -1148,7 +1188,7 @@ int main(int argc, char *argv[]) {
// \n",password,sizeof(password));

if (program_version) {
g_print("mydumper %s, built against MySQL %s\n", VERSION,
g_print("mydumper %s, AcidGo custom Version: 0.0.3, built against MySQL %s\n", VERSION,
MYSQL_VERSION_STR);
exit(EXIT_SUCCESS);
}
Expand Down Expand Up @@ -1303,6 +1343,27 @@ MYSQL *create_main_connection() {
mysql_query(conn, "SET SESSION wait_timeout = 2147483")) {
g_warning("Failed to increase wait_timeout: %s", mysql_error(conn));
}

// specific for ob to change ob_query_timeout
if ((is_ob) &&
mysql_query(conn, "SET SESSION ob_query_timeout = 10000000000")) {
g_warning("Failed to increase ob_query_timeout: %s", mysql_error(conn));
} else {
if (is_ob) {
g_message("The connection set the ob_query_timeout to 10000000000");
}
}

// specific for ob to change ob_trx_timeout
if ((is_ob) &&
mysql_query(conn, "SET SESSION ob_trx_timeout = 10000000000")) {
g_warning("Failed to increase ob_trx_timeout: %s", mysql_error(conn));
} else {
if (is_ob) {
g_message("The connection set the ob_trx_timeout to 10000000000");
}
}

if ((detected_server == SERVER_TYPE_MYSQL) &&
mysql_query(conn, "SET SESSION net_write_timeout = 2147483")) {
g_warning("Failed to increase net_write_timeout: %s", mysql_error(conn));
Expand Down Expand Up @@ -1368,7 +1429,7 @@ void *binlog_thread(void *data) {
MYSQL *conn;
conn = mysql_init(NULL);
if (defaults_file != NULL) {
mysql_options(conn, MYSQL_READ_DEFAULT_FILE, defaults_file);
mysql_options(thrconn, MYSQL_READ_DEFAULT_FILE, defaults_file);
}
mysql_options(conn, MYSQL_READ_DEFAULT_GROUP, "mydumper");

Expand Down Expand Up @@ -1460,7 +1521,8 @@ void start_dump(MYSQL *conn) {
This avoids stalling whole server with flush */

if (!no_locks) {
if (mysql_query(conn, "SHOW PROCESSLIST")) {
if ((!is_ob && mysql_query(conn, "SHOW PROCESSLIST")) ||
(is_ob && mysql_query(conn, "SHOW FULL PROCESSLIST"))) {
g_warning("Could not check PROCESSLIST, no long query guard enabled: %s",
mysql_error(conn));
} else {
Expand Down Expand Up @@ -1812,6 +1874,7 @@ void start_dump(MYSQL *conn) {
while ((row = mysql_fetch_row(databases))) {
if (!strcasecmp(row[0], "information_schema") ||
!strcasecmp(row[0], "performance_schema") ||
!strcasecmp(row[0], "oceanbase") ||
(!strcasecmp(row[0], "data_dictionary")))
continue;
dump_database(row[0], &conf);
Expand Down Expand Up @@ -2112,9 +2175,7 @@ GString *get_insertable_fields(MYSQL *conn, char *database, char *table) {
g_string_append(field_list, ",");
}

gchar *tb = g_strdup_printf("`%s`", row[0]);
g_string_append(field_list, tb);
g_free(tb);
g_string_append(field_list, row[0]);
}
mysql_free_result(res);

Expand Down Expand Up @@ -2169,7 +2230,7 @@ GList *get_chunks_for_table(MYSQL *conn, char *database, char *table,
while ((row = mysql_fetch_row(indexes))) {
if (!strcmp(row[3], "1")) {
if (row[6])
cardinality = strtoul(row[6], NULL, 10);
cardinality = strtoll(row[6], NULL, 10);
if (cardinality > max_cardinality) {
field = row[4];
max_cardinality = cardinality;
Expand Down Expand Up @@ -2220,8 +2281,8 @@ GList *get_chunks_for_table(MYSQL *conn, char *database, char *table,
case MYSQL_TYPE_LONGLONG:
case MYSQL_TYPE_INT24:
/* static stepping */
nmin = strtoul(min, NULL, 10);
nmax = strtoul(max, NULL, 10);
nmin = strtoll(min, NULL, 10);
nmax = strtoll(max, NULL, 10);
estimated_step = (nmax - nmin) / estimated_chunks + 1;
cutoff = nmin;
while (cutoff <= nmax) {
Expand Down Expand Up @@ -2317,7 +2378,7 @@ guint64 estimate_count(MYSQL *conn, char *database, char *table, char *field,
row = mysql_fetch_row(result);

if (row && row[i])
count = strtoul(row[i], NULL, 10);
count = strtoll(row[i], NULL, 10);

if (result)
mysql_free_result(result);
Expand Down

0 comments on commit 2ce5d99

Please sign in to comment.