飞道的博客

kubernetes event 的内幕

314人阅读  评论(0)

参考资料:

kubernetes里的event事件生成机制 - xiaoqing

Kubernetes · GitHub

一、概述

学习背景:

之前同事问我kubectl get event 这个命令到底是怎么回事,为什么只能拿到一段时间的,这段时间是多久?在这里写下笔记

kubernetes 中 kubelet 负责维护整个pod的生命周期,当有pod创建、崩溃都会产生日志

消费

kubelet 产生日志发送给apiserver,然后apiserver 存储到etcd, 当然只保存 --event-ttl时间的数据。

当我们使用 

kubectl get event

 拉取event-ttl时间的event。

二、kubelet生产event

总体浏览

kubelet 通过client-go把event 推送给apiserver,apiserver

makeEventRecorder:

makeEventRecorder 创建了 eventBroadcaster 事件广播器

事件广播器创建之后,又会创建EventWatcher,一个用来生产日志,一个用来生产Event给apiServer,StartStructuredLogging用来生产日志,StartRecordingToSink用来生产事件给apiServer


  
  1. func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName) {
  2. if kubeDeps.Recorder != nil {
  3. return
  4. }
  5. eventBroadcaster := record.NewBroadcaster()
  6. kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)})
  7. eventBroadcaster.StartStructuredLogging( 3)
  8. if kubeDeps.EventClient != nil {
  9. klog.V( 4).InfoS( "Sending events to api server")
  10. eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events( "")})
  11. } else {
  12. klog.InfoS( "No api server defined - no events will be sent to API server")
  13. }
  14. }

StartRecordingToSink具体实现:


  
  1. // StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink.
  2. func (e *eventBroadcasterImpl) StartRecordingToSink(stopCh <-chan struct{}) {
  3. go wait .Until(e.refreshExistingEventSeries, refreshTime, stopCh)
  4. go wait .Until(e.finishSeries, finishTime, stopCh)
  5. e .startRecordingEvents(stopCh)
  6. }

StartStructuredLogging具体实现:


  
  1. // StartStructuredLogging starts sending events received from this EventBroadcaster to the structured logging function.
  2. // The return value can be ignored or used to stop recording, if desired.
  3. func (e *eventBroadcasterImpl) StartStructuredLogging(verbosity klog.Level) func() {
  4. return e.StartEventWatcher(
  5. func(obj runtime.Object) {
  6. event, ok := obj.(*eventsv1.Event)
  7. if !ok {
  8. klog.Errorf( "unexpected type, expected eventsv1.Event")
  9. return
  10. }
  11. klog.V(verbosity).InfoS( "Event occurred", "object", klog.KRef( event.Regarding.Namespace, event.Regarding.Name), "kind", event.Regarding.Kind, "apiVersion", event.Regarding.APIVersion, "type", event.Type, "reason", event.Reason, "action", event.Action, "note", event.Note)
  12. })
  13. }
StartEventWatcher 实现:

  
  1. // StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
  2. // The return value can be ignored or used to stop recording, if desired.
  3. func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
  4. watcher, err := e.Watch()
  5. if err != nil {
  6. klog.Errorf( "Unable start event watcher: '%v' (will not retry!)", err)
  7. }
  8. go func() {
  9. defer utilruntime.HandleCrash()
  10. for watchEvent := range watcher.ResultChan() {
  11. event, ok := watchEvent.Object.(*v1.Event)
  12. if !ok {
  13. // This is all local, so there's no reason this should
  14. // ever happen.
  15. continue
  16. }
  17. eventHandler(event)
  18. }
  19. }()
  20. return watcher
  21. }

EventBroadcaster:

启动:


  
  1. // Start events processing pipeline.
  2. c .EventBroadcaster .StartStructuredLogging( 0)
  3. c .EventBroadcaster .StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.Client.CoreV1() .Events("")})

创建客户端:


  
  1. // makeEventRecorder sets up kubeDeps.Recorder if it's nil. It's a no-op otherwise.
  2. func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName) {
  3. if kubeDeps.Recorder != nil {
  4. return
  5. }
  6. eventBroadcaster := record.NewBroadcaster()
  7. kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)})
  8. eventBroadcaster.StartStructuredLogging( 3)
  9. if kubeDeps.EventClient != nil {
  10. klog.V( 4).InfoS( "Sending events to api server")
  11. eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events( "")})
  12. } else {
  13. klog.InfoS( "No api server defined - no events will be sent to API server")
  14. }
  15. }

