Category Archives: Python

Leverage Google Cloud Logging + Monitoring for Custom Cloud SQL for Postgres or AlloyDB Alerts

As migrations to CloudSQL and AlloyDB pick up speed, inevitably you will run into a condition where the cloud tooling has not quite caught up with exposing custom alerts and incidents that you may be exposing on-premises with tools such as Nagios or Oracle Enterprise Manager. One such example is monitoring of replication tools such as the GoldenGate Heartbeat table. While there are many ways that you may be able to implement this, I wanted to demonstrate a way to leverage Google Cloud Logging + Google Cloud Monitoring. Using this method will allow us to keep a long term log of certain parameters like lag or anything else you have built into the heartbeat mechanism. To demonstrate, lets use Python to query the database and create a Cloud Logging Entry:

import argparse
from datetime import datetime, timedelta
from sqlalchemy import create_engine, text
from google.cloud import logging


def retrievePgAlert(
    username: str,
    password: str,
    hostname: str,
    portNumber: int,
    databaseName: str,
    alertType: str,
) -> None:

    alertList: list = []

    conn_string = f"postgresql+psycopg2://{username}:{password}@{hostname}:{portNumber}/{databaseName}?client_encoding=utf8"
    engine = create_engine(conn_string)
    with engine.connect() as con:

        if alertType == "ogg-lag":
            sqlQuery = text(
                f"select replicat, effective_date, lag from ogg.heartbeat where lag >=:lagAmt and effective_date >= now() - interval ':intervalAmt min'"
            )

        result = con.execute(
            sqlQuery, {"lagAmt": oggLagAmt, "intervalAmt": checkIntervalMinutes}
        ).fetchall()
        for row in result:
            alertList.append(row)

        if not alertList:
            print(f"No alerts as of {datetime.now().strftime('%m/%d/%Y %H:%M:%S')}")
        else:
            for alertText in alertList:
                print(
                    f"Replicat: {alertText[0]} at date {alertText[1]} has a total lag of: {alertText[2]} seconds"
                )

            writeGcpCloudLoggingAlert(
                logger_alert_type=alertType,
                loggerName=args.loggerName,
                logger_message=alertList,
            )

    con.close()
    engine.dispose()


def writeGcpCloudLoggingAlert(
    logger_alert_type: str,
    loggerName: str,
    logger_message: list,
) -> None:

    # Writes log entries to the given logger.
    logging_client = logging.Client()

    # This log can be found in the Cloud Logging console under 'Custom Logs'.
    logger = logging_client.logger(loggerName)

    # Struct log. The struct can be any JSON-serializable dictionary.
    if logger_alert_type == "ogg-lag":
        replicatName: str
        effectiveDate: datetime
        lagAmount: str

        for alertFields in logger_message:
            replicatName = alertFields[0]
            effectiveDate = alertFields[1]
            lagAmount = int(alertFields[2])

            logger.log_struct(
                {
                    "alertType": logger_alert_type,
                    "replicat": str(alertFields[0]),
                    "alertDate": alertFields[1].strftime("%m/%d/%Y, %H:%M:%S"),
                    "alertRetrievalDate": datetime.now().strftime("%m/%d/%Y, %H:%M:%S"),
                    "lagInSeconds": int(alertFields[2]),
                },
                severity="ERROR",
            )

    print("Wrote logs to {}.".format(logger.name))


def delete_logger(loggerName):
    """Deletes a logger and all its entries.

    Note that a deletion can take several minutes to take effect.
    """
    logging_client = logging.Client()
    logger = logging_client.logger(loggerName)

    logger.delete()

    print("Deleted all logging entries for {}".format(logger.name))


