Slony-I的 RemoteWorker重试调查

客户的问题是:

向Slony-I运行环境中,增加新的slaveDB节点的时候发生错误。

log中反复出现错误,然后再重新开始(重新开始部分的log省略):

CONFIG remoteWorkerThread_1: connected to provider DB
CONFIG remoteWorkerThread_1: prepare to copy table "tst"."a_tbl"
CONFIG remoteWorkerThread_1: prepare to copy table "tst"."b_tbl"
CONFIG remoteWorkerThread_1: prepare to copy table "tst"."c_tbl"
CONFIG remoteWorkerThread_1: all tables for set 1 found on subscriber
CONFIG remoteWorkerThread_1: copy sequence "tst"."a_no_seq"
CONFIG remoteWorkerThread_1: copy sequence "tst"."b_no_seq"
CONFIG remoteWorkerThread_1: copy sequence "tst"."c_no_seq"

CONFIG remoteWorkerThread_1: copy table "tst"."a_tbl"
CONFIG remoteWorkerThread_1: Begin COPY of table "tst"."a_tbl"
NOTICE:  truncate of "tst"."a_tbl" succeeded
CONFIG remoteWorkerThread_1: 33778 bytes copied for table "tst"."a_tbl"
CONFIG remoteWorkerThread_1: 27.97 seconds to copy table "tst"."a_tbl"

CONFIG remoteWorkerThread_1: copy table "tst"."b_tbl"
CONFIG remoteWorkerThread_1: Begin COPY of table "tst"."b_tbl"
ERROR  remoteWorkerThread_1: "select "_mycluster".copyFields(2);" 

WARN   remoteWorkerThread_1: data copy for set 1 failed 1 times - sleep 15 seconds
NOTICE:  Slony-I: Logswitch to sl_log_2 initiated
CONTEXT:  SQL statement "SELECT "_mycluster".logswitch_start()"

经过查阅资料,并且和客户沟通,发现是他们的网络环境有问题:原有节点所在网段和新增节点不在一个网段。而他们又使用了网络工具来监控网络,在某些特定情况下,网络工具会切点网络连接。

正式此原因,导致出错。然后我进行了代码分析,发现remoteworker是很勤劳的,如果发生了通讯错误,它会反复重试的:

remoteWorkerThread_main函数的while循环,就会完成这个工作。

/* ----------                                                    
 * slon_remoteWorkerThread                                                    
 *                                                    
 * Listen for events on the local database connection. This means, events                                                    
 * generated by the local node only.                                                    
 * ----------                                                    
 */                                                    
void *                                                    
remoteWorkerThread_main(void *cdata)                                                    
{                                                    
    …                                                
    /*                                                
     * Work until shutdown or node destruction                                                
     */                                                
    while (true)                                                
      {                                                
        …                                            
        /*                                            
         * Event type specific processing                                            
         */                                            
        if (strcmp(event->ev_type, "SYNC") == 0)                                            
        {                                            
            …                                        
        }                                            
        else    /* not SYNC */                                        
        {                                            
            …                                        
            /*                                        
             * Simple configuration events. Call the corresponding runtime                                        
             * config function, add the query to call the configuration event                                        
             * specific stored procedure.                                        
             */                                        
            if (strcmp(event->ev_type, "STORE_NODE") == 0)                                        
            {                                        
                …                                    
            }                                        
            …                                        
            else if (strcmp(event->ev_type, "ENABLE_SUBSCRIPTION") == 0)                                        
            {                                        
                …                                    
                int         copy_set_retries = 0;                                    
                …                                    
                                                    
                                                    
                if (sub_receiver == rtcfg_nodeid &&                                    
                    event->ev_origin == node->no_id)                                
                {                                    
                    ScheduleStatus            sched_rc;                    
                    int            sleeptime = 15;                    
                    …                                
                    while (true)                                
                    {                                
                        …                            
                        /*                            
                         * If the copy succeeds, exit the loop and let the                            
                         * transaction commit.                            
                         */                            
                        if (copy_set(node, local_conn, sub_set, event) == 0)                            
                        {                            
                            …                        
                            copy_set_retries = 0;                        
                            break;                        
                        }                            
                        copy_set_retries++;                            
                                                    
                        /*                            
                         * Data copy for new enabled set has failed. Rollback                            
                         * the transaction, sleep and try again.                            
                         */                            
                        slon_log(SLON_WARN, "remoteWorkerThread_%d: "                            
                                 "data copy for set %d failed %d times - "                    
                                 "sleep %d seconds
",                    
                                 node->no_id, sub_set, copy_set_retries,                    
                                 sleeptime);                    
                        …                            
                    }                                
                }                                    
                else                                    
                {                                    
                    …                                
                }                                    
                …                                    
            }                                        
            …                                        
            else                                        
            {                                        
                …                                    
            }                                        
                                                    
            /*                                        
             * All simple configuration events fall through here. Commit the                                        
             * transaction.                                        
             */                                        
            …                                        
        }                                            
        …                                            
    }                                                
    …                                                
}                                                    
                                                    
                                                    
/* ----------                                                    
 * copy_set                                                    
 * ----------                                                    
 */                                                    
