Lately we had problems with long running processes blocking other processes. So I created following golang program which is able to check processes based on specified config file.

package main

import (
    "database/sql"
    "flag"
    "fmt"
    "net/http"
    //"github.com/araddon/dateparse"
    "io/ioutil"
    "log"
    "math/rand"
    "os"
    "strconv"
    "strings"
    "time"

    yaml "gopkg.in/yaml.v2"

    _ "github.com/lib/pq"
)

const (
    version          = `2019-09-19`
    placeholdersHelp = `check.action values:
check = only report cases - intended for analysis 
cancel = cancel query but preserve session
terminate = terminate whole session`
)

type data struct {
    pgURI        string
    pgDB         *sql.DB
    skipErrors   bool
    slackChannel string
    slackUser    string
    slackURL     string
}

type queryfile struct {
    PostgreSQLURI string `yaml:"uri"` // PostgreSQL URI
    Slack         struct {
        URL     string `yaml:"url,omitempty"`
        User    string `yaml:"user,omitempty"`
        Channel string `yaml:"channel,omitempty"`
    } `yaml:"slack,omitempty"`
    Check []struct {
        Name          string `yaml:"name"`
        RunTimeLimit  string `yaml:"runtime,omitempty"`         // interval in postgresql format like '1hour', '3minutes' etc.
        WaitEvent     string `yaml:"wait_event,omitempty"`      // as in pg_stat_activity - relation
        WaitEventType string `yaml:"wait_event_type,omitempty"` // as in pg_stat_activity - Lock
        State         string `yaml:"state"`                     // as in pg_stat_activity
        Query         string `yaml:"query,omitempty"`           // mask for query check
        Action        string `yaml:"action"`                    // what to do with query - check, cancel, terminate - default is terminate
    } `yaml:"check"`
}

type queryResult struct {
    processID            string
    processState         string
    processRunTime       string
    processWaitEventType string
    processWaitEvent     string
    processUseName       string
    processClientAddr    string
    processQuery         string
}

var (
    printHelp        = false
    showProgress     = false
    maximumRuns      = 5
    stepForMsg       = 10000
    runID            = ""
    processedLimit   = 50000
    printDebugMsg    = false
    debugLevel       = 0
    skipSlackMessage = false

    cancelProcessQueryMask    = `select pg_cancel_backend(${PID})`
    terminateProcessQueryMask = `select pg_terminate_backend(${PID})`
    queryRunTimeQueryMask     = ` and query_runtime > interval'${RUNTIMELIMIT}' `
    waitEventQueryMask        = ` and wait_event = '${WAITEVENT}' `
    waitEventTypeQueryMask    = ` and wait_event_type = '${WAITEVENTTYPE}' `
    queryStateQueryMask       = ` and state = '${QUERYSTATE}' `
    checkProcessesQueryMask   = `SELECT pid::text "processid", state, query_runtime::text as query_runtime, wait_event_type, wait_event, usename, client_addr, query
from public.view_pg_stat_activity where query not ilike '%vacuum%' and pid <> pg_backend_pid() ${QUERYRUNTIME} ${WAITEVENT} ${WAIREVENTTYPE} ${QUERYSTATE} order by query_runtime`
)

func printProgramVersion() {
    printMsg(os.Args[0], "version: ", version)
    return
}