EventBroadcaster 负责把收到的event 分别派发到log 和 apiserver:


  
  1. func (recorder *recorderImpl) generateEvent( object runtime.Object, annotations map[ string] string, eventtype, reason, message string) {
  2. ref, err := ref.GetReference(recorder.scheme, object)
  3. if err != nil {
  4. klog.Errorf( "Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", object, err, eventtype, reason, message)
  5. return
  6. }
  7. if !util.ValidateEventType(eventtype) {
  8. klog.Errorf( "Unsupported event type: '%v'", eventtype)
  9. return
  10. }
  11. event := recorder.makeEvent( ref, annotations, eventtype, reason, message)
  12. event.Source = recorder.source
  13. // NOTE: events should be a non-blocking operation, but we also need to not
  14. // put this in a goroutine, otherwise we'll race to write to a closed channel
  15. // when we go to shut down this broadcaster. Just drop events if we get overloaded,
  16. // and log an error if that happens (we've configured the broadcaster to drop
  17. // outgoing events anyway).
  18. sent, err := recorder.ActionOrDrop(watch.Added, event)
  19. if err != nil {
  20. klog.Errorf( "unable to record event: %v (will not retry!)", err)
  21. return
  22. }
  23. if !sent {
  24. klog.Errorf( "unable to record event: too many queued events, dropped event %#v", event)
  25. }
  26. }

具体分发位置位于:


  
  1. func recordEvent(sink EventSink, event *v1.Event, patch []byte, updateExistingEvent bool, eventCorrelator *EventCorrelator) bool {
  2. 。。。。。。。。。。。。
  3. // Update can fail because the event may have been removed and it no longer exists.
  4. if !updateExistingEvent || (updateExistingEvent && util.IsKeyNotFoundError(err)) {
  5. // Making sure that ResourceVersion is empty on creation
  6. event.ResourceVersion = ""
  7. newEvent, err = sink.Create( event)
  8. }
  9. .................................
  10. }

sinkCreate 用来继续向下分发,最后分发代码


  
  1. func (e *events) CreateWithEventNamespace(event *v1.Event) (*v1.Event, error) {
  2. if e.ns != "" && event.Namespace != e.ns {
  3. return nil, fmt.Errorf( "can't create an event with namespace '%v' in namespace '%v'", event.Namespace, e.ns)
  4. }
  5. result := &v1.Event{}
  6. err := e.client.Post().
  7. NamespaceIfScoped(event.Namespace, len(event.Namespace) > 0).
  8. Resource( "events").
  9. Body(event).
  10. Do(context.TODO()).
  11. Into(result)
  12. return result, err
  13. }

kubelet生产事件 是如何做聚合和限速的?如果k8s产生非常多的事件会不会把apiServer打挂掉?

kubelet做了聚合和限流,并且采用lru算法,只会上报最近默认10分钟的数据


  
  1. // if we see the same event that varies only by message
  2. // more than 10 times in a 10 minute period, aggregate the event
  3. defaultAggregateMaxEvents = 10
  4. defaultAggregateIntervalInSeconds = 600

lru 算法聚合

lru算法不再提了,这个算法是个很简单的算法主要看,lru中的key是如何生成的


  
  1. // getSpamKey builds unique event key based on source, involvedObject
  2. func getSpamKey(event *v1.Event) string {
  3. return strings.Join([] string{
  4. event.Source.Component,
  5. event.Source.Host,
  6. event.InvolvedObject.Kind,
  7. event.InvolvedObject.Namespace,
  8. event.InvolvedObject.Name,
  9. string( event.InvolvedObject.UID),
  10. event.InvolvedObject.APIVersion,
  11. },
  12. "")
  13. }

lru聚合关键代码:


  
  1. // EventAggregate checks if a similar event has been seen according to the
  2. // aggregation configuration (max events, max interval, etc) and returns:
  3. //
  4. // - The (potentially modified) event that should be created
  5. // - The cache key for the event, for correlation purposes. This will be set to
  6. // the full key for normal events, and to the result of
  7. // EventAggregatorMessageFunc for aggregate events.
  8. func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, string) {
  9. now := metav1.NewTime(e.clock.Now())
  10. var record aggregateRecord
  11. // eventKey is the full cache key for this event
  12. eventKey := getEventKey(newEvent)
  13. // aggregateKey is for the aggregate event, if one is needed.
  14. aggregateKey, localKey := e.keyFunc(newEvent)
  15. // Do we have a record of similar events in our cache?
  16. e.Lock()
  17. defer e.Unlock()
  18. value, found := e.cache.Get(aggregateKey)
  19. if found {
  20. record = value.(aggregateRecord)
  21. }
  22. // Is the previous record too old? If so, make a fresh one. Note: if we didn't
  23. // find a similar record, its lastTimestamp will be the zero value, so we
  24. // create a new one in that case.
  25. maxInterval := time.Duration(e.maxIntervalInSeconds) * time.Second
  26. interval := now.Time.Sub(record.lastTimestamp.Time)
  27. if interval > maxInterval {
  28. record = aggregateRecord{localKeys: sets.NewString()}
  29. }
  30. // Write the new event into the aggregation record and put it on the cache
  31. record.localKeys.Insert(localKey)
  32. record.lastTimestamp = now
  33. e.cache.Add(aggregateKey, record)
  34. // If we are not yet over the threshold for unique events, don't correlate them
  35. if uint(record.localKeys.Len()) < e.maxEvents {
  36. return newEvent, eventKey
  37. }
  38. // do not grow our local key set any larger than max
  39. record.localKeys.PopAny()
  40. // create a new aggregate event, and return the aggregateKey as the cache key
  41. // (so that it can be overwritten.)
  42. eventCopy := &v1.Event{
  43. ObjectMeta: metav1.ObjectMeta{
  44. Name: fmt.Sprintf( "%v.%x", newEvent.InvolvedObject.Name, now.UnixNano()),
  45. Namespace: newEvent.Namespace,
  46. },
  47. Count: 1,
  48. FirstTimestamp: now,
  49. InvolvedObject: newEvent.InvolvedObject,
  50. LastTimestamp: now,
  51. Message: e.messageFunc(newEvent),
  52. Type: newEvent.Type,
  53. Reason: newEvent.Reason,
  54. Source: newEvent.Source,
  55. }
  56. return eventCopy, aggregateKey
  57. }