static int                                                    
copy_set(SlonNode *node, SlonConn *local_conn, int set_id,                                                    
         SlonWorkMsg_event *event)                                            
{                                                    
    …                                                
    /*                                                
     * Connect to the provider DB                                                
     */                                                
    …                                                
    slon_log(SLON_CONFIG, "remoteWorkerThread_%d: "                                                
             "connected to provider DB
",                                        
             node->no_id);                                        
    …                                                
    /*                                                
     * For each table in the set                                                
     */                                                
    for (tupno1 = 0; tupno1 < ntuples1; tupno1++)                                                
    {                                                
        char       *tab_fqname = PQgetvalue(res1, tupno1, 1);                                        
                                                    
        gettimeofday(&tv_start2, NULL);                                            
        slon_log(SLON_CONFIG, "remoteWorkerThread_%d: "                                            
                 "prepare to copy table %s
",                                    
                 node->no_id, tab_fqname);                                    
                                                    
        (void) slon_mkquery(&query3, "select * from %s limit 0;",                                            
                            tab_fqname);                        
        res2 = PQexec(loc_dbconn, dstring_data(&query3));                                            
        …                                            
    }                                                
    …                                                
    slon_log(SLON_CONFIG, "remoteWorkerThread_%d: "                                                
             "all tables for set %d found on subscriber
",                                        
             node->no_id, set_id);                                        
    …                                                
    for (tupno1 = 0; tupno1 < ntuples1; tupno1++)                                                
    {                                                
        …                                            
        slon_log(SLON_CONFIG, "remoteWorkerThread_%d: "                                            
                 "copy sequence %s
",                                    
                 node->no_id, seq_fqname);                                    
        …                                            
    }                                                
    …                                                
                                                    
    /*                                                
     * For each table in the set                                                
     */                                                
    for (tupno1 = 0; tupno1 < ntuples1; tupno1++)                                                
    {                                                
        …                                            
        slon_log(SLON_CONFIG, "remoteWorkerThread_%d: "                                            
                 "copy table %s
",                                    
                 node->no_id, tab_fqname);                                    
        …                                            
        if (omit_copy) {                                            
            …                                        
        } else {                                            
            slon_log(SLON_CONFIG, "remoteWorkerThread_%d: "                                        
                 "Begin COPY of table %s
",                                    
                 node->no_id, tab_fqname);                                    
                                                    
            (void) slon_mkquery(&query2, "select %s.copyFields(%d);",                                        
                            rtcfg_namespace, tab_id);                        
                                                    
            res3 = PQexec(pro_dbconn, dstring_data(&query2));                                        
                                                    
            if (PQresultStatus(res3) != PGRES_TUPLES_OK)                                        
            {                                        
                slon_log(SLON_ERROR, "remoteWorkerThread_%d: "%s" %s
",                                    
                         node->no_id, dstring_data(&query2),                            
                         PQresultErrorMessage(res3));                            
                …                                    
                return -1;                                    
            }                                        
                                                    
        …                                            
        slon_log(SLON_CONFIG, "remoteWorkerThread_%d: "                                            
                 INT64_FORMAT " bytes copied for table %s
",                                    
                 node->no_id, copysize, tab_fqname);                                    
        …                                            
        slon_log(SLON_CONFIG, "remoteWorkerThread_%d: "                                            
                 "%.3f seconds to copy table %s
",                                    
                 node->no_id,                                    
                 TIMEVAL_DIFF(&tv_start2, &tv_now), tab_fqname);                                    
    }                                                
    …                                                
    return 0;                                                
}                                                    
原文地址:https://www.cnblogs.com/gaojian/p/3227224.html