func main() {
    runID = "[" + strconv.Itoa(rand.Intn(1000000000)) + "] "
    var d data

    var configFile string

    var err error
    var qf queryfile

    paramConfigfile := flag.String("configfile", "", "config file for the task")
    paramCheckOnly := flag.Bool("check_only", false, "only do check, do not terminate any query")

    paramHelp := flag.Bool("help", false, "print help")
    paramDebug := flag.Bool("debug", false, "print debug messages")

    paramProgress := flag.Bool("progress", false, "show progress during inserts into PostgreSQL")
    paramPrintVersion := flag.Bool("version", false, "print version of the code")

    paramDebugLevel := flag.Int("debug_level", debugLevel, "level of debug messages: 0 = default, common messages from run / 1 = show variables, queries etc. / 2 = deep debug, shows inserted data etc.")
    flag.Parse()

    configFile = *paramConfigfile
    printVersion := *paramPrintVersion
    doCheckOnly := *paramCheckOnly

    if printVersion == true {
        printProgramVersion()
        log.Fatalln()
    }

    printHelp = *paramHelp
    printDebugMsg = *paramDebug
    debugLevel = *paramDebugLevel
    showProgress = *paramProgress

    printProgramVersion()
    debugMsg(0, "config file: ", configFile)
    debugMsg(0, "printHelp: ", printHelp)
    debugMsg(0, "printDebugMsg: ", printDebugMsg)
    debugMsg(0, "debugLevel: ", debugLevel)
    debugMsg(0, "doCheckOnly: ", doCheckOnly)

    if printHelp == true || configFile == "" {
        printProgramVersion()
        if configFile == "" {
            printMsg("you have to specify config file")
        }
        flag.PrintDefaults()
        printMsg(placeholdersHelp)
        log.Fatalln()
    }

    if _, err := os.Stat(configFile); os.IsNotExist(err) {
        log.Fatalln("ERROR: Cannot find config file", configFile, "message:", err)
    }

    yamlFile, err := ioutil.ReadFile(configFile)
    if err != nil {
        log.Fatalln(runID, "ERROR: cannot read config file: ", err)
    }

    err = yaml.Unmarshal(yamlFile, &qf)
    if err != nil {
        log.Fatalln(runID, "ERROR: cannot unmarshal config file: ", err)
    }

    if qf.PostgreSQLURI == "" {
        log.Fatalln(runID, "ERROR: config file must contain PostgreSQL URI")
    }

    d.pgURI = qf.PostgreSQLURI
    checkValue("source postgresql URI", d.pgURI, true, printDebugMsg)

    d.slackURL = qf.Slack.URL
    d.slackUser = qf.Slack.User
    d.slackChannel = qf.Slack.Channel
    debugMsg(0, "d.slackURL: ", d.slackURL)
    debugMsg(0, "d.slackUser: ", d.slackUser)
    debugMsg(0, "d.slackChannel: ", d.slackChannel)

    debugMsg(0, "opening PG source connection...")
    d.pgDB, err = sql.Open("postgres", d.pgURI)
    if err != nil {
        log.Fatalln("ERROR: Cannot connect into postgresql db: ", err)
    }
    defer func() {
        if errClose := d.pgDB.Close(); err != nil {
            log.Println("closing source database:", errClose.Error())
        }
    }()
    if err = d.pgDB.Ping(); err != nil {
        log.Fatalln("ERROR: Cannot ping postgresql db: ", err)
    }

    osHostname, err := os.Hostname()
    if err != nil {
        log.Fatalln("ERROR: cannot read hostname: ", err)
    }

    for index, checkPart := range qf.Check {
        debugMsg(1, "check index: ", index)
        debugMsg(1, "checkPart.Name: ", checkPart.Name)
        debugMsg(1, "checkPart.RunTimeLimit: ", checkPart.RunTimeLimit)
        debugMsg(1, "checkPart.WaitEvent: ", checkPart.WaitEvent)
        debugMsg(1, "checkPart.WaitEventType: ", checkPart.WaitEventType)
        debugMsg(1, "checkPart.State: ", checkPart.State)
        debugMsg(1, "checkPart.Query: ", checkPart.Query)
        debugMsg(1, "checkPart.Action: ", checkPart.Action)

        if checkPart.State == "" {
            log.Fatalln("ERROR: check ", checkPart.Name, " does not contain state specification")
        }
        checkProcessesQuery := checkProcessesQueryMask
        queryRunTimeQuery := ""
        if checkPart.RunTimeLimit != "" {
            queryRunTimeQuery = strings.Replace(queryRunTimeQueryMask, "${RUNTIMELIMIT}", checkPart.RunTimeLimit, -1)
        }
        checkProcessesQuery = strings.Replace(checkProcessesQuery, "${QUERYRUNTIME}", queryRunTimeQuery, -1)
        waitEventQuery := ""
        if checkPart.WaitEvent != "" {
            waitEventQuery = strings.Replace(waitEventQueryMask, "${WAITEVENT}", checkPart.WaitEvent, -1)
        }
        checkProcessesQuery = strings.Replace(checkProcessesQuery, "${WAITEVENT}", waitEventQuery, -1)
        waitEventTypeQuery := ""
        if checkPart.WaitEventType != "" {
            waitEventTypeQuery = strings.Replace(waitEventTypeQueryMask, "${WAITEVENTTYPE}", checkPart.WaitEventType, -1)
        }
        checkProcessesQuery = strings.Replace(checkProcessesQuery, "${WAIREVENTTYPE}", waitEventTypeQuery, -1)
        queryStateQuery := ""
        if checkPart.State != "" {
            queryStateQuery = strings.Replace(queryStateQueryMask, "${QUERYSTATE}", checkPart.State, -1)
        }
        checkProcessesQuery = strings.Replace(checkProcessesQuery, "${QUERYSTATE}", queryStateQuery, -1)
        debugMsg(1, "checkProcessesQuery: ", checkProcessesQuery)

        rows, err := d.pgDB.Query(checkProcessesQuery)
        if err != nil {
            log.Fatalln("ERROR: could not run query for partitioning values:", err)
        }

        taskMsg := fmt.Sprint("[", osHostname, "] task: ", index, ": ", checkPart.Name, " (", checkPart.Action, ")")
        printMsg(taskMsg)
        var qr queryResult
        rowsFound := 0
        if rows != nil {
            for rows.Next() {
                if err = rows.Scan(&qr.processID, &qr.processState, &qr.processRunTime, &qr.processWaitEventType,
                    &qr.processWaitEvent, &qr.processUseName, &qr.processClientAddr, &qr.processQuery); err != nil {
                    log.Fatalln("ERROR: cannot query processes: ", err)
                }
                debugMsg(2, "query result: ", qr)
                processMsg := fmt.Sprint("pid:", qr.processID, ", query: ", qr.processQuery, ", runtime: ", qr.processRunTime)
                if doCheckOnly == false {
                    if checkPart.Action == "terminate" {
                        terminateProcessQuery := strings.Replace(terminateProcessQueryMask, "${PID}", qr.processID, -1)
                        outputMsg := taskMsg + " - terminating session: " + processMsg
                        debugMsg(0, "terminateProcessQuery: ", terminateProcessQuery)
                        printMsg(outputMsg)
                        runPgQuery(d.pgDB, terminateProcessQuery, "cancel query", true)
                        sendSlackMessage(outputMsg, d)
                    } else if checkPart.Action == "cancel" {
                        cancelProcessQuery := strings.Replace(cancelProcessQueryMask, "${PID}", qr.processID, -1)
                        outputMsg := taskMsg + " - canceling query: " + processMsg
                        debugMsg(0, "cancelProcessQuery: ", cancelProcessQuery)
                        printMsg(outputMsg)
                        runPgQuery(d.pgDB, cancelProcessQuery, "cancel query", true)
                        sendSlackMessage(outputMsg, d)
                    } else if checkPart.Action == "check" {
                        printMsg("Checking process: ", processMsg)
                    }
                } else {
                    printMsg("required check only - found process: ", processMsg)
                }
                rowsFound++
            }
        }
        printMsg("Rows processed: ", rowsFound)
    }

    printMsg("ALL DONE")

}

