当前位置:网站首页>Keda 2.7.1 brief analysis of scaledjob code
Keda 2.7.1 brief analysis of scaledjob code
2022-06-26 02:05:00 【weixin_ forty million four hundred and fifty-five thousand one 】
scaledJob Official description of use :As an alternate to scaling event-driven code as deployments you can also run and scale your code as Kubernetes Jobs.
The essence is through various metric control job Number , The core code is as follows go file
- scale_jobs.go
- scaledjob_controller.go
scaledjob_controller and scaledobject_controller similar , adopt requestScaleLoop function :
// requestScaleLoop request ScaleLoop handler for the respective ScaledJob
func (r *ScaledJobReconciler) requestScaleLoop(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) error {
logger.V(1).Info("Starting a new ScaleLoop")
return r.scaleHandler.HandleScalableObject(ctx, scaledJob)
}
call scale_handler.go Of HandleScalableObject function ,HandleScalableObject At the same time object and job.
switch obj := scalableObject.(type) {
case *kedav1alpha1.ScaledObject:
go h.startPushScalers(ctx, withTriggers, obj.DeepCopy(), scalingMutex)
go h.startScaleLoop(ctx, withTriggers, obj.DeepCopy(), scalingMutex)
case *kedav1alpha1.ScaledJob:
go h.startPushScalers(ctx, withTriggers, obj.DeepCopy(), scalingMutex)
go h.startScaleLoop(ctx, withTriggers, obj.DeepCopy(), scalingMutex)
}
return nil
scale_jobs The core code is RequestJobScale and createJobs, Finally through batchv1 “k8s.io/api/batch/v1” establish / adjustment job Number .
func (e *scaleExecutor) RequestJobScale(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob, isActive bool, scaleTo int64, maxScale int64) {
logger := e.logger.WithValues("scaledJob.Name", scaledJob.Name, "scaledJob.Namespace", scaledJob.Namespace)
runningJobCount := e.getRunningJobCount(ctx, scaledJob)
pendingJobCount := e.getPendingJobCount(ctx, scaledJob)
logger.Info("Scaling Jobs", "Number of running Jobs", runningJobCount)
logger.Info("Scaling Jobs", "Number of pending Jobs ", pendingJobCount)
effectiveMaxScale := NewScalingStrategy(logger, scaledJob).GetEffectiveMaxScale(maxScale, runningJobCount, pendingJobCount, scaledJob.MaxReplicaCount())
if effectiveMaxScale < 0 {
effectiveMaxScale = 0
}
if isActive {
logger.V(1).Info("At least one scaler is active")
now := metav1.Now()
scaledJob.Status.LastActiveTime = &now
err := e.updateLastActiveTime(ctx, logger, scaledJob)
if err != nil {
logger.Error(err, "Failed to update last active time")
}
e.createJobs(ctx, logger, scaledJob, scaleTo, effectiveMaxScale)
} else {
logger.V(1).Info("No change in activity")
}
condition := scaledJob.Status.Conditions.GetActiveCondition()
if condition.IsUnknown() || condition.IsTrue() != isActive {
if isActive {
if err := e.setActiveCondition(ctx, logger, scaledJob, metav1.ConditionTrue, "ScalerActive", "Scaling is performed because triggers are active"); err != nil {
logger.Error(err, "Error setting active condition when triggers are active")
return
}
} else {
if err := e.setActiveCondition(ctx, logger, scaledJob, metav1.ConditionFalse, "ScalerNotActive", "Scaling is not performed because triggers are not active"); err != nil {
logger.Error(err, "Error setting active condition when triggers are not active")
return
}
}
}
err := e.cleanUp(ctx, scaledJob)
if err != nil {
logger.Error(err, "Failed to cleanUp jobs")
}
}
func (e *scaleExecutor) createJobs(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob, scaleTo int64, maxScale int64) {
scaledJob.Spec.JobTargetRef.Template.GenerateName = scaledJob.GetName() + "-"
if scaledJob.Spec.JobTargetRef.Template.Labels == nil {
scaledJob.Spec.JobTargetRef.Template.Labels = map[string]string{
}
}
scaledJob.Spec.JobTargetRef.Template.Labels["scaledjob.keda.sh/name"] = scaledJob.GetName()
logger.Info("Creating jobs", "Effective number of max jobs", maxScale)
if scaleTo > maxScale {
scaleTo = maxScale
}
logger.Info("Creating jobs", "Number of jobs", scaleTo)
labels := map[string]string{
"app.kubernetes.io/name": scaledJob.GetName(),
"app.kubernetes.io/version": version.Version,
"app.kubernetes.io/part-of": scaledJob.GetName(),
"app.kubernetes.io/managed-by": "keda-operator",
"scaledjob.keda.sh/name": scaledJob.GetName(),
}
for key, value := range scaledJob.ObjectMeta.Labels {
labels[key] = value
}
for i := 0; i < int(scaleTo); i++ {
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
GenerateName: scaledJob.GetName() + "-",
Namespace: scaledJob.GetNamespace(),
Labels: labels,
},
Spec: *scaledJob.Spec.JobTargetRef.DeepCopy(),
}
// Job doesn't allow RestartPolicyAlways, it seems like this value is set by the client as a default one,
// we should set this property to allowed value in that case
if job.Spec.Template.Spec.RestartPolicy == "" {
logger.V(1).Info("Job RestartPolicy is not set, setting it to 'OnFailure', to avoid setting it to the client's default value 'Always'")
job.Spec.Template.Spec.RestartPolicy = corev1.RestartPolicyOnFailure
}
// Set ScaledJob instance as the owner and controller
err := controllerutil.SetControllerReference(scaledJob, job, e.reconcilerScheme)
if err != nil {
logger.Error(err, "Failed to set ScaledJob as the owner of the new Job")
}
err = e.client.Create(ctx, job)
if err != nil {
logger.Error(err, "Failed to create a new Job")
}
}
logger.Info("Created jobs", "Number of jobs", scaleTo)
e.recorder.Eventf(scaledJob, corev1.EventTypeNormal, eventreason.KEDAJobsCreated, "Created %d jobs", scaleTo)
}
Other useful URL:
https://livewyer.io/blog/2021/06/17/keda-showcase-autoscaling-based-on-prometheus-redis/
https://blog.csdn.net/github_19391267/article/details/109634935
边栏推荐
- UN make (6) conditional execution of makefile
- 如何高效的完成每日的任务?
- 将weishi相机图片进行转换
- Playful girl wangyixuan was invited to serve as the Promotion Ambassador for the global finals of the sixth season perfect children's model
- 创建OpenGl窗口
- 字节序问题
- Wanglaoji pharmaceutical's public welfare activity of "caring for the most lovely people under the scorching sun" was launched in Hangzhou
- How to set an achievable annual goal?
- A lost note for konjaku beginner
- Disruptor (I) sequence
猜你喜欢

Exploring temporary information for dynamic network embedding

一分钟了解同步、异步、阻塞和非阻塞的区别

One stop solution EMQ for hundreds of millions of communication of Internet of things

-- EGFR FISH probe solution

【无标题】vsbiji esp....32

Dataframe extracts data from a column and converts it into a list

A lost note for konjaku beginner

cv==biaoding---open----cv001

Abnova anti GBA monoclonal antibody solution

Energetic girl wangyujie was invited to be the spokesperson for the global finals of the sixth season perfect children's model
随机推荐
buffer
Disruptor(一)Sequence
LeetCode 31 ~ 40
Connectez Le projecteur
螺旋矩阵
Record a weird picture upload problem
The answer skills and examples of practical cases of the second construction company are full of essence
如何制定一个可实现的年度目标?
静态库动态库的使用
Differences and functions of TOS cos DSCP
反向输出一个整数
How to efficiently complete daily tasks?
17.11 std:: atomic continuation, std:: async in-depth discussion
Output Lua print to the cocos2d console output window
Use of static library and dynamic library
cv==biaoding---open----cv001
连接投影仪
A lost note for konjaku beginner
记录一个诡异的图片上传问题
Calibration...