5.10.2. 컨테이너 소스
컨테이너 소스는 이벤트를 생성하고 이벤트를 싱크로 보내는 컨테이너 이미지를 생성합니다. 이미지 URI를 사용하는 ContainerSource
오브젝트와 컨테이너 이미지를 생성하여 사용자 지정 이벤트 소스를 생성할 수 있습니다.
5.10.2.1. 컨테이너 이미지 생성을 위한 지침
컨테이너 소스 컨트롤러에서 두 개의 환경 변수를 삽입합니다. K_SI 및
K_CE_OVERRIDES
. 이러한 변수는 sink
및 ceOverrides
사양에서 확인됩니다. 이벤트는 K_SINK
환경 변수에 지정된 싱크 URI로 전송됩니다. 해당 메시지는 CloudEvent
HTTP 형식을 사용하여 POST
로 보내야 합니다.
컨테이너 이미지의 예
다음은 하트비트 컨테이너 이미지의 예입니다.
package main import ( "context" "encoding/json" "flag" "fmt" "log" "os" "strconv" "time" duckv1 "knative.dev/pkg/apis/duck/v1" cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/kelseyhightower/envconfig" ) type Heartbeat struct { Sequence int `json:"id"` Label string `json:"label"` } var ( eventSource string eventType string sink string label string periodStr string ) func init() { flag.StringVar(&eventSource, "eventSource", "", "the event-source (CloudEvents)") flag.StringVar(&eventType, "eventType", "dev.knative.eventing.samples.heartbeat", "the event-type (CloudEvents)") flag.StringVar(&sink, "sink", "", "the host url to heartbeat to") flag.StringVar(&label, "label", "", "a special label") flag.StringVar(&periodStr, "period", "5", "the number of seconds between heartbeats") } type envConfig struct { // Sink URL where to send heartbeat cloud events Sink string `envconfig:"K_SINK"` // CEOverrides are the CloudEvents overrides to be applied to the outbound event. CEOverrides string `envconfig:"K_CE_OVERRIDES"` // Name of this pod. Name string `envconfig:"POD_NAME" required:"true"` // Namespace this pod exists in. Namespace string `envconfig:"POD_NAMESPACE" required:"true"` // Whether to run continuously or exit. OneShot bool `envconfig:"ONE_SHOT" default:"false"` } func main() { flag.Parse() var env envConfig if err := envconfig.Process("", &env); err != nil { log.Printf("[ERROR] Failed to process env var: %s", err) os.Exit(1) } if env.Sink != "" { sink = env.Sink } var ceOverrides *duckv1.CloudEventOverrides if len(env.CEOverrides) > 0 { overrides := duckv1.CloudEventOverrides{} err := json.Unmarshal([]byte(env.CEOverrides), &overrides) if err != nil { log.Printf("[ERROR] Unparseable CloudEvents overrides %s: %v", env.CEOverrides, err) os.Exit(1) } ceOverrides = &overrides } p, err := cloudevents.NewHTTP(cloudevents.WithTarget(sink)) if err != nil { log.Fatalf("failed to create http protocol: %s", err.Error()) } c, err := cloudevents.NewClient(p, cloudevents.WithUUIDs(), cloudevents.WithTimeNow()) if err != nil { log.Fatalf("failed to create client: %s", err.Error()) } var period time.Duration if p, err := strconv.Atoi(periodStr); err != nil { period = time.Duration(5) * time.Second } else { period = time.Duration(p) * time.Second } if eventSource == "" { eventSource = fmt.Sprintf("https://knative.dev/eventing-contrib/cmd/heartbeats/#%s/%s", env.Namespace, env.Name) log.Printf("Heartbeats Source: %s", eventSource) } if len(label) > 0 && label[0] == '"' { label, _ = strconv.Unquote(label) } hb := &Heartbeat{ Sequence: 0, Label: label, } ticker := time.NewTicker(period) for { hb.Sequence++ event := cloudevents.NewEvent("1.0") event.SetType(eventType) event.SetSource(eventSource) event.SetExtension("the", 42) event.SetExtension("heart", "yes") event.SetExtension("beats", true) if ceOverrides != nil && ceOverrides.Extensions != nil { for n, v := range ceOverrides.Extensions { event.SetExtension(n, v) } } if err := event.SetData(cloudevents.ApplicationJSON, hb); err != nil { log.Printf("failed to set cloudevents data: %s", err.Error()) } log.Printf("sending cloudevent to %s", sink) if res := c.Send(context.Background(), event); !cloudevents.IsACK(res) { log.Printf("failed to send cloudevent: %v", res) } if env.OneShot { return } // Wait for next tick <-ticker.C } }
다음은 이전 하트비트 컨테이너 이미지를 참조하는 컨테이너 소스의 예입니다.
apiVersion: sources.knative.dev/v1 kind: ContainerSource metadata: name: test-heartbeats spec: template: spec: containers: # This corresponds to a heartbeats image URI that you have built and published - image: gcr.io/knative-releases/knative.dev/eventing/cmd/heartbeats name: heartbeats args: - --period=1 env: - name: POD_NAME value: "example-pod" - name: POD_NAMESPACE value: "event-test" sink: ref: apiVersion: serving.knative.dev/v1 kind: Service name: example-service ...