func runPgQuery(pgDB *sql.DB, query string, description string, exitOnError bool) {
    debugMsg(0, description, ": ", query)
    _, err := pgDB.Exec(query)
    if err != nil {
        if exitOnError == true {
            log.Fatalln("ERROR: Cannot run ", description, ": ", err, " Query: ", query)
        } else {
            printMsg("WARNING: Cannot run ", description, ": ", err, " Query: ", query)
        }
    }
}

func runPgQueryCommit(pgDB *sql.DB, query string, description string, exitOnError bool) {
    debugMsg(0, "runPgQueryCommit: ", description, ": ", query)
    pgTrans, err := pgDB.Begin() // start transaction[3] pgquerycommit
    if err != nil {
        log.Fatal("runPgQueryCommit - ERROR: cannot open transaction on select db:", err)
    }
    stmt, err := pgTrans.Prepare(query)
    if err != nil {
        log.Fatalln("runPgQueryCommit - ERROR: cannot prepare statement (", query, "): ", err)
    }
    result, err := stmt.Exec()
    if err != nil {
        if exitOnError == true {
            log.Fatalln("ERROR: Cannot run ", description, ": ", err, " Query: ", query)
        } else {
            printMsg("WARNING: Cannot run ", description, ": ", err, " Query: ", query)
        }
    }
    debugMsg(1, "runPgQueryCommit: ", description, " output: ", result)
    err = pgTrans.Commit() // end transaction[3] pgquerycommit
    if err != nil {
        log.Fatalln("ERROR: cannot commit pg transaction:", err)
    }
}