if __name__ == "__main__":

    cloudSQLHost: str = "127.0.0.1"
    hostname: str
    portNumber: str
    database: str
    username: str
    password: str
    oggLagAmt: int = 15
    checkIntervalMinutes: int = 20

    with open("~/.pgpass", "r") as pgpassfile:
        for line in pgpassfile:
            if line.strip().split(":")[0] == cloudSQLHost:
                hostname, portNumber, database, username, password = line.strip().split(
                    ":"
                )

    parser = argparse.ArgumentParser(
        description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
    )
    parser.add_argument(
        "-loggerName",
        "--loggerName",
        type=str,
        help="GCP Cloud Log Namespace",
        default="postgres-alert",
    )
    parser.add_argument(
        "-alertType",
        "--alertType",
        type=str,
        help="Type of alert to log",
        default="ogg-lag",
    )
    args = parser.parse_args()

    if args.alertType == "ogg-lag":
        retrievePgAlert(
            hostname=hostname,
            username=username,
            password=password,
            portNumber=portNumber,
            databaseName=database,
            alertType=args.alertType,
        )

In this script we utilize the Google Cloud Logging APIs, SQLAlchemy and some other basic python imports to query the database based on a lag amount we are looking for from the heartbeat table.

***Note: The query within the python code could check for any condition by changing the query, by leveraging “gcloud” commands or REST API calls.

If the condition is met, the script creates a JSON message which is then written to the appropriate Google Cloud Logging Namespace. An example of the JSON message is below (sensitive information like the project id and instance id have been redacted):

{
  "insertId": "1b6fb35g18b606n",
  "jsonPayload": {
    "alertRetrievalDate": "01/20/2023, 18:47:20",
    "lagInSeconds": 15,
    "alertType": "ogg-lag",
    "alertDate": "01/20/2023, 18:34:55",
    "replicat": "r_hr"
  },
  "resource": {
    "type": "gce_instance",
    "labels": {
      "project_id": "[project id]",
      "instance_id": "****************",
      "zone": "projects/[project id]/zones/us-central1-c"
    }
  },
  "timestamp": "2023-01-20T18:47:20.103058301Z",
  "severity": "ERROR",
  "logName": "projects/[project id]/logs/postgres-alert",
  "receiveTimestamp": "2023-01-20T18:47:20.103058301Z"
}

Create a Cloud Logging Alert

Now that we have published a message to Cloud Logging, what can we do with it? Generally there are two paths, either a Cloud Metric or a Cloud Alert. For this demonstration, we will use the “Cloud Alert”. So to start the setup navigate to the console page “Operations Logging” —> “Logs Explorer”. From there click the “Create alert” function. The following dialog will show. You will need to double check the query to retrieve the appropriate logs in step 2, and in step 3, you can choose the time between notifications (this is to mute alerts that happen in between the interval) and how long past the last alert an incident will stay open. In this case, we will mute duplicate alerts that happen for 5 minutes after the first alert (if an alert occurs at 6 minutes another notification will fire) and incidents will remain open for 30 minutes past the last alert (no new incidents will be logged unless an alert occurs after that time frame). The query to be used within the alert is as follows:


logName="projects/[project id]/logs/postgres-alert"
AND severity="ERROR"
AND (jsonPayload.alertType = "ogg-lag")
AND (jsonPayload.lagInSeconds >= 15)
AND resource.labels.instance_id = [instance id]

The following dialogues outline the screens used to setup the alert.

The last step will be to choose your notification method, which is managed by different notification channels. The different types of notification channels include:

  • Mobile Devices
  • PagerDuty Services
  • PagerDuty Sync
  • Slack
  • Webhooks
  • E-Mail
  • SMS
  • Pub/Sub

Once all of this is defined, your alert is now set to notify once you place the python script on an appropriate schedule such as linux cron, Google Cloud Scheduler, etc. In this case we will now wait for an issue to occur that conforms to the alert. When it does an email like the following will result to the notification channel:

As your migration to cloud continues, keep an open mind and look for alternative ways to handle all of the operational “things” you are accustomed to in your on-premises environment. Most of the time there is a way in cloud to handle it!