lru算法看完了,我们看一下kubelet中event是如何限速的

event限速

限速具体位置: 


  
  1. // EventCorrelate filters, aggregates, counts, and de-duplicates all incoming events
  2. func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) {
  3. if newEvent == nil {
  4. return nil, fmt.Errorf( "event is nil")
  5. }
  6. aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent)
  7. observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey)
  8. if c.filterFunc(observedEvent) {
  9. return &EventCorrelateResult{Skip: true}, nil
  10. }
  11. return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err
  12. }

k8s官方使用的包是:

"golang.org/x/time/rate"

令牌桶算法调用:


  
  1. // Filter controls that a given source+object are not exceeding the allowed rate.
  2. func (f *EventSourceObjectSpamFilter) Filter(event *v1.Event) bool {
  3. var record spamRecord
  4. // controls our cached information about this event
  5. eventKey := f.spamKeyFunc(event)
  6. // do we have a record of similar events in our cache?
  7. f.Lock()
  8. defer f.Unlock()
  9. value, found := f.cache.Get(eventKey)
  10. if found {
  11. record = value.(spamRecord)
  12. }
  13. // verify we have a rate limiter for this record
  14. if record.rateLimiter == nil {
  15. record.rateLimiter = flowcontrol.NewTokenBucketPassiveRateLimiterWithClock(f.qps, f.burst, f.clock)
  16. }
  17. // ensure we have available rate
  18. filter := !record.rateLimiter.TryAccept()
  19. // update the cache
  20. f.cache.Add(eventKey, record)
  21. return filter
  22. }


  
  1. // NewTokenBucketPassiveRateLimiterWithClock is similar to NewTokenBucketRateLimiterWithClock
  2. // except that it returns a PassiveRateLimiter which does not have Accept() and Wait() methods
  3. // and uses a PassiveClock.
  4. func NewTokenBucketPassiveRateLimiterWithClock(qps float32, burst int, c clock.PassiveClock) PassiveRateLimiter {
  5. limiter := rate.NewLimiter(rate.Limit(qps), burst)
  6. return newTokenBucketRateLimiterWithPassiveClock(limiter, c, qps)
  7. }

核心调用:


  
  1. func (tbprl *tokenBucketPassiveRateLimiter) TryAccept() bool {
  2. return tbprl .limiter .AllowN(tbprl.clock.Now(), 1)
  3. }

如果有令牌桶不了解的可以看:

GitHub - juju/ratelimit: Efficient token-bucket-based rate limiter package.

三、apiserver 存储event到etcd

实现代码:

kubernetes/apiserver.go at master · kubernetes/kubernetes · GitHub


  
  1. // CreateKubeAPIServer creates and wires a workable kube-apiserver
  2. func CreateKubeAPIServer(kubeAPIServerConfig *controlplane.Config, delegateAPIServer genericapiserver.DelegationTarget) (*controlplane.Instance, error) {
  3. kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
  4. if err != nil {
  5. return nil, err
  6. }
  7. return kubeAPIServer, nil
  8. }

kubernetes/storage_core.go at master · kubernetes/kubernetes · GitHub

存储event-ttl 时间的数据


  
  1. func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) {
  2. ............................................
  3. eventStorage, err := eventstore.NewREST(restOptionsGetter, uint64(c.EventTTL.Seconds()))
  4. if err != nil {
  5. return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
  6. }
  7. limitRangeStorage, err := limitrangestore.NewREST(restOptionsGetter)
  8. if err != nil {
  9. return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
  10. }
  11. ..........................................
  12. return restStorage, apiGroupInfo, nil
  13. }


转载:https://blog.csdn.net/qq_32783703/article/details/127379653
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场