参考资料:
kubernetes里的event事件生成机制 - xiaoqing
一、概述
学习背景:
之前同事问我kubectl get event 这个命令到底是怎么回事,为什么只能拿到一段时间的,这段时间是多久?在这里写下笔记
kubernetes 中 kubelet 负责维护整个pod的生命周期,当有pod创建、崩溃都会产生日志
消费
kubelet 产生日志发送给apiserver,然后apiserver 存储到etcd, 当然只保存 --event-ttl时间的数据。
当我们使用
kubectl get event
拉取event-ttl时间的event。
二、kubelet生产event
总体浏览
kubelet 通过client-go把event 推送给apiserver,apiserver
makeEventRecorder:
makeEventRecorder 创建了 eventBroadcaster 事件广播器
事件广播器创建之后,又会创建EventWatcher,一个用来生产日志,一个用来生产Event给apiServer,StartStructuredLogging用来生产日志,StartRecordingToSink用来生产事件给apiServer
-
func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName) {
-
if kubeDeps.Recorder !=
nil {
-
return
-
}
-
eventBroadcaster := record.NewBroadcaster()
-
kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host:
string(nodeName)})
-
eventBroadcaster.StartStructuredLogging(
3)
-
if kubeDeps.EventClient !=
nil {
-
klog.V(
4).InfoS(
"Sending events to api server")
-
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events(
"")})
-
}
else {
-
klog.InfoS(
"No api server defined - no events will be sent to API server")
-
}
-
}
StartRecordingToSink具体实现:
-
// StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink.
-
func (e *eventBroadcasterImpl)
StartRecordingToSink(stopCh <-chan struct{}) {
-
go wait
.Until(e.refreshExistingEventSeries, refreshTime, stopCh)
-
go wait
.Until(e.finishSeries, finishTime, stopCh)
-
e
.startRecordingEvents(stopCh)
-
}
StartStructuredLogging具体实现:
-
// StartStructuredLogging starts sending events received from this EventBroadcaster to the structured logging function.
-
// The return value can be ignored or used to stop recording, if desired.
-
func (e *eventBroadcasterImpl) StartStructuredLogging(verbosity klog.Level) func() {
-
return e.StartEventWatcher(
-
func(obj runtime.Object) {
-
event, ok := obj.(*eventsv1.Event)
-
if !ok {
-
klog.Errorf(
"unexpected type, expected eventsv1.Event")
-
return
-
}
-
klog.V(verbosity).InfoS(
"Event occurred",
"object", klog.KRef(
event.Regarding.Namespace,
event.Regarding.Name),
"kind",
event.Regarding.Kind,
"apiVersion",
event.Regarding.APIVersion,
"type",
event.Type,
"reason",
event.Reason,
"action",
event.Action,
"note",
event.Note)
-
})
-
}
StartEventWatcher 实现:
-
// StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
-
// The return value can be ignored or used to stop recording, if desired.
-
func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler
func(*v1.Event)) watch.Interface {
-
watcher, err := e.Watch()
-
if err !=
nil {
-
klog.Errorf(
"Unable start event watcher: '%v' (will not retry!)", err)
-
}
-
go
func() {
-
defer utilruntime.HandleCrash()
-
for watchEvent :=
range watcher.ResultChan() {
-
event, ok := watchEvent.Object.(*v1.Event)
-
if !ok {
-
// This is all local, so there's no reason this should
-
// ever happen.
-
continue
-
}
-
eventHandler(event)
-
}
-
}()
-
return watcher
-
}
EventBroadcaster:
启动:
-
// Start events processing pipeline.
-
c
.EventBroadcaster
.StartStructuredLogging(
0)
-
c
.EventBroadcaster
.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.Client.CoreV1()
.Events("")})
创建客户端:
-
// makeEventRecorder sets up kubeDeps.Recorder if it's nil. It's a no-op otherwise.
-
func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName) {
-
if kubeDeps.Recorder !=
nil {
-
return
-
}
-
eventBroadcaster := record.NewBroadcaster()
-
kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host:
string(nodeName)})
-
eventBroadcaster.StartStructuredLogging(
3)
-
if kubeDeps.EventClient !=
nil {
-
klog.V(
4).InfoS(
"Sending events to api server")
-
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events(
"")})
-
}
else {
-
klog.InfoS(
"No api server defined - no events will be sent to API server")
-
}
-
}
EventBroadcaster 负责把收到的event 分别派发到log 和 apiserver:
-
func (recorder *recorderImpl) generateEvent(
object runtime.Object, annotations map[
string]
string, eventtype, reason, message
string) {
-
ref, err :=
ref.GetReference(recorder.scheme,
object)
-
if err != nil {
-
klog.Errorf(
"Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'",
object, err, eventtype, reason, message)
-
return
-
}
-
-
if !util.ValidateEventType(eventtype) {
-
klog.Errorf(
"Unsupported event type: '%v'", eventtype)
-
return
-
}
-
-
event := recorder.makeEvent(
ref, annotations, eventtype, reason, message)
-
event.Source = recorder.source
-
-
// NOTE: events should be a non-blocking operation, but we also need to not
-
// put this in a goroutine, otherwise we'll race to write to a closed channel
-
// when we go to shut down this broadcaster. Just drop events if we get overloaded,
-
// and log an error if that happens (we've configured the broadcaster to drop
-
// outgoing events anyway).
-
sent, err := recorder.ActionOrDrop(watch.Added,
event)
-
if err != nil {
-
klog.Errorf(
"unable to record event: %v (will not retry!)", err)
-
return
-
}
-
if !sent {
-
klog.Errorf(
"unable to record event: too many queued events, dropped event %#v",
event)
-
}
-
}
具体分发位置位于:
-
func recordEvent(sink EventSink, event *v1.Event, patch []byte, updateExistingEvent bool, eventCorrelator *EventCorrelator) bool {
-
。。。。。。。。。。。。
-
// Update can fail because the event may have been removed and it no longer exists.
-
if !updateExistingEvent || (updateExistingEvent && util.IsKeyNotFoundError(err)) {
-
// Making sure that ResourceVersion is empty on creation
-
event.ResourceVersion =
""
-
newEvent, err = sink.Create(
event)
-
}
-
.................................
-
}
sinkCreate 用来继续向下分发,最后分发代码
-
func (e *events) CreateWithEventNamespace(event *v1.Event) (*v1.Event,
error) {
-
if e.ns !=
"" && event.Namespace != e.ns {
-
return
nil, fmt.Errorf(
"can't create an event with namespace '%v' in namespace '%v'", event.Namespace, e.ns)
-
}
-
result := &v1.Event{}
-
err := e.client.Post().
-
NamespaceIfScoped(event.Namespace,
len(event.Namespace) >
0).
-
Resource(
"events").
-
Body(event).
-
Do(context.TODO()).
-
Into(result)
-
return result, err
-
}
kubelet生产事件 是如何做聚合和限速的?如果k8s产生非常多的事件会不会把apiServer打挂掉?
kubelet做了聚合和限流,并且采用lru算法,只会上报最近默认10分钟的数据
-
// if we see the same event that varies only by message
-
// more than 10 times in a 10 minute period, aggregate the event
-
defaultAggregateMaxEvents =
10
-
defaultAggregateIntervalInSeconds =
600
lru 算法聚合
lru算法不再提了,这个算法是个很简单的算法主要看,lru中的key是如何生成的
-
// getSpamKey builds unique event key based on source, involvedObject
-
func getSpamKey(event *v1.Event) string {
-
return strings.Join([]
string{
-
event.Source.Component,
-
event.Source.Host,
-
event.InvolvedObject.Kind,
-
event.InvolvedObject.Namespace,
-
event.InvolvedObject.Name,
-
string(
event.InvolvedObject.UID),
-
event.InvolvedObject.APIVersion,
-
},
-
"")
-
}
lru聚合关键代码:
-
// EventAggregate checks if a similar event has been seen according to the
-
// aggregation configuration (max events, max interval, etc) and returns:
-
//
-
// - The (potentially modified) event that should be created
-
// - The cache key for the event, for correlation purposes. This will be set to
-
// the full key for normal events, and to the result of
-
// EventAggregatorMessageFunc for aggregate events.
-
func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event,
string) {
-
now := metav1.NewTime(e.clock.Now())
-
var record aggregateRecord
-
// eventKey is the full cache key for this event
-
eventKey := getEventKey(newEvent)
-
// aggregateKey is for the aggregate event, if one is needed.
-
aggregateKey, localKey := e.keyFunc(newEvent)
-
-
// Do we have a record of similar events in our cache?
-
e.Lock()
-
defer e.Unlock()
-
value, found := e.cache.Get(aggregateKey)
-
if found {
-
record = value.(aggregateRecord)
-
}
-
-
// Is the previous record too old? If so, make a fresh one. Note: if we didn't
-
// find a similar record, its lastTimestamp will be the zero value, so we
-
// create a new one in that case.
-
maxInterval := time.Duration(e.maxIntervalInSeconds) * time.Second
-
interval := now.Time.Sub(record.lastTimestamp.Time)
-
if interval > maxInterval {
-
record = aggregateRecord{localKeys: sets.NewString()}
-
}
-
-
// Write the new event into the aggregation record and put it on the cache
-
record.localKeys.Insert(localKey)
-
record.lastTimestamp = now
-
e.cache.Add(aggregateKey, record)
-
-
// If we are not yet over the threshold for unique events, don't correlate them
-
if
uint(record.localKeys.Len()) < e.maxEvents {
-
return newEvent, eventKey
-
}
-
-
// do not grow our local key set any larger than max
-
record.localKeys.PopAny()
-
-
// create a new aggregate event, and return the aggregateKey as the cache key
-
// (so that it can be overwritten.)
-
eventCopy := &v1.Event{
-
ObjectMeta: metav1.ObjectMeta{
-
Name: fmt.Sprintf(
"%v.%x", newEvent.InvolvedObject.Name, now.UnixNano()),
-
Namespace: newEvent.Namespace,
-
},
-
Count:
1,
-
FirstTimestamp: now,
-
InvolvedObject: newEvent.InvolvedObject,
-
LastTimestamp: now,
-
Message: e.messageFunc(newEvent),
-
Type: newEvent.Type,
-
Reason: newEvent.Reason,
-
Source: newEvent.Source,
-
}
-
return eventCopy, aggregateKey
-
}
lru算法看完了,我们看一下kubelet中event是如何限速的
event限速
限速具体位置:
-
// EventCorrelate filters, aggregates, counts, and de-duplicates all incoming events
-
func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult,
error) {
-
if newEvent ==
nil {
-
return
nil, fmt.Errorf(
"event is nil")
-
}
-
aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent)
-
observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey)
-
if c.filterFunc(observedEvent) {
-
return &EventCorrelateResult{Skip:
true},
nil
-
}
-
return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err
-
}
k8s官方使用的包是:
"golang.org/x/time/rate"
令牌桶算法调用:
-
// Filter controls that a given source+object are not exceeding the allowed rate.
-
func (f *EventSourceObjectSpamFilter) Filter(event *v1.Event)
bool {
-
var record spamRecord
-
-
// controls our cached information about this event
-
eventKey := f.spamKeyFunc(event)
-
-
// do we have a record of similar events in our cache?
-
f.Lock()
-
defer f.Unlock()
-
value, found := f.cache.Get(eventKey)
-
if found {
-
record = value.(spamRecord)
-
}
-
-
// verify we have a rate limiter for this record
-
if record.rateLimiter ==
nil {
-
record.rateLimiter = flowcontrol.NewTokenBucketPassiveRateLimiterWithClock(f.qps, f.burst, f.clock)
-
}
-
-
// ensure we have available rate
-
filter := !record.rateLimiter.TryAccept()
-
-
// update the cache
-
f.cache.Add(eventKey, record)
-
-
return filter
-
}
-
// NewTokenBucketPassiveRateLimiterWithClock is similar to NewTokenBucketRateLimiterWithClock
-
// except that it returns a PassiveRateLimiter which does not have Accept() and Wait() methods
-
// and uses a PassiveClock.
-
func NewTokenBucketPassiveRateLimiterWithClock(qps float32, burst int, c clock.PassiveClock) PassiveRateLimiter {
-
limiter := rate.NewLimiter(rate.Limit(qps), burst)
-
return newTokenBucketRateLimiterWithPassiveClock(limiter, c, qps)
-
}
核心调用:
-
func (tbprl *tokenBucketPassiveRateLimiter)
TryAccept() bool {
-
return tbprl
.limiter
.AllowN(tbprl.clock.Now(),
1)
-
}
如果有令牌桶不了解的可以看:
GitHub - juju/ratelimit: Efficient token-bucket-based rate limiter package.
三、apiserver 存储event到etcd
实现代码:
kubernetes/apiserver.go at master · kubernetes/kubernetes · GitHub
-
// CreateKubeAPIServer creates and wires a workable kube-apiserver
-
func CreateKubeAPIServer(kubeAPIServerConfig *controlplane.Config, delegateAPIServer genericapiserver.DelegationTarget) (*controlplane.Instance,
error) {
-
kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
-
if err !=
nil {
-
return
nil, err
-
}
-
-
return kubeAPIServer,
nil
-
}
kubernetes/storage_core.go at master · kubernetes/kubernetes · GitHub
存储event-ttl 时间的数据
-
func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo,
error) {
-
............................................
-
-
eventStorage, err := eventstore.NewREST(restOptionsGetter,
uint64(c.EventTTL.Seconds()))
-
if err !=
nil {
-
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
-
}
-
limitRangeStorage, err := limitrangestore.NewREST(restOptionsGetter)
-
if err !=
nil {
-
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
-
}
-
..........................................
-
-
return restStorage, apiGroupInfo,
nil
-
}
转载:https://blog.csdn.net/qq_32783703/article/details/127379653