技術向上

プログラミングの学び、気になるテクノロジーやビジネストレンドを発信

Cloud Pub SubをトリガーにしたCloud Functionsをデプロイする【Go】【GCP】

Cloud Pub/Sub

Cloud Pub/Subは、GCPの環境を使って、手軽にPublisherとSubscriberを構成できる、Googleによるクラウドツールです。

概要については、こちらに記載していますのでご覧ください。

Cloud Functions

Cloud Functionsは、GCPの環境を使って、手軽に関数単位でデプロイできる、Googleによるクラウドツールです。

概要については、こちらに記載していますのでご覧ください。


構成

あるTokenにPublishされたものをSubscribeすることをトリガーに、 Cloud Functionsとしてデプロイした関数を起動することができます。

Pubilsh
↓
あるTopic
↓ Subscribe
Cloud Functions上にデプロイした関数の発動


必要な設定は、Cloud Functionsに関数をデプロイするときに、トリガー(何をきっかけに関数を発動するか)にPub/SubのPublishイベントを指定し、あらかじめ設定しておいたPub/SubのTopicを指定することです。

ソースコードの記述

今回は関数をGolangで記述する例を紹介します。

// グローバルに定義した型や定数、変数は、関数がデプロイされている状態であれば、確保され続ける

type pubSubMessage struct { // この中から必要なものだけ指定すればOK
    Attrs       map[string]string `json:"attributes"`
    Data        []byte            `json:"data"` // base64-encoded data  送られるメッセージ
    ID          string            `json:"messageId"`
    PublishTime string            `json:"publishTime"`
}

type publishedData struct {
    Orders     []*orderData `json:"orders"`
}

type orderData struct {
    OrderID                 string `json:"order_id"`
    OrderStatus             string `json:"order_status"`
}

type got struct {
    Orders     []*gotOrder `json:"orders"`
}


type gotOrder struct {
    ID int64 `json:"id"`
}


var ordersURL *url.URL

func init() {  // 関数内で繰り返し使用する環境変数をinitで最初に読み込む。関数デプロイ時に実行される
    var err error
    ordersURL, err = url.Parse(os.Getenv("INTEGRATION_URL"))
    if err != nil {
        panic("error occurred parsing INTEGRATION_URL")  もし、指定した環境変数が存在しなければ、デプロイを失敗させる
    }
    ordersURL.Path = path.Join(ordersURL.Path, "orders")
}

func UpdateOrder(ctx context.Context, m pubSubMessage) error { // context.ContextとPub/Subメッセージを引数に取るのが決まった形式。 errorを返すことで、コンソール画面で一覧できる。
    var pData publishedData

    if err := json.Unmarshal(m.Data, &pData); err != nil {
        return fmt.Errorf("json unmarshal error, from m.Data to publishedData: %v", err)
    }

    orders := pData.Orders
    client := http.Client{Timeout: 15 * time.Second}

    for _, order := range orders {
        if err := func() error { // for文の中で、defer でBodyをCloseする際、オーバーヘッドを減らすために即時実行関数を使用
            req, err := createRequest(ctx, http.MethodGet,
                ordersURL.String(), nil)
            if err != nil {
                return err
            }

            q := req.URL.Query()
            q.Set("ec_order_id", order.OrderID)
            req.URL.RawQuery = q.Encode()

            resp, err := client.Do(req)
            if err != nil {
                return fmt.Errorf("http GET request error: %v", err)
            }
            defer resp.Body.Close()

            isSuccess := http.StatusOK <= resp.StatusCode && resp.StatusCode <= http.StatusPartialContent  // 206までなら成功と扱う
            if !isSuccess {
                return fmt.Errorf("failed to request GET: status code = %d", resp.StatusCode)
            }

            var got gotModel
            err = json.NewDecoder(resp.Body).Decode(&got)  / Streamのまま扱うことで、メモリ効率を下げない(↔︎ioutil.ReadAllからのjson.Unmarshal)
            if err != nil {
                return fmt.Errorf("json decode, from response body to got:  %w", err)
            }

            reqData, err := json.Marshal(order)
            if err != nil {
                return fmt.Errorf("json marshal error, order: %v", err)
            }
            for _, gotData := range got.Orders {
                ordersURL.Path = path.Join(ordersURL.Path, strconv.FormatInt(gotData.ID, 10))
                req, err = createRequest(ctx, http.MethodPut,
                    ordersURL.String(),
                    bytes.NewReader(reqData))
                if err != nil {
                    return err
                }

                resp, err = client.Do(req)
                if err != nil {
                    return fmt.Errorf("http PUT request error: %v", err)
                }
                defer resp.Body.Close()

                isSuccess = http.StatusOK <= resp.StatusCode && resp.StatusCode <= http.StatusPartialContent
                if isSuccess {
                    if _, err := io.Copy(ioutil.Discard, resp.Body); err != nil {  // bodyの結果を利用しないので、切り捨てる
                        log.Printf("[WARN] discard response body error: %v", err)
                    }
                } else {
                    b, err := ioutil.ReadAll(resp.Body)  // なぜエラーになったのかを知りたいため、bodyを読む
                    if err != nil {
                        log.Printf("failed to read response body, PUT: %v", err)
                    }
                    return fmt.Errorf("failed to request PUT: %s", string(b))
                }
                ordersURL.Path = path.Dir(ordersURL.Path)  // 最後の"/"の後に指定されたパスを切り捨てる、pathのメソッド(ファイルに対するディレクトリの意味)。
            }
            return nil
        }(); err != nil {  // for文の中のdefer resp.Body.Close()のためにifで囲ったので、ここでエラーをキャッチする必要がある
            return err
        }
    }
    return nil
}


func createRequest(
    ctx context.Context,
    method, uri,
    body io.Reader,
) (*http.Request, error) {
    req, err := http.NewRequest(method, uri, body)
    if err != nil {
        return nil, fmt.Errorf("create request error: %v", err)
    }
    req = req.WithContext(ctx)  // forのなかで、reqは繰り返し生成されるので、header、contextを都度設定
    req.Header.Add("user-agent", "UpdateOrder/1.0")
    req.Header.Add("content-type", "application/json; charset=utf-8")
    return req, nil
}


デプロイ

下記のようにコマンドをターミナルで実行し、デプロイします。

gcloud functions deploy <関数名> --runtime go111  --trigger-resource <topicID project/~などは不要> --trigger-event google.pubsub.topic.publish --region <リージョン。例:asia-northeast1> --set-env-vars <好きな変数名>=<好きな値> --timeout <任意。タイムアウトに指定する秒数。Maxは540>


このコマンドは、デプロイする関数が書かれているファイルと同じディレクトリで実行します。
関数名は先ほどのソースコードで言えば、UpdateOrderとなります。GCPのプロジェクト上で一意である必要がありますが、--entry-pointオプションを使って、ソースコード上の同じ関数を別名でデプロイすることも可能です。

タイムアウトはデフォルトで1分です。処理時間がかかりそうな関数の場合は、--timeoutオプションを指定して、伸ばしましょう。

--set-env-varsオプションを使えば、ソースコード上で使える環境変数を設定できます。


Pub/Sub、Cloud Functions共に、そこまで使用しなければ、無料で使えます。
Webサービスのアカウント登録後のメール送信など、ある程度時間のかかる処理をPublish Subscriber構成にしてUXを毀損させないようにするなど、手軽に試してみるといいと思います。


Cloud Pub/Sub概要【GCP】 - 技術向上

Cloud Functions概要【GCP】 - 技術向上