func curTime() string {
    return time.Now().UTC().Format(time.RFC3339) + ":"
}

func debugMsg(level int, t ...interface{}) {
    if printDebugMsg == true && level <= debugLevel {
        printMsg(t...)
    }
}

func printMsg(t ...interface{}) {
    fmt.Println(curTime(), fmt.Sprint(t...))
}

func checkValue(name string, value string, required bool, print bool) {
    if value == "" && required == true {
        log.Fatalln("ERROR: variable ", name, " cannot be empty!")
    }
    if print == true {
        debugMsg(0, name, ": ", value)
    }
}

func fullTableName(schema string, table string) (fullname string) {
    fullname = `"` + schema + `"."` + table + `"`
    return
}

func sendSlackMessage(msg string, d data) {
    slackMessage := strings.NewReader(fmt.Sprintf("{\"channel\": \"%s\", \"username\": \"%s\", \"text\": \"%s\" }", d.slackChannel, d.slackUser, msg))
    debugMsg(0, "sendSlackMessage: d.slackChannel - ", d.slackChannel)
    debugMsg(0, "sendSlackMessage: d.slackUser - ", d.slackUser)
    debugMsg(0, "sendSlackMessage: d.slackURL - ", d.slackURL)
    debugMsg(0, "sendSlackMessage: slackMessage - ", slackMessage)
    if d.slackURL != "" {
        if skipSlackMessage == false {
            req, err := http.NewRequest("POST", d.slackURL, slackMessage)
            if err != nil {
                printMsg("ERROR: Failed to create a request for slack message:", err, " message: ", msg)
                return
            }
            req.Header.Set("Content-Type", "application/x-www-form-urlencoded")

            resp, err := http.DefaultClient.Do(req)
            if err != nil {
                printMsg("ERROR: Failed to send slack message:", err, " message: ", msg)
                return
            }
            defer resp.Body.Close()
            printMsg("Slack message sent: ", slackMessage, " - response: ", resp)
        } else {
            printMsg("skip Slack mode: ", slackMessage)
        }
    } else {
        printMsg("Slack URL is not defined: ", slackMessage)
    }
}

How config file looks like (yaml format):

check:

  • name: query active for more then 1 hour
    runtime: “1 hour”
    state: active
    action: check
  • name: query active for more then 6 hours
    runtime: “12 hours”
    state: active
    action: terminate
  • name: query waiting for lock on relation more then 10 minutes
    runtime: “10 minutes”
    state: active
    wait_event: relation
    wait_event_type: Lock
    action: terminate
  • name: idle in transaction for more then 1 hour
    runtime: “1 hour”
    state: “idle in transaction”
    action: check
  • name: idle in transaction for more then 6 hour
    runtime: “6 hours”
    state: “idle in transaction”
    action: terminate